Skip to main content

Worker Services

Worker Services are ideal for dedicated background processing tasks like outbox processing, projections, and CDC handlers. They run continuously without HTTP overhead.

Before You Start

  • .NET 8.0+ (or .NET 9/10 for latest features)
  • Install the required packages:
    dotnet add package Excalibur.Dispatch
    dotnet add package Microsoft.Extensions.Hosting
  • Familiarity with getting started and .NET Generic Host

When to Use Worker Services

ScenarioWorker ServiceASP.NET Core
Outbox processing✅ Dedicated✅ Integrated
Projections✅ Best choice⚠️ Possible
CDC handlers✅ Best choice⚠️ Possible
Data processing✅ Best choice✅ Integrated
Saga orchestration✅ Best choice✅ Integrated
Long-running tasks✅ Best choice❌ Avoid

Basic Setup

var builder = Host.CreateApplicationBuilder(args);

var connectionString = builder.Configuration.GetConnectionString("EventStore")!;

builder.Services.AddExcalibur(excalibur =>
{
excalibur.AddEventSourcing(es =>
{
es.UseSqlServer(opts => opts.ConnectionString = connectionString);
es.AddRepository<OrderAggregate, OrderId>();
});

excalibur.AddOutbox(outbox =>
{
outbox.UseSqlServer(opts => opts.ConnectionString = connectionString)
.EnableBackgroundProcessing()
.WithProcessing(p =>
{
p.BatchSize(500)
.PollingInterval(TimeSpan.FromSeconds(1))
.EnableParallelProcessing(4);
});
});
});

var host = builder.Build();
await host.RunAsync();

Outbox Worker

Excalibur provides OutboxBackgroundService, a built-in BackgroundService for outbox processing:

// The Basic Setup example above already enables the outbox background service
// via EnableBackgroundProcessing(). Here's the detailed configuration:

builder.Services.AddExcalibur(excalibur =>
{
excalibur.AddOutbox(outbox =>
{
outbox.UseSqlServer(opts => opts.ConnectionString = connectionString)
.EnableBackgroundProcessing(options =>
{
options.PollingInterval = TimeSpan.FromSeconds(1);
options.MaxRetries = 3;
options.ProcessScheduledMessages = true;
options.RetryFailedMessages = true;
options.DrainTimeoutSeconds = 30;
});
});
});

The built-in OutboxBackgroundService provides:

  • Polling with configurable intervals
  • Pending message processing
  • Scheduled message processing
  • Automatic retry for failed messages
  • Graceful shutdown with drain timeout
  • Health state integration
  • Metrics recording

Projection Worker

Excalibur supports two projection processing modes. Choose based on your consistency requirements:

Inline Projections (Immediate Consistency)

Inline projections run during SaveAsync() and guarantee read-after-write consistency without a background worker:

builder.Services.AddExcaliburEventSourcing(es =>
{
es.AddAggregate<OrderAggregate>(agg => agg.UseSqlServerStore(connectionString));

// Inline: updated synchronously during SaveAsync()
es.AddProjection<OrderProjection>(p => p
.Inline()
.When<OrderCreated>((proj, e) => { proj.Status = "Created"; proj.CustomerId = e.CustomerId; })
.When<OrderShipped>((proj, e) => { proj.Status = "Shipped"; }));
});

See Projections -- Failure Handling for failure handling and recovery.

Async Projections (Background Processing)

For eventually-consistent projections, use GlobalStreamProjectionHost which processes events via checkpoint-based background polling:

builder.Services.AddExcaliburEventSourcing(es =>
{
es.AddAggregate<OrderAggregate>(agg => agg.UseSqlServerStore(connectionString));

// Async: updated by background host
es.AddProjection<OrderProjection>(p => p
.Async()
.When<OrderCreated>((proj, e) => { proj.Status = "Created"; })
.When<OrderShipped>((proj, e) => { proj.Status = "Shipped"; }));
});

CDC Worker

Excalibur provides CdcProcessingHostedService, a built-in BackgroundService for CDC processing:

// Setup in your worker service
builder.Services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(sql => sql.ConnectionString(connectionString))
.TrackTable("dbo.Orders", t => t.MapAll<OrderChangedEvent>())
.EnableBackgroundProcessing(options =>
{
options.PollingInterval = TimeSpan.FromSeconds(1);
options.DrainTimeoutSeconds = 30;
});
});

CDC Change Handler

Register handlers for CDC events:

public class OrderCdcHandler : ICdcChangeHandler<OrderChangedEvent>
{
private readonly IDispatcher _dispatcher;

public OrderCdcHandler(IDispatcher dispatcher)
=> _dispatcher = dispatcher;

public async Task HandleAsync(OrderChangedEvent change, CancellationToken ct)
{
// Forward changes to integration event
await _dispatcher.DispatchAsync(new OrderDataChangedIntegration
{
OrderId = change.OrderId,
ChangeType = change.Operation,
ChangedAt = change.Timestamp
}, ct);
}
}

Data Processing Worker

For batch data processing pipelines, use the built-in background service instead of Quartz jobs:

var builder = Host.CreateApplicationBuilder(args);
var connectionString = builder.Configuration.GetConnectionString("DefaultConnection")!;

// Register orchestration connection
builder.Services.AddKeyedSingleton(
DataProcessingKeys.OrchestrationConnection,
(_, _) => (Func<IDbConnection>)(() => new SqlConnection(connectionString)));

// Register processor and handler
builder.Services.AddDataProcessor<OrderDataProcessor>(
builder.Configuration, "DataProcessing");
builder.Services.AddRecordHandler<OrderRecordHandler, OrderRecord>();

// Enable background polling service (bind from appsettings.json)
builder.Services.EnableDataProcessingBackgroundService(
builder.Configuration, "DataProcessingService");

var host = builder.Build();
await host.RunAsync();

The hosted service polls IDataOrchestrationManager.ProcessDataTasksAsync() on each interval, with built-in health tracking and graceful drain on shutdown. See Data Processing for full configuration options and Database Setup for the required SQL schema.

Leader Election

For multi-instance deployments, ensure only one worker processes:

public class LeaderElectedWorker : BackgroundService
{
private readonly ILeaderElection _leaderElection;
private readonly IOutboxProcessor _processor;
private readonly ILogger<LeaderElectedWorker> _logger;

public LeaderElectedWorker(
ILeaderElection leaderElection,
IOutboxProcessor processor,
ILogger<LeaderElectedWorker> logger)
{
_leaderElection = leaderElection;
_processor = processor;
_logger = logger;

// Subscribe to leadership events
_leaderElection.OnBecameLeader += (_, args) =>
_logger.LogInformation("Became leader: {CandidateId}", args.CandidateId);

_leaderElection.OnLostLeadership += (_, args) =>
_logger.LogInformation("Lost leadership: {CandidateId}", args.CandidateId);
}

protected override async Task ExecuteAsync(CancellationToken ct)
{
// Start participating in leader election
await _leaderElection.StartAsync(ct);

try
{
while (!ct.IsCancellationRequested)
{
if (_leaderElection.IsLeader)
{
await _processor.DispatchPendingMessagesAsync(ct);
}

await Task.Delay(TimeSpan.FromSeconds(1), ct);
}
}
finally
{
// Stop participating in election on shutdown
await _leaderElection.StopAsync(CancellationToken.None);
}
}
}

Health Checks

Liveness and Readiness

builder.Services.AddHealthChecks()
.AddSqlServer(connectionString, name: "database")
.AddCheck<OutboxHealthCheck>("outbox")
.AddCheck<WorkerHealthCheck>("worker");

// Expose health endpoint
builder.Services.Configure<HealthCheckPublisherOptions>(options =>
{
options.Period = TimeSpan.FromSeconds(30);
});

// For Kubernetes
builder.Services.AddSingleton<IHealthCheckPublisher, TcpHealthCheckPublisher>();

Worker Health Check

public class WorkerHealthCheck : IHealthCheck
{
private static DateTime _lastProcessed = DateTime.UtcNow;

public static void RecordActivity() =>
_lastProcessed = DateTime.UtcNow;

public Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken ct)
{
var elapsed = DateTime.UtcNow - _lastProcessed;

if (elapsed > TimeSpan.FromMinutes(5))
{
return Task.FromResult(
HealthCheckResult.Unhealthy($"No activity for {elapsed}"));
}

if (elapsed > TimeSpan.FromMinutes(1))
{
return Task.FromResult(
HealthCheckResult.Degraded($"No activity for {elapsed}"));
}

return Task.FromResult(HealthCheckResult.Healthy());
}
}

Configuration

appsettings.json

{
"ConnectionStrings": {
"EventStore": "Server=localhost;Database=EventStore;..."
},
"Worker": {
"ProcessorId": "worker-01",
"BatchSize": 500,
"PollingInterval": "00:00:01",
"ParallelProcessors": 4,
"LeaderElection": {
"Enabled": true,
"LeaseDuration": "00:00:30"
}
},
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.Hosting.Lifetime": "Information"
}
}
}

Graceful Shutdown

Handle shutdown signals properly:

var host = builder.Build();

host.Services.GetRequiredService<IHostApplicationLifetime>()
.ApplicationStopping.Register(() =>
{
var logger = host.Services.GetRequiredService<ILogger<Program>>();
logger.LogInformation("Shutdown requested, completing current batch...");
});

await host.RunAsync();

Docker Deployment

Dockerfile

FROM mcr.microsoft.com/dotnet/runtime:8.0 AS base
WORKDIR /app

FROM mcr.microsoft.com/dotnet/sdk:8.0 AS build
WORKDIR /src
COPY ["OutboxWorker.csproj", "./"]
RUN dotnet restore
COPY . .
RUN dotnet build -c Release -o /app/build

FROM build AS publish
RUN dotnet publish -c Release -o /app/publish

FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "OutboxWorker.dll"]

Kubernetes Deployment

apiVersion: apps/v1
kind: Deployment
metadata:
name: outbox-worker
spec:
replicas: 2 # Multiple for HA with leader election
selector:
matchLabels:
app: outbox-worker
template:
metadata:
labels:
app: outbox-worker
spec:
containers:
- name: worker
image: myregistry/outbox-worker:latest
env:
- name: ConnectionStrings__EventStore
valueFrom:
secretKeyRef:
name: db-secrets
key: connection-string
- name: Worker__ProcessorId
valueFrom:
fieldRef:
fieldPath: metadata.name
livenessProbe:
tcpSocket:
port: 8080
initialDelaySeconds: 10
periodSeconds: 30
resources:
requests:
memory: "256Mi"
cpu: "100m"
limits:
memory: "512Mi"
cpu: "500m"

Scaling Strategies

Horizontal Scaling with Leader Election

// Add SQL Server leader election
builder.Services.AddSqlServerLeaderElection(
connectionString,
"outbox-processor", // Lock resource name
options =>
{
options.LeaseDuration = TimeSpan.FromSeconds(30);
options.RenewInterval = TimeSpan.FromSeconds(10);
});

Scaled-Out Processing

For high-volume scenarios, run multiple worker instances with unique processor IDs to prevent duplicate processing:

public class OutboxWorker : BackgroundService
{
private readonly IOutboxProcessor _processor;

public OutboxWorker(IOutboxProcessor processor, IConfiguration config)
{
_processor = processor;
_processor.Init(config["Worker:ProcessorId"] ?? Environment.MachineName);
}

protected override async Task ExecuteAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
var dispatched = await _processor.DispatchPendingMessagesAsync(ct);
if (dispatched == 0)
{
await Task.Delay(TimeSpan.FromSeconds(1), ct);
}
}
}
}

Best Practices

PracticeReason
Use leader electionPrevent duplicate processing
Implement health checksKubernetes liveness/readiness
Handle graceful shutdownComplete in-flight work
Set resource limitsPrevent resource exhaustion
Use unique processor IDsDebugging and monitoring
Log processing metricsMonitor throughput

See Also