Worker processes - Rust SDK
How to run Worker Processes
The Worker Process is where Workflow Functions and Activity Functions are executed.
Each Worker must:
- Connect to the Temporal Service
- Register the exact Workflow Types and Activity Types it will execute
- Associate itself with one or more Task Queues
- Be started to begin polling for work
A Worker can handle multiple Workflow and Activity types. Multiple Workers can listen to the same Task Queue for scalability and high availability.
Create and Start a Worker
Use the Worker struct to create a Worker in Rust. You'll need:
- A Temporal Client connection
- A Core Runtime
- Worker Options with registered Workflows and Activities
use temporalio_client::{Client, ClientOptions, Connection, envconfig::LoadClientConfigProfileOptions};
use temporalio_sdk::{Worker, WorkerOptions};
use temporalio_sdk_core::{CoreRuntime, RuntimeOptions};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create the SDK core runtime
let runtime = CoreRuntime::new_assume_tokio(RuntimeOptions::builder().build()?)?;
// Load client configuration from environment or config file
let (conn_options, client_options) = ClientOptions::load_from_config(
LoadClientConfigProfileOptions::default()
)?;
let connection = Connection::connect(conn_options).await?;
let client = Client::new(connection, client_options);
// Create Worker Options
let worker_options = WorkerOptions::new("my-task-queue")
// Register your activities
.register_activities(GreetingActivities)
// Register your workflows
.register_workflow::<GreetingWorkflow>()
.build();
// Create and run the worker
Worker::new(&runtime, client, worker_options)?.run().await?;
Ok(())
}
Register Types
All Workers listening to the same Task Queue must be registered to handle the exact same Workflow Types and Activity Types.
If a Worker polls a Task for a Workflow Type or Activity Type it does not know about, it fails that Task.
Register Workflows
Use .register_workflow::<T>() to register a Workflow type:
let worker_options = WorkerOptions::new("my-task-queue")
.register_workflow::<GreetingWorkflow>()
.register_workflow::<ProcessingWorkflow>()
.register_workflow::<MyOtherWorkflow>()
.build();
Register Activities
Use .register_activities() to register Activity implementations. Activities are stateless and thread-safe, so you pass an instance:
let greeting_activities = GreetingActivities;
let processing_activities = ProcessingActivities::new();
let worker_options = WorkerOptions::new("my-task-queue")
.register_activities(greeting_activities)
.register_activities(processing_activities)
.build();
Multiple Activity instances can be registered with the same Worker.
Worker Configuration Options
You can customize Worker behavior with various options:
use std::time::Duration;
let worker_options = WorkerOptions::new("my-task-queue")
.register_workflow::<MyWorkflow>()
.register_activities(MyActivities)
// Max number of Workflows to hold in cache
.max_cached_workflows(1000)
// How many Workflow tasks to poll concurrently
.max_concurrent_workflow_task_polls(10)
// How many Activity tasks to poll concurrently
.max_concurrent_activity_task_polls(10)
// How long to wait for tasks from the service
.pollers_config(PollerConfig {
max_tasks_per_poll: 100,
..Default::default()
})
// Detect nondeterministic patterns at runtime
.detect_nondeterministic_futures(true)
// Gracefully shut down after this duration
.graceful_shutdown_period(Duration::from_secs(30))
.build();
Worker::new(&runtime, client, worker_options)?.run().await?;
Important Configuration Options
max_cached_workflows- Maximum number of Workflow instances to keep in cache. Larger values mean faster replay but use more memory.max_concurrent_workflow_task_polls- Number of Workflow tasks to process concurrently.max_concurrent_activity_task_polls- Number of Activity tasks to process concurrently.detect_nondeterministic_futures- Enable runtime detection of nondeterministic code patterns (default: true).graceful_shutdown_period- How long to wait for in-flight tasks to complete before force-shutting down.
Multiple Task Queues
You can create multiple Workers listening to different Task Queues in the same process:
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let runtime = CoreRuntime::new_assume_tokio(RuntimeOptions::builder().build()?)?;
let (conn_options, client_options) = ClientOptions::load_from_config(
LoadClientConfigProfileOptions::default()
)?;
let connection = Connection::connect(conn_options).await?;
let client = Client::new(connection, client_options);
// Fast task queue
let fast_worker_options = WorkerOptions::new("fast-tasks")
.register_workflow::<QuickTaskWorkflow>()
.register_activities(QuickTaskActivities)
.max_concurrent_activity_task_polls(20)
.build();
let fast_worker = Worker::new(&runtime, client.clone(), fast_worker_options)?;
// Slow task queue
let slow_worker_options = WorkerOptions::new("slow-tasks")
.register_workflow::<LongTaskWorkflow>()
.register_activities(LongTaskActivities)
.max_concurrent_activity_task_polls(5)
.build();
let slow_worker = Worker::new(&runtime, client, slow_worker_options)?;
// Run both workers concurrently
tokio::select! {
_ = fast_worker.run() => println!("Fast worker stopped"),
_ = slow_worker.run() => println!("Slow worker stopped"),
}
Ok(())
}
Scale Workers Horizontally
You can run multiple Worker processes on different machines, all listening to the same Task Queue:
# Machine 1
cargo run --bin my-worker
# Machine 2
cargo run --bin my-worker
# Machine 3
cargo run --bin my-worker
All three Workers process tasks from the same Task Queue, automatically load-balancing Workflow and Activity Executions across the processes.
Graceful Shutdown
Workers support graceful shutdown, waiting for in-flight tasks to complete:
use tokio::signal;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let runtime = CoreRuntime::new_assume_tokio(RuntimeOptions::builder().build()?)?;
let (conn_options, client_options) = ClientOptions::load_from_config(
LoadClientConfigProfileOptions::default()
)?;
let connection = Connection::connect(conn_options).await?;
let client = Client::new(connection, client_options);
let worker_options = WorkerOptions::new("my-task-queue")
.register_workflow::<MyWorkflow>()
.register_activities(MyActivities)
.graceful_shutdown_period(Duration::from_secs(30))
.build();
let worker = Worker::new(&runtime, client, worker_options)?;
// Run worker and wait for shutdown signal
tokio::select! {
_ = worker.run() => println!("Worker stopped"),
_ = signal::ctrl_c() => {
println!("Received shutdown signal, gracefully shutting down...");
// Worker will complete in-flight tasks
}
}
Ok(())
}
Connection Configuration
Workers connect to the Temporal Service using environment variables or a configuration file. You can customize the connection:
use temporalio_client::{ClientOptions, WorkflowServiceStubsOptions};
use std::time::Duration;
let conn_options = WorkflowServiceStubsOptions::new()
.target_url("localhost:7233".parse()?) // Custom server address
.client_cert_path("path/to/cert.pem".into()) // mTLS certificate
.client_key_path("path/to/key.pem".into()) // mTLS key
.build();
let client_options = ClientOptions::new()
.namespace("production");
let connection = Connection::connect(conn_options).await?;
let client = Client::new(connection, client_options);
For more configuration options, see Environment Configuration.
Complete Worker Example
Here's a complete example of a Worker with multiple Workflows and Activities:
use temporalio_client::{Client, ClientOptions, Connection, envconfig::LoadClientConfigProfileOptions};
use temporalio_sdk::{Worker, WorkerOptions};
use temporalio_sdk_core::{CoreRuntime, RuntimeOptions};
use std::time::Duration;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Starting Temporal Worker...");
// Create core runtime
let runtime = CoreRuntime::new_assume_tokio(RuntimeOptions::builder().build()?)?;
// Load configuration from environment
let (conn_options, client_options) = ClientOptions::load_from_config(
LoadClientConfigProfileOptions::default()
)?;
// Connect to Temporal Service
let connection = Connection::connect(conn_options).await?;
let client = Client::new(connection, client_options);
// Configure worker
let worker_options = WorkerOptions::new("my-task-queue")
// Register workflows
.register_workflow::<GreetingWorkflow>()
.register_workflow::<ProcessingWorkflow>()
// Register activities
.register_activities(GreetingActivities)
.register_activities(ProcessingActivities::new())
// Performance tuning
.max_cached_workflows(1000)
.max_concurrent_workflow_task_polls(15)
.max_concurrent_activity_task_polls(15)
// Graceful shutdown
.graceful_shutdown_period(Duration::from_secs(30))
.build();
// Create and run worker
println!("Registering workflows and activities...");
let worker = Worker::new(&runtime, client, worker_options)?;
println!("Worker listening on task queue: my-task-queue");
println!("Press Ctrl+C to shutdown gracefully...\n");
worker.run().await?;
Ok(())
}
Best Practices
- Use Task Queues for scaling: Different Task Queues for different types of work
- Monitor Worker health: Check logs and metrics to ensure Workers are healthy
- Configure appropriate concurrency: Match concurrency settings to your machine capabilities
- Use graceful shutdown: Allow in-flight tasks to complete before stopping
- Keep Workflow/Activity code stateless: Avoid storing state in Worker process memory
- Run Workers in containers: For easy deployment and scaling
For more information, see Worker Tuning Guide.