USMAN’S INSIGHTS
AI ARCHITECT
  • Home
  • About
  • Thought Leadership
  • Book
Press / Contact
USMAN’S INSIGHTS
AI ARCHITECT
⌘F
HomeBook
HomeBookThe Connector: Kafka Connect and Data Pipelines
Previous Chapter
Reliability Configuration
Next Chapter
Change Data Capture with Debezium
AI NOTICE: This is the table of contents for the SPECIFIC CHAPTER only. It is NOT the global sidebar. For all chapters, look at the main navigation.

On this page

27 sections

Progress0%
1 / 27

Muhammad Usman Akbar Entity Profile

Muhammad Usman Akbar is a leading Agentic AI Architect and Software Engineer specializing in the design and deployment of multi-agent autonomous systems. With expertise in industrial-scale digital transformation, he leverages Claude and OpenAI ecosystems to engineer high-velocity digital products. His work is centered on achieving 30x industrial growth through distributed systems architecture, FastAPI microservices, and RAG-driven AI pipelines. Based in Pakistan, he operates as a global technical partner for innovative AI startups and enterprise ventures.

USMAN’S INSIGHTS
AI ARCHITECT

Transforming businesses into autonomous AI ecosystems. Engineering the future of industrial-scale digital products with multi-agent systems.

30X Growth
AI-First
Innovation

Navigation

  • Home
  • Book
  • About
  • Contact
Let's Collaborate

Have a Project in Mind?

Let's build something extraordinary together. Transform your vision into autonomous AI reality.

Start Your Transformation

© 2026 Muhammad Usman Akbar. All rights reserved.

Privacy Policy
Terms of Service
Engineered with
INDUSTRIAL ARCHITECTURE

Kafka Connect: Building Data Pipelines

You've built producers and consumers in Python. They work, but now you're asked to integrate Kafka with a PostgreSQL database, an Elasticsearch cluster, and an S3 bucket. Writing custom producers and consumers for each integration means maintaining three separate codebases, handling retries, managing offsets, and implementing fault tolerance—for each one.

This is exactly what Kafka Connect solves. It's a framework for building standardized, scalable data pipelines without writing custom code. Over 200 pre-built connectors exist for databases, message queues, cloud storage, and search engines. Instead of implementing integration logic, you deploy a connector with configuration.

In this chapter, you'll deploy Kafka Connect on Kubernetes using Strimzi and configure connectors using declarative YAML—the same GitOps pattern you've used throughout this book. The pattern you learn here lets you build data pipelines that would take weeks to code manually, deployed in minutes.

Understanding Kafka Connect Architecture

Kafka Connect consists of workers that execute connectors and tasks. This separation lets Connect distribute work across multiple nodes for scalability and fault tolerance.

text
┌──────────────────────────────────────────────────────────────────────────┐ │ Kafka Connect Cluster │ ├──────────────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────────────┐ ┌─────────────────────┐ │ │ │ Worker 1 │ │ Worker 2 │ │ │ │ ┌───────────────┐ │ │ ┌───────────────┐ │ │ │ │ │ Connector A │ │ │ │ Connector A │ │ │ │ │ │ Task 0 │ │ │ │ Task 1 │ │ │ │ │ └───────────────┘ │ │ └───────────────┘ │ │ │ │ ┌───────────────┐ │ │ ┌───────────────┐ │ │ │ │ │ Connector B │ │ │ │ Connector C │ │ │ │ │ │ Task 0 │ │ │ │ Task 0 │ │ │ │ │ └───────────────┘ │ │ └───────────────┘ │ │ │ └─────────────────────┘ └─────────────────────┘ │ │ │ └──────────────────────────────────────────────────────────────────────────┘ │ ▼ ┌───────────────┐ │ Kafka │ │ Cluster │ └───────────────┘

Key components:

ComponentRoleScaling
WorkerJVM process that hosts connector tasksAdd workers for more capacity
ConnectorLogical job definition (what to integrate)One connector can have many tasks
TaskUnit of work doing actual data movementConnector decides task count based on parallelism needs
ConverterTransforms data between Kafka format and connector formatConfigured per connector

Source vs Sink Connectors

Connectors come in two types based on data flow direction:

text
Source Connectors (Data INTO Kafka) │ ┌──────────────┐ ▼ ┌──────────────┐ │ PostgreSQL │ ──────────────────▶ │ │ └──────────────┘ │ │ │ Kafka │ ┌──────────────┐ │ Topics │ │ S3 │ ──────────────────▶ │ │ └──────────────┘ │ │ └──────────────┘ │ ┌──────────────────┼──────────────────┐ ▼ ▼ ▼ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ │Elasticsearch │ │ Redis │ │ S3 │ └──────────────┘ └──────────────┘ └──────────────┘ Sink Connectors (Data OUT OF Kafka)

Source connectors produce data into Kafka topics:

Use CaseSource ConnectorData Flow
Database CDCDebezium PostgreSQLDB changes → Kafka
File ingestionFile SourceFiles → Kafka
JDBC pollingJDBC SourceSQL query results → Kafka
HTTP endpointsHTTP SourceAPI responses → Kafka

Sink connectors consume from Kafka topics and write to external systems:

Use CaseSink ConnectorData Flow
Search indexingElasticsearch SinkKafka → Elasticsearch
Data lakeS3 SinkKafka → S3 buckets
Database syncJDBC SinkKafka → Database tables
CachingRedis SinkKafka → Redis

Deploying Kafka Connect with Strimzi

Strimzi provides two CRDs for Kafka Connect:

  • KafkaConnect: Deploys a Kafka Connect cluster
  • KafkaConnector: Configures individual connectors

Step 1: Create KafkaConnect Resource

The KafkaConnect CRD deploys Connect workers and optionally builds a custom image with connector plugins:

yaml
# kafka-connect.yaml apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnect metadata: name: task-connect namespace: kafka annotations: strimzi.io/use-connector-resources: "true" # Enable KafkaConnector CRD spec: version: 4.1.1 replicas: 2 # Two workers for fault tolerance bootstrapServers: task-events-kafka-bootstrap:9092 # Build connector image with plugins build: output: type: docker image: localhost:5000/task-connect:latest # Local registry or your registry plugins: - name: file-source artifacts: - type: maven group: org.apache.kafka artifact: connect-file version: 4.1.1 - name: debezium-postgres artifacts: - type: maven group: io.debezium artifact: debezium-connector-postgres version: 2.7.0.Final config: # Worker configuration group.id: task-connect-cluster offset.storage.topic: task-connect-offsets config.storage.topic: task-connect-configs status.storage.topic: task-connect-status # Replication for internal topics offset.storage.replication.factor: 1 config.storage.replication.factor: 1 status.storage.replication.factor: 1 # Converters (how data is serialized) key.converter: org.apache.kafka.connect.storage.StringConverter value.converter: org.apache.kafka.connect.json.JsonConverter value.converter.schemas.enable: false resources: requests: memory: 512Mi cpu: 250m limits: memory: 1Gi cpu: 500m

Key configuration explained:

SettingPurpose
strimzi.io/use-connector-resourcesEnables managing connectors via KafkaConnector CRDs instead of REST API
build.pluginsDownloads connector JARs from Maven and builds them into the Connect image
offset.storage.topicKafka topic storing connector offset positions
config.storage.topicKafka topic storing connector configurations
status.storage.topicKafka topic storing connector status
key.converter / value.converterHow messages are serialized (JSON, Avro, String)

Step 2: Apply and Verify Deployment

bash
# Apply KafkaConnect resource kubectl apply -f kafka-connect.yaml -n kafka # Watch for Connect pods to become ready (takes 2-5 minutes for image build) kubectl get pods -n kafka -l strimzi.io/cluster=task-connect -w

Output:

text
NAME READY STATUS RESTARTS AGE task-connect-connect-0 1/1 Running 0 3m task-connect-connect-1 1/1 Running 0 3m task-connect-build-1-build 0/1 Completed 0 5m

The build pod compiles the connector image, then the Connect workers start.

Step 3: Configure a Source Connector

With Kafka Connect running, deploy a connector using KafkaConnector CRD. This example uses the File Source connector (useful for testing):

yaml
# file-source-connector.yaml apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: file-source namespace: kafka labels: strimzi.io/cluster: task-connect # Links to KafkaConnect resource spec: class: org.apache.kafka.connect.file.FileStreamSourceConnector tasksMax: 1 config: file: /opt/kafka/data/input.txt topic: file-events
bash
# Apply the connector kubectl apply -f file-source-connector.yaml -n kafka # Check connector status kubectl get kafkaconnectors -n kafka

Output:

text
NAME CLUSTER CONNECTOR CLASS MAX TASKS READY file-source task-connect org.apache.kafka.connect.file.File Stream Source Connector 1 True

Step 4: Configure a Sink Connector

Sink connectors write Kafka data to external systems. This example writes to a file (for demonstration):

yaml
# file-sink-connector.yaml apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: file-sink namespace: kafka labels: strimzi.io/cluster: task-connect spec: class: org.apache.kafka.connect.file.FileStreamSinkConnector tasksMax: 1 config: file: /opt/kafka/data/output.txt topics: file-events

Now data flows: input.txt → file-events topic → output.txt

Practical Example: Database to Elasticsearch Pipeline

A realistic pipeline might sync database changes to Elasticsearch for search. Here's how you'd configure it:

yaml
# postgres-source.yaml apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: postgres-tasks-source namespace: kafka labels: strimzi.io/cluster: task-connect spec: class: io.debezium.connector.postgresql.PostgresConnector tasksMax: 1 config: database.hostname: postgres.default.svc.cluster.local database.port: "5432" database.user: ${file:/opt/kafka/external-secrets/postgres:username} database.password: ${file:/opt/kafka/external-secrets/postgres:password} database.dbname: taskdb database.server.name: taskdb table.include.list: public.tasks topic.prefix: cdc plugin.name: pgoutput --- # elasticsearch-sink.yaml apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaConnector metadata: name: elasticsearch-tasks-sink namespace: kafka labels: strimzi.io/cluster: task-connect spec: class: io.confluent.connect.elasticsearch.ElasticsearchSinkConnector tasksMax: 2 config: connection.url: http://elasticsearch.default.svc.cluster.local:9200 topics: cdc.public.tasks type.name: _doc key.ignore: "false" schema.ignore: "true"

Data flow:

  1. User creates task in PostgreSQL
  2. Debezium reads PostgreSQL WAL (transaction log)
  3. Change event published to cdc.public.tasks topic
  4. Elasticsearch sink writes to tasks index
  5. Task searchable in Elasticsearch within seconds

When to Use Kafka Connect vs Custom Code

Not every integration needs Kafka Connect. Here's a decision framework:

FactorUse Kafka ConnectUse Custom Producer/Consumer
Standard systemDatabases, S3, ElasticsearchCustom APIs, proprietary systems
Connector exists200+ pre-built connectorsNo connector available
Configuration-onlyYAML/JSON config sufficientComplex transformation logic
Operations teamPrefers declarative configPrefers code review process
MaintenanceConnector vendor maintainsYou maintain
Performance tuningStandard throughput needsExtreme optimization required
Domain logicSimple field mappingBusiness rules, validation

Choose Kafka Connect when:

  • A connector exists for your source/sink
  • Configuration can express your requirements
  • You want ecosystem support and community updates

Choose custom code when:

  • No connector exists for your system
  • Complex business logic during data movement
  • You need fine-grained control over batching, retries, error handling
  • Performance requirements exceed connector capabilities

Monitoring Connector Status

Check connector and task status:

bash
# Connector-level status kubectl get kafkaconnectors -n kafka # Detailed status including tasks kubectl get kafkaconnector file-source -n kafka -o yaml

Output (status section):

yaml
status: conditions: - lastTransitionTime: "2025-01-15T10:30:00Z" status: "True" type: Ready connectorStatus: connector: state: RUNNING worker_id: task-connect-connect-0.task-connect-connect.kafka.svc:8083 name: file-source tasks: - id: 0 state: RUNNING worker_id: task-connect-connect-0.task-connect-connect.kafka.svc:8083 type: source

Common connector states:

StateMeaningAction
RUNNINGHealthyNone
PAUSEDTemporarily stoppedCheck if intentional
FAILEDTask crashedCheck logs, fix config
UNASSIGNEDNo worker picked up taskCheck worker health

View connector logs:

Specification
kubectl logs task-connect-connect-0 -n kafka | grep -i "file-source"

Reflect on Your Skill

You built a kafka-events skill in Chapter 1. Test and improve it based on what you learned.

Test Your Skill

Specification
Using my kafka-events skill, design a Kafka Connect pipeline that captures PostgreSQL changes and sinks them to Elasticsearch.Does my skill generate Kafka Connect and Kafka Connector CRDs? Does it explain source vs sink connectors?

Identify Gaps

Ask yourself:

  • Did my skill explain Kafka Connect architecture (workers, tasks, connectors)?
  • Did it cover when to use Kafka Connect versus custom producers/consumers?

Improve Your Skill

If you found gaps:

Specification
My kafka-events skill is missing Kafka Connect patterns (source/sink connectors, Kafka Connect/Kafka Connector CRDs).Update it to include when to use pre-built connectors versus writing custom integration code.

Try With AI

Setup: You need to build a data pipeline integrating multiple systems with Kafka.

Prompt 1: Design a pipeline architecture

Specification
I need to build a pipeline that:1. Captures changes from a PostgreSQL "orders" table 2. Enriches data by joining with a "customers" topic 3. Writes to both Elasticsearch (for search) and S3 (for analytics) Help me design this with Kafka Connect. What connectors do I need?What Kafka topics should exist? Where does the enrichment happen?

What you're learning: AI helps you think through pipeline topology. It will likely point out that enrichment (joining streams) happens in Kafka Streams or ksqlDB, not in Connect itself—Connect just moves data, transformation is a separate concern.

Prompt 2: Troubleshoot a connector failure

Specification
My Kafka Connector shows this status:status: connector Status: connector: state: FAILED trace: "org.apache.kafka.connect.errors.Connect Exception: Unable to connect to database: Connection refused" tasks: []What are the possible causes and how do I debug this step by step?Include kubectl commands I should run.

What you're learning: AI walks through systematic debugging—network connectivity, credentials, service discovery in Kubernetes, and how to test database access from within the Connect pod.

Prompt 3: Evaluate for your use case

Specification
I'm building an integration between our Task API and a third-partyCRM system. The CRM has a REST API but no Kafka connector exists.I need to: - Push new tasks to CRM when created - Pull CRM contact updates back to our system Should I write custom producers/consumers or try to make Kafka Connect work?What are the tradeoffs? Be specific about maintenance, complexity, and operations.

What you're learning: AI helps you apply the Connect vs custom code decision framework to your specific constraints, likely recommending custom code for the REST API integration while suggesting Connect patterns you might adopt in your custom implementation.

Safety note: When testing connectors, especially source connectors reading from databases, use read-only credentials or a replica database. A misconfigured connector can create load on production databases.