Event-Sourced Repositories
Repositories provide a high-level API for loading and saving event-sourced aggregates. They handle event store interactions, snapshot optimization, and concurrency control.
Before You Start
- .NET 8.0+ (or .NET 9/10 for latest features)
- Install the required packages:
dotnet add package Excalibur.EventSourcing
dotnet add package Excalibur.Domain - Familiarity with event stores and domain modeling
The Repository Interface
public interface IEventSourcedRepository<TAggregate, TKey>
where TAggregate : class, IAggregateRoot<TKey>, IAggregateSnapshotSupport
where TKey : notnull
{
// Load aggregate by ID (rehydrates from events/snapshot)
Task<TAggregate?> GetByIdAsync(TKey aggregateId, CancellationToken cancellationToken);
// Persist uncommitted events
Task SaveAsync(TAggregate aggregate, CancellationToken cancellationToken);
// Save with ETag concurrency check (null skips validation)
Task SaveAsync(TAggregate aggregate, string? expectedETag, CancellationToken cancellationToken);
// Check if aggregate exists
Task<bool> ExistsAsync(TKey aggregateId, CancellationToken cancellationToken);
// Soft-delete via tombstone event
Task DeleteAsync(TAggregate aggregate, CancellationToken cancellationToken);
// Query for multiple aggregates
Task<IReadOnlyList<TAggregate>> QueryAsync<TQuery>(TQuery query, CancellationToken cancellationToken)
where TQuery : IAggregateQuery<TAggregate>;
// Find single aggregate matching criteria
Task<TAggregate?> FindAsync<TQuery>(TQuery query, CancellationToken cancellationToken)
where TQuery : IAggregateQuery<TAggregate>;
}
Basic Usage
Loading Aggregates
public class OrderService
{
private readonly IEventSourcedRepository<Order, Guid> _repository;
public async Task<OrderDto?> GetOrderAsync(Guid orderId, CancellationToken ct)
{
var order = await _repository.GetByIdAsync(orderId, ct);
if (order is null)
return null;
return new OrderDto
{
Id = order.Id,
Status = order.Status,
Total = order.Total,
Lines = order.Lines.Select(MapLine).ToList()
};
}
}
Saving Aggregates
public async Task CreateOrderAsync(CreateOrderCommand cmd, CancellationToken ct)
{
// Create new aggregate (raises OrderCreated event)
var order = new Order(cmd.OrderId, cmd.CustomerId);
// Add lines (raises OrderLineAdded events)
foreach (var line in cmd.Lines)
{
order.AddLine(line.ProductId, line.Quantity, line.UnitPrice);
}
// Save persists all uncommitted events
await _repository.SaveAsync(order, ct);
}
Modifying Aggregates
public async Task AddLineToOrderAsync(Guid orderId, LineDto line, CancellationToken ct)
{
// Load current state
var order = await _repository.GetByIdAsync(orderId, ct)
?? throw new OrderNotFoundException(orderId);
// Make changes (raises events)
order.AddLine(line.ProductId, line.Quantity, line.UnitPrice);
// Save new events
await _repository.SaveAsync(order, ct);
}
Optimistic Concurrency
Using ETags
ETags prevent lost updates when multiple processes modify the same aggregate:
public async Task UpdateOrderAsync(Guid orderId, UpdateRequest request, CancellationToken ct)
{
var order = await _repository.GetByIdAsync(orderId, ct)
?? throw new OrderNotFoundException(orderId);
// Capture ETag before modifications
var originalETag = order.ETag;
// Make changes
order.UpdateShippingAddress(request.Address);
try
{
// Save with ETag check
await _repository.SaveAsync(order, originalETag, ct);
}
catch (ConcurrencyException)
{
// Another process modified the order
throw new ConflictException("Order was modified by another process");
}
}
Automatic Concurrency (Version-Based)
The repository can use version numbers for concurrency:
// Repository internally uses:
await eventStore.AppendAsync(
aggregateId: order.Id.ToString(),
aggregateType: "Order",
events: order.GetUncommittedEvents(),
expectedVersion: order.Version, // Fails if version mismatch
cancellationToken: ct);
Repository Registration
Using the Builder (Recommended)
services.AddExcaliburEventSourcing(builder =>
{
// With explicit factory
builder.AddRepository<Order, Guid>(id => new Order());
// Or use static Create method (aggregate must implement IAggregateRoot<TAggregate, TKey>)
builder.AddRepository<Order, Guid>(); // Uses Order.Create(id)
// String-keyed aggregates
builder.AddRepository<Customer>(id => new Customer(id));
});
Generic Registration
For simple scenarios without factories:
builder.Services.AddSingleton(typeof(IEventSourcedRepository<,>),
typeof(EventSourcedRepository<,>));
Advanced Operations
Checking Existence
public async Task<bool> CanCreateOrderAsync(Guid orderId, CancellationToken ct)
{
// Check without loading full aggregate
return !await _repository.ExistsAsync(orderId, ct);
}
Querying Aggregates
The repository supports querying via IAggregateQuery<TAggregate>:
// Define a query
public class DraftOrdersQuery : IAggregateQuery<Order>
{
public OrderStatus Status { get; init; } = OrderStatus.Draft;
}
// Usage - QueryAsync returns multiple matches
var draftOrders = await _repository.QueryAsync(new DraftOrdersQuery(), ct);
// Usage - FindAsync returns first match or null
var firstDraft = await _repository.FindAsync(new DraftOrdersQuery(), ct);
Note: Query implementation depends on the underlying event store's capabilities. Some stores may require projections for efficient querying.
Soft Delete
public async Task CancelOrderAsync(Guid orderId, string reason, CancellationToken ct)
{
var order = await _repository.GetByIdAsync(orderId, ct)
?? throw new OrderNotFoundException(orderId);
// Cancel raises OrderCancelled event
order.Cancel(reason);
await _repository.SaveAsync(order, ct);
}
// In aggregate:
public void Cancel(string reason)
{
if (Status == OrderStatus.Shipped)
throw new InvalidOperationException("Cannot cancel shipped order");
RaiseEvent(new OrderCancelled(Id, reason, DateTime.UtcNow));
}
Working with the Event Store Directly
For advanced scenarios, access the event store:
public class OrderHistoryService
{
private readonly IEventStore _eventStore;
public async Task<IReadOnlyList<EventInfo>> GetOrderHistoryAsync(
Guid orderId,
CancellationToken ct)
{
var events = await _eventStore.LoadAsync(
orderId.ToString(),
"Order",
ct);
return events.Select(e => new EventInfo
{
Version = e.Version,
EventType = e.EventType,
Timestamp = e.Timestamp,
EventData = e.EventData
}).ToList();
}
}
Repository Patterns
Unit of Work Pattern
Coordinate multiple aggregate changes:
public class OrderPlacementService
{
private readonly IEventSourcedRepository<Order, Guid> _orderRepo;
private readonly IEventSourcedRepository<Inventory, string> _inventoryRepo;
private readonly IUnitOfWork _unitOfWork;
public async Task PlaceOrderAsync(PlaceOrderCommand cmd, CancellationToken ct)
{
var order = new Order(cmd.OrderId, cmd.CustomerId);
foreach (var line in cmd.Lines)
{
// Reserve inventory
var inventory = await _inventoryRepo.GetByIdAsync(line.ProductId, ct)
?? throw new ProductNotFoundException(line.ProductId);
inventory.Reserve(line.Quantity, cmd.OrderId);
// Add line to order
order.AddLine(line.ProductId, line.Quantity, line.UnitPrice);
}
order.Submit();
// Save all changes atomically
await _unitOfWork.CommitAsync(ct);
}
}
Specification Pattern
Encapsulate query logic using IAggregateQuery<T>:
public class DraftOrdersForCustomer : IAggregateQuery<Order>
{
public string CustomerId { get; init; }
public OrderStatus Status { get; init; } = OrderStatus.Draft;
}
// Usage
var query = new DraftOrdersForCustomer { CustomerId = "CUST-123" };
var orders = await _repository.QueryAsync(query, ct);
Error Handling
Common Exceptions
try
{
await _repository.SaveAsync(order, ct);
}
catch (ConcurrencyException ex)
{
// Version mismatch - aggregate was modified
_logger.LogWarning("Concurrency conflict for order {OrderId}", order.Id);
throw new ConflictException("Order was modified");
}
catch (AggregateNotFoundException ex)
{
// Aggregate doesn't exist
throw new NotFoundException($"Order {ex.AggregateId} not found");
}
catch (EventStoreException ex)
{
// Storage error
_logger.LogError(ex, "Failed to save order {OrderId}", order.Id);
throw;
}
Retry Logic
public async Task AddLineWithRetryAsync(
Guid orderId,
LineDto line,
CancellationToken ct)
{
var maxRetries = 3;
for (int attempt = 1; attempt <= maxRetries; attempt++)
{
try
{
var order = await _repository.GetByIdAsync(orderId, ct)
?? throw new OrderNotFoundException(orderId);
order.AddLine(line.ProductId, line.Quantity, line.UnitPrice);
await _repository.SaveAsync(order, ct);
return;
}
catch (ConcurrencyException) when (attempt < maxRetries)
{
_logger.LogWarning(
"Concurrency conflict, attempt {Attempt} of {MaxRetries}",
attempt, maxRetries);
await Task.Delay(100 * attempt, ct);
}
}
}
Performance Considerations
Snapshot Integration
Repositories automatically use snapshots when available:
// Repository internally:
// 1. Try to load latest snapshot
var snapshot = await _snapshotStore.GetLatestSnapshotAsync(aggregateId);
// 2. Load events since snapshot
var events = snapshot != null
? await _eventStore.LoadAsync(aggregateId, aggregateType, snapshot.Version, ct)
: await _eventStore.LoadAsync(aggregateId, aggregateType, ct);
// 3. Apply snapshot, then remaining events
if (snapshot != null)
aggregate.LoadFromSnapshot(snapshot);
aggregate.LoadFromHistory(events);
Batch Loading
For scenarios requiring multiple aggregates:
public async Task<IReadOnlyList<Order>> GetOrdersAsync(
IEnumerable<Guid> orderIds,
CancellationToken ct)
{
var tasks = orderIds.Select(id => _repository.GetByIdAsync(id, ct));
var orders = await Task.WhenAll(tasks);
return orders.Where(o => o != null).ToList()!;
}
Next Steps
- Snapshots - Optimize loading for long-lived aggregates
- Projections - Build read models from events
- Core Concepts - Event sourcing fundamentals
See Also
- Event Store — Understand how events are persisted and loaded
- Aggregates — Build the aggregates that repositories load and save
- Repository Testing — Test patterns for event-sourced repositories
- Snapshots — How repositories use snapshots to optimize aggregate loading