Skip to main content

RabbitMQ Transport

RabbitMQ transport for flexible message routing, work queues, and traditional pub/sub patterns.

Before You Start

  • .NET 8.0+ (or .NET 9/10 for latest features)
  • A running RabbitMQ server (or Docker: docker run -p 5672:5672 -p 15672:15672 rabbitmq:management)
  • Familiarity with transport concepts and choosing a transport

Installation

dotnet add package Excalibur.Dispatch.Transport.RabbitMQ

Quick Start

services.AddDispatch(dispatch =>
{
dispatch.AddHandlersFromAssembly(typeof(Program).Assembly);
dispatch.UseRabbitMQ(rmq =>
{
rmq.ConnectionString("amqp://guest:guest@localhost:5672/")
.ConfigureExchange(exchange => exchange.Name("dispatch.events").Type(RabbitMqExchangeType.Topic))
.ConfigureCloudEvents(ce => ce.EnablePublisherConfirms = true);
});
});

Standalone Registration

services.AddDispatch(dispatch =>
{
dispatch.AddHandlersFromAssembly(typeof(Program).Assembly);
});

services.AddRabbitMQTransport(rmq =>
{
rmq.ConnectionString("amqp://guest:guest@localhost:5672/")
.ConfigureExchange(exchange => exchange.Name("dispatch.events").Type(RabbitMqExchangeType.Topic))
.ConfigureCloudEvents(ce => ce.EnablePublisherConfirms = true);
});

RabbitMQ registers a keyed IMessageBus named rabbitmq:

var bus = serviceProvider.GetRequiredKeyedService<IMessageBus>("rabbitmq");

Configuration

Fluent Builder Configuration

Configure RabbitMQ transport using the fluent builder:

services.AddRabbitMQTransport(rmq =>
{
rmq.ConnectionString("amqp://user:pass@rabbitmq:5672/vhost")
.ConfigureExchange(exchange =>
{
exchange.Name("dispatch.events")
.Type(RabbitMqExchangeType.Topic)
.Durable(true)
.AutoDelete(false);
})
.ConfigureQueue(queue =>
{
queue.Name("order-handlers")
.Durable(true)
.PrefetchCount(100);
})
.ConfigureBinding(binding =>
{
binding.Exchange("dispatch.events")
.Queue("order-handlers")
.RoutingKey("orders.*");
})
.ConfigureCloudEvents(ce =>
{
ce.ExchangeType = RabbitMqExchangeType.Topic;
ce.Persistence = RabbitMqPersistence.Persistent;
ce.RoutingStrategy = RabbitMqRoutingStrategy.EventType;
});
});

Broker Options

Configure low-level broker behavior via RabbitMqOptions:

services.Configure<RabbitMqOptions>(options =>
{
options.ConnectionString = "amqp://user:pass@rabbitmq:5672/vhost";
options.Exchange = "dispatch.events";
options.RoutingKey = "orders.#";
options.QueueName = "orders-processing";

// Queue behavior
options.QueueDurable = true;
options.QueueExclusive = false;
options.QueueAutoDelete = false;
options.QueueArguments["x-message-ttl"] = 86400000; // 24 hours

// Consumer behavior
options.PrefetchCount = 100;
options.PrefetchGlobal = false;
options.AutoAck = false;
options.RequeueOnReject = false;
options.MaxBatchSize = 50;
options.MaxBatchWaitMs = 500;
options.ConsumerTag = "order-service";

// Dead letter exchange (non-CloudEvents)
options.EnableDeadLetterExchange = true;
options.DeadLetterExchange = "dispatch.dlx";
options.DeadLetterRoutingKey = "failed";

// Connection resilience
options.AutomaticRecoveryEnabled = true;
options.ConnectionTimeoutSeconds = 30;
options.NetworkRecoveryIntervalSeconds = 10;
});

CloudEvents Options

Use RabbitMqCloudEventOptions for CloudEvents-specific features:

services.UseCloudEventsForRabbitMq(options =>
{
options.ExchangeType = RabbitMqExchangeType.Topic;
options.RoutingStrategy = RabbitMqRoutingStrategy.EventType;
options.Persistence = RabbitMqPersistence.Persistent;

// Quorum queues + delivery guarantees
options.UseQuorumQueues = true;
options.EnablePublisherConfirms = true;
options.MandatoryPublishing = true;

// CloudEvents dead-letter + retry
options.EnableDeadLetterExchange = true;
options.DeadLetterExchange = "cloudevents.dlx";
options.MaxRetryAttempts = 3;
options.RetryDelay = TimeSpan.FromSeconds(30);
});

Consumer Handling Options

Configure advanced consumer behavior with RabbitMqConsumerOptions via CloudEvents options:

services.UseCloudEventsForRabbitMq(options =>
{
// Acknowledgment mode
options.Consumer.AckMode = AckMode.Manual; // Auto, Manual, or Batch

// Retry policy for failed messages
options.Consumer.RetryPolicy = RetryPolicy.Exponential(
maxRetries: 3,
initialDelay: TimeSpan.FromSeconds(1),
maxDelay: TimeSpan.FromMinutes(5));

// Dead letter exchange for failed messages
options.Consumer.DeadLetterExchange = "dlx.exchange";
options.Consumer.DeadLetterRoutingKey = "failed";
});

AckMode Options

ModeDescriptionUse Case
AutoAutomatic acknowledgment on receiveNon-critical, fire-and-forget
ManualExplicit ack after processing (default)Guaranteed delivery
BatchGrouped acknowledgmentsHigh throughput scenarios

RetryPolicy Factory Methods

// No retry - fail immediately
RetryPolicy.None()

// Fixed delay between retries
RetryPolicy.Fixed(maxRetries: 3, delay: TimeSpan.FromSeconds(5))

// Exponential backoff with jitter
RetryPolicy.Exponential(
maxRetries: 5,
initialDelay: TimeSpan.FromSeconds(1),
maxDelay: TimeSpan.FromMinutes(5))

Publisher Confirms

Enable publisher confirms for guaranteed delivery:

services.UseCloudEventsForRabbitMq(options =>
{
options.Publisher.EnableConfirms = true;
options.Publisher.ConfirmTimeout = TimeSpan.FromSeconds(5);
options.Publisher.MandatoryPublishing = true;
});

Acknowledgment Behavior (Legacy)

For non-CloudEvents usage, configure via RabbitMqOptions:

services.Configure<RabbitMqOptions>(options =>
{
options.AutoAck = false; // Manual ack after successful processing
options.RequeueOnReject = false; // Reject goes to DLQ if enabled
});

Health Checks

When using transport adapters, register aggregate health checks (for message bus-only usage, implement a custom check around the RabbitMQ client):

services.AddHealthChecks()
.AddTransportHealthChecks();

Observability

services.AddOpenTelemetry()
.WithTracing(tracing =>
{
tracing.AddSource("Excalibur.Dispatch.Observability");
// Traces: publish, consume, ack, reject
})
.WithMetrics(metrics =>
{
metrics.AddDispatchMetrics();
// Metrics: message rates, consumer lag
});

Production Checklist

  • Use durable queues and exchanges
  • Enable publisher confirms for critical messages
  • Configure dead letter exchange and retry policy
  • Set prefetch count based on handler throughput
  • Enable automatic recovery for transient network failures
  • Use TLS (amqps://) in production

Next Steps

See Also