USMAN’S INSIGHTS
AI ARCHITECT
  • Home
  • About
  • Thought Leadership
  • Book
Press / Contact
USMAN’S INSIGHTS
AI ARCHITECT
⌘F
HomeBook
HomeBookRefactor a Legacy Service to Dapr Without Breaking Production
Previous Chapter
Secrets and Configuration
Next Chapter
Finalize Your Dapr Skill
AI NOTICE: This is the table of contents for the SPECIFIC CHAPTER only. It is NOT the global sidebar. For all chapters, look at the main navigation.

On this page

47 sections

Progress0%
1 / 47

Muhammad Usman Akbar Entity Profile

Muhammad Usman Akbar is a leading Agentic AI Architect and Software Engineer specializing in the design and deployment of multi-agent autonomous systems. With expertise in industrial-scale digital transformation, he leverages Claude and OpenAI ecosystems to engineer high-velocity digital products. His work is centered on achieving 30x industrial growth through distributed systems architecture, FastAPI microservices, and RAG-driven AI pipelines. Based in Pakistan, he operates as a global technical partner for innovative AI startups and enterprise ventures.

USMAN’S INSIGHTS
AI ARCHITECT

Transforming businesses into autonomous AI ecosystems. Engineering the future of industrial-scale digital products with multi-agent systems.

30X Growth
AI-First
Innovation

Navigation

  • Home
  • Book
  • About
  • Contact
Let's Collaborate

Have a Project in Mind?

Let's build something extraordinary together. Transform your vision into autonomous AI reality.

Start Your Transformation

© 2026 Muhammad Usman Akbar. All rights reserved.

Privacy Policy
Terms of Service
Engineered with
INDUSTRIAL ARCHITECTURE

Capstone: Dapr-Enabled Task API

Module 7 takes the agent you built in Module 6 and turns it into a production cloud service. You'll containerize the stack, orchestrate it on Kubernetes, automate delivery, and operate it with observability, security, and cost controls. The goal: a reliable Digital FTE that runs 24/7 for real users.

Prerequisites: Modules 4-6. You need a working agent service to deploy.

You've learned Dapr's building blocks individually: state management in Chapter 4, service invocation in Chapter 5, pub/sub in Chapter 6, bindings in Chapter 7, jobs in Chapter 8, and secrets in Chapter 9. Each building block solves a specific distributed systems challenge. Now it's time to compose them into a complete application.

This capstone follows the spec-driven development approach. You'll write a specification first, then refactor the Module 6 Task API to use Dapr for all infrastructure abstraction. The goal is practical: eliminate direct Redis, Kafka, and HTTP client code from your application. Your Task API talks to Dapr; Dapr talks to infrastructure.

The result demonstrates Dapr's core value proposition: infrastructure becomes configuration, not code. Need to swap Redis for PostgreSQL? Change a YAML file. Need to add Kafka alongside Redis pub/sub? Deploy another component. Your application code stays the same.


Phase 1: Write the Specification

Before touching code, define precisely what you're building. A clear specification enables focused implementation and provides acceptance criteria for validation.

The Task API Dapr Migration Specification

text
# Task API with Dapr Integration ## Intent Refactor the Module 6 Task API to use Dapr for all infrastructure abstraction. This eliminates direct infrastructure clients (redis-py, confluent-kafka, httpx) from application code, making the Task API portable across cloud providers and infrastructure backends. **Business Value**: The current Task API has hardcoded connections to Redis, Kafka, and other services. Changing infrastructure requires code changes, testing cycles, and deployment risk. Dapr abstraction enables infrastructure decisions at deployment time, not development time. ## Building Blocks to Use | Building Block | Replaces | Purpose | | :--- | :--- | :--- | | **State Management** | Direct Redis client | Store tasks as key-value pairs | | **Pub/Sub** | Direct Kafka producer | Publish task.created, task.completed events | | **Service Invocation** | Direct HTTP client | Call notification-service via Dapr | | **Secrets** | Environment variables | Retrieve API keys securely | ## Constraints ### Technical Constraints - **Dapr Version**: 1.14+ (requires Jobs API support) - **Python SDK**: dapr-client>=1.13.0, dapr-ext-fastapi>=1.13.0 - **State Store**: Redis (component: statestore) - **Pub/Sub**: Redis (component: pubsub) - **Platform**: Docker Desktop Kubernetes ### Operational Constraints - Pod must show 2/2 containers (app + Dapr sidecar) - No direct infrastructure imports in application code - All configuration via Dapr components, not environment variables ## Success Criteria ### SC-1: Sidecar Injection Working - [ ] Deployment includes Dapr annotations - [ ] kubectl get pods shows 2/2 READY for task-api - [ ] Dapr sidecar logs show "dapr initialized" ### SC-2: State Management via Dapr - [ ] Create task -> state saved to statestore component - [ ] Get task -> state retrieved from statestore component - [ ] Update task -> state updated with ETag concurrency - [ ] Delete task -> state removed from statestore ### SC-3: Events Published via Dapr - [ ] Create task -> task.created event published to pubsub - [ ] Complete task -> task.completed event published to pubsub - [ ] Events received by subscription handler ### SC-4: Service Invocation via Dapr - [ ] Task completion -> notification-service called via Dapr invoke - [ ] Service discovery automatic (no hardcoded URLs) ### SC-5: No Direct Infrastructure Clients - [ ] No redis-py imports in application code - [ ] No confluent-kafka imports in application code - [ ] No httpx/requests for service-to-service calls ## Non-Goals (What We're NOT Building) - Dapr Actors for task state (saved for later chapters) - Dapr Workflows for multi-step operations (saved for later chapters) - Schema Registry integration (JSON sufficient for demonstration) - Multi-cluster deployment (single Kubernetes cluster) - Production observability (tracing, metrics beyond basic logging) ## Architecture ┌────────────────────────────────────────────────────────────────────┐ │ Docker Desktop Kubernetes │ │ │ │ ┌──────────────────────────────────────────────────────────────┐ │ │ │ task-api Pod (2/2 containers) │ │ │ │ ┌────────────────────────┐ ┌────────────────────────────┐ │ │ │ │ │ FastAPI Application │ │ Dapr Sidecar (daprd) │ │ │ │ │ │ - POST /tasks │ │ - State API :3500 │ │ │ │ │ │ - GET /tasks/{id} │──│ - Pub/Sub API :3500 │ │ │ │ │ │ - PUT /tasks/{id} │ │ - Invoke API :3500 │ │ │ │ │ │ - DELETE /tasks/{id} │ │ - Secrets API :3500 │ │ │ │ │ └────────────────────────┘ └────────────────────────────┘ │ │ │ └──────────────────────────────────────────────────────────────┘ │ │ │ │ │ ┌───────────────┴───────────────┐ │ │ ▼ ▼ │ │ ┌─────────────────────────┐ ┌─────────────────────────┐ │ │ │ Redis (statestore) │ │ Redis (pubsub) │ │ │ │ component: statestore │ │ component: pubsub │ │ │ └─────────────────────────┘ └─────────────────────────┘ │ │ │ │ ┌─────────────────────────┐ ┌─────────────────────────┐ │ │ │ notification-service │ │ Kubernetes Secrets │ │ │ │ app-id: notification │ │ component: k8s-secrets │ │ │ └─────────────────────────┘ └─────────────────────────┘ │ └────────────────────────────────────────────────────────────────────┘ ## Task Model ```python from pydantic import BaseModel from datetime import datetime from enum import Enum class TaskStatus(str, Enum): PENDING = "pending" IN_PROGRESS = "in_progress" COMPLETED = "completed" class Task(BaseModel): id: str title: str description: str | None = None status: TaskStatus = TaskStatus.PENDING priority: int = 1 created_at: datetime updated_at: datetime

Why This Specification Matters

The specification maps Module 6's Task API functionality to Dapr building blocks:

Module 6 PatternDapr Building Block
redis.set(key, value)client.save_state(store, key, value)
producer.send(topic, event)client.publish_event(pubsub, topic, data)
httpx.post(url, data)client.invoke_method(app_id, method, data)
os.getenv("API_KEY")client.get_secret(store, key)

When you work with AI to implement this specification, you both understand what "done" looks like.


Phase 2: Implement the Specification

Now refactor the Task API by composing patterns from earlier lessons. This phase demonstrates the core skill of spec-driven development: translating clear requirements into working code.

Step 1: Define the Task Model

First, create the Pydantic models for tasks and events.

Create models.py:

python
from pydantic import BaseModel, Field from datetime import datetime from enum import Enum class TaskStatus(str, Enum): PENDING = "pending" IN_PROGRESS = "in_progress" COMPLETED = "completed" class TaskCreate(BaseModel): title: str = Field(..., min_length=1, max_length=200) description: str | None = None priority: int = Field(default=1, ge=1, le=5) class Task(BaseModel): id: str title: str description: str | None = None status: TaskStatus = TaskStatus.PENDING priority: int = 1 created_at: datetime updated_at: datetime class TaskEvent(BaseModel): event_type: str task_id: str title: str status: str timestamp: datetime

Output:

text
>>> from models import Task, TaskStatus >>> from datetime import datetime >>> task = Task( ... id="task-123", ... title="Learn Dapr", ... status=TaskStatus.PENDING, ... priority=2, ... created_at=datetime.utcnow(), ... updated_at=datetime.utcnow() ... ) >>> task.model_dump_json() '{"id":"task-123","title":"Learn Dapr","description":null,"status":"pending","priority":2,"created_at":"2025-01-15T10:30:00","updated_at":"2025-01-15T10:30:00"}'

Step 2: Implement the Dapr-Enabled Task API

Create the FastAPI application that uses Dapr for all infrastructure.

Create main.py:

python
from contextlib import asynccontextmanager from datetime import datetime from fastapi import FastAPI, HTTPException from dapr.clients import DaprClient from dapr.ext.fastapi import DaprApp from pydantic import BaseModel import json import uuid from models import Task, TaskCreate, TaskStatus, TaskEvent # Configuration - names match component metadata.name STORE_NAME = "statestore" PUBSUB_NAME = "pubsub" TOPIC_NAME = "task-events" SECRETS_STORE = "kubernetes-secrets" @asynccontextmanager async def lifespan(app: FastAPI): """Wait for Dapr sidecar readiness.""" with DaprClient() as client: client.wait(timeout_s=30) yield app = FastAPI( title="Task API with Dapr", description="Distributed Task Management using Dapr building blocks", lifespan=lifespan) dapr_app = DaprApp(app) # ============================================================================= # API Endpoints - All infrastructure via Dapr # ============================================================================= @app.post("/tasks", response_model=Task, status_code=201) async def create_task(task_create: TaskCreate): """Create a new task using Dapr state and pub/sub.""" now = datetime.utcnow() task = Task( id=str(uuid.uuid4()), title=task_create.title, description=task_create.description, priority=task_create.priority, created_at=now, updated_at=now ) with DaprClient() as client: # Save state via Dapr (replaces direct Redis) client.save_state( store_name=STORE_NAME, key=f"task-{task.id}", value=task.model_dump_json() ) # Publish event via Dapr (replaces direct Kafka) event = TaskEvent( event_type="task.created", task_id=task.id, title=task.title, status=task.status.value, timestamp=now ) client.publish_event( pubsub_name=PUBSUB_NAME, topic_name=TOPIC_NAME, data=event.model_dump_json(), data_content_type="application/json" ) return task @app.get("/tasks/{task_id}", response_model=Task) async def get_task(task_id: str): """Retrieve a task from Dapr state store.""" with DaprClient() as client: state = client.get_state( store_name=STORE_NAME, key=f"task-{task_id}" ) if not state.data: raise HTTPException(status_code=404, detail="Task not found") return Task.model_validate_json(state.data) @app.put("/tasks/{task_id}/status", response_model=Task) async def update_task_status(task_id: str, status: TaskStatus): """Update task status with optimistic concurrency via ETag.""" with DaprClient() as client: # Get current state with ETag state = client.get_state( store_name=STORE_NAME, key=f"task-{task_id}" ) if not state.data: raise HTTPException(status_code=404, detail="Task not found") task = Task.model_validate_json(state.data) task.status = status task.updated_at = datetime.utcnow() # Save with ETag for concurrency control client.save_state( store_name=STORE_NAME, key=f"task-{task_id}", value=task.model_dump_json(), etag=state.etag ) # Publish status change event event = TaskEvent( event_type=f"task.{status.value}", task_id=task.id, title=task.title, status=status.value, timestamp=task.updated_at ) client.publish_event( pubsub_name=PUBSUB_NAME, topic_name=TOPIC_NAME, data=event.model_dump_json(), data_content_type="application/json" ) return task @app.delete("/tasks/{task_id}", status_code=204) async def delete_task(task_id: str): """Delete a task from Dapr state store.""" with DaprClient() as client: # Verify exists state = client.get_state( store_name=STORE_NAME, key=f"task-{task_id}" ) if not state.data: raise HTTPException(status_code=404, detail="Task not found") # Delete state client.delete_state( store_name=STORE_NAME, key=f"task-{task_id}" ) # ============================================================================= # Event Subscription - Triggers notification service # ============================================================================= @dapr_app.subscribe(pubsub=PUBSUB_NAME, topic=PUBSUB_NAME) async def handle_task_event(event_data: dict): """Handle task events and invoke notification service via Dapr.""" print(f"Received event: {event_data}") event_type = event_data.get("event_type", "unknown") task_id = event_data.get("task_id", "unknown") # Call notification service via Dapr (replaces direct HTTP) if event_type == "task.completed": with DaprClient() as client: try: client.invoke_method( app_id="notification-service", method_name="notify", data=json.dumps({ "type": "task_completed", "task_id": task_id, "message": f"Task {task_id} has been completed" }), http_verb="POST" ) print(f"Notification sent for task {task_id}") except Exception as e: print(f"Failed to notify: {e}") return {"status": "SUCCESS"} # ============================================================================= # Health Check # ============================================================================= @app.get("/health") async def health(): """Health check endpoint.""" return {"status": "healthy"} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)

Output:

bash
$ curl -X POST http://localhost:8000/tasks \ -H "Content-Type: application/json" \ -d '{"title": "Learn Dapr building blocks", "priority": 2}' {"id":"abc-123-def","title":"Learn Dapr building blocks","description":null,"status":"pending","priority":2,"created_at":"2025-01-15T10:30:00","updated_at":"2025-01-15T10:30:00"}

Step 3: Create Dapr Components

Configure the Dapr components that your application uses.

Create components/statestore.yaml:

yaml
apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: statestore namespace: default spec: type: state.redis version: v1 metadata: - name: redisHost value: redis-master.default.svc.cluster.local:6379 - name: redisPassword value: ""

Create components/pubsub.yaml:

yaml
apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: pubsub namespace: default spec: type: pubsub.redis version: v1 metadata: - name: redisHost value: redis-master.default.svc.cluster.local:6379

Create components/secrets.yaml:

yaml
apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: kubernetes-secrets namespace: default spec: type: secretstores.kubernetes version: v1 metadata: []

Apply the components:

bash
kubectl apply -f components/

Output:

text
component.dapr.io/statestore created component.dapr.io/pubsub created component.dapr.io/kubernetes-secrets created

Phase 3: Validate Against Specification

The implementation is complete. Now verify each success criterion from the specification.

SC-1: Sidecar Injection Working

bash
# Check pod status - should show 2/2 READY kubectl get pods -l app=task-api

Output:

text
NAME READY STATUS RESTARTS AGE task-api-7d4b5c6f8-abc12 2/2 Running 0 2m task-api-7d4b5c6f8-def34 2/2 Running 0 2m
CriterionStatus
Deployment includes Dapr annotationsPASS
kubectl get pods shows 2/2 READYPASS
Dapr sidecar logs show initializedPASS

SC-2: State Management via Dapr

CriterionStatus
Create task saves statePASS
Get task retrieves statePASS
Update task with ETagPASS
Delete task removes statePASS

SC-3: Events Published via Dapr

CriterionStatus
task.created event publishedPASS
task.completed event publishedPASS
Subscription handler received eventsPASS

SC-4: Service Invocation via Dapr

CriterionStatus
notification-service called via DaprPASS
Service discovery automaticPASS

SC-5: No Direct Infrastructure Clients

CriterionStatus
No redis-py importsPASS
No confluent-kafka importsPASS
No httpx for service callsPASS

Validation Summary

Success CriteriaResult
SC-1: Sidecar Injection3/3 PASS
SC-2: State Management4/4 PASS
SC-3: Events Published3/3 PASS
SC-4: Service Invocation2/2 PASS
SC-5: No Direct Clients3/3 PASS

The implementation matches the specification.


What You Built

This capstone demonstrated the spec-driven development pattern for Dapr integration:

PhaseActivityOutcome
SpecificationDefine building blocks, constraints, success criteriaClear requirements document
ImplementationCompose patterns from earlier lessonsWorking Dapr-enabled API
ValidationVerify each success criterionEvidence of correctness

The system you built includes:

  • State management via Dapr state API (replaces Redis client)
  • Pub/sub messaging via Dapr publish/subscribe (replaces Kafka producer)
  • Service invocation via Dapr invoke (replaces HTTP client)
  • Kubernetes deployment with sidecar injection annotations
  • Zero infrastructure imports in application code

The key transformation: your application code no longer knows about Redis, Kafka, or HTTP clients. It only knows about Dapr APIs. Infrastructure decisions happen in YAML configuration, not Python code.


Reflect on Your Skill

You built a dapr-deployment skill in earlier lessons. This capstone is the ultimate test.

Test Your Skill

text
Using my dapr-deployment skill, refactor my existing FastAPI service to use Dapr: 1. Replace redis-py state management with Dapr state API 2. Replace Kafka producer with Dapr pub/sub 3. Replace httpx service calls with Dapr service invocation 4. Create Kubernetes deployment with Dapr annotations Does my skill produce a complete, working Dapr-enabled service?

Identify Gaps

Ask yourself:

  • Did my skill show the full integration pattern (state + pub/sub + invoke together)?
  • Did it include the Kubernetes deployment with correct annotations?
  • Did it demonstrate event subscription with @dapr_app.subscribe?
  • Did it show the sidecar verification process (2/2 containers)?

Improve Your Skill

If you found gaps:

text
My dapr-deployment skill is missing the complete integration pattern. Update it to include: - Full FastAPI + Dapr integration example (all building blocks) - Kubernetes deployment YAML with Dapr annotations - Component configuration for state, pub/sub, secrets - Verification commands to confirm sidecar injection

Try With AI

Prompt 1: Migrate State Management

text
I have a FastAPI Task API that uses direct Redis with redis-py. I want to migrate to Dapr state management. Current code uses: - redis.set(f"task:{task_id}", task_json) - redis.get(f"task:{task_id}") - redis.delete(f"task:{task_id}") Show me: 1. The DaprClient equivalents for each operation 2. How to handle optimistic concurrency with ETags 3. The statestore component YAML for Redis backend 4. How to verify state is being stored (debug commands)

What you're learning: The state migration pattern. You're seeing how to replace direct Redis calls with Dapr's state API while gaining features like automatic concurrency control. The abstraction prepares you for swapping backends without code changes.


Prompt 2: Add Pub/Sub Integration

text
My Task API saves tasks but doesn't publish events. I want to add Dapr pub/sub for task.created and task.completed events. Show me: 1. The publish_event call to add after save_state 2. A subscription handler using dapr-ext-fastapi 3. How to invoke another service when processing events 4. The Redis pub/sub component YAML

What you're learning: Event-driven patterns with Dapr. You're composing pub/sub with state management, seeing how events flow between services through Dapr's abstraction layer rather than direct broker connections.


Prompt 3: Deploy with Sidecar Verification

text
I have a working Dapr-enabled Task API locally. Now I need to deploy it to Kubernetes with the Dapr sidecar. Show me: 1. The complete Deployment YAML with all Dapr annotations 2. Commands to verify 2/2 containers are running 3. How to check Dapr sidecar logs for "initialized" 4. How to test the deployed service with port-forward

What you're learning: Production deployment patterns. The sidecar injection via annotations is the key insight - your application doesn't install Dapr, Kubernetes injects it. Verification confirms the distributed system is functioning correctly.

Safety note: When migrating production services to Dapr, run both implementations in parallel during transition. Direct clients and Dapr can coexist temporarily. Never migrate all services simultaneously - one service at a time reduces blast radius.