Skip to main content

Worker processes - Rust SDK

How to run Worker Processes

The Worker Process is where Workflow Functions and Activity Functions are executed.

Each Worker must:

  • Connect to the Temporal Service
  • Register the exact Workflow Types and Activity Types it will execute
  • Associate itself with one or more Task Queues
  • Be started to begin polling for work

A Worker can handle multiple Workflow and Activity types. Multiple Workers can listen to the same Task Queue for scalability and high availability.

Create and Start a Worker

Use the Worker struct to create a Worker in Rust. You'll need:

  1. A Temporal Client connection
  2. A Core Runtime
  3. Worker Options with registered Workflows and Activities
use temporalio_client::{Client, ClientOptions, Connection, envconfig::LoadClientConfigProfileOptions};
use temporalio_sdk::{Worker, WorkerOptions};
use temporalio_sdk_core::{CoreRuntime, RuntimeOptions};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create the SDK core runtime
let runtime = CoreRuntime::new_assume_tokio(RuntimeOptions::builder().build()?)?;

// Load client configuration from environment or config file
let (conn_options, client_options) = ClientOptions::load_from_config(
LoadClientConfigProfileOptions::default()
)?;

let connection = Connection::connect(conn_options).await?;
let client = Client::new(connection, client_options);

// Create Worker Options
let worker_options = WorkerOptions::new("my-task-queue")
// Register your activities
.register_activities(GreetingActivities)
// Register your workflows
.register_workflow::<GreetingWorkflow>()
.build();

// Create and run the worker
Worker::new(&runtime, client, worker_options)?.run().await?;

Ok(())
}

Register Types

All Workers listening to the same Task Queue must be registered to handle the exact same Workflow Types and Activity Types.

If a Worker polls a Task for a Workflow Type or Activity Type it does not know about, it fails that Task.

Register Workflows

Use .register_workflow::<T>() to register a Workflow type:

let worker_options = WorkerOptions::new("my-task-queue")
.register_workflow::<GreetingWorkflow>()
.register_workflow::<ProcessingWorkflow>()
.register_workflow::<MyOtherWorkflow>()
.build();

Register Activities

Use .register_activities() to register Activity implementations. Activities are stateless and thread-safe, so you pass an instance:

let greeting_activities = GreetingActivities;
let processing_activities = ProcessingActivities::new();

let worker_options = WorkerOptions::new("my-task-queue")
.register_activities(greeting_activities)
.register_activities(processing_activities)
.build();

Multiple Activity instances can be registered with the same Worker.

Worker Configuration Options

You can customize Worker behavior with various options:

use std::time::Duration;

let worker_options = WorkerOptions::new("my-task-queue")
.register_workflow::<MyWorkflow>()
.register_activities(MyActivities)
// Max number of Workflows to hold in cache
.max_cached_workflows(1000)
// How many Workflow tasks to poll concurrently
.max_concurrent_workflow_task_polls(10)
// How many Activity tasks to poll concurrently
.max_concurrent_activity_task_polls(10)
// How long to wait for tasks from the service
.pollers_config(PollerConfig {
max_tasks_per_poll: 100,
..Default::default()
})
// Detect nondeterministic patterns at runtime
.detect_nondeterministic_futures(true)
// Gracefully shut down after this duration
.graceful_shutdown_period(Duration::from_secs(30))
.build();

Worker::new(&runtime, client, worker_options)?.run().await?;

Important Configuration Options

  • max_cached_workflows - Maximum number of Workflow instances to keep in cache. Larger values mean faster replay but use more memory.
  • max_concurrent_workflow_task_polls - Number of Workflow tasks to process concurrently.
  • max_concurrent_activity_task_polls - Number of Activity tasks to process concurrently.
  • detect_nondeterministic_futures - Enable runtime detection of nondeterministic code patterns (default: true).
  • graceful_shutdown_period - How long to wait for in-flight tasks to complete before force-shutting down.

Multiple Task Queues

You can create multiple Workers listening to different Task Queues in the same process:

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let runtime = CoreRuntime::new_assume_tokio(RuntimeOptions::builder().build()?)?;

let (conn_options, client_options) = ClientOptions::load_from_config(
LoadClientConfigProfileOptions::default()
)?;

let connection = Connection::connect(conn_options).await?;
let client = Client::new(connection, client_options);

// Fast task queue
let fast_worker_options = WorkerOptions::new("fast-tasks")
.register_workflow::<QuickTaskWorkflow>()
.register_activities(QuickTaskActivities)
.max_concurrent_activity_task_polls(20)
.build();

let fast_worker = Worker::new(&runtime, client.clone(), fast_worker_options)?;

// Slow task queue
let slow_worker_options = WorkerOptions::new("slow-tasks")
.register_workflow::<LongTaskWorkflow>()
.register_activities(LongTaskActivities)
.max_concurrent_activity_task_polls(5)
.build();

let slow_worker = Worker::new(&runtime, client, slow_worker_options)?;

// Run both workers concurrently
tokio::select! {
_ = fast_worker.run() => println!("Fast worker stopped"),
_ = slow_worker.run() => println!("Slow worker stopped"),
}

Ok(())
}

Scale Workers Horizontally

You can run multiple Worker processes on different machines, all listening to the same Task Queue:

# Machine 1
cargo run --bin my-worker

# Machine 2
cargo run --bin my-worker

# Machine 3
cargo run --bin my-worker

All three Workers process tasks from the same Task Queue, automatically load-balancing Workflow and Activity Executions across the processes.

Graceful Shutdown

Workers support graceful shutdown, waiting for in-flight tasks to complete:

use tokio::signal;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let runtime = CoreRuntime::new_assume_tokio(RuntimeOptions::builder().build()?)?;

let (conn_options, client_options) = ClientOptions::load_from_config(
LoadClientConfigProfileOptions::default()
)?;

let connection = Connection::connect(conn_options).await?;
let client = Client::new(connection, client_options);

let worker_options = WorkerOptions::new("my-task-queue")
.register_workflow::<MyWorkflow>()
.register_activities(MyActivities)
.graceful_shutdown_period(Duration::from_secs(30))
.build();

let worker = Worker::new(&runtime, client, worker_options)?;

// Run worker and wait for shutdown signal
tokio::select! {
_ = worker.run() => println!("Worker stopped"),
_ = signal::ctrl_c() => {
println!("Received shutdown signal, gracefully shutting down...");
// Worker will complete in-flight tasks
}
}

Ok(())
}

Connection Configuration

Workers connect to the Temporal Service using environment variables or a configuration file. You can customize the connection:

use temporalio_client::{ClientOptions, WorkflowServiceStubsOptions};
use std::time::Duration;

let conn_options = WorkflowServiceStubsOptions::new()
.target_url("localhost:7233".parse()?) // Custom server address
.client_cert_path("path/to/cert.pem".into()) // mTLS certificate
.client_key_path("path/to/key.pem".into()) // mTLS key
.build();

let client_options = ClientOptions::new()
.namespace("production");

let connection = Connection::connect(conn_options).await?;
let client = Client::new(connection, client_options);

For more configuration options, see Environment Configuration.

Complete Worker Example

Here's a complete example of a Worker with multiple Workflows and Activities:

use temporalio_client::{Client, ClientOptions, Connection, envconfig::LoadClientConfigProfileOptions};
use temporalio_sdk::{Worker, WorkerOptions};
use temporalio_sdk_core::{CoreRuntime, RuntimeOptions};
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Starting Temporal Worker...");

// Create core runtime
let runtime = CoreRuntime::new_assume_tokio(RuntimeOptions::builder().build()?)?;

// Load configuration from environment
let (conn_options, client_options) = ClientOptions::load_from_config(
LoadClientConfigProfileOptions::default()
)?;

// Connect to Temporal Service
let connection = Connection::connect(conn_options).await?;
let client = Client::new(connection, client_options);

// Configure worker
let worker_options = WorkerOptions::new("my-task-queue")
// Register workflows
.register_workflow::<GreetingWorkflow>()
.register_workflow::<ProcessingWorkflow>()
// Register activities
.register_activities(GreetingActivities)
.register_activities(ProcessingActivities::new())
// Performance tuning
.max_cached_workflows(1000)
.max_concurrent_workflow_task_polls(15)
.max_concurrent_activity_task_polls(15)
// Graceful shutdown
.graceful_shutdown_period(Duration::from_secs(30))
.build();

// Create and run worker
println!("Registering workflows and activities...");
let worker = Worker::new(&runtime, client, worker_options)?;

println!("Worker listening on task queue: my-task-queue");
println!("Press Ctrl+C to shutdown gracefully...\n");

worker.run().await?;

Ok(())
}

Best Practices

  1. Use Task Queues for scaling: Different Task Queues for different types of work
  2. Monitor Worker health: Check logs and metrics to ensure Workers are healthy
  3. Configure appropriate concurrency: Match concurrency settings to your machine capabilities
  4. Use graceful shutdown: Allow in-flight tasks to complete before stopping
  5. Keep Workflow/Activity code stateless: Avoid storing state in Worker process memory
  6. Run Workers in containers: For easy deployment and scaling

For more information, see Worker Tuning Guide.