Change Data Capture (CDC)
CDC captures row-level changes from your database and publishes them as events, enabling real-time data synchronization without modifying application code.
Before You Start
- .NET 10.0
- Install the required packages:
dotnet add package Excalibur.Cdc.SqlServer # or Excalibur.Cdc.Postgres
- SQL Server CDC must be enabled on the database and target tables
- Familiarity with event sourcing concepts and outbox pattern
Overview
Two Processing Patterns
Excalibur CDC provides two ways to process database changes. Choose the one that fits your scenario:
| Auto-Mapped | Manual Handler | |
|---|---|---|
| Register tables | TrackTable() with mapper, or BindTrackedTables() + code mappings | .CaptureInstances(), BindTrackedTables(), or TrackTable() without mapper |
| Map changes | Framework creates typed events via ICdcEventMapper<T> | You write an IDataChangeHandler and process raw DataChangeEvent |
| Dispatch | Framework dispatches via IDispatcher automatically | You decide what to do (dispatch, index, cache, etc.) |
| Best for | Domain event pipelines, CQRS projections | Search indexing, cache invalidation, custom integrations |
| Config-driven | Yes — tables from appsettings.json via BindTrackedTables | Yes — tables from appsettings.json via BindTrackedTables |
Both patterns use the same CDC processor under the hood. The difference is who handles the changes: the framework (auto-mapped) or your code (manual handler).
All three table registration methods — TrackTable(), BindTrackedTables(), and .CaptureInstances() — feed into the same capture instance list that the processor polls. BindTrackedTables() works with either pattern: tables from config can be handled by auto-mapping (if event mappers are configured in code) or by your own IDataChangeHandler implementations.
You can use both patterns in the same processor. For example, auto-map dbo.Orders with TrackTable() while processing dbo.AuditLog via a manual IDataChangeHandler registered through BindTrackedTables() or .CaptureInstances().
Quick Start
Enable CDC on Database
-- Enable CDC on database
EXEC sys.sp_cdc_enable_db;
-- Enable CDC on table
EXEC sys.sp_cdc_enable_table
@source_schema = 'dbo',
@source_name = 'Orders',
@role_name = NULL,
@supports_net_changes = 1;
Auto-Mapped Quick Start (Recommended)
Use TrackTable with ICdcEventMapper<T> to have the framework create typed events and dispatch them automatically:
services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(sql =>
{
sql.ConnectionString(connectionString)
.DatabaseName("OrdersDb");
})
.TrackTable("dbo.Orders", table =>
{
table.MapInsert<OrderCreatedEvent, OrderCreatedMapper>()
.MapUpdate<OrderUpdatedEvent, OrderUpdatedMapper>()
.MapDelete<OrderDeletedEvent, OrderDeletedMapper>();
})
.EnableBackgroundProcessing();
});
The processor derives which CDC capture instances to poll from the tracked tables. "dbo.Orders" is automatically normalized to the SQL Server capture instance dbo_Orders. If your capture instance has a custom name (e.g., dbo_Orders_v2), set it explicitly:
.TrackTable("dbo.Orders", table =>
{
table.CaptureInstance("dbo_Orders_v2") // Override the default
.MapInsert<OrderCreatedEvent, OrderCreatedMapper>();
})
Manual Handler Quick Start
Use .CaptureInstances() when you want full control over change processing via your own IDataChangeHandler:
// 1. Register the processor with capture instances
services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(sql =>
{
sql.ConnectionString(connectionString)
.DatabaseName("OrdersDb")
.CaptureInstances("dbo_Orders", "dbo_Customers");
})
.EnableBackgroundProcessing();
});
// 2. Register your handlers
services.AddDataChangeHandlersFromAssembly(typeof(Program).Assembly);
// 3. Implement IDataChangeHandler
public class OrderCdcHandler : IDataChangeHandler
{
public string[] TableNames => ["dbo_Orders"];
public async Task HandleAsync(DataChangeEvent changeEvent, CancellationToken ct)
{
// You control what happens with the change
var orderId = changeEvent.GetNewValue<Guid>("OrderId");
await _searchIndex.IndexAsync(orderId, changeEvent, ct);
}
}
Table Tracking with Event Mapping
TrackTable registers a table for auto-mapped processing. The framework creates typed domain events from CDC column data and dispatches them via IDispatcher.
Two sub-patterns are available:
ICdcEventMapper<T>(recommended) — provide a mapper class that creates typed events from column data. The framework handles dispatch.- Metadata-only — register event type metadata without a mapper. You still need a manual
IDataChangeHandlerto process changes.
Auto-Mapping with ICdcEventMapper (Recommended)
Define a mapper that converts CDC column data to your domain event:
// 1. Define the event
public record OrderCreatedEvent(int OrderId, string CustomerId, decimal Total) : IDispatchMessage;
// 2. Implement ICdcEventMapper<TEvent>
internal sealed class OrderCreatedEventMapper : ICdcEventMapper<OrderCreatedEvent>
{
public OrderCreatedEvent Map(IReadOnlyList<CdcDataChange> changes, CdcChangeType changeType)
{
return new OrderCreatedEvent(
OrderId: changes.GetValue<int>("OrderId"),
CustomerId: changes.GetValue<string>("CustomerId"),
Total: changes.GetValue<decimal>("Total"));
}
}
// 3. Register with the 2-type-param overload
services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(sql =>
{
sql.ConnectionString(connectionString)
.SchemaName("Cdc")
.BatchSize(100);
})
.TrackTable("dbo.Orders", table =>
{
table.MapInsert<OrderCreatedEvent, OrderCreatedEventMapper>()
.MapUpdate<OrderUpdatedEvent, OrderUpdatedEventMapper>()
.MapDelete<OrderDeletedEvent, OrderDeletedEventMapper>();
})
.EnableBackgroundProcessing();
});
When a CDC change is detected, the framework:
- Resolves the
ICdcEventMapper<TEvent>from DI - Calls
mapper.Map(changes, changeType)to create a typed event - Dispatches via
IDispatcher.DispatchAsync()if the event implementsIDispatchMessage
Use MapAll<TEvent, TMapper>() when a single event type handles all change types:
.TrackTable("dbo.Orders", table =>
{
table.MapAll<OrderChangedEvent, OrderChangedEventMapper>();
})
CdcDataChangeExtensions
Helper methods for extracting typed column values in mapper implementations:
| Method | Description |
|---|---|
changes.GetValue<T>(columnName) | Get new value; throws CdcMappingException if missing |
changes.GetOldValue<T>(columnName) | Get old value (before change); throws if missing |
changes.TryGetValue<T>(columnName, out value) | Safe lookup; returns false if missing |
Metadata-Only Overloads
The single-type-param overloads (MapInsert<TEvent>(), MapAll<TEvent>()) register event type metadata but do not auto-map or dispatch. Use these when you plan to process changes via a manual IDataChangeHandler:
// Registers metadata only -- requires a manual IDataChangeHandler for "dbo.Orders"
.TrackTable("dbo.Orders", table =>
{
table.MapInsert<OrderCreatedEvent>()
.MapUpdate<OrderUpdatedEvent>()
.MapDelete<OrderDeletedEvent>();
})
Entity-Inferred Table Names
services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(sql => sql.ConnectionString(connectionString))
.TrackTable<Order>(table =>
table.MapAll<OrderChangedEvent, OrderChangedEventMapper>())
.TrackTable<Customer>(table =>
table.MapAll<CustomerChangedEvent, CustomerChangedEventMapper>())
.EnableBackgroundProcessing();
});
Manual Handler Pattern
When you need full control over change processing — for search indexing, cache invalidation, or custom integrations — implement IDataChangeHandler directly. Tables can come from .CaptureInstances() on the SQL builder, BindTrackedTables() from config, or TrackTable() without event mappers:
using Excalibur.Cdc.SqlServer;
public class OrderCdcHandler : IDataChangeHandler
{
private readonly ISearchIndex _searchIndex;
private readonly ICache _cache;
public OrderCdcHandler(ISearchIndex searchIndex, ICache cache)
{
_searchIndex = searchIndex;
_cache = cache;
}
// Specify which tables this handler processes
public string[] TableNames => ["dbo.Orders"];
public async Task HandleAsync(DataChangeEvent changeEvent, CancellationToken cancellationToken)
{
switch (changeEvent.ChangeType)
{
case DataChangeType.Insert:
await HandleInsertAsync(changeEvent, cancellationToken);
break;
case DataChangeType.Update:
await HandleUpdateAsync(changeEvent, cancellationToken);
break;
case DataChangeType.Delete:
await HandleDeleteAsync(changeEvent, cancellationToken);
break;
}
}
private async Task HandleInsertAsync(DataChangeEvent changeEvent, CancellationToken ct)
{
// Use built-in extension methods on DataChangeEvent
var orderId = changeEvent.GetNewValue<Guid>("OrderId");
var customerId = changeEvent.GetNewValue<string>("CustomerId");
var totalAmount = changeEvent.GetNewValue<decimal>("TotalAmount");
await _searchIndex.IndexAsync(new OrderDocument
{
Id = orderId,
CustomerId = customerId,
Amount = totalAmount
}, ct);
await _cache.SetAsync($"order:{orderId}", changeEvent, ct);
}
private async Task HandleUpdateAsync(DataChangeEvent changeEvent, CancellationToken ct)
{
var orderId = changeEvent.GetNewValue<Guid>("OrderId");
await _searchIndex.UpdateAsync(orderId, changeEvent, ct);
await _cache.InvalidateAsync($"order:{orderId}", ct);
}
private async Task HandleDeleteAsync(DataChangeEvent changeEvent, CancellationToken ct)
{
// For deletes, use GetOldValue since NewValue is null
var orderId = changeEvent.GetOldValue<Guid>("OrderId");
await _searchIndex.DeleteAsync(orderId, ct);
await _cache.InvalidateAsync($"order:{orderId}", ct);
}
}
### Registering Data Change Handlers
Register `IDataChangeHandler` implementations using assembly scanning or explicit registration:
```csharp
// Assembly scanning -- discovers all IDataChangeHandler implementations
// ⚠️ Requires [RequiresUnreferencedCode] (not AOT-safe)
services.AddDataChangeHandlersFromAssembly(typeof(OrderCdcHandler).Assembly);
// With custom lifetime (default is Singleton)
services.AddDataChangeHandlersFromAssembly(
typeof(OrderCdcHandler).Assembly,
ServiceLifetime.Transient);
Assembly scanning finds all concrete classes implementing IDataChangeHandler and registers them with TryAdd semantics. Each handler is registered both as its concrete type and as IDataChangeHandler for enumerable resolution.
DataChangeEvent Structure
The DataChangeEvent class provides complete information about each database change:
public class DataChangeEvent
{
// Log sequence number for ordering
public byte[] Lsn { get; init; }
// Sequence value within the transaction
public byte[] SeqVal { get; init; }
// When the transaction was committed
public DateTime CommitTime { get; init; }
// The table that changed
public string TableName { get; init; }
// Insert, Update, or Delete
public DataChangeType ChangeType { get; init; }
// Column-level changes
public IList<DataChange> Changes { get; init; }
}
public class DataChange
{
public string ColumnName { get; init; }
public object? OldValue { get; init; } // null for inserts
public object? NewValue { get; init; } // null for deletes
public Type? DataType { get; init; }
}
public enum DataChangeType
{
Unknown = 0,
Insert = 1,
Update = 2,
Delete = 3
}
The framework provides extension methods for extracting typed values from DataChangeEvent:
// Get the new value (for inserts and updates)
var customerId = changeEvent.GetNewValue<string>("CustomerId");
// Get the old value (for updates and deletes)
var previousStatus = changeEvent.GetOldValue<string>("Status");
// With default value if column not found
var amount = changeEvent.GetNewValue<decimal>("Amount", defaultValue: 0m);
These methods handle type conversion and nullable types automatically.
Working with Changes
public async Task HandleAsync(DataChangeEvent changeEvent, CancellationToken cancellationToken)
{
// Check if a specific column changed
var statusChange = changeEvent.Changes
.FirstOrDefault(c => c.ColumnName == "Status");
if (statusChange is not null &&
changeEvent.ChangeType == DataChangeType.Update &&
!Equals(statusChange.OldValue, statusChange.NewValue))
{
await PublishStatusChangedEvent(
changeEvent.TableName,
statusChange.OldValue?.ToString(),
statusChange.NewValue?.ToString(),
cancellationToken);
}
// Get all column values as a dictionary
var newValues = changeEvent.Changes
.Where(c => c.NewValue is not null)
.ToDictionary(c => c.ColumnName, c => c.NewValue);
var oldValues = changeEvent.Changes
.Where(c => c.OldValue is not null)
.ToDictionary(c => c.ColumnName, c => c.OldValue);
}
Anti-Corruption Pattern Example
This section shows a recommended pattern for implementing an anti-corruption layer using IDataChangeHandler. This is not a built-in framework component — you implement this yourself using the CDC handler infrastructure.
Protect downstream systems from database schema changes by transforming raw CDC events into domain events:
public class OrderAntiCorruptionHandler : IDataChangeHandler
{
private readonly IDispatcher _dispatcher;
public OrderAntiCorruptionHandler(IDispatcher dispatcher)
{
_dispatcher = dispatcher;
}
public string[] TableNames => ["dbo.Orders"];
public async Task HandleAsync(DataChangeEvent change, CancellationToken ct)
{
var domainEvent = change.ChangeType switch
{
DataChangeType.Insert => CreateOrderCreatedEvent(change),
DataChangeType.Update => CreateOrderUpdatedEvent(change),
DataChangeType.Delete => CreateOrderDeletedEvent(change),
_ => null
};
if (domainEvent is not null)
{
await _dispatcher.DispatchAsync(domainEvent, ct);
}
}
private OrderCreatedEvent CreateOrderCreatedEvent(DataChangeEvent change)
{
// Use built-in extension methods on DataChangeEvent
return new OrderCreatedEvent(
OrderId: change.GetNewValue<Guid>("order_id"),
CustomerId: change.GetNewValue<string>("customer_id"),
TotalAmount: change.GetNewValue<decimal>("total_amt"),
Currency: MapCurrency(change.GetNewValue<int>("currency_code")),
CreatedAt: change.GetNewValue<DateTime>("created_date")
);
}
private OrderUpdatedEvent? CreateOrderUpdatedEvent(DataChangeEvent change)
{
var statusChange = change.Changes.FirstOrDefault(c => c.ColumnName == "status");
if (statusChange is null || Equals(statusChange.OldValue, statusChange.NewValue))
return null;
return new OrderUpdatedEvent(
OrderId: change.GetNewValue<Guid>("order_id"),
OldStatus: statusChange.OldValue?.ToString() ?? "",
NewStatus: statusChange.NewValue?.ToString() ?? ""
);
}
private OrderDeletedEvent CreateOrderDeletedEvent(DataChangeEvent change)
{
// For deletes, use GetOldValue since NewValue is null
return new OrderDeletedEvent(
OrderId: change.GetOldValue<Guid>("order_id")
);
}
private static string MapCurrency(int code) => code switch
{
1 => "USD",
2 => "EUR",
3 => "GBP",
_ => "USD"
};
}
SQL Server Builder Reference
The ISqlServerCdcBuilder interface provides fluent configuration for SQL Server CDC:
| Method | Description | Default |
|---|---|---|
SchemaName(string) | Schema for CDC state tables | "Cdc" |
StateTableName(string) | Table name for processing state | "CdcProcessingState" |
PollingInterval(TimeSpan) | How often to poll for changes | 5 seconds |
BatchSize(int) | Changes per processing batch | 100 |
CommandTimeout(TimeSpan) | Database command timeout | 30 seconds |
DatabaseName(string) | Database name; auto-registers IDatabaseOptions | -- |
DatabaseConnectionIdentifier(string) | Identifier for CDC source connection | cdc-{DatabaseName} |
StateConnectionIdentifier(string) | Identifier for state store connection | state-{DatabaseName} |
CaptureInstances(params string[]) | CDC capture instances to poll (for manual IDataChangeHandler pattern; auto-mapped tables via TrackTable/BindTrackedTables are derived automatically) | -- |
StopOnMissingTableHandler(bool) | Stop processing on missing handler | true |
ConnectionStringName(string) | Resolve connection from IConfiguration.GetConnectionString() | -- |
ConnectionFactory(Func<IServiceProvider, Func<SqlConnection>>) | DI-integrated source connection factory | -- |
WithStateStore(Action<ICdcStateStoreBuilder>) | Configure separate state store connection and schema | Source connection |
StateConnectionFactory(Func<IServiceProvider, Func<SqlConnection>>) | DI-integrated state connection factory | Source connection |
BindConfiguration(string) | Bind source options from IConfiguration section | -- |
When you call DatabaseName(), the builder automatically registers an IDatabaseOptions factory with sensible defaults for connection identifiers. The factory derives CaptureInstances at runtime from all registered sources — TrackTable(), BindTrackedTables(), and .CaptureInstances() — so config-driven tables are included automatically. You only need to set DatabaseConnectionIdentifier() or StateConnectionIdentifier() if you want custom values. Manual IDatabaseOptions registration takes precedence.
Connection Factory
For custom connection management (e.g., pooling or dynamic connection strings):
services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(sql =>
{
sql.ConnectionFactory(sp =>
{
var config = sp.GetRequiredService<IConfiguration>();
var connStr = config.GetConnectionString("CdcDatabase")!;
return () => new SqlConnection(connStr);
})
.SchemaName("audit")
.BatchSize(200)
.DatabaseName("AuditDb");
})
.TrackTable("dbo.AuditLog", table => table.MapAll<AuditLogChangedEvent>())
.EnableBackgroundProcessing();
});
Postgres Builder Reference
The IPostgresCdcBuilder interface provides fluent configuration for Postgres CDC:
| Method | Description | Default |
|---|---|---|
SchemaName(string) | Schema for CDC state tables | "excalibur" |
StateTableName(string) | Table name for CDC processing state | "cdc_state" |
PollingInterval(TimeSpan) | How often to poll for changes | 1 second |
BatchSize(int) | Changes per processing batch | 1000 |
Timeout(TimeSpan) | Replication operation timeout | 30 seconds |
ProcessorId(string) | Identifier for this CDC processor instance | Machine name |
ReplicationSlotName(string) | Postgres logical replication slot name | "excalibur_cdc_slot" |
PublicationName(string) | Postgres publication name | "excalibur_cdc_publication" |
UseBinaryProtocol(bool) | Use binary protocol for logical replication | false |
AutoCreateSlot(bool) | Auto-create replication slot if missing | false |
ConnectionString(string) | Postgres source connection string | -- |
ConnectionStringName(string) | Resolve connection from IConfiguration.GetConnectionString() | -- |
ConnectionFactory(Func<IServiceProvider, Func<NpgsqlConnection>>) | DI-integrated source connection factory | -- |
BindConfiguration(string) | Bind source options from IConfiguration section | -- |
WithStateStore(Action<ICdcStateStoreBuilder>) | Configure separate state store connection and schema | Source connection |
StateConnectionFactory(Func<IServiceProvider, Func<NpgsqlConnection>>) | DI-integrated state connection factory | Source connection |
Postgres Example
services.AddCdcProcessor(cdc =>
{
cdc.UsePostgres(pg =>
{
pg.ConnectionString(connectionString)
.ReplicationSlotName("orders_cdc_slot")
.PublicationName("orders_publication")
.AutoCreateSlot()
.PollingInterval(TimeSpan.FromSeconds(1))
.BatchSize(500);
})
.TrackTable("public.orders", t => t.MapAll<OrderChangedEvent>())
.EnableBackgroundProcessing();
});
Separate State Store Connection
By default, CDC uses the same connection for reading changes and persisting checkpoints. In production, you may want to separate these concerns — for example, when your CDC source is a read-replica that should not carry checkpoint write load, or when the state store lives on a different tier.
The WithStateStore method follows the Microsoft Change Feed Processor pattern where lease/checkpoint storage is configured separately from the monitored source.
Connection String
services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(sql =>
{
sql.ConnectionString(sourceConnectionString)
.WithStateStore(state =>
{
state.ConnectionString(stateConnectionString);
});
})
.TrackTable("dbo.Orders", t => t.MapAll<OrderChangedEvent>())
.EnableBackgroundProcessing();
});
Connection String with State Store Configuration
services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(sql =>
{
sql.ConnectionString(sourceConnectionString)
.WithStateStore(state =>
{
state.ConnectionString(stateConnectionString)
.SchemaName("dbo")
.TableName("CdcCheckpoints");
});
})
.TrackTable("dbo.Orders", t => t.MapAll<OrderChangedEvent>())
.EnableBackgroundProcessing();
});
Named Connection Strings
Resolve connections from IConfiguration.GetConnectionString() at DI resolution time:
services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(sql =>
{
sql.ConnectionStringName("CdcSource")
.WithStateStore(state =>
{
state.ConnectionStringName("CdcState");
});
})
.TrackTable("dbo.Orders", t => t.MapAll<OrderChangedEvent>())
.EnableBackgroundProcessing();
});
Configuration Binding
Bind all options from appsettings.json:
{
"Cdc": {
"SqlServer": {
"ConnectionString": "Server=.;Database=OrdersDb;...",
"SchemaName": "Cdc",
"PollingInterval": "00:00:05",
"BatchSize": 100
}
}
}
services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(sql =>
{
sql.BindConfiguration("Cdc:SqlServer")
.DatabaseName("OrdersDb");
})
.TrackTable("dbo.Orders", t => t.MapAll<OrderChangedEvent>())
.EnableBackgroundProcessing();
});
DI-Integrated Factory
For advanced scenarios (managed identity, dynamic connection strings):
services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(sql =>
{
sql.ConnectionFactory(sp =>
{
var config = sp.GetRequiredService<IConfiguration>();
return () => new SqlConnection(config.GetConnectionString("CdcSource")!);
})
.DatabaseName("OrdersDb")
.WithStateStore(state =>
{
state.ConnectionStringName("CdcState")
.SchemaName("cdc")
.TableName("ProcessingState");
});
})
.TrackTable("dbo.Orders", t => t.MapAll<OrderChangedEvent>())
.EnableBackgroundProcessing();
});
PostgreSQL
The same pattern works with IPostgresCdcBuilder:
services.AddCdcProcessor(cdc =>
{
cdc.UsePostgres(pg =>
{
pg.ConnectionString(sourceConnectionString)
.WithStateStore(state =>
{
state.ConnectionString(stateConnectionString)
.SchemaName("excalibur")
.TableName("cdc_state");
});
})
.TrackTable("public.orders", t => t.MapAll<OrderChangedEvent>())
.EnableBackgroundProcessing();
});
ICdcStateStoreBuilder Reference
The Action<ICdcStateStoreBuilder> callback configures state store persistence:
| Method | Description |
|---|---|
ConnectionString(string) | Set the state store connection string directly |
ConnectionStringName(string) | Resolve connection from IConfiguration.GetConnectionString() |
SchemaName(string) | Database schema for the checkpoint table |
TableName(string) | Table name for checkpoint persistence |
BindConfiguration(string) | Bind state store options from an IConfiguration section |
When WithStateStore is omitted, the source connection is used for state persistence — existing code continues to work without changes.
Configuration-Driven Setup
Use BindConfiguration on the provider builder to bind CDC source options from appsettings.json:
services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(sourceConnectionString, sql =>
{
sql.BindConfiguration("Cdc:SqlServer");
})
.TrackTable("dbo.Orders", t => t.MapAll<OrderChangedEvent>())
.EnableBackgroundProcessing();
});
{
"Cdc": {
"SqlServer": {
"SchemaName": "Cdc",
"StateTableName": "CdcProcessingState",
"PollingInterval": "00:00:05",
"BatchSize": 100
}
}
}
BindConfiguration uses OptionsBuilder<T>.BindConfiguration() with ValidateDataAnnotations and ValidateOnStart for fail-fast startup validation.
Config-Driven Table Binding
Instead of (or in addition to) registering tables in code, you can declare tracked tables in appsettings.json and bind them with BindTrackedTables. This is the recommended approach for per-environment configuration — different environments can track different tables or use different capture instances without code changes.
{
"Cdc": {
"Tables": [
{ "TableName": "dbo.Orders", "CaptureInstance": "dbo_Orders_v2" },
{ "TableName": "dbo.Customers" }
]
}
}
| Property | Required | Description |
|---|---|---|
TableName | Yes | Fully qualified table name (e.g., "dbo.Orders"). Used for handler routing and as the default capture instance name. |
CaptureInstance | No | Explicit SQL Server capture instance name. When omitted, TableName is normalized (e.g., dbo.Orders becomes dbo_Orders). Use this when your capture instance has a custom name like dbo_Orders_v2. |
services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(sql => sql
.ConnectionString(connectionString)
.DatabaseName("OrdersDb"))
.BindTrackedTables("Cdc:Tables")
.EnableBackgroundProcessing();
});
The processor derives which CDC capture instances to poll from these config entries. In the example above, the processor polls dbo_Orders_v2 (explicit) and dbo_Customers (derived from TableName).
Config-bound tables work with both processing patterns:
- Auto-mapped: Combine with
TrackTable()in code to provide event mappers for config-bound tables. Code-registered tables take precedence over config duplicates. - Manual handler: Register
IDataChangeHandlerimplementations whoseTableNamesmatch the config entries. No event mappers needed.
Config-bound tables merge additively with code-registered tables. Duplicate table names (case-insensitive) are skipped — code-registered tables always take precedence. Event mappings cannot be expressed in configuration and remain code-only via TrackTable().
Use appsettings.{Environment}.json to vary tracked tables by environment:
// appsettings.Development.json
{ "Cdc": { "Tables": [{ "TableName": "dbo.Orders" }] } }
// appsettings.Production.json
{ "Cdc": { "Tables": [
{ "TableName": "dbo.Orders", "CaptureInstance": "dbo_Orders_v3" },
{ "TableName": "dbo.Customers" },
{ "TableName": "dbo.Payments" }
] } }
Combining Code and Config
services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(sql => sql.ConnectionString(connectionString))
// Code-registered table with event mappings (takes precedence)
.TrackTable("dbo.Orders", table =>
{
table.MapInsert<OrderCreatedEvent, OrderCreatedMapper>()
.MapUpdate<OrderUpdatedEvent, OrderUpdatedMapper>();
})
// Config-bound tables (additively merged, duplicates skipped)
.BindTrackedTables("Cdc:Tables")
.EnableBackgroundProcessing();
});
Handler Auto-Discovery
Use TrackTablesFromHandlers() to automatically discover tracked tables from registered handler implementations without listing each table explicitly:
services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(sql => sql.ConnectionString(connectionString))
.TrackTablesFromHandlers()
.EnableBackgroundProcessing();
});
// Register your handlers (or use assembly scanning)
services.AddDataChangeHandlersFromAssembly(typeof(OrderCdcHandler).Assembly);
At startup, the framework resolves all ICdcTableProvider services from DI and registers their declared TableNames as tracked tables. Provider-specific handlers like IDataChangeHandler (SQL Server) implement ICdcTableProvider, so they are discovered automatically.
ICdcTableProvider Interface
Any type implementing ICdcTableProvider participates in auto-discovery:
public interface ICdcTableProvider
{
string[] TableNames { get; }
}
IDataChangeHandler extends ICdcTableProvider, so existing SQL Server handlers are discovered without changes.
Precedence Rules
When combining all three table registration methods, the precedence order is:
- Code-registered (
TrackTable()) — highest priority - Config-bound (
BindTrackedTables()) - Handler-discovered (
TrackTablesFromHandlers()) — lowest priority
Duplicates by table name (case-insensitive) are skipped at each level.
Full Example
services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(sql =>
{
sql.ConnectionStringName("CdcSource")
.DatabaseName("OrdersDb");
})
// Explicit table with event mappings
.TrackTable("dbo.Orders", table =>
{
table.MapInsert<OrderCreatedEvent, OrderCreatedMapper>()
.MapUpdate<OrderUpdatedEvent, OrderUpdatedMapper>()
.MapDelete<OrderDeletedEvent, OrderDeletedMapper>();
})
// Additional tables from config
.BindTrackedTables("Cdc:Tables")
// Remaining tables from registered handlers
.TrackTablesFromHandlers()
.EnableBackgroundProcessing()
// Bind processing options from config
.BindProcessingConfiguration("Cdc:Processing");
});
Checkpointing
State Store Configuration
Configure the CDC state store via the fluent builder:
services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(sql =>
{
sql.ConnectionString(connectionString)
.SchemaName("Cdc")
.StateTableName("CdcProcessingState");
})
.TrackTable("dbo.Orders", t => t.MapAll<OrderChangedEvent>())
.EnableBackgroundProcessing();
});
ICdcStateStore Interface
The framework provides ICdcStateStore for checkpoint management. The state store tracks processing positions per database and capture instance:
public class CdcManager
{
private readonly ICdcStateStore _stateStore;
public CdcManager(ICdcStateStore stateStore)
{
_stateStore = stateStore;
}
public async Task<IEnumerable<CdcProcessingState>> GetPositionsAsync(
string connectionId,
string databaseName,
CancellationToken ct)
{
return await _stateStore.GetLastProcessedPositionAsync(
connectionId,
databaseName,
ct);
}
public async Task UpdatePositionAsync(
string connectionId,
string databaseName,
string tableName,
byte[] lsn,
byte[]? seqVal,
DateTime? commitTime,
CancellationToken ct)
{
await _stateStore.UpdateLastProcessedPositionAsync(
connectionId,
databaseName,
tableName,
lsn,
seqVal,
commitTime,
ct);
}
}
Error Handling
Handler-Level Error Handling
Handle errors within your IDataChangeHandler implementation:
public class ResilientOrderCdcHandler : IDataChangeHandler
{
private readonly ILogger<ResilientOrderCdcHandler> _logger;
private readonly IDeadLetterQueue _deadLetter;
public string[] TableNames => ["dbo.Orders"];
public async Task HandleAsync(DataChangeEvent changeEvent, CancellationToken ct)
{
try
{
await ProcessChangeAsync(changeEvent, ct);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to process CDC event for {Table} at LSN {Lsn}",
changeEvent.TableName,
BitConverter.ToString(changeEvent.Lsn));
// Send to dead letter for manual investigation
await _deadLetter.EnqueueAsync(
changeEvent,
DeadLetterReason.UnhandledException,
ex,
new Dictionary<string, string>
{
["TableName"] = changeEvent.TableName,
["Lsn"] = BitConverter.ToString(changeEvent.Lsn),
["ChangeType"] = changeEvent.ChangeType.ToString()
},
ct);
// Optionally re-throw to stop processing
// throw;
}
}
private Task ProcessChangeAsync(DataChangeEvent changeEvent, CancellationToken ct)
{
// Processing logic
return Task.CompletedTask;
}
}
Stale Position Recovery
Handle stale LSN positions when CDC retention expires, a database is restored from backup, or an invalid LSN range triggers SQL Error 313. The framework detects these scenarios automatically and invokes the configured recovery strategy:
using Excalibur.Cdc;
services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(sql => sql.ConnectionString(connectionString))
.TrackTable("dbo.Orders", t => t.MapAll<OrderChangedEvent>())
.WithRecovery(recovery =>
{
// Choose a recovery strategy
recovery.Strategy(StalePositionRecoveryStrategy.FallbackToEarliest)
.MaxAttempts(5)
.AttemptDelay(TimeSpan.FromSeconds(30));
})
.EnableBackgroundProcessing();
});
Recovery Strategies
| Strategy | Description |
|---|---|
Throw | Throw an exception (default, requires manual intervention) |
FallbackToEarliest | Reset to earliest available position (may reprocess events) |
FallbackToLatest | Skip to latest position (may lose unprocessed events) |
InvokeCallback | Call custom handler for advanced recovery logic |
Reason Codes
The ReasonCode on CdcPositionResetEventArgs tells you why the position is stale:
| Reason Code | Description |
|---|---|
CdcCleanup | CDC cleanup job purged records older than the retention threshold |
BackupRestore | Database restored from backup with different CDC history |
CdcReenabled | CDC was disabled and re-enabled, invalidating previous positions |
LsnOutOfRange | LSN falls outside the valid min/max range (SQL Error 22037/22029) |
TvfInsufficientArguments | CDC TVF received an invalid LSN range (SQL Error 313) |
CaptureInstanceDropped | The capture instance no longer exists in the database |
Unknown | Cause could not be determined from the SQL error |
SQL Server sometimes raises error 313 ("An insufficient number of arguments were supplied") instead of the more specific 22037/22029 errors when the LSN falls outside the valid CDC window. The framework recognizes this error and treats it the same as LsnOutOfRange — the position is stale and recovery is triggered automatically.
Custom Recovery Callback
For complex recovery scenarios, use InvokeCallback with a custom handler:
services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(sql => sql.ConnectionString(connectionString))
.TrackTable("dbo.Orders", t => t.MapAll<OrderChangedEvent>())
.WithRecovery(recovery =>
{
recovery.Strategy(StalePositionRecoveryStrategy.InvokeCallback)
.OnPositionReset(async (args, ct) =>
{
_logger.LogWarning(
"Stale CDC position for {CaptureInstance}. Reason: {Reason}",
args.CaptureInstance,
args.ReasonCode);
// Handle based on reason code
// StalePositionReasonCodes: CdcCleanup, BackupRestore,
// CdcReenabled, LsnOutOfRange, CaptureInstanceDropped,
// TvfInsufficientArguments, Unknown
});
})
.EnableBackgroundProcessing();
});
Idempotency Filtering
CDC uses at-least-once delivery — events may be replayed after a crash, restart, or stale position reset. If your handlers are not naturally idempotent, enable an idempotency filter to deduplicate events before they reach your handler.
When registered, the CDC processor checks each event's (tableName, LSN, seqVal) composite key before invoking the handler. Events that have already been processed are skipped automatically. This is an opt-in feature — when no filter is registered, all events are processed without deduplication.
In-Memory Filter (Single Instance)
For single-instance deployments where CDC events are processed by one consumer:
services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(sql => sql.ConnectionString(connectionString))
.TrackTable("dbo.Orders", t => t.MapAll<OrderChangedEvent>())
.UseInMemoryIdempotencyFilter()
.EnableBackgroundProcessing();
});
The in-memory filter uses a bounded ConcurrentDictionary with a capacity of 10,000 entries. When capacity is reached, new events are processed without deduplication tracking (skip-when-full pattern), ensuring bounded memory usage.
The in-memory filter does not survive process restarts — it is purely in-memory. For durable deduplication across restarts or multi-instance deployments, use the SQL Server filter.
SQL Server Filter (Multi-Instance)
For multi-instance deployments where multiple CDC consumers may process the same events:
services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(sql => sql.ConnectionString(connectionString))
.TrackTable("dbo.Orders", t => t.MapAll<OrderChangedEvent>())
.UseSqlServerIdempotencyFilter()
.EnableBackgroundProcessing();
});
The SQL Server filter persists processed event records in a [Cdc].[CdcProcessedEvents] table with a clustered composite primary key on (TableName, Lsn, SeqVal). Duplicate inserts are handled gracefully via primary key violation detection — concurrent instances processing the same event will not error.
Customizing Options
services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(sql => sql.ConnectionString(connectionString))
.TrackTable("dbo.Orders", t => t.MapAll<OrderChangedEvent>())
.UseSqlServerIdempotencyFilter(opts =>
{
opts.SchemaName = "MySchema"; // Default: "Cdc"
opts.TableName = "MyProcessedEvents"; // Default: "CdcProcessedEvents"
opts.RetentionPeriod = TimeSpan.FromHours(48); // Default: 24 hours
opts.CleanupBatchSize = 5000; // Default: 1000
})
.EnableBackgroundProcessing();
});
| Option | Default | Description |
|---|---|---|
SchemaName | "Cdc" | Schema for the processed events table |
TableName | "CdcProcessedEvents" | Table name for tracking processed events |
RetentionPeriod | 24 hours | Records older than this are eligible for cleanup |
CleanupBatchSize | 1000 | Max records deleted per cleanup batch (prevents long transactions) |
Options are validated at startup via IValidateOptions<T> and ValidateOnStart(). Schema and table names are validated against SQL identifier rules (alphanumeric and underscores only), and retention period and batch size must be positive.
DDL Migration
Create the processed events table before starting the CDC processor:
-- Create schema if it doesn't exist
IF NOT EXISTS (SELECT 1 FROM sys.schemas WHERE name = 'Cdc')
EXEC('CREATE SCHEMA [Cdc]');
-- Create processed events table
CREATE TABLE [Cdc].[CdcProcessedEvents] (
TableName NVARCHAR(256) NOT NULL,
Lsn VARBINARY(10) NOT NULL,
SeqVal VARBINARY(10) NOT NULL,
ProcessedAt DATETIME2 NOT NULL DEFAULT SYSUTCDATETIME(),
CONSTRAINT PK_CdcProcessedEvents
PRIMARY KEY CLUSTERED (TableName, Lsn, SeqVal)
);
Retention and Cleanup
Old records are cleaned up periodically based on the configured RetentionPeriod. Cleanup uses batched DELETE TOP (@batchSize) to prevent long-running transactions from blocking CDC processing. The cleanup is invoked internally by the CDC processor.
Choosing a Filter
| In-Memory | SQL Server | |
|---|---|---|
| Durability | Lost on restart | Survives restarts |
| Multi-instance | No (single consumer only) | Yes (concurrent consumers) |
| Performance | Fastest (dictionary lookup) | Fast (clustered PK point lookup) |
| Capacity | Bounded at 10,000 entries | Limited only by disk |
| Maintenance | None | Retention cleanup (automatic) |
| Best for | Dev/test, single-instance prod | Multi-instance production |
UseInMemoryIdempotencyFilter() uses TryAddSingleton — if a filter is already registered, the call is a no-op. UseSqlServerIdempotencyFilter() uses AddSingleton and replaces any previously registered filter. This means you can safely call both, and the last one wins.
Resilience
Transient Fault Handling
When Excalibur.Data.SqlServer (or another provider that registers IDataAccessPolicyFactory) is present, the CDC processor automatically wraps all database operations in a comprehensive resilience policy:
- Retry — 3 attempts with exponential backoff and jitter for transient SQL failures (timeouts, deadlocks, connection resets, Azure SQL throttling)
- Circuit breaker — Opens after sustained failure (50% failure rate over 60 seconds, minimum 5 requests) and stays open for 30 seconds
This covers change detection queries, checkpoint updates, and event handler execution — all DB operations in the CDC pipeline are protected.
// Resilience is automatic — IDataAccessPolicyFactory is injected
// into CdcProcessor, CdcChangeDetector, and CdcChangeApplier.
// No additional configuration is required beyond registering the
// SQL Server data access provider:
builder.Services.AddExcaliburDataSqlServer(options =>
{
options.ConnectionString = connectionString;
});
Database Restore Survivability
The CDC processor is designed to handle database unavailability during restores and data replacement from backup:
- Checkpoint ordering — Checkpoints advance only after successful event processing. If the database becomes unavailable mid-batch, the checkpoint stays at the last successfully processed position
- Stale position recovery — When a restored database has different CDC LSN ranges, the configurable recovery strategy (see Stale Position Recovery) handles the mismatch automatically
- Guarded operations — Checkpoint updates and state store writes are wrapped in try-catch with logging, preventing the processing loop from crashing during transient DB unavailability
Monitoring
Health Checks
// The built-in CdcHealthCheck is internal and registered via AddCdcHealthCheck().
// For custom lag monitoring, create your own health check:
services.AddHealthChecks()
.AddCdcHealthCheck() // built-in health check
.AddCheck<CdcLagHealthCheck>("cdc-lag"); // custom lag monitor
public class CdcLagHealthCheck : IHealthCheck
{
private readonly ICdcStateStore _stateStore;
private readonly string _connectionId;
private readonly string _databaseName;
private readonly TimeSpan _maxLag = TimeSpan.FromMinutes(5);
public CdcLagHealthCheck(ICdcStateStore stateStore)
{
_stateStore = stateStore;
_connectionId = "default"; // Configure based on your setup
_databaseName = "MyDatabase";
}
public async Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken ct)
{
var positions = await _stateStore.GetLastProcessedPositionAsync(
_connectionId,
_databaseName,
ct);
foreach (var position in positions)
{
if (position.LastCommitTime.HasValue)
{
var lag = DateTime.UtcNow - position.LastCommitTime.Value;
if (lag > _maxLag)
{
return HealthCheckResult.Degraded(
$"CDC lag for {position.CaptureInstance}: {lag}");
}
}
}
return HealthCheckResult.Healthy();
}
}
Metrics
services.AddOpenTelemetry()
.WithMetrics(metrics =>
{
metrics.AddMeter("Excalibur.Data.Cdc");
// Emits:
// - excalibur.cdc.events.processed
// - excalibur.cdc.events.failed
// - excalibur.cdc.batch.duration
// - excalibur.cdc.batch.size
});
Database Maintenance
Retention Configuration
-- Set CDC retention (default 3 days)
EXEC sys.sp_cdc_change_job
@job_type = 'cleanup',
@retention = 4320; -- minutes (3 days)
Monitor CDC
-- Check CDC tables
SELECT *
FROM sys.tables
WHERE is_tracked_by_cdc = 1;
-- Check capture instances
SELECT *
FROM cdc.change_tables;
-- Check CDC job status
EXEC sys.sp_cdc_help_jobs;
Running CDC Processing
CDC requires an active processing loop. Choose the approach that fits your hosting model:
Option 1: Built-in Background Service (Recommended)
Call EnableBackgroundProcessing() on the CDC builder to register a CdcProcessingHostedService that polls for changes automatically:
services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(sql => sql.ConnectionString(connectionString))
.TrackTable("dbo.Orders", t => t.MapAll<OrderChangedEvent>())
.EnableBackgroundProcessing();
});
The hosted service:
- Polls for CDC changes at a configurable interval (default 5 seconds)
- Applies exponential backoff on errors (up to 5× the polling interval), resetting on success
- Catches and logs exceptions without crashing the host
- Supports graceful drain on shutdown (default 30-second timeout)
- Reports structured log events via
LoggerMessagesource generation
Configuration
Configure processing behavior with CdcProcessingOptions:
services.Configure<CdcProcessingOptions>(options =>
{
options.PollingInterval = TimeSpan.FromSeconds(10); // Default: 5 seconds
options.Enabled = true; // Default: true
options.DrainTimeoutSeconds = 60; // Default: 30
options.UnhealthyThreshold = 5; // Default: 5
});
| Option | Default | Description |
|---|---|---|
PollingInterval | 5 seconds | Interval between processing cycles |
Enabled | true | Set to false to disable without removing registration |
DrainTimeoutSeconds | 30 | Seconds to wait for in-flight processing on shutdown |
UnhealthyThreshold | 5 | Consecutive failures before health check reports unhealthy |
Configuration Binding (Recommended)
Use BindProcessingConfiguration to bind processing options from appsettings.json instead of hardcoding values:
services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(sql => sql.ConnectionString(connectionString))
.TrackTable("dbo.Orders", t => t.MapAll<OrderChangedEvent>())
.EnableBackgroundProcessing()
.BindProcessingConfiguration("Cdc:Processing");
});
{
"Cdc": {
"Processing": {
"Enabled": false,
"PollingInterval": "00:00:10",
"DrainTimeoutSeconds": 30,
"UnhealthyThreshold": 5
}
}
}
This uses OptionsBuilder<CdcProcessingOptions>.BindConfiguration() with ValidateDataAnnotations and ValidateOnStart for fail-fast startup validation.
You can also configure processing inline via the SQL Server builder:
services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(sql =>
{
sql.ConnectionString(connectionString)
.PollingInterval(TimeSpan.FromSeconds(10))
.BatchSize(200);
})
.TrackTable("dbo.Orders", t => t.MapAll<OrderChangedEvent>())
.EnableBackgroundProcessing();
});
Option 2: Quartz Job
Use CdcJob from the Excalibur.Jobs package for cron-scheduled CDC processing:
// Install: dotnet add package Excalibur.Jobs
services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(sql => sql.ConnectionString(connectionString))
.TrackTable("dbo.Orders", t => t.MapAll<OrderChangedEvent>());
// Don't call EnableBackgroundProcessing() — Quartz handles scheduling
});
// Schedule via Quartz.NET — CdcJob supports:
// - Multiple database configurations
// - Built-in health checks
// - Cron-based scheduling
Option 3: Manual/Serverless
For serverless environments, omit EnableBackgroundProcessing() and call ISqlServerCdcProcessor directly from an Azure Function, AWS Lambda, or other trigger:
services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(sql => sql.ConnectionString(connectionString))
.TrackTable("dbo.Orders", t => t.MapAll<OrderChangedEvent>());
// Don't call EnableBackgroundProcessing() — you'll trigger manually
});
public class CdcProcessorFunction
{
private readonly ISqlServerCdcProcessor _processor;
private readonly IDispatcher _dispatcher;
public CdcProcessorFunction(ISqlServerCdcProcessor processor, IDispatcher dispatcher)
{
_processor = processor;
_dispatcher = dispatcher;
}
[Function("ProcessCdc")]
public async Task Run([TimerTrigger("*/10 * * * * *")] TimerInfo timer)
{
// ICdcProcessor<T>.ProcessBatchAsync requires an event handler delegate
var processedCount = await _processor.ProcessBatchAsync(
async (changeEvent, ct) =>
{
// Handle each change event - dispatch to handlers, update projections, etc.
await _dispatcher.DispatchAsync(
new DataChangeNotification(changeEvent), ct);
},
CancellationToken.None);
}
}
All CDC providers implement a two-tier interface hierarchy:
ICdcProcessor<TEvent>— poll-based batch processing (SqlServer, InMemory)ICdcStreamProcessor<TEvent, TPosition>— streaming with position tracking (Postgres, MongoDB, CosmosDB, DynamoDB, Firestore)
Each provider has a marker interface (e.g., ISqlServerCdcProcessor, IPostgresCdcProcessor) for type-safe DI injection. Inject the provider-specific marker interface (e.g., ISqlServerCdcProcessor) in your DI registrations for compile-time safety.
Connection Management
CDC services are registered as singletons. To avoid holding long-lived database connections open, CDC uses a connection factory pattern (Func<SqlConnection>) instead of injecting raw connection objects. Each operation creates a fresh connection from the factory, allowing ADO.NET connection pooling to manage the lifecycle.
Connection String Approach (Simple)
When you pass a connection string directly, the framework creates a factory internally:
services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(sql => sql.ConnectionString(connectionString))
.TrackTable("dbo.Orders", t => t.MapAll<OrderChangedEvent>())
.EnableBackgroundProcessing();
});
Connection Factory Approach (Recommended for Production)
Use the factory overload when you need custom connection management, DI-resolved connection strings, or managed identity authentication:
// Factory with DI access
services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(
sp => () => new SqlConnection(
sp.GetRequiredService<IConfiguration>().GetConnectionString("Cdc")),
sql =>
{
sql.SchemaName("cdc")
.BatchSize(200);
})
.TrackTable("dbo.Orders", t => t.MapAll<OrderChangedEvent>())
.EnableBackgroundProcessing();
});
The factory signature is Func<IServiceProvider, Func<SqlConnection>>:
- The outer function receives
IServiceProviderfor resolving DI services - The inner function creates a new
SqlConnectioneach time it is called
This same pattern applies to PostgreSQL:
services.AddCdcProcessor(cdc =>
{
cdc.UsePostgres(
sp => () => new NpgsqlConnection(
sp.GetRequiredService<IConfiguration>().GetConnectionString("Cdc")),
pg =>
{
pg.PublicationName("my_publication");
})
.TrackTable("public.orders", t => t.MapAll<OrderChangedEvent>())
.EnableBackgroundProcessing();
});
Why Connection Factories Matter
| Concern | Direct Connection | Connection Factory |
|---|---|---|
| Connection pooling | Bypassed (single long-lived connection) | Properly leveraged (short-lived connections) |
| Connection recovery | Requires manual reconnection logic | Fresh connection per operation |
| DI integration | Connection string hardcoded at registration | Resolved from IServiceProvider at runtime |
| Managed identity | Difficult (token refresh on held connection) | Natural (fresh connection with current token) |
CDC processors, outbox processors, and inbox stores are all registered as singletons. Never inject a raw SqlConnection or IDbConnection into these services. Always use the factory pattern to create connections on demand.
Best Practices
| Practice | Recommendation |
|---|---|
| Table configuration | Use TrackTable() fluent builder with MapInsert/Update/Delete<T>() |
| Event mapping | Use MapAll<T>() for simple scenarios, separate events for fine-grained control |
| Error handling | Implement dead letter queue in IDataChangeHandler implementations |
| Connection management | Use Func<SqlConnection> factory overload for production (see Connection Management) |
| Checkpointing | Configure state store schema via UseSqlServer(sql => sql.SchemaName(...)) |
| Anti-corruption | Transform database columns to domain events using mapping functions |
| Recovery | Configure WithRecovery() with FallbackToEarliest for idempotent handlers |
| Idempotency | Use UseInMemoryIdempotencyFilter() for single-instance, UseSqlServerIdempotencyFilter() for multi-instance |
| Hosting | Use EnableBackgroundProcessing() for most cases, Quartz job for cron schedules |
Providers
Excalibur CDC uses ICdcBuilder as its core abstraction with provider-specific builder interfaces for each database. All providers follow the same pattern: AddCdcProcessor + UseXxx(options => { ... }).
Core Registration
using Microsoft.Extensions.DependencyInjection;
// SQL providers
services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(sql =>
{
sql.ConnectionString(connectionString)
.WithStateStore(stateConnectionString); // Optional: separate state store
});
});
// Cloud-native providers (same builder pattern)
services.AddCdcProcessor(cdc =>
{
cdc.UseCosmosDb(cosmos =>
{
cosmos.ConnectionString(connectionString)
.DatabaseName("mydb")
.ContainerName("orders");
});
});
SQL Server
Uses SQL Server's native CDC feature with polling-based change capture.
services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(sql =>
{
sql.ConnectionString(connectionString);
// Configure tracked tables, polling interval, etc.
});
});
// Or with a connection factory
services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(
sp => () => new SqlConnection(connectionString),
sql =>
{
// Configure CDC options
});
});
SQL Server CDC tracks row-level changes via change tables. The processor polls cdc.fn_cdc_get_all_changes_* functions for inserts, updates, and deletes.
PostgreSQL
Uses PostgreSQL logical replication for real-time change streaming.
services.AddCdcProcessor(cdc =>
{
cdc.UsePostgres(pg =>
{
pg.ConnectionString(connectionString)
.ReplicationSlotName("my_slot")
.PublicationName("my_pub");
});
});
PostgreSQL CDC uses logical replication slots and publications. Changes are streamed via the pgoutput plugin in real time.
MongoDB
Uses MongoDB Change Streams for real-time change notification.
dotnet add package Excalibur.Cdc.MongoDB
services.AddCdcProcessor(cdc =>
{
cdc.UseMongoDB(mongo =>
{
mongo.ConnectionString(connectionString)
.DatabaseName("MyApp")
.CollectionNames("orders", "customers")
.ProcessorId("order-processor")
.BatchSize(100)
.ReconnectInterval(TimeSpan.FromSeconds(5));
})
.TrackTable("orders", t => t.MapAll<OrderChangedEvent>())
.EnableBackgroundProcessing();
});
// With separate state store (different MongoDB cluster)
services.AddCdcProcessor(cdc =>
{
cdc.UseMongoDB(mongo =>
{
mongo.ConnectionString(connectionString)
.DatabaseName("MyApp")
.WithStateStore("mongodb://state-cluster:27017", state =>
{
state.SchemaName("cdc") // Maps to DatabaseName
.TableName("checkpoints"); // Maps to CollectionName
});
})
.TrackTable("orders", t => t.MapAll<OrderChangedEvent>())
.EnableBackgroundProcessing();
});
MongoDB Change Streams use the oplog to push change events. The processor receives insert, update, replace, and delete notifications in real time.
Azure Cosmos DB
Uses the Cosmos DB Change Feed for continuous change processing.
dotnet add package Excalibur.Cdc.CosmosDb
services.AddCdcProcessor(cdc =>
{
cdc.UseCosmosDb(cosmos =>
{
cosmos.ConnectionString(connectionString)
.DatabaseName("MyApp")
.ContainerName("orders")
.ProcessorName("order-processor");
})
.TrackTable("orders", t => t.MapAll<OrderChangedEvent>())
.EnableBackgroundProcessing();
});
The Cosmos DB Change Feed provides ordered change notifications per logical partition. WithStateStore follows Microsoft's ChangeFeedProcessorBuilder.WithLeaseContainer() pattern — the state store (lease container) can be in a different database or account.
Amazon DynamoDB
Uses DynamoDB Streams for change capture.
dotnet add package Excalibur.Cdc.DynamoDb
services.AddCdcProcessor(cdc =>
{
cdc.UseDynamoDb(dynamo =>
{
dynamo.TableName("Orders")
.ProcessorName("order-processor")
.MaxBatchSize(100)
.PollInterval(TimeSpan.FromSeconds(5));
})
.TrackTable("Orders", t => t.MapAll<OrderChangedEvent>())
.EnableBackgroundProcessing();
});
// With separate state store (different AWS region/account)
services.AddCdcProcessor(cdc =>
{
cdc.UseDynamoDb(dynamo =>
{
dynamo.TableName("Orders")
.WithStateStore(
sp => new AmazonDynamoDBClient(stateRegionEndpoint),
state =>
{
state.TableName("cdc-checkpoints"); // Maps to DynamoDB table name
});
})
.TrackTable("Orders", t => t.MapAll<OrderChangedEvent>())
.EnableBackgroundProcessing();
});
DynamoDB uses AWS SDK credential resolution (environment variables, IAM roles, profiles) instead of connection strings. WithStateStore accepts only factory overloads (Func<IServiceProvider, IAmazonDynamoDB>), not connection strings.
DynamoDB Streams captures item-level changes. The processor reads from the stream and checkpoints progress to a separate DynamoDB table.
Google Firestore
Uses Firestore real-time listeners for change detection.
dotnet add package Excalibur.Cdc.Firestore
services.AddCdcProcessor(cdc =>
{
cdc.UseFirestore(firestore =>
{
firestore.CollectionPath("orders")
.ProcessorName("order-processor")
.MaxBatchSize(100)
.PollInterval(TimeSpan.FromSeconds(5));
})
.TrackTable("orders", t => t.MapAll<OrderChangedEvent>())
.EnableBackgroundProcessing();
});
// With separate state store (different GCP project)
services.AddCdcProcessor(cdc =>
{
cdc.UseFirestore(firestore =>
{
firestore.CollectionPath("orders")
.WithStateStore("state-project-id", state =>
{
state.TableName("cdc-checkpoints"); // Maps to Firestore collection name
});
})
.TrackTable("orders", t => t.MapAll<OrderChangedEvent>())
.EnableBackgroundProcessing();
});
Firestore uses snapshot listeners on collection references. Changes are pushed in real time with document-level granularity. WithStateStore accepts a GCP project ID (creates a separate FirestoreDb) or a factory (Func<IServiceProvider, FirestoreDb>).
In-Memory (Testing)
services.AddCdcProcessor(cdc =>
{
cdc.UseInMemory(mem =>
{
// Configure in-memory CDC for testing
});
});
Background Processing
Call EnableBackgroundProcessing() on the CDC builder to register a CdcProcessingHostedService that polls for changes automatically. This works with any provider that registers an ICdcBackgroundProcessor implementation.
SQL Server / PostgreSQL:
services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(sql => sql.ConnectionString(connectionString))
.TrackTable("dbo.Orders", t => t.MapAll<OrderChangedEvent>())
.EnableBackgroundProcessing();
});
// PostgreSQL follows the same pattern
services.AddCdcProcessor(cdc =>
{
cdc.UsePostgres(pg => pg.ConnectionString(connectionString))
.TrackTable("public.orders", t => t.MapAll<OrderChangedEvent>())
.EnableBackgroundProcessing();
});
Cloud-native providers (MongoDB, Cosmos DB, DynamoDB, Firestore) use their own change stream / change feed mechanisms and manage their own background processing lifecycle.
Provider Comparison
| Provider | Mechanism | Latency | State Store |
|---|---|---|---|
| SQL Server | Polling (change tables) | Seconds | Built-in |
| PostgreSQL | Logical replication | Real-time | Built-in |
| MongoDB | Change Streams | Real-time | MongoDB / In-memory |
| Cosmos DB | Change Feed | Near real-time | Cosmos DB / In-memory |
| DynamoDB | DynamoDB Streams | Near real-time | DynamoDB / In-memory |
| Firestore | Snapshot listeners | Real-time | Firestore / In-memory |
Limitations
| Limitation | Provider | Workaround |
|---|---|---|
| Enterprise/Developer edition required | SQL Server | Use PostgreSQL or cloud-native providers |
| Schema changes require capture instance recreation | SQL Server | Plan schema migrations carefully |
| Replica set required for Change Streams | MongoDB | Use replica set or Atlas |
| Lease container required | Cosmos DB | Provision dedicated lease container |
| Large tables | All | Consider partitioning or incremental backfill |
Next Steps
- Outbox Pattern - Reliable message publishing
- Inbox Pattern - Idempotent processing
- Event Sourcing - Event-based architecture
See Also
- Projections -- Build read models from CDC change events or event-sourced streams
- Outbox Pattern -- Pair CDC with transactional outbox for reliable change event publishing
- SQL Server Data Provider -- SQL Server connection and configuration for CDC state stores
- CDC Troubleshooting -- Diagnose and resolve common CDC processing issues