USMAN’S INSIGHTS
AI ARCHITECT
  • Home
  • About
  • Thought Leadership
  • Book
Press / Contact
USMAN’S INSIGHTS
AI ARCHITECT
⌘F
HomeBook
HomeBookSwap Kafka for RabbitMQ With Absolutely Zero Code Changes
Previous Chapter
Service Invocation
Next Chapter
Bindings and Triggers
AI NOTICE: This is the table of contents for the SPECIFIC CHAPTER only. It is NOT the global sidebar. For all chapters, look at the main navigation.

On this page

20 sections

Progress0%
1 / 20

Muhammad Usman Akbar Entity Profile

Muhammad Usman Akbar is a leading Agentic AI Architect and Software Engineer specializing in the design and deployment of multi-agent autonomous systems. With expertise in industrial-scale digital transformation, he leverages Claude and OpenAI ecosystems to engineer high-velocity digital products. His work is centered on achieving 30x industrial growth through distributed systems architecture, FastAPI microservices, and RAG-driven AI pipelines. Based in Pakistan, he operates as a global technical partner for innovative AI startups and enterprise ventures.

USMAN’S INSIGHTS
AI ARCHITECT

Transforming businesses into autonomous AI ecosystems. Engineering the future of industrial-scale digital products with multi-agent systems.

30X Growth
AI-First
Innovation

Navigation

  • Home
  • Book
  • About
  • Contact
Let's Collaborate

Have a Project in Mind?

Let's build something extraordinary together. Transform your vision into autonomous AI reality.

Start Your Transformation

© 2026 Muhammad Usman Akbar. All rights reserved.

Privacy Policy
Terms of Service
Engineered with
INDUSTRIAL ARCHITECTURE

Pub/Sub Messaging

Module 7 takes the agent you built in Module 6 and turns it into a production cloud service. You'll containerize the stack, orchestrate it on Kubernetes, automate delivery, and operate it with observability, security, and cost controls. The goal: a reliable Digital FTE that runs 24/7 for real users.

Prerequisites: Modules 4-6. You need a working agent service to deploy.

In earlier chapters, you built Kafka producers and consumers directly. You learned about topics, partitions, consumer groups, and offset management. That knowledge is valuable—but it's also tightly coupled to Kafka. If your team decides to use RabbitMQ for one service or Azure Service Bus for cloud deployment, you'd rewrite your messaging code.

Dapr's pub/sub building block gives you event-driven messaging through a single API. Your application publishes to /v1.0/publish/{pubsub}/{topic} and subscribes via HTTP callbacks. The actual broker—Redis, Kafka, RabbitMQ, AWS SNS/SQS—is defined in a YAML component file. Change the YAML, keep your code.

This is the same pattern you learned with state management: infrastructure abstraction through configuration, not code changes.


The Pub/Sub API

Dapr's pub/sub exposes two operations:

OperationAPIYour Code
PublishPOST /v1.0/publish/{pubsub}/{topic}Call Dapr with event data
SubscribeDapr calls YOUR endpointHandle incoming events

The key insight: publishing is an outbound call to Dapr, but subscribing is Dapr making inbound calls to your application. You register subscription handlers, and Dapr routes events to them.


CloudEvents: Automatic Message Wrapping

When you publish through Dapr, your message gets wrapped in CloudEvents format automatically. CloudEvents is a specification for describing event data in a common way.

What you send:

json
{ "event_type": "todo.created", "todo_id": "todo-1", "title": "Learn Dapr" }

What Dapr delivers to subscribers:

json
{ "specversion": "1.0", "type": "com.dapr.event.sent", "source": "task-api", "id": "abc-123-def", "datacontenttype": "application/json", "data": { "event_type": "todo.created", "todo_id": "todo-1", "title": "Learn Dapr" } }

Why CloudEvents matters:

  • Interoperability: Any system that understands CloudEvents can process your events
  • Traceability: Built-in id, source, and time fields for debugging
  • Portability: Switch brokers without worrying about message format differences

You don't need to construct CloudEvents yourself—Dapr handles the wrapping. Your subscriber receives the data field with your original payload.


Publishing Events with DaprClient

Here's the async pattern for publishing events from your Todo API:

python
from dapr.clients import DaprClient import json async def publish_todo_created(todo_id: str, title: str): """Publish a todo.created event to the pubsub component.""" async with DaprClient() as client: await client.publish_event( pubsub_name='pubsub', topic_name='todo-events', data=json.dumps({ 'event_type': 'todo.created', 'todo_id': todo_id, 'title': title }), data_content_type='application/json' )

Output (Dapr sidecar logs):

text
INFO[0042] Publishing message to topic todo-events on pubsub pubsub

Key parameters:

ParameterPurpose
pubsub_nameName of the pub/sub component (matches metadata.name in YAML)
topic_nameTopic to publish to (created automatically if it doesn't exist)
dataYour event payload as JSON string
data_content_typeMIME type for proper deserialization

The Redis Pub/Sub Component

Before publishing works, you need a pub/sub component configured. Here's Redis:

yaml
# components/pubsub.yaml apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: pubsub namespace: default spec: type: pubsub.redis version: v1 metadata: - name: redisHost value: redis-master.default.svc.cluster.local:6379

Apply it:

bash
kubectl apply -f components/pubsub.yaml

Output:

text
component.dapr.io/pubsub created

That's it. Your publish_event() calls now route through Redis. No connection strings in code, no Redis client imports.


Subscribing to Events: Two Approaches

Dapr supports two subscription patterns:

1. Declarative Subscriptions (Kubernetes CRD)

Define subscriptions as Kubernetes resources:

yaml
# subscriptions/todo-subscription.yaml apiVersion: dapr.io/v2alpha1 kind: Subscription metadata: name: todo-subscription namespace: default spec: pubsubname: pubsub topic: todo-events routes: default: /events/todo

Apply it:

bash
kubectl apply -f subscriptions/todo-subscription.yaml

Then implement the handler in your FastAPI app:

python
from fastapi import FastAPI app = FastAPI() @app.post("/events/todo") async def handle_todo_event(event_data: dict): """Handle todo events from Dapr pub/sub.""" print(f"Received event: {event_data}") # Process the event return {"status": "SUCCESS"}

2. Programmatic Subscriptions (dapr-ext-fastapi)

The dapr-ext-fastapi extension registers subscriptions directly in code—no CRD needed:

python
from fastapi import FastAPI from dapr.ext.fastapi import DaprApp app = FastAPI() dapr_app = DaprApp(app) @dapr_app.subscribe(pubsub='pubsub', topic='todo-events') async def handle_todo_event(event_data: dict): """Dapr routes todo-events to this handler automatically.""" print(f"Received: {event_data}") return {"status": "SUCCESS"}

Output (when event arrives):

text
Received: {'event_type': 'todo.created', 'todo_id': 'todo-1', 'title': 'Learn Dapr'}

Which approach to use?

ApproachBest For
Declarative (CRD)GitOps workflows, separation of concerns, ops-managed subscriptions
ProgrammaticDeveloper-controlled subscriptions, rapid iteration, simpler deployments

For this book's learning context, programmatic subscriptions are clearer—the subscription lives with the code that handles it.


Complete Example: Todo Event Publisher and Subscriber

Here's a complete FastAPI service that publishes and subscribes to todo events:

python
from contextlib import asynccontextmanager from fastapi import FastAPI from dapr.clients import DaprClient from dapr.ext.fastapi import DaprApp from pydantic import BaseModel import json import uuid class TodoCreate(BaseModel): title: str class TodoEvent(BaseModel): event_type: str todo_id: str title: str @asynccontextmanager async def lifespan(app: FastAPI): """Dapr sidecar readiness happens automatically.""" yield app = FastAPI(lifespan=lifespan) dapr_app = DaprApp(app) @app.post("/todos") async def create_todo(todo: TodoCreate): """Create a todo and publish a todo.created event.""" todo_id = str(uuid.uuid4()) # Publish event via Dapr pub/sub async with DaprClient() as client: await client.publish_event( pubsub_name='pubsub', topic_name='todo-events', data=json.dumps({ 'event_type': 'todo.created', 'todo_id': todo_id, 'title': todo.title }), data_content_type='application/json' ) return {"id": todo_id, "title": todo.title, "status": "created"} @dapr_app.subscribe(pubsub='pubsub', topic='todo-events') async def handle_todo_event(event_data: dict): """Process todo events (could trigger notifications, analytics, etc.).""" event_type = event_data.get('event_type', 'unknown') todo_id = event_data.get('todo_id', 'unknown') print(f"Processing {event_type} for todo {todo_id}") # Your event handling logic here return {"status": "SUCCESS"}

Testing the flow:

bash
# Create a todo (triggers event) curl -X POST http://localhost:8000/todos \ -H "Content-Type: application/json" \ -d '{"title": "Learn Dapr pub/sub"}'

Output:

text
{"id": "abc-123", "title": "Learn Dapr pub/sub", "status": "created"}

Logs show the subscription handler received the event:

text
Processing todo.created for todo abc-123

Swapping Brokers: Redis to Kafka

Here's the power of Dapr's abstraction. You learned Kafka in earlier chapters. To use Kafka instead of Redis for pub/sub, change only the component YAML:

yaml
# components/kafka-pubsub.yaml apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: pubsub namespace: default spec: type: pubsub.kafka version: v1 metadata: - name: brokers value: task-events-kafka-bootstrap.kafka.svc.cluster.local:9092 - name: consumerGroup value: todo-service - name: authType value: none

Apply the new component:

bash
kubectl apply -f components/kafka-pubsub.yaml

Your application code doesn't change. The same publish_event() and @dapr_app.subscribe() calls now route through Kafka instead of Redis.

When to use which broker:

BrokerUse Case
RedisDevelopment, simple pub/sub, low-latency local messaging
KafkaProduction event streaming, durability, replay capability, high throughput
RabbitMQComplex routing, message queuing patterns
Cloud (SNS/SQS, Pub/Sub)Managed infrastructure, cloud-native deployments

The choice is now a deployment decision, not a code decision.


Subscription Response Patterns

Your subscription handler must return a status that tells Dapr how to handle the message:

python
@dapr_app.subscribe(pubsub='pubsub', topic='todo-events') async def handle_todo_event(event_data: dict): try: # Process event process_event(event_data) return {"status": "SUCCESS"} # Acknowledge, remove from queue except TransientError: return {"status": "RETRY"} # Redelivery requested except PermanentError: return {"status": "DROP"} # Discard, don't retry
StatusDapr Behavior
SUCCESSMessage acknowledged, removed from broker
RETRYMessage redelivered after backoff
DROPMessage discarded without retry

For critical events, prefer RETRY over DROP—let the broker's dead-letter handling manage truly unprocessable messages.


Reflect on Your Skill

You built a dapr-deployment skill in earlier lessons. Test and improve it based on what you learned.

Test Your Skill

text
Using my dapr-deployment skill, add pub/sub messaging to my Todo API: 1. Create a Redis pub/sub component 2. Publish todo.created events when todos are created 3. Implement a subscription handler using dapr-ext-fastapi Does my skill show both publish_event and @dapr_app.subscribe patterns?

Identify Gaps

Ask yourself:

  • Did my skill explain the CloudEvents wrapping that Dapr applies automatically?
  • Did it show how to swap from Redis to Kafka by changing only the component YAML?
  • Did it include the subscription response patterns (SUCCESS, RETRY, DROP)?

Improve Your Skill

If you found gaps:

text
My dapr-deployment skill is missing pub/sub patterns. Update it to include: - DaprClient.publish_event() async pattern - dapr-ext-fastapi @dapr_app.subscribe() decorator - Redis and Kafka component YAML examples - Subscription response status meanings

Try With AI

Setup: You have a Todo API using direct Redis pub/sub and want to migrate to Dapr's abstraction.

Prompt 1: Add pub/sub to your Todo API

text
Add pub/sub to my Todo API: publish todo.created events using async DaprClient and create a subscription handler using dapr-ext-fastapi. My current code creates todos but doesn't publish events. Show me: 1. The publish_event call in my create endpoint 2. A subscription handler that logs received events 3. The Redis pub/sub component YAML

What you're learning: The pub/sub integration pattern. You're seeing how Dapr's publish and subscribe APIs fit into existing FastAPI code without requiring broker-specific clients. The abstraction keeps your business logic clean.


Prompt 2: Swap brokers without code changes

text
Show me how to swap from Redis pub/sub to Kafka pub/sub without changing my application code. I want to see: 1. My current Redis component 2. The Kafka component that replaces it 3. Confirmation that my publish_event and subscribe code stays identical

What you're learning: Infrastructure portability in practice. The component YAML is the only thing that changes—your application remains broker-agnostic. This is why Dapr matters for production systems that may need to evolve their infrastructure.


Prompt 3: Understand CloudEvents format

text
What's CloudEvents format? How does Dapr handle it automatically? Show me: 1. What my raw event data looks like 2. What Dapr wraps it into 3. What my subscriber actually receives

What you're learning: CloudEvents as the interoperability standard for event-driven systems. Dapr handles the envelope automatically, so you don't construct CloudEvents manually—but understanding the format helps when debugging or integrating with external systems that expect CloudEvents.

Safety note: When testing pub/sub in production environments, use separate topics for testing. Publishing to production topics during development can trigger real workflows—notifications sent, orders processed, analytics skewed.