Skip to main content

Data Processing

Excalibur's data processing module provides a producer-consumer pipeline for batch processing database records. It handles orchestration (task tracking, progress, retries), while you supply the data fetching and record handling logic.

Before You Start

  • .NET 8.0+ (or .NET 9/10 for latest features)
  • Install the required package:
    dotnet add package Excalibur.Data.DataProcessing
  • A SQL database for orchestration tables (task requests, progress tracking)
  • Familiarity with dependency injection and IOptions pattern

Architecture

flowchart LR
subgraph Orchestration
DM[DataOrchestrationManager]
DB[(Orchestration DB)]
DM --> DB
end

subgraph Pipeline
DP["DataProcessor‹TRecord›"]
Q["Channel‹TRecord›"]
RH["IRecordHandler‹TRecord›"]
DP -- produces --> Q
Q -- consumes --> RH
end

DM --> DP

The DataOrchestrationManager tracks data task requests in a SQL table. Each DataProcessor<TRecord> runs a producer-consumer loop: the producer fetches batches from the source database and writes to an in-memory Channel<TRecord>, while the consumer reads from the channel and delegates to IRecordHandler<TRecord> implementations.

Quick Start

1. Define a record type

public record CustomerRecord(int Id, string Name, string Email);

2. Implement a data processor

public class CustomerProcessor : DataProcessor<CustomerRecord>
{
private readonly Func<IDbConnection> _connectionFactory;

public CustomerProcessor(
[FromKeyedServices("customers")] Func<IDbConnection> connectionFactory,
IHostApplicationLifetime appLifetime,
IOptions<DataProcessingOptions> configuration,
IServiceProvider serviceProvider,
ILogger<CustomerProcessor> logger)
: base(appLifetime, configuration, serviceProvider, logger)
{
_connectionFactory = connectionFactory;
}

public override async Task<IEnumerable<CustomerRecord>> FetchBatchAsync(
long skip, int batchSize, CancellationToken cancellationToken)
{
using var connection = _connectionFactory();
return await connection.Ready().ResolveAsync(
new SelectCustomerBatch(skip, batchSize, cancellationToken));
}
}

3. Implement a record handler

public class CustomerMigrationHandler : IRecordHandler<CustomerRecord>
{
private readonly ILogger<CustomerMigrationHandler> _logger;

public CustomerMigrationHandler(ILogger<CustomerMigrationHandler> logger)
{
_logger = logger;
}

public async Task ProcessAsync(CustomerRecord record, CancellationToken cancellationToken)
{
_logger.LogInformation("Processing customer {Id}: {Name}", record.Id, record.Name);
// Transform, validate, write to target database, etc.
await Task.CompletedTask;
}
}

4. Register services

// AOT-safe explicit registration (recommended)
builder.Services.AddDataProcessor<CustomerProcessor>(
builder.Configuration, "DataProcessing");
builder.Services.AddRecordHandler<CustomerMigrationHandler, CustomerRecord>();

// Register the source database connection factory
builder.Services.AddKeyedSingleton<Func<IDbConnection>>(
"customers",
(_, _) => () => new SqlConnection(customersConnectionString));

DI Registration

Assembly Scanning (Reflection-Based)

Discovers all IDataProcessor and IRecordHandler<T> implementations via assembly scanning. Registers the orchestration connection factory as a keyed singleton.

builder.Services.AddDataProcessing(
() => new SqlConnection(orchestrationConnectionString),
builder.Configuration,
"DataProcessing",
typeof(Program).Assembly);
AOT Compatibility

AddDataProcessing uses reflection-based assembly scanning and is annotated with [RequiresUnreferencedCode] and [RequiresDynamicCode]. For AOT-safe deployments, use the explicit generic overloads below.

AOT-Safe Explicit Registration

Register individual processors and handlers without assembly scanning:

// Bare registration (no configuration)
builder.Services.AddDataProcessor<CustomerProcessor>();
builder.Services.AddRecordHandler<CustomerMigrationHandler, CustomerRecord>();

// With inline configuration object
builder.Services.AddDataProcessor<CustomerProcessor>(new DataProcessingOptions
{
QueueSize = 128,
ProducerBatchSize = 50,
ConsumerBatchSize = 20
});

// With IConfiguration binding (recommended for production)
builder.Services.AddDataProcessor<CustomerProcessor>(
builder.Configuration, "DataProcessing");
builder.Services.AddRecordHandler<CustomerMigrationHandler, CustomerRecord>(
builder.Configuration, "DataProcessing");

Registration API Reference

MethodConfigurationValidation
AddDataProcessor<T>()NoneNone
AddDataProcessor<T>(DataProcessingOptions)Inline objectValidateDataAnnotations + ValidateOnStart + cross-property
AddDataProcessor<T>(IConfiguration, string)Bind from sectionValidateDataAnnotations + ValidateOnStart
AddRecordHandler<T,R>()NoneNone
AddRecordHandler<T,R>(DataProcessingOptions)Inline objectValidateDataAnnotations + ValidateOnStart + cross-property
AddRecordHandler<T,R>(IConfiguration, string)Bind from sectionValidateDataAnnotations + ValidateOnStart
AddDataProcessing(Func, IConfig, string, Assembly[])Assembly scanning + bindValidateDataAnnotations + ValidateOnStart

Configuration

DataProcessingOptions

PropertyTypeDefaultDescription
TableNamestring"DataProcessor.DataTaskRequests"SQL table for orchestration task records
QueueSizeint5000In-memory channel capacity between producer and consumer
ProducerBatchSizeint100Records fetched per producer iteration
ConsumerBatchSizeint10Records dequeued per consumer iteration
MaxAttemptsint3Maximum retry attempts per data task
DispatcherTimeoutMillisecondsint60000Timeout for a dispatcher to process tasks (ms)

All numeric properties require values > 0 (enforced by [Range(1, int.MaxValue)]).

appsettings.json

{
"DataProcessing": {
"TableName": "DataProcessor.DataTaskRequests",
"QueueSize": 128,
"ProducerBatchSize": 50,
"ConsumerBatchSize": 20,
"MaxAttempts": 3,
"DispatcherTimeoutMilliseconds": 60000
}
}

Cross-Property Validation

An IValidateOptions<DataProcessingOptions> validator enforces inter-property constraints at startup:

RuleConstraint
ProducerBatchSize must not exceed QueueSizePrevents the producer from overwhelming the channel
ConsumerBatchSize must not exceed QueueSizePrevents impossible dequeue sizes
DispatcherTimeoutMilliseconds must be 1,000–3,600,000Enforces 1 second to 1 hour range

If any constraint fails, the application throws OptionsValidationException at startup (fail-fast).

Orchestration Connection

The AddDataProcessing method registers the orchestration database connection factory as a keyed singleton under DataProcessingKeys.OrchestrationConnection. This connection is used by DataOrchestrationManager to manage data task records.

// Resolve explicitly in your own services
public class MyService(
[FromKeyedServices(DataProcessingKeys.OrchestrationConnection)]
Func<IDbConnection> orchestrationFactory)
{
// orchestrationFactory creates connections to the orchestration database
}

The key value is "Excalibur.DataProcessing.Orchestration".

Multi-Database

When processors need different source databases, use .NET 8 keyed services:

var orchestrationDb = builder.Configuration.GetConnectionString("Orchestration");
var customersDb = builder.Configuration.GetConnectionString("CustomersDb");
var inventoryDb = builder.Configuration.GetConnectionString("InventoryDb");

// Orchestration database (registered as keyed singleton automatically)
builder.Services.AddDataProcessing(
() => new SqlConnection(orchestrationDb),
builder.Configuration,
"DataProcessing",
typeof(Program).Assembly);

// Source database factories for individual processors
builder.Services.AddKeyedSingleton<Func<IDbConnection>>(
"customers",
(_, _) => () => new SqlConnection(customersDb));

builder.Services.AddKeyedSingleton<Func<IDbConnection>>(
"inventory",
(_, _) => () => new SqlConnection(inventoryDb));

Each processor injects its keyed factory:

public class CustomerProcessor : DataProcessor<CustomerRecord>
{
private readonly Func<IDbConnection> _connectionFactory;

public CustomerProcessor(
[FromKeyedServices("customers")] Func<IDbConnection> connectionFactory,
IHostApplicationLifetime appLifetime,
IOptions<DataProcessingOptions> configuration,
IServiceProvider serviceProvider,
ILogger<CustomerProcessor> logger)
: base(appLifetime, configuration, serviceProvider, logger)
{
_connectionFactory = connectionFactory;
}

public override async Task<IEnumerable<CustomerRecord>> FetchBatchAsync(
long skip, int batchSize, CancellationToken cancellationToken)
{
using var connection = _connectionFactory();
return await connection.Ready().ResolveAsync(
new SelectCustomerBatch(skip, batchSize, cancellationToken));
}
}

See Multi-Database Support for full details including configuration examples.

Key Abstractions

IDataProcessor

The core processing interface. Implementations run the producer-consumer pipeline.

public interface IDataProcessor : IAsyncDisposable, IDisposable
{
Task<long> RunAsync(
long completedCount,
UpdateCompletedCount updateCompletedCount,
CancellationToken cancellationToken);
}

DataProcessor<TRecord>

Abstract base class providing the producer-consumer pipeline. You implement FetchBatchAsync to supply records:

public abstract class DataProcessor<TRecord> : IDataProcessor, IRecordFetcher<TRecord>
{
// You implement this:
public abstract Task<IEnumerable<TRecord>> FetchBatchAsync(
long skip, int batchSize, CancellationToken cancellationToken);
}

The base class handles:

  • Channel-based producer-consumer coordination
  • Batch sizing (configurable via DataProcessingOptions)
  • Progress tracking (via UpdateCompletedCount delegate)
  • Graceful shutdown on application stop
  • Logging via [LoggerMessage] source generation

IRecordHandler<TRecord>

Processes individual records from the consumer side of the channel:

public interface IRecordHandler<in TRecord>
{
Task ProcessAsync(TRecord record, CancellationToken cancellationToken);
}

Best Practices

PracticeRecommendation
RegistrationUse AOT-safe AddDataProcessor<T> for production; assembly scanning for prototyping
ConfigurationUse IConfiguration binding with ValidateOnStart for fail-fast startup
Connection managementUse Func<IDbConnection> factories with keyed services for multi-database
Batch sizesSet ProducerBatchSize and ConsumerBatchSize at or below QueueSize (validated at startup)
TimeoutsKeep DispatcherTimeoutMilliseconds at 1000ms or above; default 60s is suitable for most cases
Error handlingImplement retry logic in IRecordHandler<T>.ProcessAsync; MaxAttempts controls task-level retries

See Also