USMAN’S INSIGHTS
AI ARCHITECT
  • Home
  • About
  • Thought Leadership
  • Book
Press / Contact
USMAN’S INSIGHTS
AI ARCHITECT
⌘F
HomeBook
HomeBookThe Transactional Distributed Blueprint: Saga Pattern
Previous Chapter
Agent Event Patterns
Next Chapter
Production Kafka with Strimzi
AI NOTICE: This is the table of contents for the SPECIFIC CHAPTER only. It is NOT the global sidebar. For all chapters, look at the main navigation.

On this page

25 sections

Progress0%
1 / 25

Muhammad Usman Akbar Entity Profile

Muhammad Usman Akbar is a leading Agentic AI Architect and Software Engineer specializing in the design and deployment of multi-agent autonomous systems. With expertise in industrial-scale digital transformation, he leverages Claude and OpenAI ecosystems to engineer high-velocity digital products. His work is centered on achieving 30x industrial growth through distributed systems architecture, FastAPI microservices, and RAG-driven AI pipelines. Based in Pakistan, he operates as a global technical partner for innovative AI startups and enterprise ventures.

USMAN’S INSIGHTS
AI ARCHITECT

Transforming businesses into autonomous AI ecosystems. Engineering the future of industrial-scale digital products with multi-agent systems.

30X Growth
AI-First
Innovation

Navigation

  • Home
  • Book
  • About
  • Contact
Let's Collaborate

Have a Project in Mind?

Let's build something extraordinary together. Transform your vision into autonomous AI reality.

Start Your Transformation

© 2026 Muhammad Usman Akbar. All rights reserved.

Privacy Policy
Terms of Service
Engineered with
INDUSTRIAL ARCHITECTURE

Saga Pattern for Multi-Step Workflows

Your Task API now publishes events when tasks are created. The notification service sends emails. The reminder service schedules follow-ups. The audit service logs everything. Each service works independently, consuming events and doing its job.

Then a user creates a high-priority task that requires team assignment. The workflow becomes: create task, assign to user, send notification, schedule reminder. Each step depends on the previous. What happens if notification fails after the user is already assigned? You can't leave the system half-done—the user expects an assignment notification. But you also can't use a distributed transaction across independent services.

This is where the saga pattern saves you. Instead of trying to make multiple services act as one atomic transaction, you embrace eventual consistency and design explicit compensation steps that undo work when things go wrong. In this chapter, you'll implement a choreography-based saga where services coordinate through events, each knowing how to reverse its own actions.

Why Distributed Transactions Don't Work

Traditional databases give you ACID transactions: either all changes commit, or none do. When you try to extend this across multiple services, you hit fundamental problems.

The two-phase commit trap:

text
┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │ Task API │ │ User Service │ │ Notification │ │ │ │ │ │ Service │ └──────┬───────┘ └──────┬────────┘ └──────┬───────┘ │ │ │ │ PREPARE │ PREPARE │ PREPARE ├────────────────────┼─────────────────────┤ │ │ │ │ READY │ READY │ TIMEOUT! ├────────────────────┼─────────────────────┤ │ │ │ │ ??? (Resources locked, waiting) │ │ │

The coordinator must lock resources across all services, wait for all to be ready, then commit. If any service is slow or fails, everything blocks. In microservices with network partitions and varying latencies, this approach creates cascading failures.

What we actually need:

Distributed TransactionSaga Pattern
All or nothing (atomic)Eventually consistent
Lock resources during coordinationNo locks, independent processing
Coordinator blocks until all readyServices react asynchronously
Failure = abort everythingFailure = compensate completed steps

The saga pattern trades atomicity for availability. You accept that the system may be temporarily inconsistent, but you guarantee it will reach a consistent state—either fully complete or fully rolled back.

The Saga Pattern: Sequence with Compensations

A saga is a sequence of local transactions where each step publishes an event that triggers the next. If a step fails, the saga executes compensation events in reverse order to undo completed work.

text
┌─────────────────────────────────────────────────────────────────────┐ │ Task Assignment Saga │ ├─────────────────────────────────────────────────────────────────────┤ │ │ │ FORWARD PATH (Happy Path): │ │ │ │ ┌─────────┐ ┌─────────────┐ ┌──────────────┐ ┌──────────┐ │ │ │ Create │───▶│ Assign User │───▶│ Send │───▶│ Schedule │ │ │ │ Task │ │ │ │ Notification │ │ Reminder │ │ │ └─────────┘ └─────────────┘ └──────────────┘ └──────────┘ │ │ │ │ COMPENSATION PATH (Failure at notification): │ │ │ │ ┌─────────┐ ┌─────────────┐ │ │ │ Mark │◀───│ Unassign │◀─── Failure detected! │ │ │ Failed │ │ User │ │ │ └─────────┘ └─────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────┘

Key principle: Every forward action must have a corresponding compensation that can undo it. Not all actions are reversible the same way—you can't "unsend" an email—but you can send a correction or update status to reflect the failure.

Choreography vs Orchestration

There are two ways to coordinate a saga:

Choreography (decentralized): Each service knows the overall flow and reacts to events. No central coordinator.

Specification
task.created ──▶ User Service listens, assigns user, emits user.assigneduser.assigned ──▶ Notification Service listens, sends email, emits notification.sentnotification.sent ──▶ Reminder Service listens, schedules reminder

Orchestration (centralized): A saga coordinator tells each service what to do and tracks progress.

Specification
Saga Coordinator: 1. Tell User Service: "Assign user" 2. Wait for response 3. Tell Notification Service: "Send notification" 4. Wait for response 5. Tell Reminder Service: "Schedule reminder"
AspectChoreographyOrchestration
CoordinationImplicit in event flowExplicit coordinator
Service couplingLooser—services only know eventsTighter—coordinator knows all services
Adding servicesAdd subscriber, no code changes elsewhereUpdate coordinator logic
DebuggingHarder—trace events across servicesEasier—saga state in one place
Failure handlingEach service handles its compensationsCoordinator manages rollback

This chapter focuses on choreography because it's more aligned with Kafka's event-driven model and keeps services truly independent. Orchestration has its place for complex workflows with many conditional branches.

Implementing a Choreography Saga

Let's implement the task assignment saga with four services. Each service:

  1. Listens for relevant events
  2. Performs its local work
  3. Publishes success or failure events

Step 1: Define the Event Schema

First, establish the events that drive the saga:

python
# events.py - Event definitions for the saga from dataclasses import dataclass from datetime import datetime from typing import Optional import json @dataclass class SagaEvent: """Base class for saga events with correlation tracking.""" saga_id: str # Unique identifier for this saga instance correlation_id: str # Traces the entire request occurred_at: str # ISO timestamp def to_json(self) -> str: return json.dumps(self.__dict__) @classmethod def from_json(cls, data: str): return cls(**json.loads(data)) @dataclass class TaskCreated(SagaEvent): task_id: str title: str assignee_id: str priority: str @dataclass class UserAssigned(SagaEvent): task_id: str user_id: str assignment_id: str @dataclass class UserAssignmentFailed(SagaEvent): task_id: str user_id: str reason: str @dataclass class NotificationSent(SagaEvent): task_id: str notification_id: str channel: str # email, slack, etc. @dataclass class NotificationFailed(SagaEvent): task_id: str reason: str # Compensation events @dataclass class UserUnassigned(SagaEvent): """Compensation: Reverses a user assignment.""" task_id: str user_id: str assignment_id: str compensation_reason: str @dataclass class TaskMarkedFailed(SagaEvent): """Compensation: Marks task creation as failed in saga.""" task_id: str failed_step: str reason: str

Output:

The event classes ensure every saga event carries:

  • saga_id: Links all events in one saga instance
  • correlation_id: Traces across sagas for request debugging
  • occurred_at: Timestamp for ordering and audit

Step 2: Task Service (Saga Initiator)

The task service starts the saga by creating a task and publishing the initiating event:

python
# task_service.py - Initiates the saga from confluent_kafka import Producer from datetime import datetime import uuid import json class TaskService: def __init__(self, producer: Producer): self.producer = producer def create_task_with_assignment( self, title: str, assignee_id: str, priority: str = "normal" ) -> str: """Create a task and initiate assignment saga.""" task_id = str(uuid.uuid4()) saga_id = str(uuid.uuid4()) correlation_id = str(uuid.uuid4()) # Local transaction: Create task in database task = { "id": task_id, "title": title, "assignee_id": assignee_id, "priority": priority, "status": "pending_assignment", # Saga in progress "saga_id": saga_id } self._save_task(task) # Publish event to start saga event = { "saga_id": saga_id, "correlation_id": correlation_id, "occurred_at": datetime.utcnow().isoformat(), "task_id": task_id, "title": title, "assignee_id": assignee_id, "priority": priority } self.producer.produce( topic='task.created', key=task_id, value=json.dumps(event), callback=self._delivery_report ) self.producer.flush() print(f"Saga {saga_id} started for task {task_id}") return task_id def handle_saga_failure(self, event: dict): """Compensation: Mark task as failed when saga cannot complete.""" task_id = event['task_id'] task = self._load_task(task_id) task['status'] = 'assignment_failed' task['failure_reason'] = event.get('reason', 'Unknown failure') self._save_task(task) print(f"Task {task_id} marked as failed: {task['failure_reason']}") def _save_task(self, task: dict): # In production: Save to database print(f"Saved task: {task['id']} with status {task['status']}") def _load_task(self, task_id: str) -> dict: # In production: Load from database return {"id": task_id, "status": "pending"} def _delivery_report(self, err, msg): if err: print(f"Delivery failed: {err}") else: print(f"Event delivered to {msg.topic()}")

Output:

Specification
Saved task: abc-123 with status pending_assignment Event delivered to task.created Saga 7f8a9b2c started for task abc-123

Step 3: User Service (First Saga Step)

The user service assigns users when tasks are created, or publishes a failure event if assignment fails:

python
# user_service.py - Handles user assignment step from confluent_kafka import Consumer, Producer import json import uuid from datetime import datetime class UserAssignmentService: def __init__(self, consumer: Consumer, producer: Producer): self.consumer = consumer self.producer = producer self.consumer.subscribe(['task.created', 'notification.failed']) def run(self): """Process events in the saga.""" while True: msg = self.consumer.poll(1.0) if msg is None: continue if msg.error(): continue topic = msg.topic() event = json.loads(msg.value().decode()) if topic == 'task.created': self._handle_task_created(event) elif topic == 'notification.failed': self._handle_notification_failed(event) self.consumer.commit(message=msg) def _handle_task_created(self, event: dict): """Forward step: Assign user to task.""" task_id = event['task_id'] user_id = event['assignee_id'] saga_id = event['saga_id'] try: # Local transaction: Create assignment assignment_id = self._create_assignment(task_id, user_id) # Success: Publish user.assigned success_event = { "saga_id": saga_id, "correlation_id": event['correlation_id'], "occurred_at": datetime.utcnow().isoformat(), "task_id": task_id, "user_id": user_id, "assignment_id": assignment_id } self.producer.produce( topic='user.assigned', key=task_id, value=json.dumps(success_event) ) print(f"User {user_id} assigned to task {task_id}") except Exception as e: # Failure: Publish failure event failure_event = { "saga_id": saga_id, "correlation_id": event['correlation_id'], "occurred_at": datetime.utcnow().isoformat(), "task_id": task_id, "user_id": user_id, "reason": str(e) } self.producer.produce( topic='user.assignment.failed', key=task_id, value=json.dumps(failure_event) ) print(f"Assignment failed for task {task_id}: {e}") self.producer.poll(0) def _handle_notification_failed(self, event: dict): """Compensation: Unassign user when downstream step fails.""" task_id = event['task_id'] saga_id = event['saga_id'] # Load the assignment we made assignment = self._load_assignment_for_task(task_id) if assignment: # Reverse the assignment self._delete_assignment(assignment['id']) # Publish compensation event compensation_event = { "saga_id": saga_id, "correlation_id": event['correlation_id'], "occurred_at": datetime.utcnow().isoformat(), "task_id": task_id, "user_id": assignment['user_id'], "assignment_id": assignment['id'], "compensation_reason": f"Downstream failure: {event['reason']}" } self.producer.produce( topic='user.unassigned', key=task_id, value=json.dumps(compensation_event) ) print(f"Compensated: Unassigned user from task {task_id}") self.producer.poll(0) def _create_assignment(self, task_id: str, user_id: str) -> str: # In production: Insert into database assignment_id = str(uuid.uuid4()) print(f"Created assignment {assignment_id}") return assignment_id def _load_assignment_for_task(self, task_id: str) -> dict: # In production: Query database return {"id": "assign-123", "user_id": "user-456"} def _delete_assignment(self, assignment_id: str): # In production: Delete from database print(f"Deleted assignment {assignment_id}")

Output (success path):

Specification
Created assignment 8f9a2b3c User user-456 assigned to task abc-123Event delivered to user.assigned

Output (compensation path):

Specification
Deleted assignment assign-123Compensated: Unassigned user from task abc-123Event delivered to user.unassigned

Step 4: Notification Service (Second Saga Step)

The notification service sends notifications and can trigger compensations:

python
# notification_service.py - Handles notification step from confluent_kafka import Consumer, Producer import json from datetime import datetime import uuid class NotificationService: def __init__(self, consumer: Consumer, producer: Producer): self.consumer = consumer self.producer = producer self.consumer.subscribe(['user.assigned']) def run(self): while True: msg = self.consumer.poll(1.0) if msg is None: continue if msg.error(): continue event = json.loads(msg.value().decode()) self._handle_user_assigned(event) self.consumer.commit(message=msg) def _handle_user_assigned(self, event: dict): """Forward step: Send assignment notification.""" task_id = event['task_id'] user_id = event['user_id'] saga_id = event['saga_id'] try: # Local transaction: Send notification notification_id = self._send_notification(user_id, task_id) # Success: Publish notification.sent success_event = { "saga_id": saga_id, "correlation_id": event['correlation_id'], "occurred_at": datetime.utcnow().isoformat(), "task_id": task_id, "notification_id": notification_id, "channel": "email" } self.producer.produce( topic='notification.sent', key=task_id, value=json.dumps(success_event) ) print(f"Notification sent for task {task_id}") except Exception as e: # Failure: Trigger compensation chain failure_event = { "saga_id": saga_id, "correlation_id": event['correlation_id'], "occurred_at": datetime.utcnow().isoformat(), "task_id": task_id, "reason": str(e) } self.producer.produce( topic='notification.failed', key=task_id, value=json.dumps(failure_event) ) print(f"Notification failed for task {task_id}: {e}") self.producer.poll(0) def _send_notification(self, user_id: str, task_id: str) -> str: # In production: Call email/SMS service # Simulate occasional failure import random if random.random() < 0.1: # 10% failure rate raise Exception("Email service unavailable") notification_id = str(uuid.uuid4()) print(f"Sent email notification {notification_id}") return notification_id

Output (success):

Specification
Sent email notification 3c4d5e6f Notification sent for task abc-123

Output (failure - triggers compensation):

Specification
Notification failed for task abc-123: Email service unavailable

Tracking Saga State

In choreography, each service only sees its own events. To monitor saga progress, implement a saga state tracker:

python
# saga_tracker.py - Monitors saga progress across all events from confluent_kafka import Consumer import json from datetime import datetime, timedelta from enum import Enum class SagaState(Enum): STARTED = "started" ASSIGNED = "user_assigned" NOTIFIED = "notification_sent" COMPLETED = "completed" COMPENSATING = "compensating" FAILED = "failed" class SagaTracker: def __init__(self, consumer: Consumer): self.consumer = consumer self.sagas: dict[str, dict] = {} # Subscribe to ALL saga-related topics self.consumer.subscribe([ 'task.created', 'user.assigned', 'user.assignment.failed', 'notification.sent', 'notification.failed', 'user.unassigned', 'task.marked.failed' ]) def run(self): while True: msg = self.consumer.poll(1.0) if msg is None: self._check_timeouts() continue if msg.error(): continue event = json.loads(msg.value().decode()) topic = msg.topic() saga_id = event.get('saga_id') if saga_id: self._update_saga_state(saga_id, topic, event) self.consumer.commit(message=msg) def _update_saga_state(self, saga_id: str, topic: str, event: dict): if saga_id not in self.sagas: self.sagas[saga_id] = { 'saga_id': saga_id, 'started_at': event['occurred_at'], 'state': SagaState.STARTED, 'events': [] } saga = self.sagas[saga_id] saga['events'].append({ 'topic': topic, 'occurred_at': event['occurred_at'] }) saga['last_updated'] = event['occurred_at'] # Update state based on event type state_transitions = { 'task.created': SagaState.STARTED, 'user.assigned': SagaState.ASSIGNED, 'notification.sent': SagaState.NOTIFIED, 'notification.failed': SagaState.COMPENSATING, 'user.assignment.failed': SagaState.COMPENSATING, 'user.unassigned': SagaState.COMPENSATING, 'task.marked.failed': SagaState.FAILED } if topic in state_transitions: saga['state'] = state_transitions[topic] print(f"Saga {saga_id[:8]}... -> {saga['state'].value}") def _check_timeouts(self): """Detect sagas stuck in intermediate states.""" timeout = timedelta(minutes=5) now = datetime.utcnow() for saga_id, saga in self.sagas.items(): last_update = datetime.fromisoformat(saga['last_updated']) if now - last_update > timeout: if saga['state'] not in [SagaState.COMPLETED, SagaState.FAILED]: print(f"ALERT: Saga {saga_id[:8]}... stuck in {saga['state'].value}") def get_saga_status(self, saga_id: str) -> dict: return self.sagas.get(saga_id, {"error": "Saga not found"})

Output:

Specification
Saga 7f8a9b2c... -> started Saga 7f8a9b2c... -> user_assigned Saga 7f8a9b2c... -> notification_sent

Output (failure scenario):

Specification
Saga 7f8a9b2c... -> started Saga 7f8a9b2c... -> user_assigned Saga 7f8a9b2c... -> compensating Saga 7f8a9b2c... -> failed

Designing Compensation Events

Not all actions have symmetric rollbacks. Here's how to design compensations for common scenarios:

Forward ActionCompensationNotes
Create recordDelete recordDirect reversal
Reserve inventoryRelease reservationDirect reversal
Send emailSend correction emailCan't unsend, but can correct
Charge paymentIssue refundMay have fees, timing constraints
Update statusRevert to previous statusNeed to store previous state

Compensation design principles:

  1. Idempotency: Compensations may run multiple times if failures occur during compensation. Design them to be safe to repeat.

  2. Store what you need: If reverting requires previous state, store it before the forward action.

  3. Accept semantic compensation: You can't truly undo "sent email"—but you can send a follow-up explaining the situation.

python
# Example: Compensation that handles repeated execution def compensate_reservation(reservation_id: str, saga_id: str): """Release a reservation, safely handling double-execution.""" reservation = load_reservation(reservation_id) # Already compensated? Don't release again if reservation['status'] == 'released': print(f"Reservation {reservation_id} already released") return # Haven't compensated yet? Release it reservation['status'] = 'released' reservation['released_at'] = datetime.utcnow().isoformat() reservation['saga_id'] = saga_id save_reservation(reservation) print(f"Released reservation {reservation_id}")

Output (first call):

Specification
Released reservation rsv-789

Output (second call - idempotent):

Specification
Reservation rsv-789 already released

Handling Partial Failures

What happens if compensation itself fails? You need a fallback strategy:

python
class CompensationHandler: def __init__(self, producer: Producer, max_retries: int = 3): self.producer = producer self.max_retries = max_retries self.dead_letter_topic = 'saga.compensation.failed' def compensate_with_retry( self, compensation_fn, event: dict, retry_count: int = 0 ): """Execute compensation with retries and dead letter fallback.""" try: compensation_fn(event) print(f"Compensation successful for saga {event['saga_id'][:8]}...") except Exception as e: if retry_count < self.max_retries: print(f"Compensation failed, retry {retry_count + 1}") time.sleep(2 ** retry_count) # Exponential backoff self.compensate_with_retry( compensation_fn, event, retry_count + 1 ) else: # All retries exhausted - send to dead letter for manual review self._send_to_dead_letter(event, str(e)) def _send_to_dead_letter(self, event: dict, error: str): """Send failed compensation to dead letter topic for manual handling.""" dead_letter_event = { **event, "compensation_error": error, "requires_manual_intervention": True } self.producer.produce( topic=self.dead_letter_topic, key=event['saga_id'], value=json.dumps(dead_letter_event) ) self.producer.flush() print(f"MANUAL INTERVENTION REQUIRED: Saga {event['saga_id'][:8]}...")

Output:

Specification
Compensation failed, retry 1Compensation failed, retry 2Compensation failed, retry 3MANUAL INTERVENTION REQUIRED: Saga 7f8a9b2c...

Complete Saga Flow Diagram

Here's the complete event flow for the task assignment saga:

text
SUCCESS PATH: ═════════════════════════════════════════════════════════════════════════ task.created Task Service ────────────────────▶ User Service │ │ user.assigned ▼ Notification Service │ │ notification.sent ▼ Reminder Service │ │ reminder.scheduled ▼ SAGA COMPLETE FAILURE PATH (notification fails): ═════════════════════════════════════════════════════════════════════════ task.created Task Service ────────────────────▶ User Service │ │ user.assigned ▼ Notification Service │ │ notification.failed ┌───────────────────────────────┘ │ ▼ User Service (compensates) │ │ user.unassigned │ ▼ Task Service (compensates) │ │ task.marked.failed │ ▼ SAGA ROLLED BACK

Reflect on Your Skill

You built a kafka-events skill in Chapter 1. Test and improve it based on what you learned.

Test Your Skill

Specification
Using my kafka-events skill, implement the Saga pattern for an order fulfillment workflow with compensation events.Does my skill generate the event schemas for both forward steps and compensating transactions?

Identify Gaps

Ask yourself:

  • Did my skill explain choreography vs orchestration for Sagas?
  • Did it show how to design compensation events and handle partial failures?

Improve Your Skill

If you found gaps:

Specification
My kafka-events skill is missing Saga pattern implementation (compensation events, choreography, orchestration).Update it to include when to use Sagas and how to design reliable distributed workflows.

Try With AI

Setup: You're designing a saga for an order processing workflow in an e-commerce system.

Prompt 1: Design compensation events

Specification
I'm building an order saga with these steps:1. Reserve inventory 2. Process payment 3. Ship order 4. Send confirmation Design the compensation events for each step. Consider: - What happens if payment fails after inventory is reserved? - What if shipping fails after payment succeeds? - Which compensations are direct reversals vs semantic corrections?

What you're learning: AI helps you think through asymmetric compensations—releasing inventory is straightforward, but refunding a payment has different timing and fee implications than never charging in the first place.

Prompt 2: Handle a tricky compensation scenario

Specification
In my saga, step 3 is "send welcome email to new user."Email was sent, then step 4 (create user dashboard) failed.I can't unsend the email. What are my options for compensation?How do I design the saga to handle this gracefully?

What you're learning: AI collaborates on semantic compensation strategies—you can't reverse email, but you can adjust messaging, send follow-ups, or design the saga to make email the last step so failures earlier don't leave users confused.

Prompt 3: Apply to your domain

Specification
I'm building a [your system, e.g., "booking platform for fitness classes"].The workflow is:1. [first step, e.g., "Reserve spot in class"] 2. [second step, e.g., "Charge membership"] 3. [third step, e.g., "Send booking confirmation"]Design the Kafka topics and event schemas for this saga.Include compensation events and explain when each would trigger.

What you're learning: AI helps you map the saga pattern to your specific domain, identifying which steps have clean reversals and which need semantic compensation.

Safety note: When testing sagas, use isolated topic names (e.g., test.task.created) so compensation events don't interfere with production data. Always test the compensation path as thoroughly as the success path.