USMAN’S INSIGHTS
AI ARCHITECT
  • Home
  • About
  • Thought Leadership
  • Book
Press / Contact
USMAN’S INSIGHTS
AI ARCHITECT
⌘F
HomeBook
HomeBookThe Collaboration Blueprint: Agent Event Patterns
Previous Chapter
Change Data Capture with Debezium
Next Chapter
Saga Pattern for Multi-Step Workflows
AI NOTICE: This is the table of contents for the SPECIFIC CHAPTER only. It is NOT the global sidebar. For all chapters, look at the main navigation.

On this page

18 sections

Progress0%
1 / 18

Muhammad Usman Akbar Entity Profile

Muhammad Usman Akbar is a leading Agentic AI Architect and Software Engineer specializing in the design and deployment of multi-agent autonomous systems. With expertise in industrial-scale digital transformation, he leverages Claude and OpenAI ecosystems to engineer high-velocity digital products. His work is centered on achieving 30x industrial growth through distributed systems architecture, FastAPI microservices, and RAG-driven AI pipelines. Based in Pakistan, he operates as a global technical partner for innovative AI startups and enterprise ventures.

USMAN’S INSIGHTS
AI ARCHITECT

Transforming businesses into autonomous AI ecosystems. Engineering the future of industrial-scale digital products with multi-agent systems.

30X Growth
AI-First
Innovation

Navigation

  • Home
  • Book
  • About
  • Contact
Let's Collaborate

Have a Project in Mind?

Let's build something extraordinary together. Transform your vision into autonomous AI reality.

Start Your Transformation

© 2026 Muhammad Usman Akbar. All rights reserved.

Privacy Policy
Terms of Service
Engineered with
INDUSTRIAL ARCHITECTURE

Agent Event Patterns

Your Task API creates tasks, but that's where it stops. When a task is created, nothing else happens automatically. You want to send email notifications, post to Slack, update a mobile app, log for compliance, and trigger reminder scheduling. But if you add all that logic to the Task API, you've created a monolith that knows about email services, Slack webhooks, mobile push notifications, and audit databases.

This is exactly where event-driven architecture shines. The Task API publishes a single event: "A task was created." It doesn't know or care who's listening. Email service, Slack integration, mobile push, audit logging, and reminder scheduling all subscribe independently. Each service evolves separately. The Task API stays focused on task management.

In this chapter, you'll design event schemas that enable this decoupling and implement the notification fanout pattern that makes it work. The patterns here connect directly to your Module 6 Task API agents, turning isolated API calls into a coordinated system of collaborating services.

Event Naming: The domain.action Convention

Consistent event naming is surprisingly important. When you have dozens of event types flowing through your system, predictable names make debugging, monitoring, and documentation dramatically easier.

The standard pattern is domain.action, using past tense to indicate something that happened:

Event NameWhat It Means
task.createdA task was created
task.updatedA task was modified
task.completedA task was marked done
task.deletedA task was removed
user.registeredA new user signed up
order.placedAn order was submitted

Why past tense? Events represent facts about things that already happened. They're immutable historical records, not requests to do something. Compare:

TypeExampleSemantics
Event (past tense)task.createdImmutable fact: "This happened"
Command (imperative)CreateTaskRequest: "Please do this" (can fail)

This distinction matters when you're designing systems. Events are facts you publish and forget. Commands require acknowledgment and error handling.

The Complete Event Schema

Every event in your system needs consistent metadata for tracing, debugging, and compliance. Here's the schema pattern that works at scale:

python
from dataclasses import dataclass, field from datetime import datetime from typing import Any import uuid @dataclass class EventMetadata: """Metadata for distributed tracing and compliance.""" correlation_id: str # Traces request across services causation_id: str # What event caused this event source: str # Service that produced the event @dataclass class TaskEvent: """Base structure for all task-related events.""" event_id: str = field(default_factory=lambda: str(uuid.uuid4())) event_type: str = "" occurred_at: str = field(default_factory=lambda: datetime.utcnow().isoformat() + "Z") data: dict = field(default_factory=dict) metadata: EventMetadata = None def to_dict(self) -> dict: return { "event_id": self.event_id, "event_type": self.event_type, "occurred_at": self.occurred_at, "data": self.data, "metadata": { "correlation_id": self.metadata.correlation_id, "causation_id": self.metadata.causation_id, "source": self.metadata.source } if self.metadata else {} }

Output (example event):

json
{ "event_id": "e7c5a8f2-3b4d-4e6a-9f1c-2d8e7a6b5c4d", "event_type": "task.created", "occurred_at": "2025-01-15T14:30:22.456Z", "data": { "task_id": "task-123", "title": "Review quarterly report", "owner_id": "user-456", "priority": 2 }, "metadata": { "correlation_id": "req-abc-123", "causation_id": "api-call-789", "source": "task-api" } }

Correlation ID vs Causation ID

These two IDs serve different purposes in distributed tracing:

Correlation ID: Traces a single user request across all services it touches. When a user creates a task through your API, that request gets a correlation ID. The task event, the notification, the Slack message, the audit log entry—all share the same correlation ID so you can reconstruct the complete picture.

text
User Request (correlation_id: "req-abc-123") └─> Task API creates task └─> task.created event (correlation_id: "req-abc-123") ├─> Email service sends notification (correlation_id: "req-abc-123") ├─> Slack service posts message (correlation_id: "req-abc-123") └─> Audit service logs entry (correlation_id: "req-abc-123")

Causation ID: Links each event to what directly caused it. This creates an event chain that shows causal relationships, not just temporal relationships.

text
Event: task.created (causation_id: "api-call-789") └─> Event: notification.sent (causation_id: "e7c5a8f2-...") └─> Event: email.delivered (causation_id: "notif-456-...")

When something goes wrong, correlation ID shows you everything that happened for a user request. Causation ID shows you why each thing happened.

Publishing Events from Task API

Here's how your Task API publishes events when tasks are created. The producer integrates with FastAPI's lifespan to ensure proper initialization and cleanup:

python
import json import uuid from contextlib import asynccontextmanager from datetime import datetime from confluent_kafka import Producer from fastapi import FastAPI, Request from pydantic import BaseModel # Event producer singleton producer: Producer = None def delivery_callback(err, msg): if err: print(f"Event delivery failed: {err}") else: print(f"Event delivered to {msg.topic()}[{msg.partition()}]") @asynccontextmanager async def lifespan(app: FastAPI): global producer producer = Producer({ 'bootstrap.servers': 'localhost:30092', 'client.id': 'task-api', 'acks': 'all', 'enable.idempotence': True }) yield producer.flush() app = FastAPI(lifespan=lifespan) class TaskCreate(BaseModel): title: str owner_id: str priority: int = 1 def publish_task_event( event_type: str, task_data: dict, correlation_id: str, causation_id: str): """Publish a task lifecycle event.""" event = { "event_id": str(uuid.uuid4()), "event_type": event_type, "occurred_at": datetime.utcnow().isoformat() + "Z", "data": task_data, "metadata": { "correlation_id": correlation_id, "causation_id": causation_id, "source": "task-api" } } producer.produce( topic='task-events', key=task_data.get('task_id', str(uuid.uuid4())), value=json.dumps(event), callback=delivery_callback ) producer.poll(0) @app.post("/tasks") async def create_task(task: TaskCreate, request: Request): # Get or create correlation ID from request headers correlation_id = request.headers.get('X-Correlation-ID', str(uuid.uuid4())) # Create the task (your database logic here) task_id = str(uuid.uuid4()) task_data = { "task_id": task_id, "title": task.title, "owner_id": task.owner_id, "priority": task.priority } # Publish event publish_task_event( event_type="task.created", task_data=task_data, correlation_id=correlation_id, causation_id=f"api-create-{task_id}" ) return {"id": task_id, "status": "created"}

Output (console when task created):

Specification
Event delivered to task-events[0]

The key insight here: the Task API publishes an event and returns immediately. It doesn't wait for email, Slack, or audit services. Those happen asynchronously, triggered by the event.

Notification Fanout: Multiple Consumers, One Event

The power of event-driven architecture becomes clear in the fanout pattern. Multiple services subscribe to the same topic, each processing events for their specific purpose. They operate in separate consumer groups, so each service receives every event independently.

text
task.created event │ ┌────────────────────┼────────────────────┐ ▼ ▼ ▼ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ Email Service│ │ Slack Service│ │ Audit Service│ │ (group:email)│ │ (group:slack)│ │ (group:audit)│ └──────────────┘ └──────────────┘ └──────────────┘

Each service uses a different group.id, ensuring complete independence:

Email Notification Consumer

python
import json from confluent_kafka import Consumer, KafkaError def create_email_consumer(): consumer = Consumer({ 'bootstrap.servers': 'localhost:30092', 'group.id': 'notification-email', # Unique group for email 'auto.offset.reset': 'earliest', 'enable.auto.commit': False }) consumer.subscribe(['task-events']) return consumer def send_email(to: str, subject: str, body: str): """Placeholder for actual email sending logic.""" print(f"Sending email to {to}: {subject}") def process_email_notifications(): consumer = create_email_consumer() try: while True: msg = consumer.poll(1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: continue print(f"Error: {msg.error()}") continue event = json.loads(msg.value().decode()) # Only process task.created events if event.get('event_type') == 'task.created': task_data = event.get('data', {}) correlation_id = event.get('metadata', {}).get('correlation_id', 'unknown') send_email( to=f"{task_data.get('owner_id')}@company.com", subject=f"New task assigned: {task_data.get('title')}", body=f"Task ID: {task_data.get('task_id')}\nCorrelation: {correlation_id}" ) consumer.commit(message=msg) finally: consumer.close() if __name__ == "__main__": process_email_notifications()

Output:

Specification
Sending email to user-456@company.com: New task assigned: Review quarterly report

Slack Notification Consumer

python
import json from confluent_kafka import Consumer, KafkaError def create_slack_consumer(): consumer = Consumer({ 'bootstrap.servers': 'localhost:30092', 'group.id': 'notification-slack', # Different group from email 'auto.offset.reset': 'earliest', 'enable.auto.commit': False }) consumer.subscribe(['task-events']) return consumer def post_to_slack(channel: str, message: str): """Placeholder for Slack webhook integration.""" print(f"Slack [{channel}]: {message}") def process_slack_notifications(): consumer = create_slack_consumer() try: while True: msg = consumer.poll(1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: continue print(f"Error: {msg.error()}") continue event = json.loads(msg.value().decode()) # Handle multiple event types event_type = event.get('event_type', '') task_data = event.get('data', {}) if event_type == 'task.created': post_to_slack( channel="#tasks", message=f":new: Task created: {task_data.get('title')}" ) elif event_type == 'task.completed': post_to_slack( channel="#tasks", message=f":white_check_mark: Task completed: {task_data.get('title')}" ) consumer.commit(message=msg) finally: consumer.close()

Output:

Specification
Slack [#tasks]: :new: Task created: Review quarterly report

The Immutable Audit Log Pattern

For compliance, debugging, and analytics, you often need a complete record of everything that happened. The audit log consumer implements this with a strict append-only pattern:

python
import json import os from datetime import datetime from confluent_kafka import Consumer, KafkaError def create_audit_consumer(): consumer = Consumer({ 'bootstrap.servers': 'localhost:30092', 'group.id': 'audit-log', # Dedicated group for audit 'auto.offset.reset': 'earliest', # Never miss an event 'enable.auto.commit': False }) consumer.subscribe(['task-events']) return consumer class AuditLogger: """Append-only audit log for compliance.""" def __init__(self, log_dir: str = "/var/log/audit"): self.log_dir = log_dir os.makedirs(log_dir, exist_ok=True) def append(self, event: dict): """Append event to immutable log. Never delete or modify.""" # Daily log files for easier management date_str = datetime.utcnow().strftime("%Y-%m-%d") log_file = os.path.join(self.log_dir, f"audit-{date_str}.jsonl") log_entry = { "logged_at": datetime.utcnow().isoformat() + "Z", "event": event } with open(log_file, 'a') as f: f.write(json.dumps(log_entry) + '\n') print(f"Audit logged: {event.get('event_type')} [{event.get('event_id')}]") def run_audit_logger(): consumer = create_audit_consumer() audit = AuditLogger() try: while True: msg = consumer.poll(1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: continue print(f"Error: {msg.error()}") continue event = json.loads(msg.value().decode()) # Log ALL events, not just specific types audit.append(event) consumer.commit(message=msg) finally: consumer.close() if __name__ == "__main__": run_audit_logger()

Output:

Specification
Audit logged: task.created [e7c5a8f2-3b4d-4e6a-9f1c-2d8e7a6b5c4d]

Audit log file (/var/log/audit/audit-2025-01-15.jsonl):

Specification
{"logged_at": "2025-01-15T14:30:23.001Z", "event": {"event_id": "e7c5a8f2-...", "event_type": "task.created", ...}}

Key audit log principles:

PrincipleImplementation
Append-onlyNever modify or delete log entries
CompleteLog ALL events, not filtered subset
TimestampedBoth event time and logged time recorded
SearchableJSON lines format for easy grep/analysis

Topic Design: Single vs Multiple Topics

You have a choice: put all task events in one topic (task-events) or create separate topics per event type (task-created, task-updated, task-completed).

ApproachProsCons
Single topic (task-events)Simple configuration, consumers filter by event_typeAll consumers receive all events
Multiple topicsConsumers subscribe only to what they needMore topics to manage, more complex routing

Recommendation: Start with a single topic per domain (task-events, user-events, order-events). Only split when you have clear performance or isolation requirements.

Specification
# Single topic approach (recommended for most cases)producer.produce(topic='task-events', ...) # Consumer filters by event_typeif event.get('event_type') == 'task.created': handle_task_created(event)elif event.get('event_type') == 'task.completed': handle_task_completed(event)

Your Module 6 Task API agents can now communicate through events. When an orchestrating agent creates a task, it publishes an event that notification agents, scheduling agents, and analytics agents all consume independently.

text
Orchestrating Agent (Task Creation) │ ▼ publish: task.created │ ├─────────────────────────────────────────────┐ │ │ ▼ ▼ Notification Agent Scheduling Agent (sends email, Slack) (creates calendar reminders) │ │ ▼ publish: notification.sent ▼ publish: reminder.scheduled │ │ └──────────────────┬──────────────────────────┘ │ ▼ Audit Agent (logs all events for compliance)

This architecture means agents evolve independently. Adding a new notification channel (mobile push, SMS) requires only deploying a new consumer—no changes to the Task API or other agents.


Reflect on Your Skill

You built a kafka-events skill in Chapter 1. Test and improve it based on what you learned.

Test Your Skill

Specification
Using my kafka-events skill, design an event schema with correlation_id and causation_id for an agent-based workflow.Does my skill show how to trace events across multiple agents and reconstruct causality chains?

Identify Gaps

Ask yourself:

  • Did my skill explain correlation_id (workflow tracing) vs causation_id (event causality)?
  • Did it show how agents use these IDs to coordinate distributed workflows?

Improve Your Skill

If you found gaps:

Specification
My kafka-events skill is missing agent event patterns (correlation_id, causation_id, event chaining).Update it to include how to implement distributed tracing and causality tracking in agent systems.

Try With AI

Setup: You're designing event schemas for an AI agent system that manages tasks, sends notifications, and maintains audit trails.

Prompt 1: Design a complete event schema

Specification
I'm designing events for a Task API that needs to support: - Multiple notification channels (email, Slack, mobile push) - Distributed tracing across agent services - Compliance audit logging - Future analytics on task completion times Here's my current event structure:{ "event_id": "uuid", "event_type": "task.created", "data": { "task_id": "...", "title": "..." }}What fields am I missing? Design a complete schema that supportsall my requirements. Explain why each field is necessary.

What you're learning: AI will suggest additions like occurred_at for analytics, correlation_id for tracing, causation_id for event chains, and metadata.source for debugging. You'll see how requirements translate to schema decisions.

Prompt 2: Troubleshoot a fanout issue

Specification
I have three consumers on the same 'task-events' topic: - Email service (group: notification-email) - Slack service (group: notification-slack) - Audit service (group: audit-log) Email and Slack work fine, but audit is missing events.The audit consumer shows it's connected and subscribed.Here's my audit consumer config:consumer = Consumer({ 'bootstrap.servers': 'localhost:30092', 'group.id': 'audit-log', 'auto.offset.reset': 'latest' # I want only new events}) What could cause the audit consumer to miss events?Walk me through the diagnosis.

What you're learning: AI helps you understand how auto.offset.reset: latest means you miss events that occurred before the consumer started. For audit logging, you likely want earliest to never miss an event.

Prompt 3: Extend for your domain

Specification
I'm building an agent system for [your domain: e-commerce/healthcare/finance/education].The main entity lifecycle events would be:- [entity].created (e.g., order.placed, patient.admitted, trade.executed)- [entity].updated- [entity].completed/cancelled Design the event schema and consumer architecture for my domain.Consider:1. What notification channels make sense? 2. What compliance requirements might apply? 3. What analytics would be valuable? 4. How should I handle failed events?

What you're learning: AI helps translate generic patterns to your specific domain, suggesting notification channels appropriate for healthcare (pager, EHR integration) vs e-commerce (email, SMS) and compliance requirements (HIPAA vs PCI-DSS).

Safety note: When publishing events containing user data, consider GDPR and privacy requirements. Events in Kafka are retained based on topic configuration and may need data masking or separate topics for sensitive information.