Skip to main content

Google Pub/Sub Transport

Google Cloud Pub/Sub transport for scalable, GCP-native messaging with global availability.

Before You Start

Installation

dotnet add package Excalibur.Dispatch.Transport.GooglePubSub

Quick Start

services.AddDispatch(dispatch =>
{
dispatch.AddHandlersFromAssembly(typeof(Program).Assembly);
dispatch.UseGooglePubSub(pubsub =>
{
pubsub.ProjectId("my-gcp-project")
.TopicId("dispatch-events")
.SubscriptionId("dispatch-events-sub");
});
});

Standalone Registration

services.AddDispatch(dispatch =>
{
dispatch.AddHandlersFromAssembly(typeof(Program).Assembly);
});

services.AddGooglePubSubTransport("dispatch-events", pubsub =>
{
pubsub.ProjectId("my-gcp-project")
.TopicId("dispatch-events")
.SubscriptionId("dispatch-events-sub")
.MapTopic<OrderCreated>("orders-topic");
});

Google Pub/Sub registers a keyed IMessageBus named GooglePubSub or GooglePubSub:{TopicId} when TopicId is set:

var bus = serviceProvider.GetRequiredKeyedService<IMessageBus>("GooglePubSub:dispatch-events");

Configuration

Pub/Sub Options

Configure core transport settings with GooglePubSubOptions:

services.Configure<GooglePubSubOptions>(options =>
{
options.ProjectId = "my-gcp-project";
options.TopicId = "dispatch-events";
options.SubscriptionId = "dispatch-events-sub";

options.MaxPullMessages = 100;
options.AckDeadlineSeconds = 60;
options.EnableAutoAckExtension = true;
options.MaxConcurrentAcks = 10;
options.MaxConcurrentMessages = 0; // Uses Environment.ProcessorCount * 2
});

CloudEvents Configuration

Via Transport Builder

Configure CloudEvents settings directly on the transport builder:

services.AddGooglePubSubTransport("events", pubsub =>
{
pubsub.ProjectId("my-gcp-project")
.TopicId("dispatch-events")
.SubscriptionId("dispatch-events-sub")
.ConfigureCloudEvents(ce =>
{
ce.UseOrderingKeys = true;
ce.UseExactlyOnceDelivery = true;
ce.EnableDeduplication = true;
ce.EnableCompression = true;
ce.CompressionThreshold = 1024 * 1024; // 1MB
});
});

Standalone CloudEvents Registration

Use UseCloudEventsForPubSub for standalone CloudEvents configuration:

services.UseCloudEventsForPubSub(options =>
{
options.UseOrderingKeys = true;
options.UseExactlyOnceDelivery = true;
options.DefaultTopic = "dispatch-events";
options.DefaultSubscription = "dispatch-events-sub";
});

When UseOrderingKeys is enabled, CloudEvents use the partition key as the Pub/Sub ordering key to preserve ordering for related messages.

Message Compression

Configure compression for large messages using PubSubCompressionOptions:

services.AddGooglePubSubTransport("events", pubsub =>
{
pubsub.ProjectId("my-gcp-project")
.TopicId("dispatch-events")
.ConfigureOptions(options =>
{
// Enable compression
options.Compression.Enabled = true;

// Choose algorithm: Gzip (best ratio) or Snappy (fastest)
options.Compression.Algorithm = CompressionAlgorithm.Snappy;

// Only compress messages larger than threshold
options.Compression.ThresholdBytes = 1024; // 1 KB

// Auto-detect compressed messages on receive
options.Compression.EnableAutoDetection = true;

// Control whether to compress already-compressed content types
options.Compression.CompressAlreadyCompressedContent = false;

// Add custom content types to the compressed list
options.Compression.CompressedContentTypes.Add("application/x-custom-compressed");
});
});

Compression Algorithm Comparison

AlgorithmSpeedRatioUse Case
GzipSlowerBetterLarge payloads, bandwidth-constrained
SnappyFasterGoodHigh throughput, latency-sensitive
BrotliSlowestBestPre-compressed static content
DeflateModerateGoodBalance of speed and ratio

Compression Requirements

  • Snappy requires the Snappier NuGet package (v1.2.0+)
  • Auto-detection uses magic bytes to identify Gzip/Deflate streams (Snappy/Brotli cannot be auto-detected)
  • Messages below ThresholdBytes are sent uncompressed
  • Already-compressed content types (images, videos, archives) are skipped by default
# For Snappy compression
dotnet add package Snappier

Dead Letter Topics

Configure dead letter handling via the transport options:

services.AddGooglePubSubTransport("events", pubsub =>
{
pubsub.ProjectId("my-gcp-project")
.TopicId("dispatch-events")
.EnableDeadLetter("dispatch-events-dlq");
});

Health Checks

When using transport adapters, register aggregate health checks:

services.AddHealthChecks()
.AddTransportHealthChecks();

Observability

services.AddOpenTelemetry()
.WithTracing(tracing =>
{
tracing.AddSource("Excalibur.Dispatch.Observability");
tracing.AddGoogleCloudTraceExporter();
})
.WithMetrics(metrics =>
{
metrics.AddDispatchMetrics();
});

Configure telemetry options via GooglePubSubOptions:

services.Configure<GooglePubSubOptions>(options =>
{
options.EnableOpenTelemetry = true;
options.ExportToCloudMonitoring = true;
options.TracingSamplingRatio = 0.1; // 10% sampling
options.EnableTracePropagation = true;
});

Production Checklist

  • Use Workload Identity or managed credentials
  • Configure UseExactlyOnceDelivery for critical streams
  • Enable ordering keys for strict ordering requirements
  • Set ack deadlines and auto-extend for long handlers
  • Configure dead letter topics for failed messages
  • Enable OpenTelemetry and Cloud Monitoring

Next Steps

See Also