Custom Middleware
Create custom middleware to implement cross-cutting concerns specific to your application.
Before You Start
- .NET 8.0+ (or .NET 9/10 for latest features)
- Install the required packages:
dotnet add package Excalibur.Dispatch - Familiarity with pipeline concepts and built-in middleware
Basic Structure
public class MyMiddleware : IDispatchMiddleware
{
private readonly IMyService _service;
public MyMiddleware(IMyService service)
{
_service = service;
}
// Optional: specify pipeline stage
public DispatchMiddlewareStage? Stage => DispatchMiddlewareStage.PreProcessing;
public async ValueTask<IMessageResult> InvokeAsync(
IDispatchMessage message,
IMessageContext context,
DispatchRequestDelegate next,
CancellationToken cancellationToken)
{
// Pre-processing logic
// Call next middleware
var result = await next(message, context, cancellationToken);
// Post-processing logic
return result;
}
}
Registration
services.AddDispatch(options =>
{
options.ConfigurePipeline("Default", pipeline =>
{
pipeline.Use<MyMiddleware>();
});
});
// The middleware is automatically registered in DI
// Or register explicitly:
services.AddScoped<MyMiddleware>();
Common Patterns
Timing and Metrics
public class TimingMiddleware : IDispatchMiddleware
{
private readonly ILogger<TimingMiddleware> _logger;
private readonly IMeterFactory _meterFactory;
public DispatchMiddlewareStage? Stage => DispatchMiddlewareStage.Logging;
public async ValueTask<IMessageResult> InvokeAsync(
IDispatchMessage message,
IMessageContext context,
DispatchRequestDelegate next,
CancellationToken ct)
{
var messageType = message.GetType().Name;
var sw = Stopwatch.StartNew();
try
{
var result = await next(message, context, ct);
sw.Stop();
_logger.LogInformation(
"{MessageType} completed in {Duration}ms",
messageType, sw.ElapsedMilliseconds);
RecordMetric(messageType, sw.ElapsedMilliseconds, success: true);
return result;
}
catch (Exception ex)
{
sw.Stop();
RecordMetric(messageType, sw.ElapsedMilliseconds, success: false);
throw;
}
}
}
Context Enrichment
public class CorrelationMiddleware : IDispatchMiddleware
{
public DispatchMiddlewareStage? Stage => DispatchMiddlewareStage.PreProcessing;
public async ValueTask<IMessageResult> InvokeAsync(
IDispatchMessage message,
IMessageContext context,
DispatchRequestDelegate next,
CancellationToken ct)
{
// Ensure correlation ID exists
if (string.IsNullOrEmpty(context.CorrelationId))
{
context.CorrelationId = Guid.NewGuid().ToString();
}
// Add to activity for distributed tracing
Activity.Current?.SetTag("correlation.id", context.CorrelationId);
// Add to log scope
using (_logger.BeginScope(new Dictionary<string, object>
{
["CorrelationId"] = context.CorrelationId
}))
{
return await next(message, context, ct);
}
}
}
Tenant Resolution
public class TenantMiddleware : IDispatchMiddleware
{
private readonly ITenantResolver _resolver;
public DispatchMiddlewareStage? Stage => DispatchMiddlewareStage.PreProcessing;
public async ValueTask<IMessageResult> InvokeAsync(
IDispatchMessage message,
IMessageContext context,
DispatchRequestDelegate next,
CancellationToken ct)
{
// Resolve tenant from message or context
var tenantId = await _resolver.ResolveAsync(message, context, ct);
if (string.IsNullOrEmpty(tenantId))
{
return MessageResult.Failed(new TenantRequiredError());
}
// Store tenant in context via identity feature
var identity = context.GetOrCreateIdentityFeature();
identity.TenantId = tenantId;
// Configure tenant-specific services
using (var scope = context.RequestServices.CreateScope())
{
var tenantDb = scope.ServiceProvider.GetRequiredService<ITenantDatabase>();
tenantDb.SetTenant(tenantId);
return await next(message, context, ct);
}
}
}
Audit Logging
public class AuditMiddleware : IDispatchMiddleware
{
private readonly IAuditLogger _auditLogger;
public DispatchMiddlewareStage? Stage => DispatchMiddlewareStage.PostProcessing;
public async ValueTask<IMessageResult> InvokeAsync(
IDispatchMessage message,
IMessageContext context,
DispatchRequestDelegate next,
CancellationToken ct)
{
var startTime = DateTime.UtcNow;
var result = await next(message, context, ct);
// Only audit mutations (actions)
if (message is IDispatchAction action)
{
await _auditLogger.LogAsync(new AuditEntry
{
MessageType = message.GetType().Name,
MessageId = message.MessageId,
UserId = context.GetUserId(),
TenantId = context.GetTenantId(),
Timestamp = startTime,
Duration = DateTime.UtcNow - startTime,
Success = result.IsSuccess,
ErrorMessage = result.ErrorMessage
}, ct);
}
return result;
}
}
Rate Limiting
public class RateLimitMiddleware : IDispatchMiddleware
{
private readonly IRateLimiter _limiter;
public DispatchMiddlewareStage? Stage => DispatchMiddlewareStage.RateLimiting;
public async ValueTask<IMessageResult> InvokeAsync(
IDispatchMessage message,
IMessageContext context,
DispatchRequestDelegate next,
CancellationToken ct)
{
var clientId = context.GetItem<string>("ClientId") ?? "anonymous";
var messageType = message.GetType().Name;
var lease = await _limiter.AcquireAsync(
$"{clientId}:{messageType}",
ct);
if (!lease.IsAcquired)
{
return MessageResult.Failed(new RateLimitExceededError
{
RetryAfter = lease.RetryAfter
});
}
try
{
return await next(message, context, ct);
}
finally
{
lease.Dispose();
}
}
}
Circuit Breaker
public class CircuitBreakerMiddleware : IDispatchMiddleware
{
private readonly ICircuitBreakerPolicy _circuitBreaker;
public DispatchMiddlewareStage? Stage => DispatchMiddlewareStage.PreProcessing;
public async ValueTask<IMessageResult> InvokeAsync(
IDispatchMessage message,
IMessageContext context,
DispatchRequestDelegate next,
CancellationToken ct)
{
var circuitKey = message.GetType().Name;
if (!_circuitBreaker.IsAllowed(circuitKey))
{
return MessageResult.Failed(new CircuitOpenError
{
RecoveryTime = _circuitBreaker.GetRecoveryTime(circuitKey)
});
}
try
{
var result = await next(message, context, ct);
if (result.IsSuccess)
_circuitBreaker.RecordSuccess(circuitKey);
else
_circuitBreaker.RecordFailure(circuitKey);
return result;
}
catch (Exception)
{
_circuitBreaker.RecordFailure(circuitKey);
throw;
}
}
}
Accessing Request Data
From Message
public async ValueTask<IMessageResult> InvokeAsync(
IDispatchMessage message,
IMessageContext context,
DispatchRequestDelegate next,
CancellationToken ct)
{
// Type check
if (message is CreateOrderAction order)
{
// Access message properties
var customerId = order.CustomerId;
}
// Check message kind
if (message.Kind == MessageKinds.Event)
{
// Handle event-specific logic
}
}
From Context
// Read core properties (direct on interface)
var correlationId = context.CorrelationId;
// Read cross-cutting concerns via feature extensions
var userId = context.GetUserId();
var tenantId = context.GetTenantId();
// Read custom items from dictionary
var tenant = context.GetItem<Tenant>("CurrentTenant");
// Write custom values to Items dictionary
context.SetItem("ProcessingStarted", DateTime.UtcNow);
context.SetItem("RequestSource", "API");
// Access scoped services
var db = context.RequestServices.GetRequiredService<IDbConnection>();
Error Handling
Returning Errors
public async ValueTask<IMessageResult> InvokeAsync(...)
{
if (!IsValid(message))
{
return MessageResult.Failed(
MessageProblemDetails.ValidationError("Field: Error message"));
}
if (!IsAuthorized(context))
{
return MessageResult.Failed("Access denied");
}
return await next(message, context, ct);
}
Wrapping Exceptions
public async ValueTask<IMessageResult> InvokeAsync(...)
{
try
{
return await next(message, context, ct);
}
catch (ExternalServiceException ex)
{
_logger.LogError(ex, "External service failed");
return MessageResult.Failed(new ExternalServiceError(ex.Message));
}
}
Testing Middleware
public class TimingMiddlewareTests
{
[Fact]
public async Task Logs_Duration_For_Successful_Request()
{
// Arrange
var logger = new FakeLogger<TimingMiddleware>();
var middleware = new TimingMiddleware(logger);
var message = new TestAction();
var context = new MessageContext();
var next = A.Fake<DispatchRequestDelegate>();
A.CallTo(() => next(message, context, A<CancellationToken>._))
.Returns(MessageResult.Success());
// Act
await middleware.InvokeAsync(message, context, next, CancellationToken.None);
// Assert
logger.Logs.Should().ContainSingle(log =>
log.Message.Contains("completed in") &&
log.Level == LogLevel.Information);
}
}
Gotchas and Common Mistakes
Middleware runs for every dispatch -- including nested calls
If you use DispatchChildAsync inside a handler, the child dispatch goes through the full pipeline again. This means your middleware will execute twice (once for the parent, once for the child). Design middleware to be re-entrant and avoid side effects that should only happen once per top-level request.
The mistake: A logging middleware that writes an audit entry on every InvokeAsync -- nested dispatches create duplicate audit records.
Correct approach: Check context.CausationId to distinguish top-level dispatches from child dispatches, or use the DispatchMiddlewareStage to control when middleware fires.
Always call next() unless short-circuiting
Forgetting to call next(message, context, cancellationToken) silently swallows the rest of the pipeline and the handler never executes:
public async ValueTask<IMessageResult> InvokeAsync(
IDispatchMessage message,
IMessageContext context,
DispatchRequestDelegate next,
CancellationToken ct)
{
// Pre-processing...
// Wrong: forgot to call next -- handler never runs!
// return MessageResult.Success();
// Correct: always call next unless intentionally short-circuiting
return await next(message, context, ct);
}
Only skip next() when you intentionally want to short-circuit (e.g., returning a validation error or a cached result).
Don't modify the message -- use context instead
IDispatchMessage is the caller's data. Mutating it can cause subtle bugs especially when the same message instance is reused. Store request-scoped data in IMessageContext.Items instead:
// Wrong: modifying the message
// ((MyAction)message).EnrichedField = "value";
// Correct: use context items
context.SetItem("EnrichedField", "value");
Best Practices
| Practice | Recommendation |
|---|---|
| Dependencies | Use constructor injection |
| Stage | Always specify a stage for ordering |
| Async | Use async/await properly |
| Exceptions | Catch and convert to results |
| Context | Don't modify message, use context |
| Disposal | Implement IAsyncDisposable if needed |
Next Steps
- Built-in Middleware — Reference implementations
- Validation — Input validation patterns
See Also
- Built-in Middleware - Reference implementations for logging, validation, authorization, caching, and more
- Middleware Overview - Introduction to middleware concepts, stages, and registration
- Pipeline Profiles - Configure named pipeline profiles with different middleware stacks