Skip to main content

GlobalStreamProjectionHost

GlobalStreamProjectionHost<TState> is a background service that continuously reads from the global event stream and applies events to a custom projection. Use it when you need full control over how events are projected — beyond what AddProjection<T>().Async() provides.

Before You Start

  • .NET 10.0
  • Install the required packages:
    dotnet add package Excalibur.EventSourcing
  • Familiarity with projections and event stores

When to Use GlobalStreamProjectionHost

ScenarioUse
Standard per-aggregate read modelsAddProjection<T>().Inline() or .Async()
Cross-aggregate custom state machinesGlobalStreamProjectionHost
Global metrics / statisticsGlobalStreamProjectionHost
CDC-like stream tailing with custom logicGlobalStreamProjectionHost
Simple background catch-up processingEnableProjectionProcessing() (uses AsyncProjectionProcessingHost)

GlobalStreamProjectionHost vs AsyncProjectionProcessingHost

AspectGlobalStreamProjectionHostAsyncProjectionProcessingHost
ScopeSingle custom projection (your IGlobalStreamProjection<TState>)All registered .Async() projections
State typeYour custom TState classPer-projection stores via IProjectionStore<T>
RegistrationManual — register as hosted serviceAutomatic via EnableProjectionProcessing()
FlexibilityFull control over event handling logicConvention-based When<T> handlers
Use caseGlobal aggregations, custom state machinesStandard async read models

Architecture

Core Interface

Implement IGlobalStreamProjection<TState> to define how events update your state:

public interface IGlobalStreamProjection<TState>
where TState : class
{
Task ApplyAsync(IDomainEvent domainEvent, TState state, CancellationToken cancellationToken);
}

Getting Started

1. Define Your State

public class SystemMetricsState
{
public long TotalEventsProcessed { get; set; }
public long TotalOrders { get; set; }
public decimal TotalRevenue { get; set; }
public DateTimeOffset LastEventTimestamp { get; set; }
}

2. Implement the Projection

public class SystemMetricsProjection : IGlobalStreamProjection<SystemMetricsState>
{
private readonly ILogger<SystemMetricsProjection> _logger;

public SystemMetricsProjection(ILogger<SystemMetricsProjection> logger)
=> _logger = logger;

public Task ApplyAsync(
IDomainEvent domainEvent,
SystemMetricsState state,
CancellationToken cancellationToken)
{
state.TotalEventsProcessed++;
state.LastEventTimestamp = domainEvent.OccurredAt;

switch (domainEvent)
{
case OrderCreated e:
state.TotalOrders++;
state.TotalRevenue += e.Amount;
break;
}

return Task.CompletedTask;
}
}

3. Register the Host

services.AddSingleton<IGlobalStreamProjection<SystemMetricsState>, SystemMetricsProjection>();
services.AddHostedService<GlobalStreamProjectionHost<SystemMetricsState>>();
services.Configure<GlobalStreamProjectionOptions>(opts =>
{
opts.ProjectionName = "SystemMetrics"; // Unique name for checkpoint tracking
opts.BatchSize = 500;
opts.IdlePollingInterval = TimeSpan.FromSeconds(2);
opts.CheckpointInterval = 100;
});
Unique ProjectionName

Each GlobalStreamProjectionHost instance must have a unique ProjectionName. If two hosts share a name, they will overwrite each other's checkpoint positions. Default is "AsyncProjectionProcessingHost" — always override it.

Configuration Options

OptionDefaultDescription
ProjectionName"AsyncProjectionProcessingHost"Unique checkpoint identifier — always override
BatchSize500Maximum events read per poll
IdlePollingInterval1 secondDelay between polls when no events are found
CheckpointInterval100Events processed between checkpoint saves

Lifecycle

The host follows this loop:

  1. Startup — Restores last checkpoint position from ISubscriptionCheckpointStore
  2. Poll — Reads up to BatchSize events from the global stream via IGlobalStreamQuery
  3. Apply — Deserializes each event and calls ApplyAsync on your projection
  4. Checkpoint — After CheckpointInterval events, persists the current position
  5. Idle — If no events found, waits IdlePollingInterval before polling again
  6. Shutdown — Persists final checkpoint position on graceful stop

Error Handling

  • Per-event errors are logged and skipped — processing continues with the next event
  • Batch-level errors are logged, then the host waits IdlePollingInterval before retrying
  • Cancellation triggers graceful shutdown with final checkpoint persistence

Cursor Map (Multi-Stream Tracking)

When ICursorMapStore is registered in DI, the host tracks per-stream positions in addition to the global checkpoint. This enables scenarios like:

  • Detecting which aggregates have been processed
  • Building per-aggregate cursor maps for selective replay
  • Coordinating with other services that need stream-level granularity
// Optional: register a cursor map store for per-stream tracking
services.AddSingleton<ICursorMapStore, SqlServerCursorMapStore>();

The cursor map is saved alongside the checkpoint — positions accumulate between checkpoint saves to minimize I/O.

Observability

When registered in DI, the host integrates with:

  • ProjectionObservability — Records error counts per projection
  • ProjectionHealthState — Reports current position as async lag metric

These are automatically available when you call WithProjectionHealthChecks() on the event sourcing builder.

Testing

[Fact]
public async Task AppliesOrderCreatedToMetrics()
{
var projection = new SystemMetricsProjection(NullLogger<SystemMetricsProjection>.Instance);
var state = new SystemMetricsState();

var @event = new OrderCreated("order-1", 1) { Amount = 99.95m };
await projection.ApplyAsync(@event, state, CancellationToken.None);

state.TotalOrders.ShouldBe(1);
state.TotalRevenue.ShouldBe(99.95m);
state.TotalEventsProcessed.ShouldBe(1);
}

See Also

  • Projections — Standard projection system (inline, async, ephemeral)
  • Materialized Views — Schedule-driven views with IMaterializedViewProcessor
  • Event Store — Core event persistence and global stream queries