USMAN’S INSIGHTS
AI ARCHITECT
  • Home
  • About
  • Thought Leadership
  • Book
Press / Contact
USMAN’S INSIGHTS
AI ARCHITECT
⌘F
HomeBook
HomeBookThe Voice: Your First Producer Python
Previous Chapter
Deploying Kafka with Strimzi
Next Chapter
Producer Deep Dive Reliability
AI NOTICE: This is the table of contents for the SPECIFIC CHAPTER only. It is NOT the global sidebar. For all chapters, look at the main navigation.

On this page

32 sections

Progress0%
1 / 32

Muhammad Usman Akbar Entity Profile

Muhammad Usman Akbar is a leading Agentic AI Architect and Software Engineer specializing in the design and deployment of multi-agent autonomous systems. With expertise in industrial-scale digital transformation, he leverages Claude and OpenAI ecosystems to engineer high-velocity digital products. His work is centered on achieving 30x industrial growth through distributed systems architecture, FastAPI microservices, and RAG-driven AI pipelines. Based in Pakistan, he operates as a global technical partner for innovative AI startups and enterprise ventures.

USMAN’S INSIGHTS
AI ARCHITECT

Transforming businesses into autonomous AI ecosystems. Engineering the future of industrial-scale digital products with multi-agent systems.

30X Growth
AI-First
Innovation

Navigation

  • Home
  • Book
  • About
  • Contact
Let's Collaborate

Have a Project in Mind?

Let's build something extraordinary together. Transform your vision into autonomous AI reality.

Start Your Transformation

© 2026 Muhammad Usman Akbar. All rights reserved.

Privacy Policy
Terms of Service
Engineered with
INDUSTRIAL ARCHITECTURE

Your First Producer (Python)

You have a Kafka cluster running on Docker Desktop Kubernetes. Now it's time to send your first message.

In the request-response world, you call an API and wait for a response. In event-driven systems, you publish an event and move on. But "move on" doesn't mean "forget about it." You need to know whether your message actually reached Kafka. Did it land on a partition? Which offset was assigned? Did something go wrong?

The confluent-kafka-python library handles this through a pattern that might feel unusual at first: asynchronous production with delivery callbacks. You call produce(), which returns immediately. Later, you call poll() to process callbacks that tell you what happened. This pattern maximizes throughput while still giving you visibility into delivery success or failure.

By the end of this lesson, you'll have working producer code that sends messages to your Kafka cluster and confirms each delivery.

Installing the Client Library

The confluent-kafka-python library is the official Confluent client for Python. It wraps the high-performance librdkafka C library, giving you the best performance available for Python Kafka clients.

Why not aiokafka? You might see aiokafka in tutorials—it has cleaner async/await syntax. We use confluent-kafka because:

Featureconfluent-kafkaaiokafka
MaintenanceOfficial (Confluent maintains it)Community-maintained
Performance~10x faster (C library underneath)Pure Python
Schema RegistryNative supportRequires extra libraries
AdoptionProduction standardLess common in production

The callback pattern takes getting used to, but it's what you'll see in real Kafka jobs, and you'll need Schema Registry support in Lesson 10.

Install it with uv:

Specification
uv add confluent-kafka

Output:

text
Resolved 1 package in 0.5s Installed 1 package in 0.3s + confluent-kafka==2.6.1

If you're using pip:

Specification
pip install confluent-kafka

The library requires librdkafka to be available on your system. On macOS, the pip/uv installation handles this automatically. On Linux, you may need to install it separately (apt-get install librdkafka-dev on Debian/Ubuntu).

Connect to Your Kafka Cluster

Your Kafka cluster from Lesson 4 includes a NodePort listener on port 30092. This exposes Kafka directly on your localhost—no extra setup needed.

Verify the NodePort is working:

Specification
kubectl get svc -n kafka | grep external

Output:

Specification
task-events-kafka-external-bootstrap Node Port 10.96.x.x <none> 9094:30092/TCP

Connection Reference:

Where Your Code RunsBootstrap Server
Local machine (Mac/Windows)localhost:30092
Pod in same namespacetask-events-kafka-bootstrap:9092
Pod in different namespacetask-events-kafka-bootstrap.kafka.svc.cluster.local:9092

For this lesson, you run code locally, so use localhost:30092.

The Minimal Producer

Let's start with the simplest producer that actually works:

python
from confluent_kafka import Producer # Create producer with minimal configuration producer = Producer({ 'bootstrap.servers': 'localhost:30092', 'client.id': 'my-first-producer' }) # Send a message producer.produce( topic='task-created', value='Hello, Kafka!' ) # Wait for all messages to be delivered producer.flush() print("Message sent!")

Output:

Specification
Message sent!

This works, but it's blind. You have no idea whether the message actually reached Kafka or where it landed. Let's add visibility.

Understanding the Asynchronous Model

The produce() method is non-blocking. When you call it, the message goes into an internal buffer, and the method returns immediately. The actual network transmission happens in a background thread.

This creates a problem: how do you know if delivery succeeded?

The answer is delivery callbacks. You provide a function that Kafka calls after each message is delivered (or fails). But there's a catch: callbacks don't execute automatically. You must call poll() to trigger them.

Here's the mental model:

text
produce() → Message enters buffer → Background thread sends to Kafka ↓ Kafka acknowledges ↓ poll() → Triggers your callback with result

Adding Delivery Callbacks

A delivery callback receives two arguments:

  • err: An error object if delivery failed, or None if successful
  • msg: A message object with metadata about the delivered message
python
from confluent_kafka import Producer def delivery_report(err, msg): """Called once for each message produced.""" if err is not None: print(f'FAILED: {err}') else: print(f'SUCCESS: topic={msg.topic()} partition={msg.partition()} offset={msg.offset()}') producer = Producer({ 'bootstrap.servers': 'localhost:30092', 'client.id': 'task-api-producer' }) # Send with callback producer.produce( topic='task-created', value='{"id": "task-001", "title": "Buy groceries"}', callback=delivery_report ) # Trigger callback processing producer.poll(0) # Ensure delivery before exit producer.flush()

Output:

Specification
SUCCESS: topic=task-created partition=0 offset=0

Now you can see exactly where your message landed: topic task-created, partition 0, offset 0.

Why Message Keys Matter

So far, we've sent messages without keys. Kafka accepts this, but you lose an important guarantee.

When you provide a key, Kafka uses it to determine the partition:

python
# Messages with same key always go to same partition producer.produce( topic='task-created', key='user-123', # User ID as key value='{"id": "task-001", "title": "Buy groceries", "user": "user-123"}', callback=delivery_report )

Why this matters:

  1. Ordering: Messages with the same key are always ordered (within a partition)
  2. Locality: All events for an entity stay together, simplifying consumer logic
  3. Scaling: Different keys can be processed in parallel across partitions

For task events, using task_id or user_id as the key ensures all events for that entity arrive in order.

Specification
# All events for task-001 go to same partition, preserving orderproducer.produce(topic='task-events', key='task-001', value='{"type": "created", ...}')producer.produce(topic='task-events', key='task-001', value='{"type": "updated", ...}')producer.produce(topic='task-events', key='task-001', value='{"type": "completed", ...}')

Complete Producer Example

Here's a production-ready producer that sends multiple messages with proper error handling:

python
from confluent_kafka import Producer import json from datetime import datetime, timezone def delivery_report(err, msg): """Callback triggered by poll() or flush() after message delivery.""" if err is not None: print(f'Delivery failed for {msg.key()}: {err}') else: print(f'Delivered: {msg.topic()} [{msg.partition()}] @ {msg.offset()}') def create_producer(): """Create a configured Kafka producer.""" return Producer({ 'bootstrap.servers': 'localhost:30092', 'client.id': 'task-api-producer', }) def send_task_event(producer, task_id: str, title: str): """Send a task created event.""" event = { 'id': task_id, 'title': title, 'created_at': datetime.now(timezone.utc).isoformat() } producer.produce( topic='task-created', key=task_id, value=json.dumps(event), callback=delivery_report ) # Process any pending callbacks (non-blocking) producer.poll(0) def main(): producer = create_producer() # Send some task events tasks = [ ('task-001', 'Buy groceries'), ('task-002', 'Review pull request'), ('task-003', 'Deploy to production'), ] for task_id, title in tasks: send_task_event(producer, task_id, title) print(f'Queued: {task_id}') # Wait for all messages to be delivered remaining = producer.flush(timeout=10) if remaining > 0: print(f'WARNING: {remaining} messages were not delivered') else: print('All messages delivered successfully') if __name__ == '__main__': main()

Output:

Specification
Queued: task-001Queued: task-002Queued: task-003Delivered: task-created [0] @ 1Delivered: task-created [1] @ 0Delivered: task-created [2] @ 0All messages delivered successfully

Notice how messages landed on different partitions (0, 1, 2). Kafka distributed them based on the key hash.

Understanding poll() and flush()

These two methods are often confused. Here's the difference:

MethodBehaviorWhen to Use
poll(timeout)Process callbacks for delivered messages; returns number of events processedCall regularly in loops to handle callbacks without blocking
flush(timeout)Block until all buffered messages are delivered (or timeout); processes callbacksCall before shutdown to ensure no messages are lost

The pattern:

python
# In a loop: poll(0) for non-blocking callback processing for message in messages: producer.produce(topic, value=message, callback=callback) producer.poll(0) # Non-blocking, process any ready callbacks # Before shutdown: flush() to ensure all messages delivered producer.flush(timeout=10) # Block up to 10 seconds

What happens if you skip poll()?

Callbacks accumulate in memory. If you never call poll() or flush(), your callback functions never execute, and you never learn about delivery failures until the program exits.

Verifying Messages with Kafka CLI

Your producer is sending messages, but let's verify they actually arrived. Use the Kafka console consumer:

bash
# First, port-forward to access Kafka from your machine kubectl port-forward svc/task-events-kafka-bootstrap 9092:9092 -n kafka & # Then consume from the beginning kubectl exec -it task-events-dual-role-0 -n kafka -- \ /opt/kafka/bin/kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --topic task-created \ --from-beginning

Output:

Specification
{"id": "task-001", "title": "Buy groceries", "created_at": "2025-01-15T10:30:00Z"}{"id": "task-002", "title": "Review pull request", "created_at": "2025-01-15T10:30:01Z"}{"id": "task-003", "title": "Deploy to production", "created_at": "2025-01-15T10:30:02Z"}

Your messages are in Kafka, persisted and ready for any consumer to read.

Common Producer Errors

ErrorCauseFix
NoBrokersAvailableCan't connect to bootstrap serversVerify port-forward is running; check bootstrap.servers address
UNKNOWN_TOPIC_OR_PARTITIONTopic doesn't existCreate topic first using KafkaTopic CRD or auto.create.topics.enable
MSG_SIZE_TOO_LARGEMessage exceeds max.message.bytesIncrease broker limit or reduce message size
Callback never calledForgot to call poll()/flush()Add poll(0) after produce(), flush() before exit

Reflect on Your Skill

You built a kafka-events skill in Lesson 0. Test and improve it based on what you learned.

Test Your Skill

Specification
Using my kafka-events skill, generate Python producer code that sends task.created events to a Kafka topic.Does my skill include proper error handling, serialization, and callback patterns?

Identify Gaps

Ask yourself:

  • Did my skill show synchronous vs asynchronous producer patterns?
  • Did it include delivery callbacks and error handling?

Improve Your Skill

If you found gaps:

Specification
My kafka-events skill is missing producer implementation patterns (sync vs async, callbacks, error handling).Update it to include when to use synchronous vs asynchronous producers and how to handle delivery failures.

Try With AI

Prompt 1: Debug a Silent Producer

Specification
My Kafka producer runs without errors but my delivery_report callbacknever prints anything. Here's my code:producer.produce(topic='events', value='test', callback=delivery_report) # ... more produce calls ...print("Done sending") What am I missing? Walk me through the produce/poll/flush lifecycle.

What you're learning: The asynchronous callback model—understanding that produce() is non-blocking and callbacks require explicit triggering.

Prompt 2: Design a Key Strategy

Specification
I'm building an event-driven task management system. Each task has:- task_id (unique)- user_id (who owns it)- project_id (which project it belongs to) Help me choose the right message key. I need ordering guarantees fortask lifecycle events (created → updated → completed). But I alsowant to scale consumer processing. What are the trade-offs betweenusing task_id vs user_id vs project_id as the key?

What you're learning: Key design decisions—balancing ordering guarantees against parallelism and understanding partition assignment.

Prompt 3: Add Error Handling

Specification
My producer works in development but I'm worried about production.What happens if:1. Kafka is temporarily unreachable? 2. A message is too large? 3. The topic doesn't exist?Help me add robust error handling to my delivery_report callback.Show me how to log failures, potentially retry, and alert oncritical errors.

What you're learning: Production resilience patterns—moving from "it works on my machine" to handling real-world failure scenarios.

Safety note: When testing producer code, start with a development topic. Avoid producing to production topics until you've verified your error handling works correctly.