Event Store Setup
This guide covers configuring event stores and registering aggregate repositories.
Before You Start
- .NET 8.0+ (or .NET 9/10 for latest features)
- Install the required packages:
dotnet add package Excalibur.EventSourcing
dotnet add package Excalibur.EventSourcing.SqlServer # or your provider - Familiarity with event sourcing concepts and dependency injection
Basic Setup
// Configure event sourcing with repositories
services.AddExcaliburEventSourcing(builder =>
{
builder.AddRepository<OrderAggregate, Guid>(id => new OrderAggregate());
builder.UseIntervalSnapshots(100);
});
// Add the event store provider
services.AddSqlServerEventStore(connectionString);
Event Store Providers
SQL Server
dotnet add package Excalibur.EventSourcing.SqlServer
// Option 1: Simple registration with connection string
services.AddSqlServerEventStore(connectionString);
// Option 2: Full event sourcing setup with all stores
services.AddSqlServerEventSourcing(options =>
{
options.ConnectionString = connectionString;
options.RegisterHealthChecks = true;
});
// This registers:
// - IEventStore (SqlServerEventStore)
// - ISnapshotStore (SqlServerSnapshotStore)
// - IEventSourcedOutboxStore (SqlServerEventSourcedOutboxStore)
Postgres
dotnet add package Excalibur.Data.Postgres
// With connection string
services.AddPostgresEventStore(connectionString);
// Or with options
services.AddPostgresEventStore(connectionString, options =>
{
options.SchemaName = "events";
});
In-Memory (Testing)
dotnet add package Excalibur.EventSourcing.InMemory
// In-memory event store for tests only
services.AddInMemoryEventStore();
Repository Registration
Basic Registration
Register repositories for your aggregates:
services.AddExcaliburEventSourcing(builder =>
{
builder.AddRepository<OrderAggregate, Guid>();
builder.AddRepository<CustomerAggregate, Guid>();
builder.AddRepository<InventoryAggregate, string>();
});
Custom Factory
When your aggregate requires custom construction:
services.AddExcaliburEventSourcing(builder =>
{
builder.AddRepository<OrderAggregate, Guid>(
key => new OrderAggregate(key, tenantId));
});
String-Keyed Aggregates
For aggregates using string identifiers:
services.AddExcaliburEventSourcing(builder =>
{
builder.AddRepository<LegacyOrderAggregate>(
key => new LegacyOrderAggregate(key));
});
Event Serialization
Default (System.Text.Json)
Events are serialized using the configured serializer:
// Default JSON serialization
services.AddJsonSerialization();
// Or with options
services.AddJsonSerialization(options =>
{
options.SerializerOptions.PropertyNamingPolicy = JsonNamingPolicy.CamelCase;
});
Custom Serializer
services.AddExcaliburEventSourcing(builder =>
{
builder.UseEventSerializer<MessagePackEventSerializer>();
});
Upcasting (Event Versioning)
Handle breaking changes in event schemas using message upcasters:
services.AddExcaliburEventSourcing(builder =>
{
builder.AddUpcastingPipeline(upcasting =>
{
// Register individual upcaster
upcasting.RegisterUpcaster<OrderCreatedV1, OrderCreated>(
new OrderCreatedV1ToV2Upcaster());
// Or scan assembly for all upcasters
upcasting.ScanAssembly(typeof(Program).Assembly);
// Enable auto-upcasting during replay
upcasting.EnableAutoUpcastOnReplay();
});
});
// Define upcaster
public class OrderCreatedV1ToV2Upcaster : IMessageUpcaster<OrderCreatedV1, OrderCreated>
{
public OrderCreated Upcast(OrderCreatedV1 source)
{
return new OrderCreated(source.AggregateId, source.Version)
{
OrderId = source.OrderId,
CustomerId = source.CustomerName, // Map renamed field
CreatedAt = source.Timestamp
};
}
}
Database Schema
SQL Server Schema
The SQL Server provider creates these tables:
-- Events table
CREATE TABLE [EventSourcing].[Events] (
[SequenceNumber] BIGINT IDENTITY(1,1) PRIMARY KEY,
[StreamId] NVARCHAR(256) NOT NULL,
[Version] INT NOT NULL,
[EventType] NVARCHAR(512) NOT NULL,
[Data] NVARCHAR(MAX) NOT NULL,
[Metadata] NVARCHAR(MAX) NULL,
[Timestamp] DATETIMEOFFSET NOT NULL,
CONSTRAINT [UQ_Events_StreamVersion] UNIQUE ([StreamId], [Version])
);
-- Snapshots table
CREATE TABLE [EventSourcing].[Snapshots] (
[StreamId] NVARCHAR(256) PRIMARY KEY,
[Version] INT NOT NULL,
[Data] NVARCHAR(MAX) NOT NULL,
[Timestamp] DATETIMEOFFSET NOT NULL
);
Migrations
Configuration Options
| Option | Default | Description |
|---|---|---|
ConnectionString | null | SQL Server connection string (required unless using factory) |
EventStoreSchema | dbo | Database schema for the events table |
EventStoreTable | Events | Name of events table |
SnapshotStoreSchema | dbo | Database schema for the snapshots table |
SnapshotStoreTable | Snapshots | Name of snapshots table |
OutboxSchema | dbo | Database schema for the outbox table |
OutboxTable | EventSourcedOutbox | Name of outbox table |
RegisterHealthChecks | true | Whether to register health checks |
Multiple Event Stores
For multi-tenant or sharded scenarios:
// Register named event stores using keyed services
services.AddKeyedSingleton<IEventStore>("tenant-a",
(sp, _) => new SqlServerEventStore(
tenantAConnection,
sp.GetRequiredService<ILogger<SqlServerEventStore>>()));
services.AddKeyedSingleton<IEventStore>("tenant-b",
(sp, _) => new SqlServerEventStore(
tenantBConnection,
sp.GetRequiredService<ILogger<SqlServerEventStore>>()));
// Resolve by tenant
var eventStore = services.GetRequiredKeyedService<IEventStore>(tenantId);
Observability
Enable OpenTelemetry tracing:
services.AddOpenTelemetry()
.WithTracing(tracing =>
{
tracing.AddEventSourcingInstrumentation();
});
This adds spans for:
EventStore.AppendEventStore.LoadSnapshot.SaveSnapshot.LoadRepository.SaveRepository.Load
Best Practices
| Practice | Reason |
|---|---|
| Use strongly-typed IDs | Type safety, prevents mixing aggregate types |
| Configure snapshots | Prevents unbounded event replay |
| Enable migrations | Automatic schema updates |
| Add health checks | Monitor event store availability |
| Use connection pooling | Performance in high-throughput scenarios |
Troubleshooting
"Stream not found"
The aggregate doesn't exist. This is normal for new aggregates — create via factory method.
"Concurrency conflict"
Another process modified the aggregate. Reload and retry:
try
{
await repository.SaveAsync(aggregate, ct);
}
catch (ConcurrencyException)
{
// Reload and retry
var fresh = await repository.GetByIdAsync(aggregate.Id, ct);
// Re-apply changes
await repository.SaveAsync(fresh, ct);
}
Slow event replay
Configure snapshots to limit replay:
es.UseIntervalSnapshots(100); // Snapshot every 100 events
See Also
- Event Store — Core event store concepts and API reference
- Event Sourcing Overview — Introduction to event sourcing patterns in Excalibur
- Snapshot Setup — Configure snapshot strategies to optimize event replay performance
- Aggregates — Aggregate root design and event application patterns