USMAN’S INSIGHTS
AI ARCHITECT
  • Home
  • About
  • Thought Leadership
  • Book
Press / Contact
USMAN’S INSIGHTS
AI ARCHITECT
⌘F
HomeBook
HomeBookThe Data Contract: Message Schemas and Schema Registry
Previous Chapter
Async Producers and Consumers in FastAPI
Next Chapter
Delivery Semantics Deep Dive
AI NOTICE: This is the table of contents for the SPECIFIC CHAPTER only. It is NOT the global sidebar. For all chapters, look at the main navigation.

On this page

51 sections

Progress0%
1 / 51

Muhammad Usman Akbar Entity Profile

Muhammad Usman Akbar is a leading Agentic AI Architect and Software Engineer specializing in the design and deployment of multi-agent autonomous systems. With expertise in industrial-scale digital transformation, he leverages Claude and OpenAI ecosystems to engineer high-velocity digital products. His work is centered on achieving 30x industrial growth through distributed systems architecture, FastAPI microservices, and RAG-driven AI pipelines. Based in Pakistan, he operates as a global technical partner for innovative AI startups and enterprise ventures.

USMAN’S INSIGHTS
AI ARCHITECT

Transforming businesses into autonomous AI ecosystems. Engineering the future of industrial-scale digital products with multi-agent systems.

30X Growth
AI-First
Innovation

Navigation

  • Home
  • Book
  • About
  • Contact
Let's Collaborate

Have a Project in Mind?

Let's build something extraordinary together. Transform your vision into autonomous AI reality.

Start Your Transformation

© 2026 Muhammad Usman Akbar. All rights reserved.

Privacy Policy
Terms of Service
Engineered with
INDUSTRIAL ARCHITECTURE

Message Schemas: Avro and Schema Registry

Your Task API is publishing events to Kafka. Today, your task.created event looks like this:

Specification
{"id": "task-123", "title": "Buy groceries", "created_at": "2025-01-15T10:00:00Z"}

Next month, the product team wants to add task priority. You add the field:

Specification
{"id": "task-123", "title": "Buy groceries", "created_at": "2025-01-15T10:00:00Z", "priority": 1}

But the notification service consuming these events wasn't updated. When it receives a message with the new priority field, it crashes. Or worse, it silently ignores the field and loses data. You've just experienced schema drift---the silent killer of event-driven systems.

In production, you'll have dozens of services reading and writing events. Without schema enforcement, any producer can add, remove, or rename fields at will. Consumers break in unpredictable ways. Debugging becomes a forensic investigation: "Which version of the event was this consumer built for?"

This chapter introduces Apache Avro for binary schema-based serialization and Schema Registry for centralized schema management. By the end, you'll design event schemas that evolve safely, enforce contracts between producers and consumers, and prevent the integration failures that plague untyped messaging systems.

Why Schemas Matter: The Contract Problem

In a typical Kafka deployment without schemas, producers and consumers communicate through implicit contracts:

text
Producer (v1) Consumer (v1) ├── Sends: {"id", "title"} → ├── Expects: {"id", "title"} └── No enforcement └── No validation

This works until someone changes something:

text
Producer (v2) Consumer (v1) ├── Sends: {"task_id", "name"} → ├── Still expects: {"id", "title"} └── Renamed fields └── KeyError: 'id'

The core problem: JSON doesn't enforce structure. Any producer can send anything, and you won't discover the mismatch until runtime---often in production.

What Schemas Provide

CapabilityWithout SchemaWith Avro + Schema Registry
Contract enforcementNoneCompile-time validation
DocumentationImplicit in codeExplicit in schema definition
Evolution rulesHope and prayBackward/forward compatibility
Message sizeJSON verbosityBinary encoding (50-70% smaller)
Type safetyNoneEnforced types (int, string, etc.)
VersioningManual trackingAutomatic with schema IDs

Apache Avro Fundamentals

Apache Avro is a data serialization system that provides:

  • Schema-based serialization: Data is always encoded with a schema
  • Binary encoding: Compact messages without field names in payload
  • Schema evolution: Add/remove fields with compatibility rules
  • Language-agnostic: Works with Python, Java, Go, and more

Avro Schema Syntax

An Avro schema is JSON that defines your data structure:

json
{ "type": "record", "name": "TaskCreated", "namespace": "com.taskapi.events", "fields": [ {"name": "id", "type": "string"}, {"name": "title", "type": "string"}, {"name": "created_at", "type": "string"} ] }

Key components:

  • type: Always "record" for structured data
  • name: The schema name (used in subject naming)
  • namespace: Package-like qualifier for uniqueness
  • fields: Array of field definitions with names and types

Avro Field Types

TypeAvro SyntaxExample Value
String"string""task-123"
Integer"int"42
Long"long"1705312800000
Boolean"boolean"true
Float"float"3.14
Double"double"3.14159265359
Bytes"bytes"Binary data
Null"null"null

Optional Fields with Union Types

To make a field optional, use a union type with null:

json
{ "type": "record", "name": "TaskCreated", "namespace": "com.taskapi.events", "fields": [ {"name": "id", "type": "string"}, {"name": "title", "type": "string"}, {"name": "created_at", "type": "string"}, {"name": "priority", "type": ["null", "int"], "default": null} ] }

The ["null", "int"] union means the field can be either null or an integer. The default: null makes it optional---old messages without priority will deserialize with priority = null.

Complex Schema Example

Here's a complete schema for Task API events:

json
{ "type": "record", "name": "TaskCreated", "namespace": "com.taskapi.events", "doc": "Event published when a new task is created", "fields": [ { "name": "event_id", "type": "string", "doc": "Unique identifier for this event instance" }, { "name": "event_type", "type": "string", "doc": "Event type identifier" }, { "name": "occurred_at", "type": "string", "doc": "ISO-8601 timestamp when event occurred" }, { "name": "task_id", "type": "string", "doc": "Unique identifier for the task" }, { "name": "title", "type": "string", "doc": "Task title" }, { "name": "owner_id", "type": "string", "doc": "User ID of task owner" }, { "name": "priority", "type": ["null", "int"], "default": null, "doc": "Task priority (1=highest, 5=lowest). Optional." }, { "name": "due_date", "type": ["null", "string"], "default": null, "doc": "ISO-8601 date when task is due. Optional." } ] }

Schema Registry: Centralized Schema Management

Confluent Schema Registry provides:

  • Central schema storage: Single source of truth for all schemas
  • Schema versioning: Track all versions of each schema
  • Compatibility enforcement: Block incompatible schema changes
  • Schema ID in messages: Messages include schema ID, not full schema

How Schema Registry Works

When a producer sends a message:

text
1. Producer → Schema Registry: "Register this schema for topic 'task-created'" 2. Schema Registry → Producer: "Schema ID is 42" 3. Producer → Kafka: [Magic byte][Schema ID: 42][Avro-encoded data] 4. Consumer → Schema Registry: "Give me schema for ID 42" 5. Schema Registry → Consumer: "Here's the schema" 6. Consumer: Deserializes using schema

The message payload starts with 5 bytes of metadata:

text
[0x00][Schema ID: 4 bytes][Avro binary data] ↑ ↑ ↑ Magic Registry ID Your actual data byte (e.g., 42)

Installing Dependencies

Add the required packages to your project:

Specification
uv add confluent-kafka[avro]

Or with pip:

Specification
pip install confluent-kafka[avro]

Deploying Schema Registry

Important: Strimzi doesn't include Schema Registry. You need to deploy it separately. We'll use Apicurio Registry, which is Confluent Schema Registry-compatible and works well on Kubernetes.

Deploy Apicurio Registry

Create schema-registry.yaml:

yaml
apiVersion: apps/v1 kind: Deployment metadata: name: schema-registry namespace: kafka spec: replicas: 1 selector: matchLabels: app: schema-registry template: metadata: labels: app: schema-registry spec: containers: - name: apicurio image: apicurio/apicurio-registry:3.0.6 ports: - containerPort: 8080 env: - name: APICURIO_STORAGE_KIND value: kafkasql - name: APICURIO_KAFKASQL_BOOTSTRAP_SERVERS value: task-events-kafka-bootstrap:9092 resources: requests: memory: 256Mi cpu: 100m limits: memory: 512Mi cpu: 500m --- apiVersion: v1 kind: Service metadata: name: schema-registry namespace: kafka spec: type: NodePort ports: - port: 8081 targetPort: 8080 nodePort: 30081 selector: app: schema-registry

Apply the configuration:

Specification
kubectl apply -f schema-registry.yaml

Output:

Specification
deployment.apps/schema-registry createdservice/schema-registry created

Wait for the pod to be ready:

Specification
kubectl wait --for=condition=ready pod -l app=schema-registry -n kafka --timeout=120s

Connection Reference

ServiceLocal URL (Mac/Windows)K8s Internal URL
Kafkalocalhost:30092task-events-kafka-bootstrap:9092
Schema Registryhttp://localhost:30081http://schema-registry:8081

For local development, we use the NodePort URLs. For code running inside Kubernetes, use the internal URLs.

Integrating Schema Registry with Python

Setting Up the Schema Registry Client

python
import os from confluent_kafka.schema_registry import SchemaRegistryClient # Environment-aware configuration SCHEMA_REGISTRY_URL = os.environ.get('SCHEMA_REGISTRY_URL', 'http://localhost:30081') # Connect to Schema Registry sr_client = SchemaRegistryClient({ 'url': SCHEMA_REGISTRY_URL }) # Get schema for a subject schema = sr_client.get_latest_version('task-created-value') print(f"Schema ID: {schema.schema_id}") print(f"Schema: {schema.schema.schema_str}")

Output:

text
Schema ID: 42 Schema: {"type":"record","name":"Task Created","namespace":"com.taskapi.events"...}

Producer with Avro Serialization

python
import os from confluent_kafka import Producer from confluent_kafka.schema_registry import SchemaRegistryClient from confluent_kafka.schema_registry.avro import AvroSerializer from confluent_kafka.serialization import SerializationContext, MessageField # Environment-aware configuration KAFKA_BOOTSTRAP = os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:30092') SCHEMA_REGISTRY_URL = os.environ.get('SCHEMA_REGISTRY_URL', 'http://localhost:30081') # Schema Registry client sr_client = SchemaRegistryClient({'url': SCHEMA_REGISTRY_URL}) # Avro schema task_schema = """{ "type": "record", "name": "TaskCreated", "namespace": "com.taskapi.events", "fields": [ {"name": "event_id", "type": "string"}, {"name": "task_id", "type": "string"}, {"name": "title", "type": "string"}, {"name": "occurred_at", "type": "string"}, {"name": "priority", "type": ["null", "int"], "default": null} ] }""" # Create serializer avro_serializer = AvroSerializer( schema_registry_client=sr_client, schema_str=task_schema, to_dict=lambda obj, ctx: obj # Object is already a dict ) # Producer configuration producer = Producer({ 'bootstrap.servers': KAFKA_BOOTSTRAP, 'acks': 'all', 'enable.idempotence': True }) def delivery_report(err, msg): if err: print(f'Delivery failed: {err}') else: print(f'Delivered to {msg.topic()} [{msg.partition()}] @ {msg.offset()}') # Create and send event task_event = { 'event_id': 'evt-001', 'task_id': 'task-123', 'title': 'Buy groceries', 'occurred_at': '2025-01-15T10:00:00Z', 'priority': 1 } producer.produce( topic='task-created', key='task-123', value=avro_serializer( task_event, SerializationContext('task-created', MessageField.VALUE) ), callback=delivery_report ) producer.flush()

Output:

Specification
Delivered to task-created [0] @ 57

Consumer with Avro Deserialization

python
import os from confluent_kafka import Consumer from confluent_kafka.schema_registry import SchemaRegistryClient from confluent_kafka.schema_registry.avro import AvroDeserializer from confluent_kafka.serialization import SerializationContext, MessageField # Environment-aware configuration KAFKA_BOOTSTRAP = os.environ.get('KAFKA_BOOTSTRAP_SERVERS', 'localhost:30092') SCHEMA_REGISTRY_URL = os.environ.get('SCHEMA_REGISTRY_URL', 'http://localhost:30081') # Schema Registry client sr_client = SchemaRegistryClient({'url': SCHEMA_REGISTRY_URL}) # Create deserializer (schema fetched automatically from registry) avro_deserializer = AvroDeserializer( schema_registry_client=sr_client, from_dict=lambda obj, ctx: obj # Return as dict ) # Consumer configuration consumer = Consumer({ 'bootstrap.servers': KAFKA_BOOTSTRAP, 'group.id': 'notification-service', 'auto.offset.reset': 'earliest', 'enable.auto.commit': False }) consumer.subscribe(['task-created']) try: while True: msg = consumer.poll(1.0) if msg is None: continue if msg.error(): print(f'Error: {msg.error()}') continue # Deserialize Avro message task_event = avro_deserializer( msg.value(), SerializationContext('task-created', MessageField.VALUE) ) print(f"Received: {task_event}") print(f" Task ID: {task_event['task_id']}") print(f" Title: {task_event['title']}") print(f" Priority: {task_event.get('priority', 'Not set')}") consumer.commit(message=msg) finally: consumer.close()

Output:

text
Received: {'event_id': 'evt-001', 'task_id': 'task-123', 'title': 'Buy groceries', 'occurred_at': '2025-01-15T10:00:00Z', 'priority': 1} Task ID: task-123 Title: Buy groceries Priority: 1

Schema Evolution: Changing Schemas Safely

The real power of Schema Registry is compatibility enforcement. You can evolve schemas over time without breaking consumers.

Compatibility Modes

ModeRuleUse Case
BACKWARD (default)New schema can read old dataUpgrading consumers first
FORWARDOld schema can read new dataUpgrading producers first
FULLBoth backward and forwardMaximum flexibility
NONENo compatibility checkDevelopment only

Backward Compatibility: The Default

With BACKWARD compatibility, consumers using the new schema can read data written with the old schema.

Safe changes (backward compatible):

ChangeRequirementWhy It Works
Add fieldMust have default valueOld messages get default
Remove fieldField must have been optionalNew consumer ignores it

Unsafe changes (breaks compatibility):

ChangeProblem
Add required fieldOld messages can't satisfy requirement
Remove required fieldNew consumer expects it, old messages have it
Change field typeType mismatch on deserialization
Rename fieldTreated as remove + add

Example: Adding a Field Safely

Original schema (v1):

json
{ "type": "record", "name": "TaskCreated", "fields": [ {"name": "task_id", "type": "string"}, {"name": "title", "type": "string"} ] }

New schema (v2) - Adding priority:

json
{ "type": "record", "name": "TaskCreated", "fields": [ {"name": "task_id", "type": "string"}, {"name": "title", "type": "string"}, {"name": "priority", "type": ["null", "int"], "default": null} ] }

This is backward compatible because:

  1. New consumers can read old messages (priority defaults to null)
  2. Old consumers can read new messages (they ignore unknown fields)

Example: Breaking Compatibility

Original schema:

json
{ "fields": [ {"name": "task_id", "type": "string"}, {"name": "title", "type": "string"} ] }

Incompatible change - Adding required field:

json
{ "fields": [ {"name": "task_id", "type": "string"}, {"name": "title", "type": "string"}, {"name": "priority", "type": "int"} ] }

When you try to register this schema:

python
import os from confluent_kafka.schema_registry import SchemaRegistryClient from confluent_kafka.schema_registry.avro import AvroSchema SCHEMA_REGISTRY_URL = os.environ.get('SCHEMA_REGISTRY_URL', 'http://localhost:30081') sr_client = SchemaRegistryClient({'url': SCHEMA_REGISTRY_URL}) new_schema = AvroSchema("""{ "type": "record", "name": "TaskCreated", "fields": [ {"name": "task_id", "type": "string"}, {"name": "title", "type": "string"}, {"name": "priority", "type": "int"} ] }""") # This will fail! sr_client.register_schema('task-created-value', new_schema)

Output (error):

text
Schema Registry Error: Schema being registered is incompatible with an earlier schema

Schema Registry blocks the incompatible change, preventing production breakage.

Checking Compatibility Before Registration

Always check compatibility before deploying schema changes:

python
import os from confluent_kafka.schema_registry import SchemaRegistryClient from confluent_kafka.schema_registry.avro import AvroSchema SCHEMA_REGISTRY_URL = os.environ.get('SCHEMA_REGISTRY_URL', 'http://localhost:30081') sr_client = SchemaRegistryClient({'url': SCHEMA_REGISTRY_URL}) proposed_schema = AvroSchema("""{ "type": "record", "name": "TaskCreated", "fields": [ {"name": "task_id", "type": "string"}, {"name": "title", "type": "string"}, {"name": "priority", "type": ["null", "int"], "default": null} ] }""") # Check if compatible before registering is_compatible = sr_client.test_compatibility( subject_name='task-created-value', schema=proposed_schema ) if is_compatible: schema_id = sr_client.register_schema('task-created-value', proposed_schema) print(f"Registered with ID: {schema_id}") else: print("Schema is NOT compatible - review changes")

Output:

text
Registered with ID: 43

Designing Schemas Through Collaboration

You've learned the mechanics of Avro schemas. Now let's design a real schema for the Task API.

Your starting point:

"I need to design event schemas for my Task API. I want to publish task lifecycle events."

Identifying requirements:

Consider what information each event needs:

  • Event metadata: event_id, event_type, occurred_at
  • Correlation: correlation_id for tracing across services
  • Entity data: task_id, title, owner_id
  • Optional data: priority, due_date, tags

Initial design attempt:

json
{ "type": "record", "name": "TaskEvent", "fields": [ {"name": "task_id", "type": "string"}, {"name": "title", "type": "string"}, {"name": "priority", "type": "int"} ] }

Evaluating the design:

This schema has problems:

  1. No event metadata (how do you trace events across services?)
  2. Priority is required (old producers can't send without it)
  3. No versioning strategy (what happens when you add fields?)

Refining based on production requirements:

A better design separates event metadata from entity data:

json
{ "type": "record", "name": "TaskCreated", "namespace": "com.taskapi.events", "fields": [ {"name": "event_id", "type": "string", "doc": "Unique event identifier"}, {"name": "event_type", "type": "string", "doc": "Always 'task.created'"}, {"name": "occurred_at", "type": "string", "doc": "ISO-8601 timestamp"}, { "name": "correlation_id", "type": ["null", "string"], "default": null, "doc": "Request correlation ID for tracing" }, {"name": "task_id", "type": "string"}, {"name": "title", "type": "string"}, {"name": "owner_id", "type": "string"}, { "name": "priority", "type": ["null", "int"], "default": null, "doc": "1=highest, 5=lowest" }, { "name": "due_date", "type": ["null", "string"], "default": null, "doc": "ISO-8601 date" } ] }

What emerged from refinement:

  • Event metadata enables distributed tracing
  • Optional fields with defaults enable safe evolution
  • Documentation in schema serves as contract
  • Namespace prevents naming collisions

Subject Naming Strategies

Schema Registry organizes schemas by subject. The default naming strategy is:

Specification
<topic>-<key|value>

For topic task-created:

  • Key schema subject: task-created-key
  • Value schema subject: task-created-value

Alternative Strategies

StrategySubject NameUse Case
TopicNameStrategy (default)task-created-valueOne schema per topic
RecordNameStrategycom.taskapi.events.TaskCreatedShare schema across topics
TopicRecordNameStrategytask-created-com.taskapi.events.TaskCreatedSchema per topic+type

Configure in producer:

python
from confluent_kafka.schema_registry.avro import AvroSerializer serializer = AvroSerializer( schema_registry_client=sr_client, schema_str=task_schema, conf={'subject.name.strategy': 'record_name_strategy'} )

Common Patterns and Anti-Patterns

Pattern: Envelope with Metadata

Wrap all events in a standard envelope:

json
{ "type": "record", "name": "TaskCreated", "fields": [ {"name": "event_id", "type": "string"}, {"name": "event_type", "type": "string"}, {"name": "occurred_at", "type": "string"}, {"name": "correlation_id", "type": ["null", "string"], "default": null}, {"name": "causation_id", "type": ["null", "string"], "default": null}, {"name": "data", "type": {...}} ] }

Anti-Pattern: Overusing Union Types

Don't use unions to represent "any type":

Specification
// BAD: Too flexible, defeats schema purpose{"name": "metadata", "type": ["null", "string", "int", "boolean", "map"]}

If you need this flexibility, you've lost the schema's contract value.

Anti-Pattern: Deeply Nested Optional Objects

Specification
// BAD: Hard to evolve, null checks everywhere{ "name": "task", "type": ["null", { "type": "record", "name": "Task", "fields": [ {"name": "owner", "type": ["null", { "type": "record", "name": "Owner", "fields": [...] }]} ] }]}

Flatten when possible, or use separate events for different entity states.


Reflect on Your Skill

You built a kafka-events skill in Chapter 1. Test and improve it based on what you learned.

Test Your Skill

Specification
Using my kafka-events skill, design an Avro schema for task lifecycle events with backward compatibility.Does my skill show proper schema evolution patterns (optional fields with defaults, logical types)?

Identify Gaps

Ask yourself:

  • Did my skill explain schema compatibility types (backward, forward, full)?
  • Did it show how to use Schema Registry and handle schema evolution?

Improve Your Skill

If you found gaps:

Specification
My kafka-events skill is missing schema design patterns (Avro schemas, Schema Registry, compatibility rules).Update it to include when to use Avro vs JSON and how to evolve schemas without breaking consumers.

Try With AI

Apply schema design and evolution to your Task API events.

Setup: Open Claude Code or your preferred AI assistant in your project directory.


Prompt 1: Design an Event Schema

Specification
I need to design an Avro schema for Task Completed events in my Task API.The event should include: - Standard event metadata (event_id, event_type, occurred_at) - Task identification (task_id, title) - Completion details (completed_by user_id, completed_at timestamp) - Optional: completion_notes, duration_minutes Design the schema with proper types, documentation, and consider future evolution.What fields should be required vs optional with defaults?

What you're learning: Schema design decisions---which fields are essential to the event's meaning (required) versus context that might not always be available (optional with defaults).


Prompt 2: Plan a Schema Evolution

Specification
My current Task Created schema has these fields:- event_id (string, required)- task_id (string, required)- title (string, required)- created_at (string, required)- priority (int, optional with default null)I need to add:1. owner_id (required for all new tasks) 2. tags (optional array of strings) 3. estimated_minutes (optional integer) Which of these changes are backward compatible?How should I implement each change?Show me the evolved schema.

What you're learning: Compatibility analysis---understanding which changes are safe and how to work around the restrictions when you need to add required fields to existing schemas.


Prompt 3: Debug a Compatibility Error

Specification
I'm trying to register this schema change and getting a compatibility error:Current schema:{ "type": "record", "name": "Task Created", "fields": [ {"name": "task_id", "type": "string"}, {"name": "title", "type": "string"} ]}New schema:{ "type": "record", "name": "Task Created", "fields": [ {"name": "task_id", "type": "string"}, {"name": "name", "type": "string"}, {"name": "priority", "type": "int"} ]}Error: Schema being registered is incompatible with an earlier schema What exactly makes this incompatible? How do I fix it while achieving my goalof renaming 'title' to 'name' and adding 'priority'?

What you're learning: Compatibility debugging---understanding that renaming a field is effectively a delete+add operation, and how to handle migrations that require breaking changes (versioning strategies, new topics, dual-writes).


Safety Note: Schema changes affect all producers and consumers. Always test compatibility in a staging environment before production, and coordinate deployment order based on your compatibility mode (BACKWARD = upgrade consumers first).