Skip to main content

RabbitMQ Transport

RabbitMQ transport for flexible message routing, work queues, and traditional pub/sub patterns.

Before You Start

  • .NET 8.0+ (or .NET 9/10 for latest features)
  • A running RabbitMQ server (or Docker: docker run -p 5672:5672 -p 15672:15672 rabbitmq:management)
  • Familiarity with transport concepts and choosing a transport

Installation

dotnet add package Excalibur.Dispatch.Transport.RabbitMQ
One-Line Setup with Metapackage

For the fastest setup, use the Excalibur.Dispatch.RabbitMQ experience metapackage. It bundles the RabbitMQ transport with Polly resilience and OpenTelemetry observability in a single call:

dotnet add package Excalibur.Dispatch.RabbitMQ
services.AddDispatchRabbitMQ(rmq =>
{
rmq.ConnectionString("amqp://guest:guest@localhost:5672/");
});

AddDispatchRabbitMQ calls AddDispatch internally and configures UseRabbitMQ, UseResilience, and UseObservability. Pass an optional second parameter (Action<IDispatchBuilder>) for additional pipeline configuration. See Package Guide for details.

Quick Start

services.AddDispatch(dispatch =>
{
dispatch.AddHandlersFromAssembly(typeof(Program).Assembly);
dispatch.UseRabbitMQ(rmq =>
{
rmq.ConnectionString("amqp://guest:guest@localhost:5672/")
.ConfigureExchange(exchange => exchange.Name("dispatch.events").Type(RabbitMQExchangeType.Topic))
.ConfigureCloudEvents(ce => ce.EnablePublisherConfirms = true);
});
});

Standalone Registration

services.AddDispatch(dispatch =>
{
dispatch.AddHandlersFromAssembly(typeof(Program).Assembly);
});

services.AddRabbitMQTransport(rmq =>
{
rmq.ConnectionString("amqp://guest:guest@localhost:5672/")
.ConfigureExchange(exchange => exchange.Name("dispatch.events").Type(RabbitMQExchangeType.Topic))
.ConfigureCloudEvents(ce => ce.EnablePublisherConfirms = true);
});

RabbitMQ registers a keyed IMessageBus named rabbitmq:

var bus = serviceProvider.GetRequiredKeyedService<IMessageBus>("rabbitmq");

Configuration

Fluent Builder Configuration

Start simple

For most applications, the Quick Start above is all you need. The fluent builder below is for advanced scenarios (custom exchanges, queue bindings, CloudEvents routing).

Configure RabbitMQ transport using the fluent builder:

services.AddRabbitMQTransport(rmq =>
{
rmq.ConnectionString("amqp://user:pass@rabbitmq:5672/vhost")
.ConfigureExchange(exchange =>
{
exchange.Name("dispatch.events")
.Type(RabbitMQExchangeType.Topic)
.Durable(true)
.AutoDelete(false);
})
.ConfigureQueue(queue =>
{
queue.Name("order-handlers")
.Durable(true)
.PrefetchCount(100);
})
.ConfigureBinding(binding =>
{
binding.Exchange("dispatch.events")
.Queue("order-handlers")
.RoutingKey("orders.*");
})
.ConfigureCloudEvents(ce =>
{
ce.ExchangeType = RabbitMQExchangeType.Topic;
ce.Persistence = RabbitMqPersistence.Persistent;
ce.RoutingStrategy = RabbitMqRoutingStrategy.EventType;
});
});

Broker Options

When do I need this?

Use RabbitMqOptions when you need fine-grained control over queue arguments, consumer behavior, or dead letter exchanges. The fluent builder above covers most scenarios.

Configure low-level broker behavior via RabbitMqOptions:

services.Configure<RabbitMqOptions>(options =>
{
options.ConnectionString = "amqp://user:pass@rabbitmq:5672/vhost";
options.Exchange = "dispatch.events";
options.RoutingKey = "orders.#";
options.QueueName = "orders-processing";

// Queue behavior
options.QueueDurable = true;
options.QueueExclusive = false;
options.QueueAutoDelete = false;
options.QueueArguments["x-message-ttl"] = 86400000; // 24 hours

// Consumer behavior
options.PrefetchCount = 100;
options.PrefetchGlobal = false;
options.AutoAck = false;
options.RequeueOnReject = false;
options.MaxBatchSize = 50;
options.MaxBatchWaitMs = 500;
options.ConsumerTag = "order-service";

// Dead letter exchange (non-CloudEvents)
options.EnableDeadLetterExchange = true;
options.DeadLetterExchange = "dispatch.dlx";
options.DeadLetterRoutingKey = "failed";

// Connection resilience
options.AutomaticRecoveryEnabled = true;
options.ConnectionTimeoutSeconds = 30;
options.NetworkRecoveryIntervalSeconds = 10;
});

CloudEvents Options

Use RabbitMqCloudEventOptions for CloudEvents-specific features:

services.UseCloudEventsForRabbitMq(options =>
{
options.ExchangeType = RabbitMQExchangeType.Topic;
options.RoutingStrategy = RabbitMqRoutingStrategy.EventType;
options.Persistence = RabbitMqPersistence.Persistent;

// Quorum queues + delivery guarantees
options.UseQuorumQueues = true;
options.EnablePublisherConfirms = true;
options.MandatoryPublishing = true;

// CloudEvents dead-letter + retry
options.EnableDeadLetterExchange = true;
options.DeadLetterExchange = "cloudevents.dlx";
options.MaxRetryAttempts = 3;
options.RetryDelay = TimeSpan.FromSeconds(30);
});

Consumer Handling Options

Configure advanced consumer behavior with RabbitMqConsumerOptions via CloudEvents options:

services.UseCloudEventsForRabbitMq(options =>
{
// Acknowledgment mode
options.Consumer.AckMode = AckMode.Manual; // Auto, Manual, or Batch

// Retry policy for failed messages
options.Consumer.RabbitMqRetryOptions = RabbitMqRetryOptions.Exponential(
maxRetries: 3,
initialDelay: TimeSpan.FromSeconds(1),
maxDelay: TimeSpan.FromMinutes(5));

// Dead letter exchange for failed messages
options.Consumer.DeadLetterExchange = "dlx.exchange";
options.Consumer.DeadLetterRoutingKey = "failed";
});

AckMode Options

ModeDescriptionUse Case
AutoAutomatic acknowledgment on receiveNon-critical, fire-and-forget
ManualExplicit ack after processing (default)Guaranteed delivery
BatchGrouped acknowledgmentsHigh throughput scenarios

RabbitMqRetryOptions Factory Methods

// No retry - fail immediately
RabbitMqRetryOptions.None()

// Fixed delay between retries
RabbitMqRetryOptions.Fixed(maxRetries: 3, delay: TimeSpan.FromSeconds(5))

// Exponential backoff with jitter
RabbitMqRetryOptions.Exponential(
maxRetries: 5,
initialDelay: TimeSpan.FromSeconds(1),
maxDelay: TimeSpan.FromMinutes(5))

Publisher Confirms

Enable publisher confirms for guaranteed delivery:

services.UseCloudEventsForRabbitMq(options =>
{
options.Publisher.EnableConfirms = true;
options.Publisher.ConfirmTimeout = TimeSpan.FromSeconds(5);
options.Publisher.MandatoryPublishing = true;
});

Acknowledgment Behavior (Legacy)

For non-CloudEvents usage, configure via RabbitMqOptions:

services.Configure<RabbitMqOptions>(options =>
{
options.AutoAck = false; // Manual ack after successful processing
options.RequeueOnReject = false; // Reject goes to DLQ if enabled
});

Health Checks

When using transport adapters, register aggregate health checks (for message bus-only usage, implement a custom check around the RabbitMQ client):

services.AddHealthChecks()
.AddTransportHealthChecks();

Observability

services.AddOpenTelemetry()
.WithTracing(tracing =>
{
tracing.AddSource("Excalibur.Dispatch");
// Traces: publish, consume, ack, reject
})
.WithMetrics(metrics =>
{
metrics.AddDispatchMetrics();
// Metrics: message rates, consumer lag
});

Production Checklist

  • Use durable queues and exchanges
  • Enable publisher confirms for critical messages
  • Configure dead letter exchange and retry policy
  • Set prefetch count based on handler throughput
  • Enable automatic recovery for transient network failures
  • Use TLS (amqps://) in production

Next Steps

See Also