USMAN’S INSIGHTS
AI ARCHITECT
  • Home
  • About
  • Thought Leadership
  • Book
Press / Contact
USMAN’S INSIGHTS
AI ARCHITECT
⌘F
HomeBook
HomeBookThe Listener: Your First Consumer Python
Previous Chapter
Producer Deep Dive Reliability
Next Chapter
Consumer Deep Dive Groups and Rebalancing
AI NOTICE: This is the table of contents for the SPECIFIC CHAPTER only. It is NOT the global sidebar. For all chapters, look at the main navigation.

On this page

43 sections

Progress0%
1 / 43

Muhammad Usman Akbar Entity Profile

Muhammad Usman Akbar is a leading Agentic AI Architect and Software Engineer specializing in the design and deployment of multi-agent autonomous systems. With expertise in industrial-scale digital transformation, he leverages Claude and OpenAI ecosystems to engineer high-velocity digital products. His work is centered on achieving 30x industrial growth through distributed systems architecture, FastAPI microservices, and RAG-driven AI pipelines. Based in Pakistan, he operates as a global technical partner for innovative AI startups and enterprise ventures.

USMAN’S INSIGHTS
AI ARCHITECT

Transforming businesses into autonomous AI ecosystems. Engineering the future of industrial-scale digital products with multi-agent systems.

30X Growth
AI-First
Innovation

Navigation

  • Home
  • Book
  • About
  • Contact
Let's Collaborate

Have a Project in Mind?

Let's build something extraordinary together. Transform your vision into autonomous AI reality.

Start Your Transformation

© 2026 Muhammad Usman Akbar. All rights reserved.

Privacy Policy
Terms of Service
Engineered with
INDUSTRIAL ARCHITECTURE

Your First Consumer (Python)

You've built a producer that reliably publishes events to Kafka. Now you need something to receive those events. In the previous lesson, you verified message delivery by checking broker acknowledgments. But real systems need consumers that process messages, handle failures gracefully, and track their position in the event stream.

Consuming from Kafka is fundamentally different from calling an API. With an API, you request data and get an immediate response. With Kafka, you poll for messages continuously, process whatever arrives, and tell Kafka you're done. This poll-process-commit loop is the heart of every Kafka consumer.

The challenge is reliability. What happens if your consumer crashes after reading a message but before processing it? What if it processes successfully but crashes before confirming? These edge cases determine whether your system loses messages, processes them twice, or handles them exactly right. Your commit strategy makes this decision.

Consumer Fundamentals

Before writing code, understand what a consumer actually does:

ConceptDescription
Consumer GroupNamed group of consumers that share work. Each partition is assigned to exactly one consumer in the group.
SubscriptionTopics this consumer wants to receive messages from.
Poll LoopContinuous loop calling poll() to fetch messages from assigned partitions.
OffsetPosition in partition. Consumer tracks "where am I?" to resume after restart.
CommitTell Kafka "I've processed up to this offset."

The Poll Loop Pattern

Every Kafka consumer follows this structure:

python
while running: messages = poll(timeout) for message in messages: process(message) commit(message) # or auto-commit

The poll() call:

  • Fetches available messages from assigned partitions
  • Handles rebalancing when consumers join/leave
  • Returns None if no messages available within timeout
  • Must be called regularly (Kafka considers consumer dead if poll() stops)

Setting Up the Consumer

Install the same library you used for the producer:

Specification
uv add confluent-kafka

Connecting to Kafka

Like in Chapter 6, use the NodePort to connect from your local machine:

Where Your Code RunsBootstrap Server
Local machine (Mac/Windows)localhost:30092
Pod in same namespacetask-events-kafka-bootstrap:9092
Pod in different namespacetask-events-kafka-bootstrap.kafka.svc.cluster.local:9092

Basic Consumer Configuration

python
from confluent_kafka import Consumer, KafkaError consumer = Consumer({ 'bootstrap.servers': 'localhost:30092', # NodePort for local dev 'group.id': 'task-notification-service', 'auto.offset.reset': 'earliest', 'enable.auto.commit': True, 'auto.commit.interval.ms': 5000 })

Output:

Specification
Consumer created with group.id: task-notification-service

Let's understand each configuration:

SettingValuePurpose
bootstrap.serversKafka cluster addressWhere to connect
group.idUnique consumer group nameIdentifies this consumer group for partition assignment
auto.offset.resetearliest or latestWhere to start if no committed offset exists
enable.auto.commitTrue or FalseWhether Kafka auto-commits offsets periodically
auto.commit.interval.ms5000How often to auto-commit (if enabled)

The auto.offset.reset Decision

When a consumer group first subscribes to a topic (or when its committed offsets have expired), Kafka needs to know where to start reading:

ValueBehaviorUse When
earliestRead from the beginning of the topicYou need to process all historical messages
latestRead only new messages from now onYou only care about future events

Example scenario: Your notification service starts for the first time. With earliest, it processes all past task-created events (potentially thousands). With latest, it ignores history and only notifies for new tasks.

python
# Process all historical events (good for data pipelines, audit logs) 'auto.offset.reset': 'earliest' # Only future events (good for real-time notifications) 'auto.offset.reset': 'latest'

Choose based on your business requirements, not technical preference.

The Complete Poll Loop

Here's a production-ready consumer with proper error handling:

python
from confluent_kafka import Consumer, KafkaError import json import signal import sys # Graceful shutdown handling running = True def signal_handler(sig, frame): global running print("Shutdown signal received...") running = False signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) # Consumer configuration consumer = Consumer({ 'bootstrap.servers': 'localhost:30092', 'group.id': 'task-notification-service', 'auto.offset.reset': 'earliest', 'enable.auto.commit': True, 'auto.commit.interval.ms': 5000 }) # Subscribe to topic(s) consumer.subscribe(['task-created']) print("Subscribed to topic: task-created") try: while running: # Poll for messages (1 second timeout) msg = consumer.poll(timeout=1.0) if msg is None: # No message available within timeout continue if msg.error(): # Handle specific errors if msg.error().code() == KafkaError._PARTITION_EOF: # Reached end of partition (not an error, just informational) print(f"Reached end of partition {msg.partition()}") continue else: # Actual error print(f"Consumer error: {msg.error()}") continue # Process the message key = msg.key().decode('utf-8') if msg.key() else None value = json.loads(msg.value().decode('utf-8')) print(f"Received task: {value.get('title', 'unknown')}") print(f" Key: {key}") print(f" Partition: {msg.partition()}, Offset: {msg.offset()}") # Your business logic here # send_notification(value) finally: # Clean shutdown print("Closing consumer...") consumer.close() print("Consumer closed.")

Output:

text
Subscribed to topic: task-created Received task: Buy groceries Key: task-123 Partition: 0, Offset: 0 Received task: Complete report Key: task-456 Partition: 1, Offset: 0 Reached end of partition 0 Reached end of partition 1 ^CShutdown signal received... Closing consumer... Consumer closed.

Understanding the Error Handling

The msg.error() check catches several conditions:

Error CodeMeaningAction
_PARTITION_EOFReached end of available messagesContinue polling (more messages may arrive)
_UNKNOWN_TOPIC_OR_PARTTopic doesn't existCheck topic name, may need to create topic
_ALL_BROKERS_DOWNCan't reach any brokerCheck network, broker health

The _PARTITION_EOF error is not really an error. It means "you've caught up with this partition." Continue polling for new messages.

Why consumer.close() Matters

Always call consumer.close() when shutting down:

  1. Triggers immediate rebalance: Other consumers in the group get your partitions faster
  2. Commits pending offsets: Ensures processed messages aren't re-processed
  3. Releases resources: Cleans up connections and memory

Without close(), Kafka waits for session timeout (default 45 seconds) before reassigning partitions.

Auto-Commit vs Manual Commit

This is the most important decision for your consumer. It determines your message delivery guarantee.

Auto-Commit: Simple but Risky

With auto-commit enabled, Kafka periodically commits offsets in the background:

python
consumer = Consumer({ 'bootstrap.servers': 'localhost:30092', 'group.id': 'my-service', 'enable.auto.commit': True, 'auto.commit.interval.ms': 5000 # Every 5 seconds })

The Problem: Consider this timeline:

text
T=0: poll() returns message at offset 100 T=1: Start processing message T=2: Auto-commit happens -> Kafka records "processed up to 100" T=3: Processing fails, consumer crashes T=4: Consumer restarts, asks Kafka "where was I?" T=5: Kafka says "offset 101" -> Message at offset 100 is LOST

Auto-commit committed the offset before processing completed. If processing fails, that message is never retried.

When auto-commit is acceptable:

  • Processing is fast and unlikely to fail
  • Losing occasional messages is acceptable
  • You have separate retry/dead-letter mechanisms

Manual Commit: Control and Safety

Manual commit lets you commit AFTER successful processing:

python
from confluent_kafka import Consumer, KafkaError consumer = Consumer({ 'bootstrap.servers': 'localhost:30092', 'group.id': 'task-audit-service', 'auto.offset.reset': 'earliest', 'enable.auto.commit': False # Disable auto-commit }) consumer.subscribe(['task-created']) try: while running: msg = consumer.poll(timeout=1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: continue print(f"Error: {msg.error()}") continue # Process the message try: value = json.loads(msg.value().decode('utf-8')) # Your business logic (e.g., write to database) save_to_audit_log(value) # Commit AFTER successful processing consumer.commit(message=msg) print(f"Processed and committed offset {msg.offset()}") except Exception as e: print(f"Processing failed: {e}") # Don't commit - message will be reprocessed on restart finally: consumer.close()

Output:

text
Processed and committed offset 0 Processed and committed offset 1 Processed and committed offset 2 Processing failed: Database connection error # Consumer restarts later... Processed and committed offset 3 # Retry of failed message

Timeline with manual commit:

text
T=0: poll() returns message at offset 100 T=1: Start processing message T=2: Processing succeeds T=3: commit(message) -> Kafka records "processed up to 100" T=4: If crash happened at T=2, message would be reprocessed

Commit Strategies Compared

StrategyCodeGuaranteeRisk
Auto-commitenable.auto.commit: TrueAt-most-once (may lose)Message loss if crash after poll
Commit per messagecommit(message=msg)At-least-once (may duplicate)Slow, but safe
Commit per batchcommit() after N messagesAt-least-onceBetter throughput, batch may duplicate

At-Least-Once Pattern (Recommended Default)

For most applications, at-least-once is the right choice. Process, then commit:

python
# Process message result = process_task_event(value) # Only commit if processing succeeded if result.success: consumer.commit(message=msg) else: # Log failure, don't commit # Message will be reprocessed on next consumer restart logger.error(f"Failed to process: {result.error}")

Your processing code must be idempotent. If the same message is processed twice (consumer crashed after processing but before commit), the result should be the same.

Idempotent processing example:

python
def process_task_event(event): """Idempotent: Uses event_id as database primary key.""" task_id = event['id'] # Check if already processed if audit_log.exists(task_id): return Result(success=True, skipped=True) # Process and save audit_log.insert(task_id, event) return Result(success=True)

Synchronous vs Asynchronous Commit

The commit() method has two modes:

Synchronous Commit (Default)

Specification
consumer.commit(message=msg, asynchronous=False)
  • Blocks until Kafka confirms the commit
  • Slower but guarantees commit succeeded
  • Use when you need certainty

Asynchronous Commit

Specification
consumer.commit(message=msg, asynchronous=True)
  • Returns immediately, commit happens in background
  • Faster but commit might fail silently
  • Use when throughput matters more than certainty

For critical data, use synchronous. For high-throughput scenarios where occasional reprocessing is acceptable, use asynchronous.

Running Your Consumer

In Development (Local Port Forward)

bash
# Forward Kafka bootstrap service to localhost kubectl port-forward svc/task-events-kafka-bootstrap 9092:9092 -n kafka # In another terminal, run your consumer python consumer.py

In Kubernetes (as a Deployment)

Create a consumer deployment that runs alongside your producer:

yaml
# consumer-deployment.yaml apiVersion: apps/v1 kind: Deployment metadata: name: task-notification-consumer namespace: kafka spec: replicas: 1 selector: matchLabels: app: task-notification-consumer template: metadata: labels: app: task-notification-consumer spec: containers: - name: consumer image: python:3.12-slim command: ["python", "/app/consumer.py"] volumeMounts: - name: app-code mountPath: /app volumes: - name: app-code configMap: name: consumer-code

Verifying Consumer Behavior

Use the Kafka CLI to check consumer group status:

bash
# Check consumer group lag kubectl exec -it task-events-dual-role-0 -n kafka -- \ /opt/kafka/bin/kafka-consumer-groups.sh \ --bootstrap-server localhost:9092 \ --group task-notification-service \ --describe

Output:

text
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG task-notification-service task-created 0 15 15 0 task-notification-service task-created 1 12 12 0 task-notification-service task-created 2 18 18 0
ColumnMeaning
CURRENT-OFFSETWhere consumer has committed
LOG-END-OFFSETLatest message in partition
LAGMessages behind (LOG-END - CURRENT)

A lag of 0 means your consumer is caught up. Growing lag means consumer can't keep up with producers.

Common Mistakes to Avoid

MistakeProblemSolution
Forgetting consumer.close()Slow rebalancing, uncommitted offsetsAlways close in finally block
Auto-commit with slow processingMessage loss on crashUse manual commit for slow/risky processing
Not handling _PARTITION_EOFTreating it as error, stopping consumerContinue polling, it's informational
Committing before processingMessage loss if processing failsAlways process first, commit second
Single consumer for high-volume topicCan't keep up, growing lagScale with more consumers (up to partition count)

Reflect on Your Skill

You built a kafka-events skill in Chapter 1. Test and improve it based on what you learned.

Test Your Skill

Specification
Using my kafka-events skill, generate Python consumer code that processes task events from a Kafka topic.Does my skill include proper offset management, error handling, and graceful shutdown?

Identify Gaps

Ask yourself:

  • Did my skill explain auto.offset.reset and enable.auto.commit?
  • Did it show how to handle message processing errors without losing data?

Improve Your Skill

If you found gaps:

Specification
My kafka-events skill is missing consumer implementation patterns (offset management, error handling, shutdown).Update it to include when to use auto-commit vs manual commit and how to handle processing failures.

Try With AI

Prompt 1: Debug a Consumer That's Losing Messages

Specification
I have a Kafka consumer that seems to be losing messages. Here's my code:consumer = Consumer({ 'bootstrap.servers': 'localhost:30092', 'group.id': 'my-service', 'enable.auto.commit': True, 'auto.commit.interval.ms': 1000})while True: msg = consumer.poll(1.0) if msg: result = process_message(msg) # Takes 5-10 seconds if not result.success: print("Failed, will retry later") When the consumer restarts after a crash, some messages are missing.What's wrong with this code and how do I fix it?

What you're learning: Diagnosing the auto-commit timing problem and implementing proper at-least-once semantics.

Prompt 2: Design Idempotent Processing

Specification
I need to process task-created events and send email notifications.Each event contains: task_id, title, owner_email, created_at.My consumer uses at-least-once delivery, so duplicates are possible.Help me design idempotent processing so users don't get duplicate emails.What data should I track? Where should I store it?Walk me through the logic step by step.

What you're learning: Designing idempotent consumers that handle duplicates gracefully using state tracking.

Prompt 3: Choose the Right Commit Strategy

Specification
I'm building three different consumers for the same task-events topic:1. Real-time dashboard that shows live task counts 2. Audit log that must never miss an event 3. Analytics pipeline that aggregates daily statistics For each consumer, help me decide:- auto.offset.reset: earliest or latest?- enable.auto.commit: true or false? - If manual commit: per-message or batch?Explain the trade-offs for each choice.

What you're learning: Matching commit strategies to business requirements, understanding that different consumers of the same topic may need different configurations.

Important: When working with Kafka consumers, always test your error handling by simulating failures. Kill your consumer mid-processing and verify it correctly resumes from the last committed offset.