Skip to main content

Pipeline

The Dispatch pipeline processes messages through a chain of middleware components. Each middleware can inspect, modify, or short-circuit message processing.

Before You Start

  • .NET 8.0+ (or .NET 9/10 for latest features)
  • Install the required package:
    dotnet add package Excalibur.Dispatch
  • Familiarity with handlers and dependency injection

Pipeline Architecture

Messages flow through middleware in a nested pattern:

flowchart LR
subgraph Pipeline["Request → Response"]
direction LR
R[Request] --> M1[Rate Limiting]
M1 --> M2[Validation]
M2 --> M3[Authorization]
M3 --> H[Handler]
H --> M3r[Authorization]
M3r --> M2r[Validation]
M2r --> M1r[Rate Limiting]
M1r --> Res[Response]
end

Each middleware can:

  • Execute logic before calling next()
  • Execute logic after next() returns
  • Modify the message or context
  • Short-circuit by returning without calling next()
  • Handle exceptions from downstream components

Pipeline Stages

Middleware executes in defined stages. Lower values execute first on request, last on response:

StageValuePurpose
Start0Pipeline entry point
RateLimiting50Throughput control
PreProcessing100Tracing and context setup
Instrumentation150Performance metrics
Authentication175Identity verification
Logging190Audit trails and diagnostics
Validation200Input validation
Serialization250Message serialization
Authorization300Permission checks
Cache400Cache lookup and storage
Optimization450Batching and bulk operations
Routing500Handler resolution
Processing600Core handler execution
PostProcessing700Cleanup and result transformation
Error800Exception handling
ErrorHandling801Exception handling (alias)
End1000Pipeline exit

Quick Start

Register Middleware

builder.Services.AddDispatch(dispatch =>
{
dispatch.AddHandlersFromAssembly(typeof(Program).Assembly);

// Configure middleware pipeline
dispatch.ConfigurePipeline("Default", pipeline =>
{
pipeline.Use<LoggingMiddleware>();
pipeline.Use<ValidationMiddleware>();
pipeline.Use<AuthorizationMiddleware>();
});
});

Create Custom Middleware

public class TimingMiddleware : IDispatchMiddleware
{
private readonly ILogger<TimingMiddleware> _logger;

public TimingMiddleware(ILogger<TimingMiddleware> logger)
{
_logger = logger;
}

public DispatchMiddlewareStage? Stage => DispatchMiddlewareStage.Logging;

public async ValueTask<IMessageResult> InvokeAsync(
IDispatchMessage message,
IMessageContext context,
DispatchRequestDelegate next,
CancellationToken ct)
{
var sw = Stopwatch.StartNew();

// Call next middleware
var result = await next(message, context, ct);

sw.Stop();
_logger.LogInformation(
"{MessageType} completed in {ElapsedMs}ms",
message.GetType().Name,
sw.ElapsedMilliseconds);

return result;
}
}

Message Kind Filtering

Limit which messages your middleware processes:

public class ValidationMiddleware : IDispatchMiddleware
{
public DispatchMiddlewareStage? Stage => DispatchMiddlewareStage.Validation;

// Only validate commands and queries, not events
public MessageKinds ApplicableMessageKinds => MessageKinds.Action;

public async ValueTask<IMessageResult> InvokeAsync(
IDispatchMessage message,
IMessageContext context,
DispatchRequestDelegate next,
CancellationToken ct)
{
// Validation logic...
return await next(message, context, ct);
}
}

Short-Circuiting

Return early without calling next() to skip downstream processing:

public class CachingMiddleware : IDispatchMiddleware
{
public DispatchMiddlewareStage? Stage => DispatchMiddlewareStage.Cache;

public async ValueTask<IMessageResult> InvokeAsync(
IDispatchMessage message,
IMessageContext context,
DispatchRequestDelegate next,
CancellationToken ct)
{
var cacheKey = BuildCacheKey(message);
var cached = await _cache.GetAsync<IMessageResult>(cacheKey);

if (cached is not null)
{
return cached; // Short-circuit: don't call next
}

var result = await next(message, context, ct);

if (result.Succeeded)
{
await _cache.SetAsync(cacheKey, result, TimeSpan.FromMinutes(5));
}

return result;
}
}

Conditional Middleware

Apply middleware based on runtime conditions:

builder.Services.AddDispatch(dispatch =>
{
dispatch.ConfigurePipeline("Default", pipeline =>
{
// Only in development
pipeline.UseWhen<DebugMiddleware>(
sp => sp.GetRequiredService<IHostEnvironment>().IsDevelopment());

// Only when audit feature is enabled
pipeline.UseWhen<AuditMiddleware>(
sp => sp.GetRequiredService<IConfiguration>()
.GetValue<bool>("Features:AuditEnabled"));

// Based on configuration
pipeline.UseWhen<FeatureMiddleware>(
sp => sp.GetRequiredService<IConfiguration>()
.GetValue<bool>("Features:NewFeature"));
});
});

Automatic Default Pipeline

When you register middleware using UseMiddleware<T>() without explicitly calling ConfigurePipeline(), Dispatch automatically creates a "Default" pipeline:

// This automatically creates a default pipeline
builder.Services.AddDispatch(dispatch =>
{
dispatch.AddHandlersFromAssembly(typeof(Program).Assembly);
dispatch.UseMiddleware<LoggingMiddleware>();
dispatch.UseMiddleware<ValidationMiddleware>();
});

Middleware is then ordered by their Stage value.

In This Section

TopicDescription
Pipeline ProfilesReusable middleware configurations for different scenarios

See Also

  • Middleware - Built-in and custom middleware components
  • Handlers - Action and event handlers that the pipeline wraps
  • Performance - Optimize pipeline throughput