USMAN’S INSIGHTS
AI ARCHITECT
  • Home
  • About
  • Thought Leadership
  • Book
Press / Contact
USMAN’S INSIGHTS
AI ARCHITECT
⌘F
HomeBook
HomeBookThe Atomic Unit: Transactions for Stream Processing
Previous Chapter
Delivery Semantics Deep Dive
Next Chapter
Reliability Configuration
AI NOTICE: This is the table of contents for the SPECIFIC CHAPTER only. It is NOT the global sidebar. For all chapters, look at the main navigation.

On this page

41 sections

Progress0%
1 / 41

Muhammad Usman Akbar Entity Profile

Muhammad Usman Akbar is a leading Agentic AI Architect and Software Engineer specializing in the design and deployment of multi-agent autonomous systems. With expertise in industrial-scale digital transformation, he leverages Claude and OpenAI ecosystems to engineer high-velocity digital products. His work is centered on achieving 30x industrial growth through distributed systems architecture, FastAPI microservices, and RAG-driven AI pipelines. Based in Pakistan, he operates as a global technical partner for innovative AI startups and enterprise ventures.

USMAN’S INSIGHTS
AI ARCHITECT

Transforming businesses into autonomous AI ecosystems. Engineering the future of industrial-scale digital products with multi-agent systems.

30X Growth
AI-First
Innovation

Navigation

  • Home
  • Book
  • About
  • Contact
Let's Collaborate

Have a Project in Mind?

Let's build something extraordinary together. Transform your vision into autonomous AI reality.

Start Your Transformation

© 2026 Muhammad Usman Akbar. All rights reserved.

Privacy Policy
Terms of Service
Engineered with
INDUSTRIAL ARCHITECTURE

Transactions for Stream Processing

Your Task API publishes events to Kafka reliably---acks=all ensures durability, idempotent producers prevent duplicates on retry. But what happens when a single operation must update multiple topics atomically? When a task is created, you might need to write to task-events, audit-log, and notification-queue as a single unit of work. If the producer crashes after writing to task-events but before audit-log, you have an inconsistent state that no amount of retries can fix.

This is the exactly-once challenge for stream processing. Consider a payment processor that consumes from payment-requests, processes the payment, and produces to both payment-completed and ledger-updates. If it crashes after producing to payment-completed but before ledger-updates, restarting will re-process the same payment---potentially charging a customer twice or leaving the ledger inconsistent.

Kafka transactions solve this by making the read-process-write cycle atomic. Either all outputs are committed together, or none are. This chapter covers the transactional producer lifecycle, consumer isolation levels, and the zombie fencing mechanism that prevents duplicate processing from crashed producers.

The Problem: Partial Writes Break Consistency

Without Transactions: The Crash Window

Consider this stream processing pattern:

text
1. Consumer reads message from input-topic 2. Processor transforms message 3. Producer writes to output-topic-1 4. Producer writes to output-topic-2 5. Consumer commits offset

The crash window exists between steps 3 and 5. If the processor crashes after step 3:

text
┌─────────────────────────────────────────────────────────────────┐ │ Timeline of Partial Failure │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ READ PROCESS WRITE-1 CRASH! (never happens) │ │ ──────────────────────────────────────X─────────────────────────│ │ payment- transform payment- │ ledger-updates │ │ requests message completed │ offset commit │ │ ✓ │ │ │ │ │ │ On Restart: │ │ │ - offset not committed │ │ │ - payment-completed has message │ │ │ - ledger-updates missing message │ │ │ - re-processing creates DUPLICATE │ │ │ │ └─────────────────────────────────────────────────────────────────┘

The result: payment-completed has the message, ledger-updates doesn't, and on restart the processor will re-read and re-process---creating duplicates in payment-completed.

With Transactions: All-or-Nothing

Transactions wrap the entire read-process-write cycle:

text
┌─────────────────────────────────────────────────────────────────┐ │ Transaction Lifecycle │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ BEGIN_TXN READ PROCESS WRITE-1 WRITE-2 COMMIT_TXN │ │ ─────────────────────────────────────────────────────────────── │ │ [─────────────────── TRANSACTION ──────────────────────────] │ │ │ │ If crash before COMMIT: │ │ - All writes are ABORTED │ │ - Offset not committed │ │ - On restart: clean re-processing │ │ │ │ If COMMIT succeeds: │ │ - All writes are VISIBLE atomically │ │ - Offset committed within transaction │ │ - No re-processing needed │ │ │ └─────────────────────────────────────────────────────────────────┘

The Transactional Producer Lifecycle

Step 1: Configure with transactional.id

Every transactional producer needs a unique, stable identifier:

python
from confluent_kafka import Producer producer = Producer({ 'bootstrap.servers': 'localhost:30092', 'transactional.id': 'task-processor-1', # REQUIRED: unique and stable 'enable.idempotence': True # Automatically enabled with transactional.id })

Output:

text
>>> producer = Producer({...}) # Producer created, but transactions not yet initialized

Critical: The transactional.id must be:

  • Unique per producer instance: Two producers with same ID cause fencing
  • Stable across restarts: Same ID allows recovery of in-flight transactions
  • Descriptive: Include service name and partition/shard for debugging

Step 2: Initialize Transactions

Before any transactional operations, call init_transactions() exactly once:

python
# Initialize transactions (one-time setup per producer instance) producer.init_transactions()

Output:

text
>>> producer.init_transactions() # Registers producer with transaction coordinator # Aborts any pending transactions from previous instance with same transactional.id

What happens during init:

  1. Producer contacts transaction coordinator (a Kafka broker)
  2. Coordinator assigns a producer epoch (version number)
  3. Any pending transactions from previous epoch are aborted
  4. Producer is ready for transactional operations

Step 3: Transaction Lifecycle

Each transaction follows a strict lifecycle:

python
from confluent_kafka import Producer, KafkaError producer = Producer({ 'bootstrap.servers': 'localhost:30092', 'transactional.id': 'task-processor-1' }) producer.init_transactions() try: # Begin transaction producer.begin_transaction() # All produce calls are part of this transaction producer.produce('task-events', key='task-123', value='{"event": "created"}') producer.produce('audit-log', key='task-123', value='{"action": "task_created"}') producer.produce('notification-queue', key='user-456', value='{"notify": "task_assigned"}') # Commit makes all writes visible atomically producer.commit_transaction() print("Transaction committed successfully") except KafkaError as e: # Abort on any error - all writes are discarded producer.abort_transaction() print(f"Transaction aborted: {e}") raise

Output (success):

text
>>> producer.begin_transaction() >>> producer.produce('task-events', ...) >>> producer.produce('audit-log', ...) >>> producer.produce('notification-queue', ...) >>> producer.commit_transaction() Transaction committed successfully

Output (failure):

text
>>> producer.begin_transaction() >>> producer.produce('task-events', ...) >>> # Network error occurs >>> producer.abort_transaction() Transaction aborted: KafkaError{code=_TIMED_OUT,...}

The Transaction State Machine

text
┌─────────────────────────────────────────────────────────────────┐ │ Transaction State Machine │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ ┌──────────┐ begin_transaction() ┌─────────────────┐ │ │ │ READY │ ─────────────────────────── → │ IN_TRANSACTION │ │ │ └──────────┘ └─────────────────┘ │ │ ↑ │ │ │ │ │ │ │ │ │ │ commit_transaction() │ │ │ │ │ ┌──────────────────────────────────┘ │ │ │ │ │ │ │ │ │ │ abort_transaction() │ │ │ │ │ ┌────────────────────────────────────────┘ │ │ │ ↓ ↓ │ │ │ ┌──────────┐ │ │ └──│ COMPLETE │ │ │ └──────────┘ │ │ │ │ States: │ │ - READY: Can begin new transaction │ │ - IN_TRANSACTION: produce() calls are transactional │ │ - COMPLETE: Transaction finished, can begin new one │ │ │ └─────────────────────────────────────────────────────────────────┘

Zombie Fencing: Preventing Duplicates from Crashed Producers

The Zombie Producer Problem

What happens when a transactional producer crashes and restarts?

text
Timeline: 1. Producer-A (epoch 1) begins transaction 2. Producer-A produces to topic-1 3. Producer-A crashes (network partition or process failure) 4. Producer-A restarts, calls init_transactions() (epoch 2) 5. Original Producer-A (epoch 1) recovers, tries to commit Without fencing: Both producers write, creating duplicates With fencing: Epoch 1 is "fenced" - cannot commit

How Epoch-Based Fencing Works

The transactional.id maps to a monotonically increasing epoch:

text
┌─────────────────────────────────────────────────────────────────┐ │ Zombie Fencing with Epochs │ ├─────────────────────────────────────────────────────────────────┤ │ │ │ Transaction Coordinator State: │ │ transactional.id = "task-processor-1" │ │ current_epoch = 42 │ │ │ │ ┌────────────────┐ ┌────────────────┐ │ │ │ Producer-A │ │ Producer-A │ │ │ │ (crashed) │ │ (restarted) │ │ │ │ epoch = 41 │ │ epoch = 42 │ │ │ └────────────────┘ └────────────────┘ │ │ │ │ │ │ │ commit_transaction() │ produce() │ │ ↓ ↓ │ │ ┌─────────────────────────────────────────────────────────┐ │ │ │ Transaction Coordinator │ │ │ │ epoch 41 < current_epoch 42 │ │ │ │ → REJECT: ProducerFencedException │ │ │ │ │ │ │ │ epoch 42 = current_epoch 42 │ │ │ │ → ACCEPT: Valid producer │ │ │ │ └─────────────────────────────────────────────────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────┘

Fencing in Practice

When the old producer tries to commit:

python
# Producer-A (epoch 41) - the "zombie" try: producer_old.commit_transaction() except KafkaError as e: print(f"Fenced: {e}") # KafkaError{code=PRODUCER_FENCED, ...}

Output:

text
Fenced: Kafka Error{code=PRODUCER_FENCED,val=90,str="Producer has been fenced by a newer producer instance"}

The zombie producer receives PRODUCER_FENCED and must shut down. Only the producer with the current epoch can commit transactions.

Consumer Isolation: read_committed

The Problem with Default Isolation

By default, consumers see messages as soon as they're written---even if the transaction hasn't committed yet:

python
# Default consumer sees uncommitted messages consumer = Consumer({ 'bootstrap.servers': 'localhost:30092', 'group.id': 'notification-service', # isolation.level defaults to 'read_uncommitted' })

This consumer will see messages from in-progress transactions that might be aborted. If it processes and acts on an aborted message, data consistency is broken.

Configuring read_committed

To see only committed messages, set isolation.level:

python
from confluent_kafka import Consumer, KafkaError consumer = Consumer({ 'bootstrap.servers': 'localhost:30092', 'group.id': 'notification-service', 'auto.offset.reset': 'earliest', 'enable.auto.commit': False, 'isolation.level': 'read_committed' # Only see committed transactions }) consumer.subscribe(['task-events']) while True: msg = consumer.poll(1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: continue raise Exception(msg.error()) # This message is guaranteed to be from a committed transaction print(f"Processing committed message: {msg.value().decode()}") consumer.commit(message=msg)

Output:

text
# Producer begins transaction, writes message # Consumer with read_committed: (waiting...) # Producer commits transaction Processing committed message: {"event": "task_created", "task_id": "123"}

Isolation Level Comparison

Isolation LevelSees UncommittedSees AbortedLatencyUse Case
read_uncommittedYesYes (briefly)LowestMonitoring, metrics
read_committedNoNoHigherBusiness logic, exactly-once

The latency trade-off: With read_committed, consumers must wait for transactions to complete. If a producer has a long-running transaction, consumers see a delay.

Complete Example: Atomic Order Processing

Here's a complete stream processor that consumes orders, processes them, and atomically writes to multiple output topics:

python
from confluent_kafka import Producer, Consumer, KafkaError import json class AtomicOrderProcessor: """Process orders with exactly-once semantics across multiple topics.""" def __init__(self, processor_id: str, bootstrap_servers: str): self.producer = Producer({ 'bootstrap.servers': bootstrap_servers, 'transactional.id': f'order-processor-{processor_id}', 'enable.idempotence': True }) self.consumer = Consumer({ 'bootstrap.servers': bootstrap_servers, 'group.id': 'order-processors', 'auto.offset.reset': 'earliest', 'enable.auto.commit': False, 'isolation.level': 'read_committed' }) # Initialize transactions once self.producer.init_transactions() self.consumer.subscribe(['incoming-orders']) def process_orders(self): """Main processing loop with transactional semantics.""" while True: msg = self.consumer.poll(1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: continue raise Exception(msg.error()) try: # Process within transaction self._process_order_transactionally(msg) except Exception as e: print(f"Processing failed: {e}") # Transaction already aborted in _process_order_transactionally def _process_order_transactionally(self, msg): """Process single order atomically.""" try: self.producer.begin_transaction() # Parse order order = json.loads(msg.value().decode()) order_id = order['order_id'] amount = order['amount'] # Business logic: validate, calculate, etc. processed_order = { 'order_id': order_id, 'status': 'processed', 'amount': amount, 'tax': amount * 0.08 } # Atomic writes to multiple topics self.producer.produce( 'processed-orders', key=order_id, value=json.dumps(processed_order) ) self.producer.produce( 'audit-log', key=order_id, value=json.dumps({ 'action': 'order_processed', 'order_id': order_id, 'timestamp': '2025-01-15T10:30:00Z' }) ) self.producer.produce( 'ledger-updates', key=order_id, value=json.dumps({ 'order_id': order_id, 'debit': amount, 'credit': 0 }) ) # Commit transaction (makes all writes visible) self.producer.commit_transaction() print(f"Order {order_id} processed successfully") except KafkaError as e: self.producer.abort_transaction() print(f"Transaction aborted for order: {e}") raise def close(self): """Clean shutdown.""" self.consumer.close() # Usage if __name__ == '__main__': processor = AtomicOrderProcessor( processor_id='1', bootstrap_servers='localhost:30092' ) try: processor.process_orders() finally: processor.close()

Output (successful processing):

text
Order order-123 processed successfully Order order-124 processed successfully Order order-125 processed successfully

Output (transaction aborted):

text
Transaction aborted for order: Kafka Error{code=_TIMED_OUT,...} # No partial writes - all three topics remain consistent

Transaction Performance Considerations

Latency Impact

Transactions add latency at two points:

OperationAdditional LatencyWhy
begin_transaction()~1-5msCoordinator round-trip
commit_transaction()~10-50msWrite transaction markers to all partitions

For high-throughput systems, batch multiple operations within a single transaction:

python
# GOOD: Batch operations in one transaction producer.begin_transaction() for order in orders_batch: # Process 100 orders producer.produce('processed-orders', ...) producer.commit_transaction() # One commit overhead for 100 messages # AVOID: Transaction per message for order in orders: producer.begin_transaction() producer.produce('processed-orders', ...) producer.commit_transaction() # Commit overhead per message

Transaction Timeout

Transactions have a timeout (default 60 seconds). Long-running transactions risk timeout failures:

python
producer = Producer({ 'bootstrap.servers': 'localhost:30092', 'transactional.id': 'processor-1', 'transaction.timeout.ms': 60000 # 60 seconds default })

If a transaction takes longer than transaction.timeout.ms, the coordinator aborts it automatically.

Exploring Transaction Design Through Collaboration

You've implemented transactional producers, but choosing when to use them requires careful analysis.

Your scenario:

Your Task API creates tasks and must notify three downstream systems:

  • Email service (notification-queue)
  • Audit system (audit-log)
  • Analytics pipeline (analytics-events)

Evaluating the trade-offs:

Consider these questions before choosing transactions:

  1. Must all writes succeed together?

    • If email fails but audit succeeds, is that acceptable?
    • If analytics misses an event, does that break anything?
  2. What's your latency budget?

    • Transactions add 10-50ms per commit
    • Can your Task API wait that long?
  3. Are your consumers idempotent?

    • If analytics can deduplicate by task_id, you might not need exactly-once

Questioning the approach:

For the Task API scenario, consider:

  • Audit-log: Probably needs transactions (compliance requirement)
  • Analytics: Might tolerate at-least-once with deduplication
  • Notifications: Duplicate emails are worse than missing ones

What emerged from this analysis:

You might use transactions for task-events + audit-log only, and publish to analytics-events separately with idempotent producer. This reduces transaction scope while maintaining critical consistency.

The decision isn't "use transactions everywhere" but rather "use transactions where atomicity is required, and simpler patterns elsewhere."


Reflect on Your Skill

You built a kafka-events skill in Chapter 1. Test and improve it based on what you learned.

Test Your Skill

Specification
Using my kafka-events skill, implement a stream processing pipeline that reads from one topic, transforms data, and writes to another topic atomically.Does my skill show proper transactional producer/consumer configuration and read_committed isolation?

Identify Gaps

Ask yourself:

  • Did my skill explain transactional.id and how it enables exactly-once stream processing?
  • Did it show the relationship between transactions and consumer isolation levels?

Improve Your Skill

If you found gaps:

Specification
My kafka-events skill is missing transactional patterns (transactional.id, read_committed, atomic read-process-write).Update it to include when to use Kafka transactions and how they differ from database transactions.

Try With AI

Apply what you've learned by designing transactional systems for real scenarios.

Setup: Open Claude Code or your preferred AI assistant in your Kafka project directory.


Prompt 1: Design Transaction Boundaries

Specification
I'm building an e-commerce order processing system. When an order is placed:1. Deduct inventory from stock-levels topic 2. Create payment record in payments topic 3. Add to shipping-queue topic 4. Update customer-orders topic Help me decide: - Should all four writes be in one transaction? - Or should I group them differently? - What if payment processing takes 2-3 seconds?Explain the trade-offs and show me the transaction structure you recommend.

What you're learning: Transaction boundary design---grouping related writes while avoiding performance bottlenecks from overly broad transactions.


Prompt 2: Debug a Zombie Fencing Issue

Specification
My stream processor keeps getting Producer Fenced Exception:Kafka Error{code=PRODUCER_FENCED,val=90,str="Producer has been fenced..."}My setup:- 3 Kubernetes pods running the same processor - Each pod uses transactional.id = "order-processor" - Pods restart frequently due to memory limits What's wrong? How should I configure transactional.id for horizontally-scaled processors?

What you're learning: Transactional.id uniqueness requirements in distributed systems. Each instance needs a unique ID, typically combining service name with partition assignment.


Prompt 3: Implement Consume-Transform-Produce

Specification
I need to implement exactly-once stream processing: - Consume from: raw-events (5 partitions) - Transform: enrich with metadata - Produce to: enriched-events AND audit-log Show me the code pattern that:1. Consumes and processes in a transaction 2. Commits consumer offsets within the same transaction 3. Handles partition rebalancing correctly Use confluent-kafka-python with proper error handling.

What you're learning: The full exactly-once pattern including send_offsets_to_transaction() for atomic offset commits---the most complex but most reliable stream processing pattern.


Safety Note: Transactional producers require proper cleanup. Always call abort_transaction() on errors and ensure transactions complete within timeout limits. Orphaned transactions can block consumers with read_committed isolation.