Skip to main content

Transports

Dispatch supports multiple message transports for distributed messaging. You can use a single transport or route messages to different transports based on rules.

Before You Start

  • .NET 8.0+ (or .NET 9/10 for latest features)
  • Install the core package plus your transport:
    dotnet add package Excalibur.Dispatch
    dotnet add package Excalibur.Dispatch.Transport.Kafka # or RabbitMQ, AzureServiceBus, etc.
  • Familiarity with handlers and pipeline concepts
Not sure which transport to use?

Start with the Choosing a Transport guide for a decision matrix and trade-off comparison across all five production transports.

Supported Transports

TransportPackageUse Case
In-MemoryBuilt-inTesting, development
Cron TimerBuilt-inScheduled jobs, background tasks
KafkaExcalibur.Dispatch.Transport.KafkaHigh-throughput event streaming
RabbitMQExcalibur.Dispatch.Transport.RabbitMQTraditional messaging, routing patterns
Azure Service BusExcalibur.Dispatch.Transport.AzureServiceBusAzure-native messaging
AWS SQSExcalibur.Dispatch.Transport.AwsSqsAWS-native messaging
Google Pub/SubExcalibur.Dispatch.Transport.GooglePubSubGCP-native messaging

Quick Start

Single Transport

Configure transports through the AddDispatch() builder using Use{Transport}() methods:

// Install the transport package
// dotnet add package Excalibur.Dispatch.Transport.RabbitMQ

services.AddDispatch(dispatch =>
{
dispatch.AddHandlersFromAssembly(typeof(Program).Assembly);

// Configure transport through the builder (recommended)
dispatch.UseRabbitMQ(rmq => rmq.HostName("localhost"));
});

All five transports follow the same Use prefix pattern:

services.AddDispatch(dispatch =>
{
dispatch.UseKafka(kafka => kafka.BootstrapServers("localhost:9092"));
dispatch.UseRabbitMQ(rmq => rmq.HostName("localhost"));
dispatch.UseAzureServiceBus(asb => asb.ConnectionString("..."));
dispatch.UseAwsSqs(sqs => sqs.Region("us-east-1"));
dispatch.UseGooglePubSub(pubsub => pubsub.ProjectId("my-project"));
});

Multi-Transport Routing

Route different message types to different transports:

services.AddDispatch(dispatch =>
{
dispatch.AddHandlersFromAssembly(typeof(Program).Assembly);

// Register transports through the builder
dispatch.UseKafka(kafka => kafka.BootstrapServers("localhost:9092"));
dispatch.UseRabbitMQ(rmq => rmq.HostName("localhost"));

dispatch.UseRouting(routing =>
{
routing.Transport
// High-volume events to Kafka
.Route<OrderCreatedEvent>().To("kafka")
// Payment events to RabbitMQ
.Route<PaymentProcessedEvent>().To("rabbitmq")
// Default transport
.Default("rabbitmq");
});
});

Transport Interfaces

Dispatch provides minimal transport interfaces inspired by Microsoft.Extensions.AI (IChatClient), Azure.Messaging.ServiceBus (ServiceBusClient), and HttpClientFactory (DelegatingHandler).

ITransportSender / ITransportReceiver

Each transport implements two minimal interfaces — 3 methods each plus GetService() for raw SDK access:

public interface ITransportSender : IAsyncDisposable
{
string Destination { get; }
Task<SendResult> SendAsync(TransportMessage message, CancellationToken cancellationToken);
Task<BatchSendResult> SendBatchAsync(IReadOnlyList<TransportMessage> messages, CancellationToken cancellationToken);
Task FlushAsync(CancellationToken cancellationToken);
object? GetService(Type serviceType) => null;
}

public interface ITransportReceiver : IAsyncDisposable
{
string Source { get; }
Task<IReadOnlyList<TransportReceivedMessage>> ReceiveAsync(int maxMessages, CancellationToken cancellationToken);
Task AcknowledgeAsync(TransportReceivedMessage message, CancellationToken cancellationToken);
Task RejectAsync(TransportReceivedMessage message, string? reason, bool requeue, CancellationToken cancellationToken);
object? GetService(Type serviceType) => null;
}

ITransportSubscriber (Push-Based)

For transports with native push semantics (Kafka consumer groups, RabbitMQ BasicConsume, Azure Event Hubs, Google Pub/Sub streaming pull), ITransportSubscriber provides a push-based alternative to the pull-based ITransportReceiver:

public interface ITransportSubscriber : IAsyncDisposable
{
string Source { get; }
Task SubscribeAsync(
Func<TransportReceivedMessage, CancellationToken, Task<MessageAction>> handler,
CancellationToken cancellationToken);
object? GetService(Type serviceType) => null;
}

public enum MessageAction { Acknowledge, Reject, Requeue }

The handler callback returns a MessageAction telling the transport what to do with the message. DelegatingTransportSubscriber provides the decorator base class.

InterfacePatternUse When
ITransportReceiverPullYou control the polling loop, batch receive
ITransportSubscriberPushTransport drives delivery, handler reacts

All 5 transports implement ITransportSubscriber:

TransportPush ModelSDK API
Azure Service BusNative pushServiceBusProcessor events
RabbitMQNative pushAsyncEventingBasicConsumer + BasicConsumeAsync
Google Pub/SubNative pushSubscriberClient.StartAsync() streaming pull
KafkaContinuous pollIConsumer<string,byte[]>.Consume() in loop
AWS SQSLong pollReceiveMessageAsync with 20s wait

MessageAction settlement maps to transport-native operations:

MessageActionAzure SBRabbitMQKafkaAWS SQSGoogle Pub/Sub
AcknowledgeCompleteBasicAckCommit offsetDeleteAck
RejectDead-letterNack (no requeue)Commit (DLQ via decorator)Delete (DLQ via redrive)Nack
RequeueAbandonNack (requeue)Seek backVisibility timeout = 0Nack

GetService exposes the underlying subscriber client:

TransportSubscriber Returns
Azure Service BusServiceBusProcessor
RabbitMQIChannel
KafkaIConsumer<string, byte[]>
AWS SQSIAmazonSQS
Google Pub/SubSubscriberClient

TransportMessage (Slim)

TransportMessage is a slim message type (9 properties). Transport-specific hints flow via the Properties dictionary with well-known keys:

var message = new TransportMessage
{
Body = Encoding.UTF8.GetBytes(payload),
ContentType = "application/json",
MessageType = "OrderCreated",
CorrelationId = correlationId,
};

// Transport hints via Properties dictionary
message.Properties[TransportTelemetryConstants.PropertyKeys.OrderingKey] = orderId;
message.Properties[TransportTelemetryConstants.PropertyKeys.PartitionKey] = customerId;

Decorator Pattern

Cross-cutting concerns (telemetry, ordering, deduplication, scheduling, CloudEvents, DLQ routing) are composable decorators built on DelegatingTransportSender / DelegatingTransportReceiver:

var sender = new TransportSenderBuilder(nativeSender)
.Use(inner => new TelemetryTransportSender(inner, meter, activitySource, "Kafka"))
.Use(inner => new OrderingTransportSender(inner, msg => msg.Subject))
.Build();
DecoratorDirectionPurpose
TelemetryTransportSenderSendOpenTelemetry metrics + traces
TelemetryTransportReceiverReceiveOpenTelemetry metrics + traces
OrderingTransportSenderSendSet ordering key from message
DeduplicationTransportSenderSendSet deduplication ID
SchedulingTransportSenderSendScheduled delivery time
CloudEventsTransportSenderSendCloudEvents envelope
CloudEventsTransportReceiverReceiveCloudEvents unwrapping
DeadLetterTransportReceiverReceiveRoute failures to DLQ

GetService() — Raw SDK Access

Access the underlying transport SDK client for advanced scenarios:

// Kafka: get the native IProducer
var producer = sender.GetService(typeof(IProducer<string, byte[]>))
as IProducer<string, byte[]>;

// Azure Service Bus: get the native ServiceBusSender
var sbSender = sender.GetService(typeof(ServiceBusSender))
as ServiceBusSender;
TransportSender ReturnsReceiver ReturnsSubscriber Returns
KafkaIProducer<string, byte[]>IConsumer<string, byte[]>IConsumer<string, byte[]>
RabbitMQIChannelIChannelIChannel
Azure Service BusServiceBusSenderServiceBusReceiverServiceBusProcessor
AWS SQSIAmazonSQSIAmazonSQSIAmazonSQS
Google Pub/SubPublisherServiceApiClientSubscriberServiceApiClientSubscriberClient

Transport Selection Guide

RequirementRecommended Transport
High throughput (>100k msg/sec)Kafka
Complex routing patternsRabbitMQ
Azure-native integrationAzure Service Bus
AWS-native integrationAWS SQS
GCP-native integrationGoogle Pub/Sub
Scheduled jobs / cron tasksCron Timer
Local development/testingIn-Memory

Common Configuration

Connection Resilience

Configure resilience per transport via the options classes:

services.Configure<RabbitMqOptions>(options =>
{
options.AutomaticRecoveryEnabled = true;
options.NetworkRecoveryIntervalSeconds = 10;
});

Health Checks

Register health checks for monitoring transport adapters:

services.AddHealthChecks()
.AddTransportHealthChecks();

app.MapHealthChecks("/health");

Observability

All transports emit OpenTelemetry traces and metrics via the TelemetryTransportSender / TelemetryTransportReceiver decorators:

services.AddOpenTelemetry()
.WithTracing(builder =>
{
builder.AddSource("Excalibur.Dispatch.Observability");
// Transport-specific traces: Excalibur.Dispatch.Transport.{Name}
builder.AddSource("Excalibur.Dispatch.Transport.Kafka");
builder.AddSource("Excalibur.Dispatch.Transport.RabbitMQ");
})
.WithMetrics(builder =>
{
builder.AddDispatchMetrics();
});

Standard transport metric names follow dispatch.transport.* convention:

MetricTypeDescription
dispatch.transport.messages.sentCounterMessages sent successfully
dispatch.transport.messages.send_failedCounterSend failures
dispatch.transport.messages.receivedCounterMessages received
dispatch.transport.messages.acknowledgedCounterMessages acknowledged
dispatch.transport.messages.rejectedCounterMessages rejected
dispatch.transport.send.durationHistogramSend operation duration (ms)
dispatch.transport.receive.durationHistogramReceive operation duration (ms)
dispatch.transport.batch.sizeHistogramBatch operation message count

Message Serialization

By default, messages are serialized using MemoryPack. You can configure different serializers:

// Use System.Text.Json for cross-language compatibility
services.AddJsonSerialization();

// Or MessagePack for compact binary format
services.AddMessagePackSerialization();
SerializerPackageBest For
MemoryPack (default)Built-in.NET-only, maximum performance
System.Text.JsonBuilt-inCross-language, debugging
MessagePackExcalibur.Dispatch.Serialization.MessagePackCross-language, compact
ProtobufExcalibur.Dispatch.Serialization.ProtobufSchema-based, cross-language

Dead Letter Queue Support

Each transport can implement IDeadLetterQueueManager from Excalibur.Dispatch.Transport.Abstractions for transport-native dead letter handling:

TransportDLQ SupportMechanismRegistration
Google Pub/SubYesSubscription-basedBuilt-in
AWS SQSYesQueue-based (native redrive)Built-in
KafkaYesTopic-based ({topic}.dead-letter)services.AddKafkaDeadLetterQueue()
Azure Service BusYesNative $DeadLetterQueue subqueueservices.AddServiceBusDeadLetterQueue()
RabbitMQYesDead letter exchange (DLX)services.AddRabbitMqDeadLetterQueue()

IDeadLetterQueueManager Interface

All transport DLQ implementations share the same base interface:

public interface IDeadLetterQueueManager
{
Task<string> MoveToDeadLetterAsync(
TransportMessage message, string reason,
Exception? exception,
CancellationToken cancellationToken);

Task<IReadOnlyList<DeadLetterMessage>> GetDeadLetterMessagesAsync(
int maxMessages,
CancellationToken cancellationToken);

Task<ReprocessResult> ReprocessDeadLetterMessagesAsync(
IEnumerable<DeadLetterMessage> messages,
ReprocessOptions options,
CancellationToken cancellationToken);

Task<DeadLetterStatistics> GetStatisticsAsync(
CancellationToken cancellationToken);

Task<int> PurgeDeadLetterQueueAsync(
CancellationToken cancellationToken);
}

Kafka DLQ Example

services.AddKafkaTransport("events", kafka => { /* ... */ });

services.AddKafkaDeadLetterQueue(dlq =>
{
dlq.TopicSuffix = ".dead-letter"; // Default
dlq.ConsumerGroupId = "dlq-processor"; // Default
dlq.MaxDeliveryAttempts = 5; // Default
dlq.MessageRetentionPeriod = TimeSpan.FromDays(14);
dlq.AutoCreateTopics = true;
});

AWS SQS DLQ

AWS SQS DLQ support is built into the DlqProcessor class, which implements both IDlqManager (SQS-specific) and IDeadLetterQueueManager (transport-agnostic). Configure through DlqOptions:

services.Configure<DlqOptions>(options =>
{
options.DeadLetterQueueUrl = new Uri("https://sqs.us-east-1.amazonaws.com/...");
});

Azure Service Bus DLQ

Azure Service Bus uses the native $DeadLetterQueue subqueue. The manager accesses it via ServiceBusReceiverOptions { SubQueue = SubQueue.DeadLetter } — no admin API package required.

services.AddServiceBusDeadLetterQueue(dlq =>
{
dlq.EntityPath = "orders"; // Queue or subscription path
dlq.MaxBatchSize = 100; // Batch size for purge/retrieve (default: 100)
dlq.ReceiveWaitTime = TimeSpan.FromSeconds(5); // Wait time (default: 5s)
dlq.StatisticsPeekCount = 1000; // Max messages to peek for stats (default: 1000)
dlq.IncludeStackTrace = true; // Include stack traces (default: true)
});

RabbitMQ DLQ

RabbitMQ uses dead letter exchanges (DLX). Messages are published to the DLX exchange with dlq_reason and dlq_original_source headers. Peek semantics use BasicGet(autoAck: false) + Nack(requeue: true).

services.AddRabbitMqDeadLetterQueue(dlq =>
{
dlq.Exchange = "dead-letters"; // DLX exchange name (default: "dead-letters")
dlq.QueueName = "dead-letter-queue"; // DLQ queue name (default: "dead-letter-queue")
dlq.RoutingKey = "#"; // Routing key (default: "#")
dlq.IncludeStackTrace = true; // Include stack traces (default: true)
dlq.MaxBatchSize = 100; // Batch size for stats (default: 100)
});

For more details, see Dead Letter Handling.

Poison Message Handling

services.AddDispatch(dispatch =>
{
dispatch.AddPoisonMessageHandling(options =>
{
options.MaxRetryAttempts = 5;
options.EnableAlerting = true;
options.AlertThreshold = 10;
});
});

In This Section

See Also

  • Patterns - Outbox, inbox, and dead-letter patterns for reliable messaging
  • Middleware - Transport-aware middleware components
  • Deployment - Deploy transport-backed applications