Cancellation - Rust SDK
Workflows and Activities in the Temporal Rust SDK support cancellation. This page shows how to handle cancellation in your Workflows and Activities.
Detect Workflow Cancellation
A Workflow can detect that it has been cancelled using the ctx.cancelled() method. This returns a future that resolves with the cancellation reason when the Workflow is cancelled:
use temporalio_sdk::workflows::select;
#[run]
async fn run(ctx: &mut workflow::WorkflowContext<Self>) -> WorkflowResult<String> {
select! {
result = ctx.activity(
MyActivities::long_running_operation,
input,
ActivityOptions {
start_to_close_timeout: Some(Duration::from_secs(300)),
..Default::default()
}
) => {
Ok(format!("Activity completed: {:?}", result?))
}
reason = ctx.cancelled() => {
Ok(format!("Workflow cancelled: {}", reason))
}
}
}
When the Workflow receives a cancellation request, ctx.cancelled() will resolve with the cancellation reason.
Graceful Cancellation Handling
When a Workflow is cancelled, you should stop executing new activities and clean up gracefully:
use temporalio_sdk::workflows::select;
#[run]
async fn run(ctx: &mut workflow::WorkflowContext<Self>) -> WorkflowResult<Vec<String>> {
let mut results = Vec::new();
for item in &self.items {
select! {
result = ctx.activity(
MyActivities::process_item,
item.clone(),
ActivityOptions {
start_to_close_timeout: Some(Duration::from_secs(60)),
..Default::default()
}
) => {
results.push(result??);
}
reason = ctx.cancelled() => {
// Workflow was cancelled
workflow::logger.info(format!("Processing cancelled: {}", reason));
return Err(WorkflowExecutionError::new_with_reason(
"cancelled",
&reason
));
}
}
}
Ok(results)
}
Handle Cancellation in Activities
Activities can allow graceful cancellation by regularly recording heartbeats:
use temporalio_sdk::activities::{ActivityContext, ActivityError};
use std::time::Duration;
#[activity]
pub async fn process_many_items(
ctx: ActivityContext,
items: Vec<String>,
) -> Result<usize, ActivityError> {
let mut processed = 0;
for item in items {
// Process the item
do_work(&item).await?;
processed += 1;
// Record heartbeat to detect cancellation
ctx.record_heartbeat(processed);
// Small delay to simulate work
tokio::time::sleep(Duration::from_millis(100)).await;
}
Ok(processed)
}
When an Activity is cancelled:
- The Activity will be notified through its heartbeat mechanism
- The Activity should then clean up any resources and exit
- The Workflow will receive an error with
ActivityError::Cancelled
Workflow Cancellation Semantics
When a Workflow is cancelled:
- The cancellation request is delivered to the Workflow task
ctx.cancelled()resolves with the cancellation reason- The Workflow should stop starting new activities and clean up
- The Workflow Execution terminates with a cancelled status
Immediate termination
You can also use Workflow termination to immediately stop the Workflow without allowing it to handle cancellation:
// This terminates the workflow process immediately
// from outside the workflow (e.g., from a client)
let handle = client.get_workflow_handle::<MyWorkflow>("workflow-id");
handle.terminate(TerminateWorkflowOptions::builder()
.reason("Emergency stop")
.build()).await?;
Cancellation Best Practices
- Always handle cancellation: Wrap long-running operations in
select!to detect cancellation - Clean up resources: When cancelled, make sure to properly close connections, stop child workflows, etc.
- Record heartbeats in activities: For long Activities, record heartbeats regularly so cancellation can be detected
- Don't ignore cancellation: Returning
Okafter cancellation will mislead downstream systems - Be idempotent: Ensure cancelled operations can be safely retried if the Workflow retries
Cancellation Flow Example
Here's a complete example of handling cancellation in a Workflow with multiple activities:
#[workflow]
pub struct ProcessingWorkflow {
items: Vec<String>,
}
#[workflow_methods]
impl ProcessingWorkflow {
#[init]
fn new(_ctx: &WorkflowContextView, items: Vec<String>) -> Self {
Self { items }
}
#[run]
async fn run(ctx: &mut workflow::WorkflowContext<Self>) -> WorkflowResult<ProcessingResult> {
let items = ctx.state(|s| s.items.clone());
let mut successful = 0;
let mut failed = 0;
for item in items {
select! {
result = ctx.activity(
MyActivities::process_item,
item,
ActivityOptions {
start_to_close_timeout: Some(Duration::from_secs(60)),
..Default::default()
}
) => {
match result? {
Ok(_) => successful += 1,
Err(_) => failed += 1,
}
}
reason = ctx.cancelled() => {
return Err(WorkflowExecutionError::new(
"cancelled",
format!("Processing cancelled after {} items. Reason: {}", successful + failed, reason)
));
}
}
}
Ok(ProcessingResult {
successful,
failed,
total: successful + failed,
})
}
}
This Workflow processes items but will stop and report progress if cancelled.