Skip to main content

Kafka Schema Registry

The Kafka transport includes full Confluent Schema Registry integration via the UseConfluentSchemaRegistry() fluent builder. This enables schema validation, evolution, and interoperability with other Confluent-ecosystem consumers.

Before You Start

  • Kafka transport configured (see Kafka Transport)
  • A running Schema Registry instance (local or Confluent Cloud)

Quick Start

services.AddKafkaTransport("events", kafka =>
{
kafka.BootstrapServers("localhost:9092")
.UseConfluentSchemaRegistry(registry =>
{
registry.SchemaRegistryUrl("http://localhost:8081")
.SubjectNameStrategy(SubjectNameStrategy.TopicName)
.CompatibilityMode(CompatibilityMode.Backward)
.AutoRegisterSchemas(true)
.CacheSchemas(true);
})
.MapTopic<OrderCreated>("orders-topic");
});

Builder API

IConfluentSchemaRegistryBuilder

MethodDefaultDescription
SchemaRegistryUrl()RequiredSchema Registry URL
SchemaRegistryUrls()--Multiple URLs for high availability
BasicAuth()NoneHTTP Basic authentication
ConfigureSsl()NoneSSL/TLS configuration
SubjectNameStrategy()TopicNameSubject naming strategy
CompatibilityMode()BackwardSchema compatibility mode
AutoRegisterSchemas()trueAuto-register schemas on first use
ValidateBeforeRegister()trueValidate schemas locally first
CacheSchemas()trueEnable local schema caching
CacheCapacity()1000Maximum cached schemas
RequestTimeout()30 secondsHTTP request timeout

ISchemaRegistrySslBuilder

registry.ConfigureSsl(ssl =>
{
ssl.EnableCertificateVerification(true)
.CaCertificateLocation("/path/to/ca.crt")
.ClientCertificateLocation("/path/to/client.crt")
.ClientKeyLocation("/path/to/client.key")
.ClientKeyPassword("secret");
});

Subject Naming Strategies

StrategySubject FormatUse Case
TopicName{topic}-valueSingle schema per topic (default Confluent behavior)
RecordName{namespace}.{type}Multiple schemas per topic, type-based lookup
TopicRecordName{topic}-{namespace}.{type}Maximum flexibility, different schemas per topic+type

For custom strategies, implement ISubjectNameStrategy:

registry.SubjectNameStrategy<MyCustomStrategy>();

Compatibility Modes

ModeDirectionDescription
None--No compatibility checking
BackwardConsumerNew consumers can read old producer data
ForwardProducerOld consumers can read new producer data
FullBothConsumers and producers can be upgraded independently
*TransitiveAll versionsChecks against all registered versions

Configuration Examples

High Availability

registry.SchemaRegistryUrls(
"http://registry1.example.com:8081",
"http://registry2.example.com:8081",
"http://registry3.example.com:8081"
);

Production with Authentication

registry.SchemaRegistryUrl("https://registry.example.com:8085")
.BasicAuth("api-key", "api-secret")
.ConfigureSsl(ssl => ssl.EnableCertificateVerification(true)
.CaCertificateLocation("/path/to/ca.crt"))
.AutoRegisterSchemas(false) // Disable in production
.CompatibilityMode(CompatibilityMode.Full);

Multi-Event Topics with RecordName Strategy

services.AddKafkaTransport("events", kafka =>
{
kafka.BootstrapServers("localhost:9092")
.UseConfluentSchemaRegistry(registry =>
{
registry.SchemaRegistryUrl("http://localhost:8081")
.SubjectNameStrategy(SubjectNameStrategy.RecordName)
.CompatibilityMode(CompatibilityMode.Backward);
})
.MapTopic<OrderCreated>("domain-events")
.MapTopic<OrderShipped>("domain-events")
.MapTopic<OrderCancelled>("domain-events");
});

Wire Format

Messages produced with Schema Registry include a 5-byte Confluent header:

BytesContentDescription
0Magic byteAlways 0x00
1-4Schema IDBig-endian int32 from Schema Registry
5+PayloadSerialized message (JSON, Avro, Protobuf)

The transport automatically detects and handles this format:

  • Producer: Prepends schema ID header when Schema Registry is configured
  • Consumer: Detects magic byte and extracts schema ID for deserialization; falls back to raw JSON if no header found

DI Registration

When UseConfluentSchemaRegistry() is configured, these services are registered automatically:

ServiceImplementationLifetime
ConfluentSchemaRegistryOptionsConfigurationSingleton
ConfluentSchemaRegistryClientUnderlying Confluent clientSingleton
ISchemaRegistryClientCachingSchemaRegistryClient decoratorSingleton

Standalone registration (without the transport builder):

services.AddConfluentSchemaRegistry(opts =>
{
opts.Url = "http://localhost:8081";
opts.MaxCachedSchemas = 1000;
opts.Schema.AutoRegisterSchemas = true;
});

Error Handling

ErrorEvent IDBehavior
Schema registration failure22216SchemaRegistryException thrown
Schema retrieval error22214SchemaRegistryException thrown
Compatibility check failure22219SchemaRegistryException thrown
Network timeout22404Retry via Polly (if configured)
Invalid wire format22403Fall back to raw JSON
Type resolution failure22403SchemaRegistryException thrown

Backward Compatibility

Users who don't configure Schema Registry are unaffected:

// No UseConfluentSchemaRegistry() = standard JSON format, no schema headers
services.AddKafkaTransport("events", kafka =>
{
kafka.BootstrapServers("localhost:9092")
.MapTopic<OrderCreated>("orders-topic");
});

See Also