Skip to main content

Handlers

Dispatch provides two types of handlers: action handlers for request/response patterns and event handlers for pub-sub notifications.

Before You Start

  • .NET 8.0+ (or .NET 9/10 for latest features)
  • Install the required packages:
    dotnet add package Excalibur.Dispatch
    dotnet add package Excalibur.Dispatch.Abstractions
  • Familiarity with getting started and dependency injection

Action Handlers

Action handlers process actions (commands and queries) dispatched through the pipeline.

Commands (No Return Value)

Use IActionHandler<TAction> for commands that don't return data:

using Excalibur.Dispatch.Abstractions.Delivery;

public record CreateOrderAction(string CustomerId, List<string> Items) : IDispatchAction;

public class CreateOrderHandler : IActionHandler<CreateOrderAction>
{
private readonly IOrderRepository _repository;

public CreateOrderHandler(IOrderRepository repository)
{
_repository = repository;
}

public async Task HandleAsync(
CreateOrderAction action,
CancellationToken cancellationToken)
{
var order = new Order
{
Id = Guid.NewGuid(),
CustomerId = action.CustomerId,
Items = action.Items,
Status = OrderStatus.Created
};

await _repository.SaveAsync(order, cancellationToken);
}
}

Queries (With Return Value)

Use IActionHandler<TAction, TResult> for queries that return data:

using Excalibur.Dispatch.Abstractions.Delivery;

public record GetOrderAction(Guid OrderId) : IDispatchAction<Order>;

public class GetOrderHandler : IActionHandler<GetOrderAction, Order>
{
private readonly IOrderRepository _repository;

public GetOrderHandler(IOrderRepository repository)
{
_repository = repository;
}

public async Task<Order> HandleAsync(
GetOrderAction action,
CancellationToken cancellationToken)
{
return await _repository.GetByIdAsync(action.OrderId, cancellationToken);
}
}

Event Handlers

Event handlers subscribe to domain events for pub-sub messaging. Multiple handlers can process the same event.

using Excalibur.Dispatch.Abstractions;
using Excalibur.Dispatch.Abstractions.Delivery;

public record OrderCreatedEvent(Guid OrderId, string CustomerId, DateTime CreatedAt)
: IDispatchEvent;

public class SendOrderConfirmationHandler : IEventHandler<OrderCreatedEvent>
{
private readonly IEmailService _emailService;

public SendOrderConfirmationHandler(IEmailService emailService)
{
_emailService = emailService;
}

public async Task HandleAsync(
OrderCreatedEvent @event,
CancellationToken cancellationToken)
{
await _emailService.SendOrderConfirmationAsync(
@event.OrderId,
@event.CustomerId,
cancellationToken);
}
}

public class UpdateInventoryHandler : IEventHandler<OrderCreatedEvent>
{
private readonly IInventoryService _inventoryService;

public UpdateInventoryHandler(IInventoryService inventoryService)
{
_inventoryService = inventoryService;
}

public async Task HandleAsync(
OrderCreatedEvent @event,
CancellationToken cancellationToken)
{
await _inventoryService.ReserveItemsAsync(
@event.OrderId,
cancellationToken);
}
}

Handler Registration

Register handlers during service configuration:

// Auto-discover all handlers in an assembly (recommended)
builder.Services.AddDispatch(dispatch =>
{
dispatch.AddHandlersFromAssembly(typeof(Program).Assembly);
});

// Or register from multiple assemblies
builder.Services.AddDispatch(dispatch =>
{
dispatch.AddHandlersFromAssembly(typeof(OrderHandler).Assembly);
dispatch.AddHandlersFromAssembly(typeof(InventoryHandler).Assembly);
});

Dispatching Messages

Dispatching Actions

Dispatch manages message context automatically - no explicit context needed:

public class OrderService
{
private readonly IDispatcher _dispatcher;

public OrderService(IDispatcher dispatcher)
{
_dispatcher = dispatcher;
}

public async Task<Order> GetOrderAsync(Guid orderId, CancellationToken ct)
{
var action = new GetOrderAction(orderId);

// Context-less dispatch - Dispatch creates context automatically
var result = await _dispatcher.DispatchAsync<GetOrderAction, Order>(action, ct);

if (result.IsSuccess)
return result.ReturnValue;

throw new OrderNotFoundException(orderId);
}

public async Task CreateOrderAsync(string customerId, List<string> items, CancellationToken ct)
{
var action = new CreateOrderAction(customerId, items);

// Simple dispatch without explicit context
var result = await _dispatcher.DispatchAsync(action, ct);

if (!result.IsSuccess)
throw new OrderCreationException(result.ErrorMessage);
}
}

Publishing Events

public class OrderService
{
private readonly IDispatcher _dispatcher;

public async Task CompleteOrderAsync(Guid orderId, CancellationToken ct)
{
// ... order completion logic ...

// Dispatch event to all handlers (context managed automatically)
var @event = new OrderCompletedEvent(orderId, DateTime.UtcNow);
await _dispatcher.DispatchAsync(@event, ct);
}
}

Context Propagation

When dispatching messages from within a handler, use DispatchChildAsync to maintain proper message lineage for distributed tracing and debugging.

Top-Level vs Nested Dispatch

public class OrderController : ControllerBase
{
private readonly IDispatcher _dispatcher;

[HttpPost]
public async Task<IActionResult> CreateOrder(CreateOrderRequest request, CancellationToken ct)
{
// Top-level dispatch: context created automatically
var result = await _dispatcher.DispatchAsync(
new CreateOrderAction(request.CustomerId, request.Items), ct);

return result.IsSuccess ? Ok() : BadRequest(result.ErrorMessage);
}
}

public class CreateOrderHandler : IActionHandler<CreateOrderAction>
{
private readonly IDispatcher _dispatcher;
private readonly IOrderRepository _repository;

public CreateOrderHandler(IDispatcher dispatcher, IOrderRepository repository)
{
_dispatcher = dispatcher;
_repository = repository;
}

public async Task HandleAsync(CreateOrderAction action, CancellationToken ct)
{
var order = new Order { Id = Guid.NewGuid(), CustomerId = action.CustomerId };
await _repository.SaveAsync(order, ct);

// Nested dispatch: use DispatchChildAsync for proper context chaining
await _dispatcher.DispatchChildAsync(
new ValidateInventoryAction(order.Id, action.Items), ct);
}
}

What Gets Propagated

DispatchChildAsync creates a child context that:

PropertyBehavior
CorrelationIdCopied from parent (maintains distributed trace)
TenantIdCopied from parent (multi-tenant isolation)
UserIdCopied from parent (audit trail)
SessionIdCopied from parent (message grouping)
WorkflowIdCopied from parent (saga orchestration)
TraceParentCopied from parent (OpenTelemetry integration)
SourceCopied from parent (origin tracking)
CausationIdSet to parent's MessageId (causal chain)
MessageIdNew unique ID generated

When to Use Each Method

MethodUse When
DispatchAsyncTop-level dispatch (controllers, background services, external triggers)
DispatchChildAsyncNested dispatch from within a handler
// From a controller or service (top-level)
await _dispatcher.DispatchAsync(action, cancellationToken);

// From within a handler (nested) - establishes causal chain
await _dispatcher.DispatchChildAsync(childAction, cancellationToken);

Causal Chain Example

When DispatchChildAsync is used, the message chain becomes traceable:

CreateOrderAction (MessageId: "msg-001")
└── ValidateInventoryAction (MessageId: "msg-002", CausationId: "msg-001")
└── ReserveStockAction (MessageId: "msg-003", CausationId: "msg-002")

All three messages share the same CorrelationId, making it easy to trace the entire business transaction in logs and monitoring tools.

Streaming Handlers

Dispatch provides specialized handlers for processing large documents and data streams efficiently using IAsyncEnumerable<T>. These handlers enable memory-efficient processing without loading entire datasets into memory.

Document-to-Stream Handler

Use IStreamingDocumentHandler<TDocument, TOutput> when a single document produces multiple outputs:

using Excalibur.Dispatch.Abstractions.Delivery;
using System.Runtime.CompilerServices;

public record CsvDocument(string Content) : IDispatchDocument;

public class CsvRowHandler : IStreamingDocumentHandler<CsvDocument, DataRow>
{
public async IAsyncEnumerable<DataRow> HandleAsync(
CsvDocument document,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
foreach (var line in document.Content.Split('\n'))
{
cancellationToken.ThrowIfCancellationRequested();
yield return ParseRow(line);
}
}

private DataRow ParseRow(string line) => new DataRow(line.Split(','));
}

Use cases:

  • CSV/JSON parsing into records
  • Document splitting into pages
  • Entity extraction from text
  • Report row generation

Stream Consumer Handler

Use IStreamConsumerHandler<TDocument> to consume an incoming stream of documents:

using Excalibur.Dispatch.Abstractions.Delivery;

public class BatchImportHandler : IStreamConsumerHandler<DataRow>
{
private readonly IDatabase _database;

public BatchImportHandler(IDatabase database) => _database = database;

public async Task HandleAsync(
IAsyncEnumerable<DataRow> documents,
CancellationToken cancellationToken)
{
var batch = new List<DataRow>();
await foreach (var row in documents.WithCancellation(cancellationToken))
{
batch.Add(row);
if (batch.Count >= 1000)
{
await _database.BulkInsertAsync(batch, cancellationToken);
batch.Clear();
}
}
if (batch.Count > 0)
{
await _database.BulkInsertAsync(batch, cancellationToken);
}
}
}

Use cases:

  • Batch imports with buffering
  • ETL sinks writing to storage
  • Message queue consumers
  • Aggregation pipelines

Stream Transform Handler

Use IStreamTransformHandler<TInput, TOutput> for stream-to-stream transformations:

using Excalibur.Dispatch.Abstractions.Delivery;
using System.Runtime.CompilerServices;

public class EnrichmentHandler : IStreamTransformHandler<CustomerRecord, EnrichedCustomer>
{
private readonly IExternalService _service;

public EnrichmentHandler(IExternalService service) => _service = service;

public async IAsyncEnumerable<EnrichedCustomer> HandleAsync(
IAsyncEnumerable<CustomerRecord> input,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
await foreach (var record in input.WithCancellation(cancellationToken))
{
var enriched = await _service.EnrichAsync(record, cancellationToken);
yield return enriched;
}
}
}

Use cases:

  • Data enrichment from external sources
  • Format conversion
  • Filtering and aggregation
  • Batching and flattening

Progress Document Handler

Use IProgressDocumentHandler<TDocument> for long-running operations with progress reporting:

using Excalibur.Dispatch.Abstractions.Delivery;

public class PdfExportHandler : IProgressDocumentHandler<ExportDocument>
{
public async Task HandleAsync(
ExportDocument document,
IProgress<DocumentProgress> progress,
CancellationToken cancellationToken)
{
var pages = document.GetPages();
var total = pages.Count;

for (int i = 0; i < total; i++)
{
cancellationToken.ThrowIfCancellationRequested();
await ProcessPageAsync(pages[i], cancellationToken);

progress.Report(DocumentProgress.FromItems(
itemsProcessed: i + 1,
totalItems: total,
currentPhase: $"Processing page {i + 1} of {total}"));
}

progress.Report(DocumentProgress.Completed(total, "Export complete"));
}
}

Use cases:

  • Large file processing
  • Multi-step transformations
  • Report generation
  • Data migrations

Streaming Handler Summary

InterfaceInputOutputPurpose
IStreamingDocumentHandler<TDoc, TOut>Single documentIAsyncEnumerable<TOut>Document to stream
IStreamConsumerHandler<TDoc>IAsyncEnumerable<TDoc>TaskConsume stream
IStreamTransformHandler<TIn, TOut>IAsyncEnumerable<TIn>IAsyncEnumerable<TOut>Transform stream
IProgressDocumentHandler<TDoc>Document + IProgress<T>TaskProgress reporting

Handler Interfaces Summary

Dispatch provides two tiers of handler interfaces:

These handlers return your business types directly. The framework automatically wraps results in IMessageResult:

InterfacePurposeReturnFramework Wraps To
IActionHandler<TAction>Commands without return valueTaskIMessageResult
IActionHandler<TAction, TResult>Queries with return valueTask<TResult>IMessageResult<TResult>
IEventHandler<TEvent>Pub-sub event subscribersTaskIMessageResult
IDocumentHandler<TDocument>Document processingTaskIMessageResult

Advanced Handler (Infrastructure/Power Users)

This handler gives you direct control over IMessageResult:

InterfacePurposeReturn
IDispatchHandler<TMessage>Full control over resultTask<IMessageResult>

Use IDispatchHandler when you need to:

  • Return MessageResult.SuccessFromCache() with CacheHit = true
  • Set ValidationResult or AuthorizationResult on success
  • Return failure without throwing an exception
  • Implement batch processing (IBatchableHandler)

Streaming Handlers

InterfaceInputOutputPurpose
IStreamingDocumentHandler<TDoc, TOut>Single documentIAsyncEnumerable<TOut>Document to stream
IStreamConsumerHandler<TDoc>IAsyncEnumerable<TDoc>TaskConsume stream
IStreamTransformHandler<TIn, TOut>IAsyncEnumerable<TIn>IAsyncEnumerable<TOut>Transform stream
IProgressDocumentHandler<TDoc>Document + IProgress<T>TaskProgress reporting

Message Interfaces Summary

InterfacePurpose
IDispatchActionMarker for commands (no return)
IDispatchAction<TResult>Marker for queries (with return)
IDispatchEventEvents for pub-sub dispatch
IDomainEventDomain events with event sourcing metadata (extends IDispatchEvent)

Best Practices

Keep Handlers Focused

Each handler should do one thing well:

// Good: Single responsibility
public class CreateOrderHandler : IActionHandler<CreateOrderAction>
{
public async Task HandleAsync(CreateOrderAction action, CancellationToken ct)
{
// Only creates the order
}
}

// Good: Separate handler for side effects
public class SendOrderConfirmationHandler : IEventHandler<OrderCreatedEvent>
{
public async Task HandleAsync(OrderCreatedEvent @event, CancellationToken ct)
{
// Only sends confirmation
}
}

Use Constructor Injection

Handlers support full dependency injection:

public class ComplexHandler : IActionHandler<ComplexAction>
{
private readonly IRepository _repository;
private readonly IValidator _validator;
private readonly ILogger<ComplexHandler> _logger;

public ComplexHandler(
IRepository repository,
IValidator validator,
ILogger<ComplexHandler> logger)
{
_repository = repository;
_validator = validator;
_logger = logger;
}

public async Task HandleAsync(ComplexAction action, CancellationToken ct)
{
_logger.LogInformation("Processing {ActionType}", action.GetType().Name);
// ...
}
}

Handle Cancellation

Always respect the cancellation token:

public async Task HandleAsync(LongRunningAction action, CancellationToken ct)
{
foreach (var item in action.Items)
{
ct.ThrowIfCancellationRequested();
await ProcessItemAsync(item, ct);
}
}

See Also

  • Pipeline - Add middleware for cross-cutting concerns
  • Middleware - Built-in and custom middleware components
  • Event Sourcing - Build event-sourced aggregates with handlers