Skip to main content

Change Data Capture (CDC)

CDC captures row-level changes from your database and publishes them as events, enabling real-time data synchronization without modifying application code.

Before You Start

  • .NET 8.0+ (or .NET 9/10 for latest features)
  • Install the required packages:
    dotnet add package Excalibur.Data.SqlServer  # or Excalibur.Data.Postgres
  • SQL Server CDC must be enabled on the database and target tables
  • Familiarity with event sourcing concepts and outbox pattern

Overview

flowchart LR
subgraph Database
T[Table] --> CT[Change Tracking]
CT --> CDC[CDC Tables]
end

subgraph Excalibur
CDC --> P[CDC Processor]
P --> H[IDataChangeHandler]
end

subgraph Consumers
H --> S1[Search Index]
H --> S2[Cache]
H --> S3[Analytics]
end

Quick Start

Enable CDC on Database

-- Enable CDC on database
EXEC sys.sp_cdc_enable_db;

-- Enable CDC on table
EXEC sys.sp_cdc_enable_table
@source_schema = 'dbo',
@source_name = 'Orders',
@role_name = NULL,
@supports_net_changes = 1;

Configure CDC Processor

using Excalibur.Cdc;

// Register CDC processor with fluent builder
services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(connectionString, sql =>
{
sql.SchemaName("Cdc")
.StateTableName("CdcProcessingState")
.PollingInterval(TimeSpan.FromSeconds(5))
.BatchSize(100);
})
.TrackTable("dbo.Orders", table =>
{
table.MapInsert<OrderCreatedEvent>()
.MapUpdate<OrderUpdatedEvent>()
.MapDelete<OrderDeletedEvent>();
})
.EnableBackgroundProcessing();
});

Table Tracking with Event Mapping

The recommended approach is to use the fluent builder API to configure table tracking and event mapping:

Basic Table Tracking

services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(connectionString)
.TrackTable("dbo.Orders", table =>
{
// Map all change types to a single event
table.MapAll<OrderChangedEvent>();
})
.EnableBackgroundProcessing();
});

Separate Events per Change Type

services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(connectionString)
.TrackTable("dbo.Orders", table =>
{
table.MapInsert<OrderCreatedEvent>()
.MapUpdate<OrderUpdatedEvent>()
.MapDelete<OrderDeletedEvent>();
})
.TrackTable("dbo.Customers", table =>
{
table.MapAll<CustomerChangedEvent>();
})
.EnableBackgroundProcessing();
});

Entity-Inferred Table Names

services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(connectionString)
.TrackTable<Order>(table => table.MapAll<OrderChangedEvent>())
.TrackTable<Customer>(table => table.MapAll<CustomerChangedEvent>())
.EnableBackgroundProcessing();
});

Low-Level Handler Interface

For advanced scenarios requiring custom processing logic, implement IDataChangeHandler:

using Excalibur.Data.SqlServer.Cdc;

public class OrderCdcHandler : IDataChangeHandler
{
private readonly ISearchIndex _searchIndex;
private readonly ICache _cache;

public OrderCdcHandler(ISearchIndex searchIndex, ICache cache)
{
_searchIndex = searchIndex;
_cache = cache;
}

// Specify which tables this handler processes
public string[] TableNames => ["dbo.Orders"];

public async Task HandleAsync(DataChangeEvent changeEvent, CancellationToken cancellationToken)
{
switch (changeEvent.ChangeType)
{
case DataChangeType.Insert:
await HandleInsertAsync(changeEvent, cancellationToken);
break;
case DataChangeType.Update:
await HandleUpdateAsync(changeEvent, cancellationToken);
break;
case DataChangeType.Delete:
await HandleDeleteAsync(changeEvent, cancellationToken);
break;
}
}

private async Task HandleInsertAsync(DataChangeEvent changeEvent, CancellationToken ct)
{
// Use built-in extension methods on DataChangeEvent
var orderId = changeEvent.GetNewValue<Guid>("OrderId");
var customerId = changeEvent.GetNewValue<string>("CustomerId");
var totalAmount = changeEvent.GetNewValue<decimal>("TotalAmount");

await _searchIndex.IndexAsync(new OrderDocument
{
Id = orderId,
CustomerId = customerId,
Amount = totalAmount
}, ct);

await _cache.SetAsync($"order:{orderId}", changeEvent, ct);
}

private async Task HandleUpdateAsync(DataChangeEvent changeEvent, CancellationToken ct)
{
var orderId = changeEvent.GetNewValue<Guid>("OrderId");
await _searchIndex.UpdateAsync(orderId, changeEvent, ct);
await _cache.InvalidateAsync($"order:{orderId}", ct);
}

private async Task HandleDeleteAsync(DataChangeEvent changeEvent, CancellationToken ct)
{
// For deletes, use GetOldValue since NewValue is null
var orderId = changeEvent.GetOldValue<Guid>("OrderId");
await _searchIndex.DeleteAsync(orderId, ct);
await _cache.InvalidateAsync($"order:{orderId}", ct);
}
}

## DataChangeEvent Structure

The `DataChangeEvent` class provides complete information about each database change:

```csharp
public class DataChangeEvent
{
// Log sequence number for ordering
public byte[] Lsn { get; init; }

// Sequence value within the transaction
public byte[] SeqVal { get; init; }

// When the transaction was committed
public DateTime CommitTime { get; init; }

// The table that changed
public string TableName { get; init; }

// Insert, Update, or Delete
public DataChangeType ChangeType { get; init; }

// Column-level changes
public IList<DataChange> Changes { get; init; }
}

public class DataChange
{
public string ColumnName { get; init; }
public object? OldValue { get; init; } // null for inserts
public object? NewValue { get; init; } // null for deletes
public Type? DataType { get; init; }
}

public enum DataChangeType
{
Unknown = 0,
Insert = 1,
Update = 2,
Delete = 3
}
Built-in Extension Methods

The framework provides extension methods for extracting typed values from DataChangeEvent:

// Get the new value (for inserts and updates)
var customerId = changeEvent.GetNewValue<string>("CustomerId");

// Get the old value (for updates and deletes)
var previousStatus = changeEvent.GetOldValue<string>("Status");

// With default value if column not found
var amount = changeEvent.GetNewValue<decimal>("Amount", defaultValue: 0m);

These methods handle type conversion and nullable types automatically.

Working with Changes

public async Task HandleAsync(DataChangeEvent changeEvent, CancellationToken cancellationToken)
{
// Check if a specific column changed
var statusChange = changeEvent.Changes
.FirstOrDefault(c => c.ColumnName == "Status");

if (statusChange is not null &&
changeEvent.ChangeType == DataChangeType.Update &&
!Equals(statusChange.OldValue, statusChange.NewValue))
{
await PublishStatusChangedEvent(
changeEvent.TableName,
statusChange.OldValue?.ToString(),
statusChange.NewValue?.ToString(),
cancellationToken);
}

// Get all column values as a dictionary
var newValues = changeEvent.Changes
.Where(c => c.NewValue is not null)
.ToDictionary(c => c.ColumnName, c => c.NewValue);

var oldValues = changeEvent.Changes
.Where(c => c.OldValue is not null)
.ToDictionary(c => c.ColumnName, c => c.OldValue);
}

Anti-Corruption Pattern Example

User-Implemented Pattern

This section shows a recommended pattern for implementing an anti-corruption layer using IDataChangeHandler. This is not a built-in framework component — you implement this yourself using the CDC handler infrastructure.

Protect downstream systems from database schema changes by transforming raw CDC events into domain events:

public class OrderAntiCorruptionHandler : IDataChangeHandler
{
private readonly IDispatcher _dispatcher;

public OrderAntiCorruptionHandler(IDispatcher dispatcher)
{
_dispatcher = dispatcher;
}

public string[] TableNames => ["dbo.Orders"];

public async Task HandleAsync(DataChangeEvent change, CancellationToken ct)
{
var domainEvent = change.ChangeType switch
{
DataChangeType.Insert => CreateOrderCreatedEvent(change),
DataChangeType.Update => CreateOrderUpdatedEvent(change),
DataChangeType.Delete => CreateOrderDeletedEvent(change),
_ => null
};

if (domainEvent is not null)
{
await _dispatcher.DispatchAsync(domainEvent, ct);
}
}

private OrderCreatedEvent CreateOrderCreatedEvent(DataChangeEvent change)
{
// Use built-in extension methods on DataChangeEvent
return new OrderCreatedEvent(
OrderId: change.GetNewValue<Guid>("order_id"),
CustomerId: change.GetNewValue<string>("customer_id"),
TotalAmount: change.GetNewValue<decimal>("total_amt"),
Currency: MapCurrency(change.GetNewValue<int>("currency_code")),
CreatedAt: change.GetNewValue<DateTime>("created_date")
);
}

private OrderUpdatedEvent? CreateOrderUpdatedEvent(DataChangeEvent change)
{
var statusChange = change.Changes.FirstOrDefault(c => c.ColumnName == "status");
if (statusChange is null || Equals(statusChange.OldValue, statusChange.NewValue))
return null;

return new OrderUpdatedEvent(
OrderId: change.GetNewValue<Guid>("order_id"),
OldStatus: statusChange.OldValue?.ToString() ?? "",
NewStatus: statusChange.NewValue?.ToString() ?? ""
);
}

private OrderDeletedEvent CreateOrderDeletedEvent(DataChangeEvent change)
{
// For deletes, use GetOldValue since NewValue is null
return new OrderDeletedEvent(
OrderId: change.GetOldValue<Guid>("order_id")
);
}

private static string MapCurrency(int code) => code switch
{
1 => "USD",
2 => "EUR",
3 => "GBP",
_ => "USD"
};
}

Checkpointing

State Store Configuration

Configure the CDC state store via the fluent builder:

services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(connectionString, sql =>
{
sql.SchemaName("Cdc")
.StateTableName("CdcProcessingState");
})
.TrackTable("dbo.Orders", t => t.MapAll<OrderChangedEvent>())
.EnableBackgroundProcessing();
});

ICdcStateStore Interface

The framework provides ICdcStateStore for checkpoint management. The state store tracks processing positions per database and capture instance:

public class CdcManager
{
private readonly ICdcStateStore _stateStore;

public CdcManager(ICdcStateStore stateStore)
{
_stateStore = stateStore;
}

public async Task<IEnumerable<CdcProcessingState>> GetPositionsAsync(
string connectionId,
string databaseName,
CancellationToken ct)
{
return await _stateStore.GetLastProcessedPositionAsync(
connectionId,
databaseName,
ct);
}

public async Task UpdatePositionAsync(
string connectionId,
string databaseName,
string tableName,
byte[] lsn,
byte[]? seqVal,
DateTime? commitTime,
CancellationToken ct)
{
await _stateStore.UpdateLastProcessedPositionAsync(
connectionId,
databaseName,
tableName,
lsn,
seqVal,
commitTime,
ct);
}
}

Error Handling

Handler-Level Error Handling

Handle errors within your IDataChangeHandler implementation:

public class ResilientOrderCdcHandler : IDataChangeHandler
{
private readonly ILogger<ResilientOrderCdcHandler> _logger;
private readonly IDeadLetterQueue _deadLetter;

public string[] TableNames => ["dbo.Orders"];

public async Task HandleAsync(DataChangeEvent changeEvent, CancellationToken ct)
{
try
{
await ProcessChangeAsync(changeEvent, ct);
}
catch (Exception ex)
{
_logger.LogError(ex, "Failed to process CDC event for {Table} at LSN {Lsn}",
changeEvent.TableName,
BitConverter.ToString(changeEvent.Lsn));

// Send to dead letter for manual investigation
await _deadLetter.EnqueueAsync(
changeEvent,
DeadLetterReason.UnhandledException,
ex,
new Dictionary<string, string>
{
["TableName"] = changeEvent.TableName,
["Lsn"] = BitConverter.ToString(changeEvent.Lsn),
["ChangeType"] = changeEvent.ChangeType.ToString()
},
ct);

// Optionally re-throw to stop processing
// throw;
}
}

private Task ProcessChangeAsync(DataChangeEvent changeEvent, CancellationToken ct)
{
// Processing logic
return Task.CompletedTask;
}
}

Stale Position Recovery

Handle stale LSN positions when CDC retention expires using the fluent builder:

using Excalibur.Cdc;

services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(connectionString)
.TrackTable("dbo.Orders", t => t.MapAll<OrderChangedEvent>())
.WithRecovery(recovery =>
{
// Choose a recovery strategy
recovery.Strategy(StalePositionRecoveryStrategy.FallbackToEarliest)
.MaxAttempts(5)
.AttemptDelay(TimeSpan.FromSeconds(30));
})
.EnableBackgroundProcessing();
});

Recovery Strategies

StrategyDescription
ThrowThrow an exception (default, requires manual intervention)
FallbackToEarliestReset to earliest available position (may reprocess events)
FallbackToLatestSkip to latest position (may lose unprocessed events)
InvokeCallbackCall custom handler for advanced recovery logic

Custom Recovery Callback

For complex recovery scenarios, use InvokeCallback with a custom handler:

services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(connectionString)
.TrackTable("dbo.Orders", t => t.MapAll<OrderChangedEvent>())
.WithRecovery(recovery =>
{
recovery.Strategy(StalePositionRecoveryStrategy.InvokeCallback)
.OnPositionReset(async (args, ct) =>
{
_logger.LogWarning(
"Stale CDC position for {CaptureInstance}. Reason: {Reason}",
args.CaptureInstance,
args.ReasonCode);

// Handle based on reason code
// StalePositionReasonCodes: CdcCleanup, BackupRestore,
// CdcReenabled, LsnOutOfRange, CaptureInstanceDropped, Unknown
});
})
.EnableBackgroundProcessing();
});

Monitoring

Health Checks

services.AddHealthChecks()
.AddCheck<CdcHealthCheck>("cdc");

public class CdcHealthCheck : IHealthCheck
{
private readonly ICdcStateStore _stateStore;
private readonly string _connectionId;
private readonly string _databaseName;
private readonly TimeSpan _maxLag = TimeSpan.FromMinutes(5);

public CdcHealthCheck(ICdcStateStore stateStore)
{
_stateStore = stateStore;
_connectionId = "default"; // Configure based on your setup
_databaseName = "MyDatabase";
}

public async Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken ct)
{
var positions = await _stateStore.GetLastProcessedPositionAsync(
_connectionId,
_databaseName,
ct);

foreach (var position in positions)
{
if (position.LastCommitTime.HasValue)
{
var lag = DateTime.UtcNow - position.LastCommitTime.Value;
if (lag > _maxLag)
{
return HealthCheckResult.Degraded(
$"CDC lag for {position.CaptureInstance}: {lag}");
}
}
}

return HealthCheckResult.Healthy();
}
}

Metrics

services.AddOpenTelemetry()
.WithMetrics(metrics =>
{
metrics.AddMeter("Excalibur.Data.Cdc");
// Emits:
// - excalibur.cdc.events.processed
// - excalibur.cdc.events.failed
// - excalibur.cdc.batch.duration
// - excalibur.cdc.batch.size
});

Database Maintenance

Retention Configuration

-- Set CDC retention (default 3 days)
EXEC sys.sp_cdc_change_job
@job_type = 'cleanup',
@retention = 4320; -- minutes (3 days)

Monitor CDC

-- Check CDC tables
SELECT *
FROM sys.tables
WHERE is_tracked_by_cdc = 1;

-- Check capture instances
SELECT *
FROM cdc.change_tables;

-- Check CDC job status
EXEC sys.sp_cdc_help_jobs;

Running CDC Processing

CDC requires an active processing loop. Choose the approach that fits your hosting model:

Call EnableBackgroundProcessing() on the CDC builder to register a CdcProcessingHostedService that polls for changes automatically:

services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(connectionString)
.TrackTable("dbo.Orders", t => t.MapAll<OrderChangedEvent>())
.EnableBackgroundProcessing();
});

The hosted service:

  • Polls for CDC changes at a configurable interval (default 5 seconds)
  • Catches and logs exceptions without crashing the host
  • Supports graceful drain on shutdown (default 30-second timeout)
  • Reports structured log events via LoggerMessage source generation

Configuration

Configure processing behavior with CdcProcessingOptions:

services.Configure<CdcProcessingOptions>(options =>
{
options.PollingInterval = TimeSpan.FromSeconds(10); // Default: 5 seconds
options.Enabled = true; // Default: true
options.DrainTimeoutSeconds = 60; // Default: 30
});
OptionDefaultDescription
PollingInterval5 secondsInterval between processing cycles
EnabledtrueSet to false to disable without removing registration
DrainTimeoutSeconds30Seconds to wait for in-flight processing on shutdown

You can also configure processing inline via the SQL Server builder:

services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(connectionString, sql =>
{
sql.PollingInterval(TimeSpan.FromSeconds(10))
.BatchSize(200);
})
.TrackTable("dbo.Orders", t => t.MapAll<OrderChangedEvent>())
.EnableBackgroundProcessing();
});

Option 2: Quartz Job

Use CdcJob from the Excalibur.Jobs package for cron-scheduled CDC processing:

// Install: dotnet add package Excalibur.Jobs

services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(connectionString)
.TrackTable("dbo.Orders", t => t.MapAll<OrderChangedEvent>());
// Don't call EnableBackgroundProcessing() — Quartz handles scheduling
});

// Schedule via Quartz.NET — CdcJob supports:
// - Multiple database configurations
// - Built-in health checks
// - Cron-based scheduling

Option 3: Manual/Serverless

For serverless environments, omit EnableBackgroundProcessing() and call ICdcProcessor directly from an Azure Function, AWS Lambda, or other trigger:

services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(connectionString)
.TrackTable("dbo.Orders", t => t.MapAll<OrderChangedEvent>());
// Don't call EnableBackgroundProcessing() — you'll trigger manually
});

public class CdcProcessorFunction
{
private readonly ICdcProcessor _processor;
private readonly IDispatcher _dispatcher;

public CdcProcessorFunction(ICdcProcessor processor, IDispatcher dispatcher)
{
_processor = processor;
_dispatcher = dispatcher;
}

[Function("ProcessCdc")]
public async Task Run([TimerTrigger("*/10 * * * * *")] TimerInfo timer)
{
// ICdcProcessor.ProcessCdcChangesAsync requires an event handler delegate
var processedCount = await _processor.ProcessCdcChangesAsync(
async (changeEvent, ct) =>
{
// Handle each change event - dispatch to handlers, update projections, etc.
await _dispatcher.DispatchAsync(
new DataChangeNotification(changeEvent), ct);
},
CancellationToken.None);
}
}

Connection Management

CDC services are registered as singletons. To avoid holding long-lived database connections open, CDC uses a connection factory pattern (Func<SqlConnection>) instead of injecting raw connection objects. Each operation creates a fresh connection from the factory, allowing ADO.NET connection pooling to manage the lifecycle.

Connection String Approach (Simple)

When you pass a connection string directly, the framework creates a factory internally:

services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(connectionString)
.TrackTable("dbo.Orders", t => t.MapAll<OrderChangedEvent>())
.EnableBackgroundProcessing();
});

Use the factory overload when you need custom connection management, DI-resolved connection strings, or managed identity authentication:

// Factory with DI access
services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(
sp => () => new SqlConnection(
sp.GetRequiredService<IConfiguration>().GetConnectionString("Cdc")),
sql =>
{
sql.SchemaName("cdc")
.BatchSize(200);
})
.TrackTable("dbo.Orders", t => t.MapAll<OrderChangedEvent>())
.EnableBackgroundProcessing();
});

The factory signature is Func<IServiceProvider, Func<SqlConnection>>:

  • The outer function receives IServiceProvider for resolving DI services
  • The inner function creates a new SqlConnection each time it is called

This same pattern applies to PostgreSQL:

services.AddCdcProcessor(cdc =>
{
cdc.UsePostgres(
sp => () => new NpgsqlConnection(
sp.GetRequiredService<IConfiguration>().GetConnectionString("Cdc")),
pg =>
{
pg.PublicationName("my_publication");
})
.TrackTable("public.orders", t => t.MapAll<OrderChangedEvent>())
.EnableBackgroundProcessing();
});

Why Connection Factories Matter

ConcernDirect ConnectionConnection Factory
Connection poolingBypassed (single long-lived connection)Properly leveraged (short-lived connections)
Connection recoveryRequires manual reconnection logicFresh connection per operation
DI integrationConnection string hardcoded at registrationResolved from IServiceProvider at runtime
Managed identityDifficult (token refresh on held connection)Natural (fresh connection with current token)
Avoid holding connections in singletons

CDC processors, outbox processors, and inbox stores are all registered as singletons. Never inject a raw SqlConnection or IDbConnection into these services. Always use the factory pattern to create connections on demand.

Best Practices

PracticeRecommendation
Table configurationUse TrackTable() fluent builder with MapInsert/Update/Delete<T>()
Event mappingUse MapAll<T>() for simple scenarios, separate events for fine-grained control
Error handlingImplement dead letter queue in IDataChangeHandler implementations
Connection managementUse Func<SqlConnection> factory overload for production (see Connection Management)
CheckpointingConfigure state store schema via UseSqlServer(sql => sql.SchemaName(...))
Anti-corruptionTransform database columns to domain events using mapping functions
RecoveryConfigure WithRecovery() with FallbackToEarliest for idempotent handlers
HostingUse EnableBackgroundProcessing() for most cases, Quartz job for cron schedules

Providers

Excalibur CDC uses ICdcBuilder as its core abstraction with provider-specific builders for each database. Some providers use the builder pattern (AddCdcProcessor + UseSqlServer/UsePostgres), while cloud-native providers register directly via IServiceCollection.

Core Registration

using Microsoft.Extensions.DependencyInjection;

// Builder pattern (SQL providers)
services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(connectionString, sql =>
{
// Configure SQL Server CDC
});
});

// Direct registration (cloud-native providers)
services.AddCosmosDbCdc(options =>
{
// Configure Cosmos DB change feed
});

SQL Server

Uses SQL Server's native CDC feature with polling-based change capture.

services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(connectionString, sql =>
{
// Configure tracked tables, polling interval, etc.
});
});

// Or with a connection factory
services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(
sp => () => new SqlConnection(connectionString),
sql =>
{
// Configure CDC options
});
});

SQL Server CDC tracks row-level changes via change tables. The processor polls cdc.fn_cdc_get_all_changes_* functions for inserts, updates, and deletes.

PostgreSQL

Uses PostgreSQL logical replication for real-time change streaming.

services.AddCdcProcessor(cdc =>
{
cdc.UsePostgres(connectionString, pg =>
{
// Configure publication, replication slot, etc.
});
});

// Or with a connection factory
services.AddCdcProcessor(cdc =>
{
cdc.UsePostgres(
sp => () => new NpgsqlConnection(connectionString),
pg =>
{
// Configure CDC options
});
});

PostgreSQL CDC uses logical replication slots and publications. Changes are streamed via the pgoutput plugin in real time.

MongoDB

Uses MongoDB Change Streams for real-time change notification.

dotnet add package Excalibur.Data.MongoDB
// With options callback
services.AddMongoDbCdc(options =>
{
options.ConnectionString = "mongodb://localhost:27017";
options.DatabaseName = "MyApp";
});

// With connection string and processor ID
services.AddMongoDbCdc(
connectionString: "mongodb://localhost:27017",
processorId: "order-processor",
configure: options =>
{
options.DatabaseName = "MyApp";
});

State Store:

// Persistent state store
services.AddMongoDbCdcStateStore(
connectionString: "mongodb://localhost:27017",
databaseName: "cdc",
collectionName: "state");

// In-memory state store (development)
services.AddInMemoryMongoDbCdcStateStore();

MongoDB Change Streams use the oplog to push change events. The processor receives insert, update, replace, and delete notifications in real time.

Azure Cosmos DB

Uses the Cosmos DB Change Feed for continuous change processing.

dotnet add package Excalibur.Data.CosmosDb
// With options callback
services.AddCosmosDbCdc(options =>
{
options.ConnectionString = "AccountEndpoint=...;AccountKey=...";
options.DatabaseName = "MyApp";
options.ContainerName = "orders";
options.LeaseContainerName = "leases";
});

// From configuration
services.AddCosmosDbCdc(configuration);
services.AddCosmosDbCdc(configuration, sectionName: "CosmosDbCdc");

State Store:

// Cosmos DB state store
services.AddCosmosDbCdcStateStore(options =>
{
options.ConnectionString = "AccountEndpoint=...;AccountKey=...";
});

// From configuration
services.AddCosmosDbCdcStateStore(configuration);

// In-memory state store (development)
services.AddInMemoryCosmosDbCdcStateStore();

The Cosmos DB Change Feed provides ordered change notifications per logical partition. The lease container coordinates multiple processors for scale-out.

Amazon DynamoDB

Uses DynamoDB Streams for change capture.

dotnet add package Excalibur.Data.DynamoDb
services.AddDynamoDbCdc(options =>
{
options.TableName = "Orders";
options.StreamViewType = DynamoDbStreamViewType.NewAndOldImages;
});

Available DynamoDbStreamViewType values:

ValueDescription
KeysOnlyOnly the key attributes of the modified item
NewImageThe entire item as it appears after modification
OldImageThe entire item as it appeared before modification
NewAndOldImagesBoth the new and old item images (recommended for CDC)

State Store:

// DynamoDB state store
services.AddDynamoDbCdcStateStore(
tableName: "cdc-state",
configureOptions: options =>
{
// Configure state store options
});

// In-memory state store (development)
services.AddInMemoryDynamoDbCdcStateStore();

DynamoDB Streams captures item-level changes with configurable stream view types (keys only, new image, old image, or both).

Google Firestore

Uses Firestore real-time listeners for change detection.

dotnet add package Excalibur.Data.Firestore
services.AddFirestoreCdc(options =>
{
options.ProjectId = "my-gcp-project";
options.CollectionPath = "orders";
});

State Store:

// Firestore state store
services.AddFirestoreCdcStateStore(
collectionName: "cdc-state",
configureOptions: options =>
{
// Configure state store options
});

// Default state store
services.AddFirestoreCdcStateStore();

// In-memory state store (development)
services.AddInMemoryFirestoreCdcStateStore();

Firestore uses snapshot listeners on collection references. Changes are pushed in real time with document-level granularity.

In-Memory (Testing)

services.AddCdcProcessor(cdc =>
{
cdc.UseInMemory(mem =>
{
// Configure in-memory CDC for testing
});
});

Background Processing

Call EnableBackgroundProcessing() on the CDC builder to register a CdcProcessingHostedService that polls for changes automatically. This works with any provider that registers an ICdcBackgroundProcessor implementation.

SQL Server / PostgreSQL:

services.AddCdcProcessor(cdc =>
{
cdc.UseSqlServer(connectionString)
.TrackTable("dbo.Orders", t => t.MapAll<OrderChangedEvent>())
.EnableBackgroundProcessing();
});

// PostgreSQL follows the same pattern
services.AddCdcProcessor(cdc =>
{
cdc.UsePostgres(connectionString)
.TrackTable("public.orders", t => t.MapAll<OrderChangedEvent>())
.EnableBackgroundProcessing();
});

Cloud-native providers (MongoDB, Cosmos DB, DynamoDB, Firestore) use their own change stream / change feed mechanisms and manage their own background processing lifecycle.

Provider Comparison

ProviderMechanismLatencyState Store
SQL ServerPolling (change tables)SecondsBuilt-in
PostgreSQLLogical replicationReal-timeBuilt-in
MongoDBChange StreamsReal-timeMongoDB / In-memory
Cosmos DBChange FeedNear real-timeCosmos DB / In-memory
DynamoDBDynamoDB StreamsNear real-timeDynamoDB / In-memory
FirestoreSnapshot listenersReal-timeFirestore / In-memory

Limitations

LimitationProviderWorkaround
Enterprise/Developer edition requiredSQL ServerUse PostgreSQL or cloud-native providers
Schema changes require capture instance recreationSQL ServerPlan schema migrations carefully
Replica set required for Change StreamsMongoDBUse replica set or Atlas
Lease container requiredCosmos DBProvision dedicated lease container
Large tablesAllConsider partitioning or incremental backfill

Next Steps

See Also