RabbitMQ Transport
RabbitMQ transport for flexible message routing, work queues, and traditional pub/sub patterns.
Before You Start
- .NET 8.0+ (or .NET 9/10 for latest features)
- A running RabbitMQ server (or Docker:
docker run -p 5672:5672 -p 15672:15672 rabbitmq:management) - Familiarity with transport concepts and choosing a transport
Installation
dotnet add package Excalibur.Dispatch.Transport.RabbitMQ
For the fastest setup, use the Excalibur.Dispatch.RabbitMQ experience metapackage. It bundles the RabbitMQ transport with Polly resilience and OpenTelemetry observability in a single call:
dotnet add package Excalibur.Dispatch.RabbitMQ
services.AddDispatchRabbitMQ(rmq =>
{
rmq.ConnectionString("amqp://guest:guest@localhost:5672/");
});
AddDispatchRabbitMQ calls AddDispatch internally and configures UseRabbitMQ, UseResilience, and UseObservability. Pass an optional second parameter (Action<IDispatchBuilder>) for additional pipeline configuration. See Package Guide for details.
Quick Start
Using the Dispatch Builder (Recommended)
services.AddDispatch(dispatch =>
{
dispatch.AddHandlersFromAssembly(typeof(Program).Assembly);
dispatch.UseRabbitMQ(rmq =>
{
rmq.ConnectionString("amqp://guest:guest@localhost:5672/")
.ConfigureExchange(exchange => exchange.Name("dispatch.events").Type(RabbitMQExchangeType.Topic))
.ConfigureCloudEvents(ce => ce.EnablePublisherConfirms = true);
});
});
Standalone Registration
services.AddDispatch(dispatch =>
{
dispatch.AddHandlersFromAssembly(typeof(Program).Assembly);
});
services.AddRabbitMQTransport(rmq =>
{
rmq.ConnectionString("amqp://guest:guest@localhost:5672/")
.ConfigureExchange(exchange => exchange.Name("dispatch.events").Type(RabbitMQExchangeType.Topic))
.ConfigureCloudEvents(ce => ce.EnablePublisherConfirms = true);
});
RabbitMQ registers a keyed IMessageBus named rabbitmq:
var bus = serviceProvider.GetRequiredKeyedService<IMessageBus>("rabbitmq");
Configuration
Fluent Builder Configuration
For most applications, the Quick Start above is all you need. The fluent builder below is for advanced scenarios (custom exchanges, queue bindings, CloudEvents routing).
Configure RabbitMQ transport using the fluent builder:
services.AddRabbitMQTransport(rmq =>
{
rmq.ConnectionString("amqp://user:pass@rabbitmq:5672/vhost")
.ConfigureExchange(exchange =>
{
exchange.Name("dispatch.events")
.Type(RabbitMQExchangeType.Topic)
.Durable(true)
.AutoDelete(false);
})
.ConfigureQueue(queue =>
{
queue.Name("order-handlers")
.Durable(true)
.PrefetchCount(100);
})
.ConfigureBinding(binding =>
{
binding.Exchange("dispatch.events")
.Queue("order-handlers")
.RoutingKey("orders.*");
})
.ConfigureCloudEvents(ce =>
{
ce.ExchangeType = RabbitMQExchangeType.Topic;
ce.Persistence = RabbitMqPersistence.Persistent;
ce.RoutingStrategy = RabbitMqRoutingStrategy.EventType;
});
});
Broker Options
Use RabbitMqOptions when you need fine-grained control over queue arguments, consumer behavior, or dead letter exchanges. The fluent builder above covers most scenarios.
Configure low-level broker behavior via RabbitMqOptions:
services.Configure<RabbitMqOptions>(options =>
{
options.ConnectionString = "amqp://user:pass@rabbitmq:5672/vhost";
options.Exchange = "dispatch.events";
options.RoutingKey = "orders.#";
options.QueueName = "orders-processing";
// Queue behavior
options.QueueDurable = true;
options.QueueExclusive = false;
options.QueueAutoDelete = false;
options.QueueArguments["x-message-ttl"] = 86400000; // 24 hours
// Consumer behavior
options.PrefetchCount = 100;
options.PrefetchGlobal = false;
options.AutoAck = false;
options.RequeueOnReject = false;
options.MaxBatchSize = 50;
options.MaxBatchWaitMs = 500;
options.ConsumerTag = "order-service";
// Dead letter exchange (non-CloudEvents)
options.EnableDeadLetterExchange = true;
options.DeadLetterExchange = "dispatch.dlx";
options.DeadLetterRoutingKey = "failed";
// Connection resilience
options.AutomaticRecoveryEnabled = true;
options.ConnectionTimeoutSeconds = 30;
options.NetworkRecoveryIntervalSeconds = 10;
});
CloudEvents Options
Use RabbitMqCloudEventOptions for CloudEvents-specific features:
services.UseCloudEventsForRabbitMq(options =>
{
options.ExchangeType = RabbitMQExchangeType.Topic;
options.RoutingStrategy = RabbitMqRoutingStrategy.EventType;
options.Persistence = RabbitMqPersistence.Persistent;
// Quorum queues + delivery guarantees
options.UseQuorumQueues = true;
options.EnablePublisherConfirms = true;
options.MandatoryPublishing = true;
// CloudEvents dead-letter + retry
options.EnableDeadLetterExchange = true;
options.DeadLetterExchange = "cloudevents.dlx";
options.MaxRetryAttempts = 3;
options.RetryDelay = TimeSpan.FromSeconds(30);
});
Consumer Handling Options
Configure advanced consumer behavior with RabbitMqConsumerOptions via CloudEvents options:
services.UseCloudEventsForRabbitMq(options =>
{
// Acknowledgment mode
options.Consumer.AckMode = AckMode.Manual; // Auto, Manual, or Batch
// Retry policy for failed messages
options.Consumer.RabbitMqRetryOptions = RabbitMqRetryOptions.Exponential(
maxRetries: 3,
initialDelay: TimeSpan.FromSeconds(1),
maxDelay: TimeSpan.FromMinutes(5));
// Dead letter exchange for failed messages
options.Consumer.DeadLetterExchange = "dlx.exchange";
options.Consumer.DeadLetterRoutingKey = "failed";
});
AckMode Options
| Mode | Description | Use Case |
|---|---|---|
Auto | Automatic acknowledgment on receive | Non-critical, fire-and-forget |
Manual | Explicit ack after processing (default) | Guaranteed delivery |
Batch | Grouped acknowledgments | High throughput scenarios |
RabbitMqRetryOptions Factory Methods
// No retry - fail immediately
RabbitMqRetryOptions.None()
// Fixed delay between retries
RabbitMqRetryOptions.Fixed(maxRetries: 3, delay: TimeSpan.FromSeconds(5))
// Exponential backoff with jitter
RabbitMqRetryOptions.Exponential(
maxRetries: 5,
initialDelay: TimeSpan.FromSeconds(1),
maxDelay: TimeSpan.FromMinutes(5))
Publisher Confirms
Enable publisher confirms for guaranteed delivery:
services.UseCloudEventsForRabbitMq(options =>
{
options.Publisher.EnableConfirms = true;
options.Publisher.ConfirmTimeout = TimeSpan.FromSeconds(5);
options.Publisher.MandatoryPublishing = true;
});
Acknowledgment Behavior (Legacy)
For non-CloudEvents usage, configure via RabbitMqOptions:
services.Configure<RabbitMqOptions>(options =>
{
options.AutoAck = false; // Manual ack after successful processing
options.RequeueOnReject = false; // Reject goes to DLQ if enabled
});
Health Checks
When using transport adapters, register aggregate health checks (for message bus-only usage, implement a custom check around the RabbitMQ client):
services.AddHealthChecks()
.AddTransportHealthChecks();
Observability
services.AddOpenTelemetry()
.WithTracing(tracing =>
{
tracing.AddSource("Excalibur.Dispatch");
// Traces: publish, consume, ack, reject
})
.WithMetrics(metrics =>
{
metrics.AddDispatchMetrics();
// Metrics: message rates, consumer lag
});
Production Checklist
- Use durable queues and exchanges
- Enable publisher confirms for critical messages
- Configure dead letter exchange and retry policy
- Set prefetch count based on handler throughput
- Enable automatic recovery for transient network failures
- Use TLS (
amqps://) in production
Next Steps
- Kafka Transport -- High-throughput streaming
- Multi-Transport Routing -- Combine RabbitMQ with other transports
See Also
- Choosing a Transport -- Compare RabbitMQ against other transports to find the best fit
- Message Mapping -- Configure how message types map to exchanges and queues
- Dead Letter Handling -- Strategies for managing failed messages with DLX
- Multi-Transport Routing -- Route different message types across RabbitMQ and other transports