USMAN’S INSIGHTS
AI ARCHITECT
  • Home
  • About
  • Thought Leadership
  • Book
Press / Contact
USMAN’S INSIGHTS
AI ARCHITECT
⌘F
HomeBook
HomeBookCapstone: Stateful Task Agent with Workflows
Previous Chapter
Actor Security Essentials
Next Chapter
Finalize Your Dapr Skill
AI NOTICE: This is the table of contents for the SPECIFIC CHAPTER only. It is NOT the global sidebar. For all chapters, look at the main navigation.

On this page

31 sections

Progress0%
1 / 31

Muhammad Usman Akbar Entity Profile

Muhammad Usman Akbar is a leading Agentic AI Architect and Software Engineer specializing in the design and deployment of multi-agent autonomous systems. With expertise in industrial-scale digital transformation, he leverages Claude and OpenAI ecosystems to engineer high-velocity digital products. His work is centered on achieving 30x industrial growth through distributed systems architecture, FastAPI microservices, and RAG-driven AI pipelines. Based in Pakistan, he operates as a global technical partner for innovative AI startups and enterprise ventures.

USMAN’S INSIGHTS
AI ARCHITECT

Transforming businesses into autonomous AI ecosystems. Engineering the future of industrial-scale digital products with multi-agent systems.

30X Growth
AI-First
Innovation

Navigation

  • Home
  • Book
  • About
  • Contact
Let's Collaborate

Have a Project in Mind?

Let's build something extraordinary together. Transform your vision into autonomous AI reality.

Start Your Transformation

© 2026 Muhammad Usman Akbar. All rights reserved.

Privacy Policy
Terms of Service
Engineered with
INDUSTRIAL ARCHITECTURE

Capstone: Stateful Task Agent with Workflows

You've traveled through 18 lessons of actors and workflows. You understand turn-based concurrency, durable execution, reminders that survive restarts, and saga patterns that compensate on failure. Now it's time to combine everything into a complete system.

This capstone isn't just an exercise. It's a Digital FTE blueprint—the kind of stateful agent system you could package and sell to clients who need intelligent task management with conversation capabilities.

By the end of this lesson, you'll have a working system where:

  • TaskActor manages individual task state with deadline reminders
  • ChatActor maintains conversation context per user
  • TaskProcessingWorkflow orchestrates task lifecycle with saga compensation
  • All components deploy to Docker Desktop Kubernetes with Dapr sidecars

This is the culmination of Module 7. Everything you've learned converges here.

System Architecture

Before writing code, let's understand what we're building:

Specification
┌────────────────────────────────────────────────────────────────────────┐ │ Stateful Task Agent System │ ├────────────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ │ │ TaskActor │ │ ChatActor │ │ Workflow │ │ │ │ (per task) │ │ (per user) │ │ Runtime │ │ │ ├─────────────────┤ ├─────────────────┤ ├─────────────────┤ │ │ │ - status │ │ - history[] │ │ validate_task │ │ │ │ - assignee │ │ - context │ │ assign_task │ │ │ │ - deadline │ │ - preferences │ │ notify_assignee │ │ │ │ - reminder │ │ │ │ (saga compen.) │ │ │ └────────┬────────┘ └────────┬────────┘ └────────┬────────┘ │ │ │ │ │ │ │ └───────────────────────┼────────────────────────┘ │ │ │ │ │ ┌───────────────────────┴────────────────────────┐ │ │ │ Dapr Building Blocks │ │ │ ├─────────────────────────────────────────────────┤ │ │ │ State Store (Redis) │ Pub/Sub │ Placement │ │ │ └─────────────────────────────────────────────────┘ │ │ │ └────────────────────────────────────────────────────────────────────────┘

Component Responsibilities

ComponentResponsibilityBuilding Block
TaskActorStateful task entity with deadline remindersActors + Reminders
ChatActorConversation state per user with event publishingActors + Pub/Sub
TaskProcessingWorkflowOrchestrate task lifecycle with rollbackWorkflows + Saga
RedisPersist actor state and workflow historyState Store

The Specification (Spec-First)

Before implementation, write the specification. This is Layer 4—spec-driven development:

markdown
# TaskAgent System Specification ## Intent Build a stateful task management system that combines: - Per-task entity state (TaskActor) with deadline reminders - Per-user conversation context (ChatActor) for AI agent integration - Durable task processing workflow with saga compensation ## Success Criteria 1. TaskActor persists state across deactivation/reactivation cycles 2. Deadline reminder fires and updates task status to "overdue" 3. ChatActor maintains conversation history per user 4. TaskProcessingWorkflow survives pod restart and resumes correctly 5. Saga compensation rolls back assignment on notification failure 6. System deploys to Docker Desktop Kubernetes with Dapr sidecars ## Non-Goals - Production-grade security (covered in Module 7 security lessons) - Multi-tenant isolation (extension exercise) - Cross-app workflows (extension exercise) ## Architecture Constraints - Use Redis for state store (actorStateStore: "true") - Workflow activities call actors through ActorProxy - Single namespace (default) for capstone simplicity

TaskActor Implementation

The TaskActor manages individual task state with deadline reminders.

Create task_actor_service/actors/task_actor.py:

python
from datetime import datetime, timedelta from dapr.actor import Actor, ActorInterface, actormethod from dapr.actor.runtime.context import ActorRuntimeContext class TaskActorInterface(ActorInterface): @actormethod(name="GetTask") async def get_task(self) -> dict | None: ... @actormethod(name="CreateTask") async def create_task(self, task_data: dict) -> dict: ... @actormethod(name="UpdateStatus") async def update_status(self, status: str) -> dict: ... @actormethod(name="AssignTask") async def assign_task(self, assignee: str) -> dict: ... @actormethod(name="SetDeadlineReminder") async def set_deadline_reminder(self, deadline_seconds: int) -> None: ... class TaskActor(Actor, TaskActorInterface): """Stateful task entity with deadline reminder support.""" def __init__(self, ctx: ActorRuntimeContext, actor_id: str): super().__init__(ctx, actor_id) self._state_key = f"task-{actor_id}" async def _on_activate(self) -> None: """Called when actor is activated.""" print(f"TaskActor {self.id.id} activated") found, _ = await self._state_manager.try_get_state(self._state_key) if not found: # Initialize empty state on first activation await self._state_manager.set_state(self._state_key, { "task_id": self.id.id, "status": "uninitialized", "created_at": None, "updated_at": None }) async def _get_current_status(self) -> str: """Get current status for comparison.""" found, state = await self._state_manager.try_get_state(self._state_key) return state.get("status", "unknown") if found else "not_found" async def get_task(self) -> dict | None: """Get current task state.""" found, state = await self._state_manager.try_get_state(self._state_key) if not found or state.get("status") == "uninitialized": return None return state async def create_task(self, task_data: dict) -> dict: """Initialize task with provided data.""" now = datetime.utcnow().isoformat() state = { "task_id": self.id.id, "title": task_data.get("title", "Untitled"), "description": task_data.get("description", ""), "status": "pending", "assignee": None, "deadline": task_data.get("deadline"), "created_at": now, "updated_at": now } await self._state_manager.set_state(self._state_key, state) print(f"TaskActor {self.id.id}: Created task '{state['title']}'") return state async def update_status(self, status: str) -> dict: """Update task status with turn-based concurrency guarantee.""" state = await self._state_manager.get_state(self._state_key) state["status"] = status state["updated_at"] = datetime.utcnow().isoformat() await self._state_manager.set_state(self._state_key, state) print(f"TaskActor {self.id.id}: Status updated to '{status}'") return state async def assign_task(self, assignee: str) -> dict: """Assign task to user.""" state = await self._state_manager.get_state(self._state_key) state["assignee"] = assignee state["status"] = "assigned" state["updated_at"] = datetime.utcnow().isoformat() await self._state_manager.set_state(self._state_key, state) print(f"TaskActor {self.id.id}: Assigned to '{assignee}'") return state async def set_deadline_reminder(self, deadline_seconds: int) -> None: """Register deadline reminder that survives restarts.""" await self.register_reminder( reminder_name="deadline_reminder", state=b'{"action": "mark_overdue"}', due_time=timedelta(seconds=deadline_seconds), period=timedelta(seconds=0) # One-time reminder ) print(f"TaskActor {self.id.id}: Deadline reminder set for {deadline_seconds}s") async def receive_reminder( self, name: str, state: bytes, due_time: timedelta, period: timedelta ) -> None: """Callback when reminder fires - marks task overdue.""" if name == "deadline_reminder": print(f"TaskActor {self.id.id}: Deadline reminder fired!") await self.update_status("overdue")

Output:

bash
TaskActor task-001 activated TaskActor task-001: Created task 'Review PR #42' TaskActor task-001: Assigned to 'alice@example.com' TaskActor task-001: Deadline reminder set for 30s # ... 30 seconds later ... TaskActor task-001: Deadline reminder fired! TaskActor task-001: Status updated to 'overdue'

ChatActor Implementation

The ChatActor maintains conversation history per user.

Create task_actor_service/actors/chat_actor.py:

python
from datetime import datetime from dapr.actor import Actor, ActorInterface, actormethod from dapr.actor.runtime.context import ActorRuntimeContext from dapr.clients import DaprClient import json class ChatActorInterface(ActorInterface): @actormethod(name="ProcessMessage") async def process_message(self, message: dict) -> dict: ... @actormethod(name="GetHistory") async def get_history(self) -> list[dict]: ... @actormethod(name="ClearHistory") async def clear_history(self) -> None: ... class ChatActor(Actor, ChatActorInterface): """Per-user conversation context for AI agent integration.""" MAX_HISTORY = 10 # Keep last 10 exchanges def __init__(self, ctx: ActorRuntimeContext, actor_id: str): super().__init__(ctx, actor_id) self._history_key = f"chat-history-{actor_id}" async def _on_activate(self) -> None: """Initialize empty history on first activation.""" print(f"ChatActor {self.id.id} activated") found, _ = await self._state_manager.try_get_state(self._history_key) if not found: await self._state_manager.set_state(self._history_key, []) async def process_message(self, message: dict) -> dict: """Process user message and generate response.""" history = await self._state_manager.get_state(self._history_key) # Add user message to history user_entry = { "role": "user", "content": message.get("content", ""), "timestamp": datetime.utcnow().isoformat() } history.append(user_entry) # Generate response (in production, call AI service) response_content = self._generate_response(message.get("content", ""), history) assistant_entry = { "role": "assistant", "content": response_content, "timestamp": datetime.utcnow().isoformat() } history.append(assistant_entry) # Trim to max history if len(history) > self.MAX_HISTORY * 2: history = history[-(self.MAX_HISTORY * 2):] await self._state_manager.set_state(self._history_key, history) # Publish conversation event await self._publish_conversation_event(user_entry, assistant_entry) print(f"ChatActor {self.id.id}: Processed message, history size: {len(history)}") return assistant_entry async def get_history(self) -> list[dict]: """Return conversation history.""" return await self._state_manager.get_state(self._history_key) async def clear_history(self) -> None: """Clear conversation history.""" await self._state_manager.set_state(self._history_key, []) print(f"ChatActor {self.id.id}: History cleared") def _generate_response(self, content: str, history: list) -> str: """Generate response based on message and context.""" if "task" in content.lower(): return f"I can help with tasks. You have {len(history)//2} messages in our conversation." elif "help" in content.lower(): return "I'm your task management assistant. Ask me about creating, assigning, or tracking tasks." else: return f"Understood. I'm tracking our conversation (message #{len(history)//2 + 1})." async def _publish_conversation_event(self, user_msg: dict, assistant_msg: dict) -> None: """Publish conversation event for downstream processing.""" with DaprClient() as client: client.publish_event( pubsub_name="pubsub", topic_name="conversation-events", data=json.dumps({ "event_type": "conversation.updated", "user_id": self.id.id, "user_message": user_msg, "assistant_message": assistant_msg, "timestamp": datetime.utcnow().isoformat() }), data_content_type="application/json" )

Output:

bash
ChatActor user-alice activated ChatActor user-alice: Processed message, history size: 2 ChatActor user-alice: Processed message, history size: 4 Event published: {"event_type": "conversation.updated", "user_id": "user-alice", ...}

TaskProcessingWorkflow with Saga

The workflow orchestrates task processing with compensation.

Create task_workflow_service/workflows/task_workflow.py:

python
import dapr.ext.workflow as wf from dataclasses import dataclass from datetime import timedelta @dataclass class TaskProcessingInput: task_id: str title: str description: str assignee: str deadline_seconds: int = 3600 # Default 1 hour @dataclass class TaskProcessingResult: task_id: str status: str message: str def task_processing_workflow( ctx: wf.DaprWorkflowContext, input_data: TaskProcessingInput) -> TaskProcessingResult: """ Orchestrate task lifecycle with saga compensation. Steps: 1. Validate task data 2. Create task in TaskActor 3. Assign task to user 4. Set deadline reminder 5. Notify assignee On failure at step 5: Rollback assignment (compensation) """ compensations = [] try: # Step 1: Validate task data validation = yield ctx.call_activity( validate_task_activity, input={ "task_id": input_data.task_id, "title": input_data.title, "description": input_data.description } ) if not validation["is_valid"]: return TaskProcessingResult( task_id=input_data.task_id, status="rejected", message=f"Validation failed: {validation['issues']}" ) # Step 2: Create task in TaskActor create_result = yield ctx.call_activity( create_task_activity, input={ "task_id": input_data.task_id, "title": input_data.title, "description": input_data.description, "deadline_seconds": input_data.deadline_seconds } ) # Step 3: Assign task to user assign_result = yield ctx.call_activity( assign_task_activity, input={ "task_id": input_data.task_id, "assignee": input_data.assignee } ) # Register compensation: unassign if notification fails compensations.append(("unassign_task_activity", { "task_id": input_data.task_id })) # Step 4: Set deadline reminder yield ctx.call_activity( set_reminder_activity, input={ "task_id": input_data.task_id, "deadline_seconds": input_data.deadline_seconds } ) # Step 5: Notify assignee (might fail - triggers saga compensation) yield ctx.call_activity( notify_assignee_activity, input={ "task_id": input_data.task_id, "assignee": input_data.assignee, "title": input_data.title }, retry_policy=wf.RetryPolicy( max_attempts=3, initial_interval=timedelta(seconds=1), backoff_coefficient=2.0 ) ) return TaskProcessingResult( task_id=input_data.task_id, status="completed", message=f"Task assigned to {input_data.assignee} with deadline reminder" ) except Exception as e: # Saga compensation: rollback in reverse order for comp_name, comp_data in reversed(compensations): try: yield ctx.call_activity(comp_name, input=comp_data) except Exception as comp_error: print(f"Compensation failed: {comp_error}") return TaskProcessingResult( task_id=input_data.task_id, status="failed", message=f"Processing failed, compensation applied: {str(e)}" )

Activity Implementations

Create task_workflow_service/activities/task_activities.py:

python
from dapr.actor import ActorProxy, ActorId from datetime import datetime def validate_task_activity(ctx, data: dict) -> dict: """Validate task data meets requirements.""" issues = [] if len(data.get("title", "")) < 5: issues.append("Title must be at least 5 characters") if len(data.get("description", "")) < 10: issues.append("Description must be at least 10 characters") is_valid = len(issues) == 0 print(f"Activity validate_task: task={data['task_id']}, valid={is_valid}") return { "is_valid": is_valid, "issues": issues, "validated_at": datetime.utcnow().isoformat() } async def create_task_activity(ctx, data: dict) -> dict: """Create task in TaskActor.""" from task_actor_service.actors.task_actor import TaskActorInterface proxy = ActorProxy.create( "TaskActor", ActorId(data["task_id"]), TaskActorInterface ) result = await proxy.CreateTask({ "title": data["title"], "description": data["description"], "deadline": data.get("deadline_seconds") }) print(f"Activity create_task: task={data['task_id']} created") return result async def assign_task_activity(ctx, data: dict) -> dict: """Assign task to user via TaskActor.""" from task_actor_service.actors.task_actor import TaskActorInterface proxy = ActorProxy.create( "TaskActor", ActorId(data["task_id"]), TaskActorInterface ) result = await proxy.AssignTask(data["assignee"]) print(f"Activity assign_task: task={data['task_id']} -> {data['assignee']}") return result async def set_reminder_activity(ctx, data: dict) -> dict: """Set deadline reminder on TaskActor.""" from task_actor_service.actors.task_actor import TaskActorInterface proxy = ActorProxy.create( "TaskActor", ActorId(data["task_id"]), TaskActorInterface ) await proxy.SetDeadlineReminder(data["deadline_seconds"]) print(f"Activity set_reminder: task={data['task_id']}, {data['deadline_seconds']}s") return {"reminder_set": True} def notify_assignee_activity(ctx, data: dict) -> dict: """Send notification to assignee.""" import random if random.random() < 0.2: # 20% failure rate for demonstration raise Exception(f"Notification service unavailable for {data['assignee']}") print(f"Activity notify_assignee: Notified {data['assignee']} about '{data['title']}'") return { "notified": True, "assignee": data["assignee"], "notified_at": datetime.utcnow().isoformat() } async def unassign_task_activity(ctx, data: dict) -> dict: """Compensation: Remove assignment from task.""" from task_actor_service.actors.task_actor import TaskActorInterface proxy = ActorProxy.create( "TaskActor", ActorId(data["task_id"]), TaskActorInterface ) result = await proxy.UpdateStatus("pending") # Reset to pending print(f"Activity unassign_task (COMPENSATION): task={data['task_id']} reset to pending") return result

Output (successful flow):

bash
Workflow started: task-processing-task-001 Activity validate_task: task=task-001, valid=True Activity create_task: task=task-001 created Activity assign_task: task=task-001 -> alice@example.com Activity set_reminder: task=task-001, 3600s Activity notify_assignee: Notified alice@example.com about 'Review PR #42' Workflow completed: {status: "completed", message: "Task assigned to alice@example.com..."}

Output (failure with compensation):

bash
Workflow started: task-processing-task-002 Activity validate_task: task=task-002, valid=True Activity create_task: task=task-002 created Activity assign_task: task=task-002 -> bob@example.com Activity set_reminder: task=task-002, 3600s Activity notify_assignee: FAILED - Notification service unavailable Retry 1 of 3... Activity notify_assignee: FAILED - Notification service unavailable Retry 2 of 3... Activity notify_assignee: FAILED - Notification service unavailable Executing compensation... Activity unassign_task (COMPENSATION): task=task-002 reset to pending Workflow completed: {status: "failed", message: "Processing failed, compensation applied..."}

FastAPI Integration

Wire actors and workflows into a FastAPI application.

Create task_actor_service/main.py:

python
from contextlib import asynccontextmanager from fastapi import FastAPI, HTTPException from dapr.ext.fastapi import DaprActor, DaprApp from dapr.actor import ActorProxy, ActorId from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowClient from pydantic import BaseModel from actors.task_actor import TaskActor, TaskActorInterface from actors.chat_actor import ChatActor, ChatActorInterface # Note: In production, workflows and activities would be registered here @asynccontextmanager async def lifespan(app: FastAPI): """Startup and shutdown lifecycle.""" await dapr_actor.register_actor(TaskActor) await dapr_actor.register_actor(ChatActor) # wfr.start() print("Task Agent System started") yield # wfr.shutdown() print("Task Agent System stopped") app = FastAPI(title="Task Agent System", lifespan=lifespan) dapr_actor = DaprActor(app) dapr_app = DaprApp(app) # Task API endpoints @app.post("/tasks/process") async def process_task(task: TaskCreate): """Start task processing workflow.""" client = DaprWorkflowClient() instance_id = f"task-workflow-{task.task_id}" client.schedule_new_workflow( workflow=task_processing_workflow, input=TaskProcessingInput( task_id=task.task_id, title=task.title, description=task.description, assignee=task.assignee, deadline_seconds=task.deadline_seconds ), instance_id=instance_id ) return {"workflow_id": instance_id, "status": "started"} @app.get("/tasks/{task_id}") async def get_task(task_id: str): """Get task state from TaskActor.""" proxy = ActorProxy.create("TaskActor", ActorId(task_id), TaskActorInterface) task = await proxy.GetTask() if not task: raise HTTPException(status_code=404, detail="Task not found") return task

Kubernetes Deployment

Dapr Components

yaml
# k8s/components/statestore.yaml apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: statestore namespace: default spec: type: state.redis version: v1 metadata: - name: redisHost value: redis-master.default.svc.cluster.local:6379 - name: redisPassword value: "" - name: actorStateStore value: "true" # Required for actors

Deployment Manifest

yaml
# k8s/task-agent-deployment.yaml apiVersion: apps/v1 kind: Deployment metadata: name: task-agent-system namespace: default spec: replicas: 2 selector: matchLabels: app: task-agent-system template: metadata: labels: app: task-agent-system annotations: dapr.io/enabled: "true" dapr.io/app-id: "task-agent-system" dapr.io/app-port: "8000" dapr.io/enable-actors: "true" spec: containers: - name: task-agent image: task-agent-system:latest ports: - containerPort: 8000

Testing the Complete System

Test 1: Task Processing Workflow

bash
# Start task processing curl -X POST http://localhost:8000/tasks/process \ -H "Content-Type: application/json" \ -d '{ "task_id": "task-001", "title": "Review PR #42", "description": "Review the authentication refactoring pull request", "assignee": "alice@example.com", "deadline_seconds": 60 }'

Output:

json
{"workflow_id": "task-workflow-task-001", "status": "started"}

Production Readiness Checklist

Before deploying this system to production, verify:

CategoryRequirementStatus
StateActor state persists across pod restartsVerify with Redis persistence
DurabilityWorkflow survives node failureTest with pod deletion
CompensationSaga rollback executes on failureSimulate notification failures
ObservabilityTraces visible in Zipkin/JaegerConfigure tracing component
SecuritymTLS enabled between sidecarsVerify Dapr Sentry is running
ResourcesSidecar limits configuredCheck dapr.io annotations

Reflect on Your Skill: Comprehensive Review

You've built a complete Digital FTE blueprint. This is the moment to assess what your dapr-deployment skill has learned throughout Module 7. Access your skill's growth by exploring these capstone scenarios.

Prompt 1: Extend the System

markdown
I've built a TaskActor + TaskProcessingWorkflow system for task management. Now I want to add a ProjectActor that manages a collection of tasks. Help me design: 1. ProjectActor that tracks multiple TaskActor IDs 2. A workflow that creates a project with 3 initial tasks 3. An activity that calls TaskActor for each task creation

Prompt 2: Debug a Production Issue

markdown
My TaskProcessingWorkflow is failing intermittently in production. The logs show: - Workflow starts successfully - Activities 1-3 complete - Activity 4 (notify_assignee) fails with timeout

Prompt 3: Design for Scale

markdown
My capstone system needs to handle 10,000 concurrent tasks across 100 users. Each user has their own ChatActor, and tasks are distributed across TaskActors. Help me analyze the impact on the Placement service and actor activation rates.

Safety Note

This capstone combines multiple Dapr building blocks. When deploying to production: verify Redis has persistence enabled (actor state must survive Redis restarts), configure appropriate actor idle timeouts, and implement proper circuit breakers for external notification services to prevent saga compensation storms.