Skip to main content

Data Processing

Excalibur's data processing module provides a producer-consumer pipeline for batch processing database records. It handles orchestration (task tracking, progress, retries), while you supply the data fetching and record handling logic.

Before You Start

  • .NET 10.0
  • Install the required package:
    dotnet add package Excalibur.Data.DataProcessing
  • A SQL database for orchestration tables (task requests, progress tracking)
  • Familiarity with dependency injection and IOptions pattern

Architecture

flowchart LR
subgraph Orchestration
DM[DataOrchestrationManager]
DB[(Orchestration DB)]
DM --> DB
end

subgraph Pipeline
DP["DataProcessor‹TRecord›"]
Q["Channel‹TRecord›"]
RH["IRecordHandler‹TRecord›"]
DP -- produces --> Q
Q -- consumes --> RH
end

DM --> DP

The DataOrchestrationManager tracks data task requests in a SQL table. Each DataProcessor<TRecord> runs a producer-consumer loop: the producer fetches batches from the source database and writes to an in-memory Channel<TRecord>, while the consumer reads from the channel and delegates to IRecordHandler<TRecord> implementations.

Quick Start

1. Define a record type

public record CustomerRecord(int Id, string Name, string Email);

2. Implement a data processor

public class CustomerProcessor : DataProcessor<CustomerRecord>
{
private readonly Func<IDbConnection> _connectionFactory;

public CustomerProcessor(
[FromKeyedServices("customers")] Func<IDbConnection> connectionFactory,
IHostApplicationLifetime appLifetime,
IOptions<DataProcessingOptions> configuration,
IServiceProvider serviceProvider,
ILogger<CustomerProcessor> logger)
: base(appLifetime, configuration, serviceProvider, logger)
{
_connectionFactory = connectionFactory;
}

public override async Task<IEnumerable<CustomerRecord>> FetchBatchAsync(
long skip, int batchSize, CancellationToken cancellationToken)
{
using var connection = _connectionFactory();
return await connection.Ready().ResolveAsync(
new SelectCustomerBatch(skip, batchSize, cancellationToken));
}
}

3. Implement a record handler

public class CustomerMigrationHandler : IRecordHandler<CustomerRecord>
{
private readonly ILogger<CustomerMigrationHandler> _logger;

public CustomerMigrationHandler(ILogger<CustomerMigrationHandler> logger)
{
_logger = logger;
}

public async Task ProcessAsync(CustomerRecord record, CancellationToken cancellationToken)
{
_logger.LogInformation("Processing customer {Id}: {Name}", record.Id, record.Name);
// Transform, validate, write to target database, etc.
await Task.CompletedTask;
}
}

4. Register services

// AOT-safe explicit registration (recommended)
builder.Services.AddDataProcessor<CustomerProcessor>(
builder.Configuration, "DataProcessing");
builder.Services.AddRecordHandler<CustomerMigrationHandler, CustomerRecord>();

// Register the source database connection factory
builder.Services.AddKeyedSingleton<Func<IDbConnection>>(
"customers",
(_, _) => () => new SqlConnection(customersConnectionString));

DI Registration

Assembly Scanning (Reflection-Based)

Discovers all IDataProcessor and IRecordHandler<T> implementations via assembly scanning. Registers the orchestration connection factory as a keyed singleton.

builder.Services.AddDataProcessing(
() => new SqlConnection(orchestrationConnectionString),
builder.Configuration,
"DataProcessing",
typeof(Program).Assembly);
AOT Compatibility

AddDataProcessing uses reflection-based assembly scanning and is annotated with [RequiresUnreferencedCode] and [RequiresDynamicCode]. For AOT-safe deployments, use the explicit generic overloads below.

AOT-Safe Explicit Registration

Register individual processors and handlers without assembly scanning:

// Bare registration (no configuration)
builder.Services.AddDataProcessor<CustomerProcessor>();
builder.Services.AddRecordHandler<CustomerMigrationHandler, CustomerRecord>();

// With inline configuration object
builder.Services.AddDataProcessor<CustomerProcessor>(new DataProcessingOptions
{
QueueSize = 128,
ProducerBatchSize = 50,
ConsumerBatchSize = 20
});

// With IConfiguration binding (recommended for production)
builder.Services.AddDataProcessor<CustomerProcessor>(
builder.Configuration, "DataProcessing");
builder.Services.AddRecordHandler<CustomerMigrationHandler, CustomerRecord>(
builder.Configuration, "DataProcessing");

Registration API Reference

MethodConfigurationValidation
AddDataProcessor<T>()NoneNone
AddDataProcessor<T>(DataProcessingOptions)Inline objectValidateDataAnnotations + ValidateOnStart + cross-property
AddDataProcessor<T>(IConfiguration, string)Bind from sectionValidateDataAnnotations + ValidateOnStart
AddRecordHandler<T,R>()NoneNone
AddRecordHandler<T,R>(DataProcessingOptions)Inline objectValidateDataAnnotations + ValidateOnStart + cross-property
AddRecordHandler<T,R>(IConfiguration, string)Bind from sectionValidateDataAnnotations + ValidateOnStart
AddDataProcessing(Func, IConfig, string, Assembly[])Assembly scanning + bindValidateDataAnnotations + ValidateOnStart
EnableDataProcessingBackgroundService(Action?)Optional configure actionValidateDataAnnotations + ValidateOnStart + cross-property
EnableDataProcessingBackgroundService(IConfiguration, string)Bind from sectionValidateDataAnnotations + ValidateOnStart + cross-property

Configuration

DataProcessingOptions

PropertyTypeDefaultDescription
SchemaNamestring"DataProcessor"SQL schema for the orchestration table
TableNamestring"DataTaskRequests"SQL table name for orchestration task records
QualifiedTableNamestring(computed)Read-only [SchemaName].[TableName] with bracket-escaping
QueueSizeint5000In-memory channel capacity between producer and consumer
ProducerBatchSizeint100Records fetched per producer iteration
ConsumerBatchSizeint10Records dequeued per consumer iteration
MaxAttemptsint3Maximum retry attempts per data task
DispatcherTimeoutMillisecondsint60000Timeout for a dispatcher to process tasks (ms)

All numeric properties require values > 0 (enforced by [Range(1, int.MaxValue)]).

appsettings.json

{
"DataProcessing": {
"SchemaName": "DataProcessor",
"TableName": "DataTaskRequests",
"QueueSize": 128,
"ProducerBatchSize": 50,
"ConsumerBatchSize": 20,
"MaxAttempts": 3,
"DispatcherTimeoutMilliseconds": 60000
}
}

Cross-Property Validation

An IValidateOptions<DataProcessingOptions> validator enforces inter-property constraints at startup:

RuleConstraint
ProducerBatchSize must not exceed QueueSizePrevents the producer from overwhelming the channel
ConsumerBatchSize must not exceed QueueSizePrevents impossible dequeue sizes
DispatcherTimeoutMilliseconds must be 1,000–3,600,000Enforces 1 second to 1 hour range

If any constraint fails, the application throws OptionsValidationException at startup (fail-fast).

Database Setup

The data processing system requires one table in your SQL Server database. Create the schema and table before starting the application:

-- Create the schema (if it doesn't exist)
IF NOT EXISTS (SELECT 1 FROM sys.schemas WHERE name = 'DataProcessor')
BEGIN
EXEC('CREATE SCHEMA [DataProcessor]');
END
GO

-- Create the data task requests table
IF NOT EXISTS (SELECT 1 FROM sys.objects
WHERE object_id = OBJECT_ID(N'[DataProcessor].[DataTaskRequests]') AND type = N'U')
BEGIN
CREATE TABLE [DataProcessor].[DataTaskRequests]
(
[DataTaskId] UNIQUEIDENTIFIER NOT NULL,
[CreatedAt] DATETIMEOFFSET NOT NULL,
[RecordType] NVARCHAR(256) NOT NULL,
[Attempts] INT NOT NULL DEFAULT 0,
[MaxAttempts] INT NOT NULL DEFAULT 3,
[CompletedCount] INT NOT NULL DEFAULT 0,

CONSTRAINT [PK_DataTaskRequests] PRIMARY KEY CLUSTERED ([DataTaskId])
);

-- Index for the polling query (WHERE Attempts < MaxAttempts ORDER BY CreatedAt)
-- Note: filtered indexes cannot reference other columns, so we use a
-- covering index with CreatedAt for ORDER BY efficiency instead.
CREATE NONCLUSTERED INDEX [IX_DataTaskRequests_Pending]
ON [DataProcessor].[DataTaskRequests] ([CreatedAt])
INCLUDE ([DataTaskId], [RecordType], [Attempts], [MaxAttempts], [CompletedCount]);
END
GO

If you customize SchemaName or TableName in DataProcessingOptions, update the script accordingly. A complete setup script is included in the DataProcessingBackgroundService sample.

Orchestration Connection

The AddDataProcessing method registers the orchestration database connection factory as a keyed singleton under DataProcessingKeys.OrchestrationConnection. This connection is used by DataOrchestrationManager to manage data task records.

// Resolve explicitly in your own services
public class MyService(
[FromKeyedServices(DataProcessingKeys.OrchestrationConnection)]
Func<IDbConnection> orchestrationFactory)
{
// orchestrationFactory creates connections to the orchestration database
}

The key value is "Excalibur.DataProcessing.Orchestration".

Multi-Database

When processors need different source databases, use keyed services (introduced in .NET 8, stable in .NET 10):

var orchestrationDb = builder.Configuration.GetConnectionString("Orchestration");
var customersDb = builder.Configuration.GetConnectionString("CustomersDb");
var inventoryDb = builder.Configuration.GetConnectionString("InventoryDb");

// Orchestration database (registered as keyed singleton automatically)
builder.Services.AddDataProcessing(
() => new SqlConnection(orchestrationDb),
builder.Configuration,
"DataProcessing",
typeof(Program).Assembly);

// Source database factories for individual processors
builder.Services.AddKeyedSingleton<Func<IDbConnection>>(
"customers",
(_, _) => () => new SqlConnection(customersDb));

builder.Services.AddKeyedSingleton<Func<IDbConnection>>(
"inventory",
(_, _) => () => new SqlConnection(inventoryDb));

Each processor injects its keyed factory:

public class CustomerProcessor : DataProcessor<CustomerRecord>
{
private readonly Func<IDbConnection> _connectionFactory;

public CustomerProcessor(
[FromKeyedServices("customers")] Func<IDbConnection> connectionFactory,
IHostApplicationLifetime appLifetime,
IOptions<DataProcessingOptions> configuration,
IServiceProvider serviceProvider,
ILogger<CustomerProcessor> logger)
: base(appLifetime, configuration, serviceProvider, logger)
{
_connectionFactory = connectionFactory;
}

public override async Task<IEnumerable<CustomerRecord>> FetchBatchAsync(
long skip, int batchSize, CancellationToken cancellationToken)
{
using var connection = _connectionFactory();
return await connection.Ready().ResolveAsync(
new SelectCustomerBatch(skip, batchSize, cancellationToken));
}
}

See Multi-Database Support for full details including configuration examples.

Key Abstractions

IDataProcessor

The core processing interface. Implementations run the producer-consumer pipeline.

public interface IDataProcessor : IAsyncDisposable, IDisposable
{
Task<long> RunAsync(
long completedCount,
UpdateCompletedCount updateCompletedCount,
CancellationToken cancellationToken);
}

DataProcessor<TRecord>

Abstract base class providing the producer-consumer pipeline. You implement FetchBatchAsync to supply records:

public abstract class DataProcessor<TRecord> : IDataProcessor, IRecordFetcher<TRecord>
{
// You implement this:
public abstract Task<IEnumerable<TRecord>> FetchBatchAsync(
long skip, int batchSize, CancellationToken cancellationToken);
}

The base class handles:

  • Channel-based producer-consumer coordination
  • Batch sizing (configurable via DataProcessingOptions)
  • Progress tracking (via UpdateCompletedCount delegate)
  • Graceful shutdown on application stop
  • Logging via [LoggerMessage] source generation

IRecordHandler<TRecord>

Processes individual records from the consumer side of the channel:

public interface IRecordHandler<in TRecord>
{
Task ProcessAsync(TRecord record, CancellationToken cancellationToken);
}

Best Practices

PracticeRecommendation
RegistrationUse AOT-safe AddDataProcessor<T> for production; assembly scanning for prototyping
ConfigurationUse IConfiguration binding with ValidateOnStart for fail-fast startup
Connection managementUse Func<IDbConnection> factories with keyed services for multi-database
Batch sizesSet ProducerBatchSize and ConsumerBatchSize at or below QueueSize (validated at startup)
TimeoutsKeep DispatcherTimeoutMilliseconds at 1000ms or above; default 60s is suitable for most cases
Error handlingOrchestration DB calls are automatically retried when Excalibur.Data.SqlServer is registered; implement domain-level retry in IRecordHandler<T>.ProcessAsync; MaxAttempts controls task-level retries

Background Processing

Instead of scheduling data processing via Quartz jobs, you can use the built-in BackgroundService that polls for pending data tasks on a configurable interval.

Enable Background Service

// Enable with defaults (5s polling interval)
builder.Services.EnableDataProcessingBackgroundService();

// Enable with custom options via delegate
builder.Services.EnableDataProcessingBackgroundService(options =>
{
options.PollingInterval = TimeSpan.FromSeconds(10);
options.DrainTimeoutSeconds = 60;
options.UnhealthyThreshold = 5;
});

// Or bind from appsettings.json section (AOT-safe)
builder.Services.EnableDataProcessingBackgroundService(
builder.Configuration, "DataProcessingService");

The hosted service calls IDataOrchestrationManager.ProcessDataTasksAsync() on each polling cycle. It works with both the assembly-scanning registration path (AddDataProcessing) and the AOT-safe explicit registration path (AddDataProcessor<T>).

Two Separate Options Classes

Configuration is split into two concerns:

  • DataProcessingOptions -- pipeline tuning: SchemaName, TableName, QueueSize, ProducerBatchSize, ConsumerBatchSize, MaxAttempts
  • DataProcessingHostedServiceOptions -- polling lifecycle: PollingInterval, Enabled, DrainTimeoutSeconds, UnhealthyThreshold
// Pipeline tuning (queue sizes, batching, table config)
builder.Services.AddDataProcessor<OrderDataProcessor>(new DataProcessingOptions
{
SchemaName = "DataProcessor",
TableName = "DataTaskRequests",
QueueSize = 500,
ProducerBatchSize = 50,
ConsumerBatchSize = 10,
MaxAttempts = 3,
});
builder.Services.AddRecordHandler<OrderRecordHandler, OrderRecord>();

// Or bind pipeline options from appsettings.json:
// builder.Services.AddDataProcessor<OrderDataProcessor>(
// builder.Configuration, "DataProcessing");

// Polling/lifecycle tuning (separate concern) -- delegate
builder.Services.EnableDataProcessingBackgroundService(options =>
{
options.PollingInterval = TimeSpan.FromSeconds(10);
options.DrainTimeoutSeconds = 60;
});

// Or bind polling/lifecycle from appsettings.json:
// builder.Services.EnableDataProcessingBackgroundService(
// builder.Configuration, "DataProcessingService");

Both can also be configured entirely via appsettings.json:

{
"DataProcessing": {
"SchemaName": "DataProcessor",
"TableName": "DataTaskRequests",
"QueueSize": 500,
"ProducerBatchSize": 50,
"ConsumerBatchSize": 10,
"MaxAttempts": 3
},
"DataProcessingService": {
"PollingInterval": "00:00:10",
"Enabled": true,
"DrainTimeoutSeconds": 60,
"UnhealthyThreshold": 5
}
}

DataProcessingHostedServiceOptions

PropertyTypeDefaultDescription
PollingIntervalTimeSpan5 secondsInterval between polling cycles
EnabledbooltrueWhether the background processor is active
DrainTimeoutSecondsint30Seconds to wait for in-flight processing during shutdown
UnhealthyThresholdint3Consecutive errors before the service is considered unhealthy
Computed Property

DrainTimeout is a read-only TimeSpan computed from DrainTimeoutSeconds. Use DrainTimeoutSeconds in configuration.

Cross-Property Validation

An IValidateOptions<DataProcessingHostedServiceOptions> validator enforces at startup:

RuleConstraint
PollingInterval must be positivePrevents zero or negative polling
DrainTimeout must exceed PollingIntervalEnsures the drain window covers at least one full cycle
UnhealthyThreshold must be >= 1At least one error before marking unhealthy

Health Checks

The data processing subsystem integrates with ASP.NET Core health checks via a 3-tier model:

StatusCondition
HealthyService is running and processing within normal intervals
DegradedNo activity for longer than DegradedInactivityTimeout (default 5 minutes)
UnhealthyService stopped, or no activity for longer than UnhealthyInactivityTimeout (default 10 minutes)

Register the health check:

builder.Services.AddHealthChecks()
.AddDataProcessingHealthCheck();

// Or with custom options:
builder.Services.AddHealthChecks()
.AddDataProcessingHealthCheck(
name: "data_processing",
failureStatus: HealthStatus.Degraded,
tags: new[] { "ready" });

// Configure inactivity thresholds:
builder.Services.Configure<DataProcessingHealthCheckOptions>(options =>
{
options.DegradedInactivityTimeout = TimeSpan.FromMinutes(3);
options.UnhealthyInactivityTimeout = TimeSpan.FromMinutes(8);
});

The health check reports diagnostic data including IsRunning, TotalProcessed, TotalFailed, TotalCycles, LastActivityTime, and InactivitySeconds.

Internal Tracking

The hosted service also tracks basic health state internally (IsHealthy, ConsecutiveErrors, LastSuccessfulProcessing) and marks itself unhealthy after UnhealthyThreshold consecutive errors. The IHealthCheck integration is the recommended approach for production monitoring.

Resilience

When Excalibur.Data.SqlServer (or another provider that registers IDataAccessPolicyFactory) is present, all orchestration database operations are automatically wrapped in a comprehensive resilience policy:

  • Retry — 3 attempts with exponential backoff and jitter for transient SQL failures (timeouts, deadlocks, connection resets, Azure SQL throttling)
  • Circuit breaker — Opens after sustained failure (50% failure rate over 60 seconds, minimum 5 requests) and stays open for 30 seconds before allowing probe requests

This covers all five orchestration DB operations: task insertion, pending task selection, attempt count updates, completed count checkpoints, and task deletion.

When no IDataAccessPolicyFactory is registered (e.g., when using a non-SQL Server backend without a Polly provider), operations execute directly without retry or circuit breaker wrapping.

// Resilience is automatic when SqlServer is registered:
builder.Services.AddExcaliburDataSqlServer(options =>
{
options.ConnectionString = connectionString;
});

// IDataAccessPolicyFactory is now available — DataProcessing
// automatically discovers and uses it for all DB operations.
builder.Services.AddDataProcessing(dp =>
{
dp.ConnectionFactory(() => new SqlConnection(connectionString))
.AddProcessor<OrderProcessor>()
.EnableBackgroundProcessing();
});

Database Restore Survivability

The data processing pipeline is designed to survive database unavailability and data replacement (e.g., restoring from a production backup):

During database unavailability (restore in progress):

  • All DB operations are guarded with try-catch — the processing loop continues without crashing
  • Attempt count and delete operations log failures but don't propagate exceptions
  • The resilience policy (when registered) retries transient failures before surfacing errors
  • The health check transitions to Degraded, then Unhealthy based on inactivity duration

After data replacement (restored from backup):

  • Stale task detection — Each completed-count checkpoint verifies the task row still exists (checks rows affected). If the row was replaced or deleted during a restore, the checkpoint returns 0 rows and the processor aborts that task cleanly
  • Consecutive failure threshold — If a processor encounters 5 consecutive record-level failures (e.g., foreign key violations from mismatched data), it breaks out of the current batch instead of burning through the entire queue
  • Idempotent re-processing — Tasks that fail to be deleted after completion are re-selected on the next poll; processors should handle re-processing idempotently via the completedCount resume parameter

Configuration via appsettings.json

{
"DataProcessingService": {
"PollingInterval": "00:00:10",
"Enabled": true,
"DrainTimeoutSeconds": 60,
"UnhealthyThreshold": 5
}
}
builder.Services.EnableDataProcessingBackgroundService(
builder.Configuration, "DataProcessingService");

Complete Example

var builder = WebApplication.CreateBuilder(args);
var connectionString = builder.Configuration.GetConnectionString("DefaultConnection")!;

// 1. Register orchestration connection
builder.Services.AddKeyedSingleton(
DataProcessingKeys.OrchestrationConnection,
(_, _) => (Func<IDbConnection>)(() => new SqlConnection(connectionString)));

// 2. Register processor and handler (AOT-safe)
builder.Services.AddDataProcessor<OrderDataProcessor>(
builder.Configuration, "DataProcessing");
builder.Services.AddRecordHandler<OrderRecordHandler, OrderRecord>();

// 3. Enable background service (bind from appsettings.json)
builder.Services.EnableDataProcessingBackgroundService(
builder.Configuration, "DataProcessingService");

// 4. Register health check (optional but recommended)
builder.Services.AddHealthChecks()
.AddDataProcessingHealthCheck();

var app = builder.Build();
app.MapHealthChecks("/health");
app.Run();

Quartz vs Background Service

AspectQuartz JobBackground Service
SchedulingCron expressions, complex schedulesFixed polling interval
DependenciesRequires Excalibur.Jobs + Quartz.NETBuilt-in, no extra packages
Concurrency controlJob disallow concurrent executionSingle hosted service instance
Health monitoringVia Quartz job listenersBuilt-in IHealthCheck with Healthy/Degraded/Unhealthy tiers
Graceful shutdownQuartz scheduler shutdownDrainTimeout with linked cancellation
Best forComplex schedules, multi-job orchestrationSimple polling, minimal dependencies

See Also