You've traveled through 18 lessons of actors and workflows. You understand turn-based concurrency, durable execution, reminders that survive restarts, and saga patterns that compensate on failure. Now it's time to combine everything into a complete system.
This capstone isn't just an exercise. It's a Digital FTE blueprint—the kind of stateful agent system you could package and sell to clients who need intelligent task management with conversation capabilities.
By the end of this lesson, you'll have a working system where:
This is the culmination of Module 7. Everything you've learned converges here.
Before writing code, let's understand what we're building:
┌────────────────────────────────────────────────────────────────────────┐
│ Stateful Task Agent System │
├────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │
│ │ TaskActor │ │ ChatActor │ │ Workflow │ │
│ │ (per task) │ │ (per user) │ │ Runtime │ │
│ ├─────────────────┤ ├─────────────────┤ ├─────────────────┤ │
│ │ - status │ │ - history[] │ │ validate_task │ │
│ │ - assignee │ │ - context │ │ assign_task │ │
│ │ - deadline │ │ - preferences │ │ notify_assignee │ │
│ │ - reminder │ │ │ │ (saga compen.) │ │
│ └────────┬────────┘ └────────┬────────┘ └────────┬────────┘ │
│ │ │ │ │
│ └───────────────────────┼────────────────────────┘ │
│ │ │
│ ┌───────────────────────┴────────────────────────┐ │
│ │ Dapr Building Blocks │ │
│ ├─────────────────────────────────────────────────┤ │
│ │ State Store (Redis) │ Pub/Sub │ Placement │ │
│ └─────────────────────────────────────────────────┘ │
│ │
└────────────────────────────────────────────────────────────────────────┘| Component | Responsibility | Building Block |
|---|---|---|
| TaskActor | Stateful task entity with deadline reminders | Actors + Reminders |
| ChatActor | Conversation state per user with event publishing | Actors + Pub/Sub |
| TaskProcessingWorkflow | Orchestrate task lifecycle with rollback | Workflows + Saga |
| Redis | Persist actor state and workflow history | State Store |
Before implementation, write the specification. This is Layer 4—spec-driven development:
# TaskAgent System Specification
## Intent
Build a stateful task management system that combines:
- Per-task entity state (TaskActor) with deadline reminders
- Per-user conversation context (ChatActor) for AI agent integration
- Durable task processing workflow with saga compensation
## Success Criteria
1. TaskActor persists state across deactivation/reactivation cycles
2. Deadline reminder fires and updates task status to "overdue"
3. ChatActor maintains conversation history per user
4. TaskProcessingWorkflow survives pod restart and resumes correctly
5. Saga compensation rolls back assignment on notification failure
6. System deploys to Docker Desktop Kubernetes with Dapr sidecars
## Non-Goals
- Production-grade security (covered in Module 7 security lessons)
- Multi-tenant isolation (extension exercise)
- Cross-app workflows (extension exercise)
## Architecture Constraints
- Use Redis for state store (actorStateStore: "true")
- Workflow activities call actors through ActorProxy
- Single namespace (default) for capstone simplicityThe TaskActor manages individual task state with deadline reminders.
Create task_actor_service/actors/task_actor.py:
from datetime import datetime, timedelta
from dapr.actor import Actor, ActorInterface, actormethod
from dapr.actor.runtime.context import ActorRuntimeContext
class TaskActorInterface(ActorInterface):
@actormethod(name="GetTask")
async def get_task(self) -> dict | None: ...
@actormethod(name="CreateTask")
async def create_task(self, task_data: dict) -> dict: ...
@actormethod(name="UpdateStatus")
async def update_status(self, status: str) -> dict: ...
@actormethod(name="AssignTask")
async def assign_task(self, assignee: str) -> dict: ...
@actormethod(name="SetDeadlineReminder")
async def set_deadline_reminder(self, deadline_seconds: int) -> None: ...
class TaskActor(Actor, TaskActorInterface):
"""Stateful task entity with deadline reminder support."""
def __init__(self, ctx: ActorRuntimeContext, actor_id: str):
super().__init__(ctx, actor_id)
self._state_key = f"task-{actor_id}"
async def _on_activate(self) -> None:
"""Called when actor is activated."""
print(f"TaskActor {self.id.id} activated")
found, _ = await self._state_manager.try_get_state(self._state_key)
if not found:
# Initialize empty state on first activation
await self._state_manager.set_state(self._state_key, {
"task_id": self.id.id,
"status": "uninitialized",
"created_at": None,
"updated_at": None
})
async def _get_current_status(self) -> str:
"""Get current status for comparison."""
found, state = await self._state_manager.try_get_state(self._state_key)
return state.get("status", "unknown") if found else "not_found"
async def get_task(self) -> dict | None:
"""Get current task state."""
found, state = await self._state_manager.try_get_state(self._state_key)
if not found or state.get("status") == "uninitialized":
return None
return state
async def create_task(self, task_data: dict) -> dict:
"""Initialize task with provided data."""
now = datetime.utcnow().isoformat()
state = {
"task_id": self.id.id,
"title": task_data.get("title", "Untitled"),
"description": task_data.get("description", ""),
"status": "pending",
"assignee": None,
"deadline": task_data.get("deadline"),
"created_at": now,
"updated_at": now
}
await self._state_manager.set_state(self._state_key, state)
print(f"TaskActor {self.id.id}: Created task '{state['title']}'")
return state
async def update_status(self, status: str) -> dict:
"""Update task status with turn-based concurrency guarantee."""
state = await self._state_manager.get_state(self._state_key)
state["status"] = status
state["updated_at"] = datetime.utcnow().isoformat()
await self._state_manager.set_state(self._state_key, state)
print(f"TaskActor {self.id.id}: Status updated to '{status}'")
return state
async def assign_task(self, assignee: str) -> dict:
"""Assign task to user."""
state = await self._state_manager.get_state(self._state_key)
state["assignee"] = assignee
state["status"] = "assigned"
state["updated_at"] = datetime.utcnow().isoformat()
await self._state_manager.set_state(self._state_key, state)
print(f"TaskActor {self.id.id}: Assigned to '{assignee}'")
return state
async def set_deadline_reminder(self, deadline_seconds: int) -> None:
"""Register deadline reminder that survives restarts."""
await self.register_reminder(
reminder_name="deadline_reminder",
state=b'{"action": "mark_overdue"}',
due_time=timedelta(seconds=deadline_seconds),
period=timedelta(seconds=0) # One-time reminder
)
print(f"TaskActor {self.id.id}: Deadline reminder set for {deadline_seconds}s")
async def receive_reminder(
self,
name: str,
state: bytes,
due_time: timedelta,
period: timedelta
) -> None:
"""Callback when reminder fires - marks task overdue."""
if name == "deadline_reminder":
print(f"TaskActor {self.id.id}: Deadline reminder fired!")
await self.update_status("overdue")Output:
TaskActor task-001 activated
TaskActor task-001: Created task 'Review PR #42'
TaskActor task-001: Assigned to 'alice@example.com'
TaskActor task-001: Deadline reminder set for 30s
# ... 30 seconds later ...
TaskActor task-001: Deadline reminder fired!
TaskActor task-001: Status updated to 'overdue'The ChatActor maintains conversation history per user.
Create task_actor_service/actors/chat_actor.py:
from datetime import datetime
from dapr.actor import Actor, ActorInterface, actormethod
from dapr.actor.runtime.context import ActorRuntimeContext
from dapr.clients import DaprClient
import json
class ChatActorInterface(ActorInterface):
@actormethod(name="ProcessMessage")
async def process_message(self, message: dict) -> dict: ...
@actormethod(name="GetHistory")
async def get_history(self) -> list[dict]: ...
@actormethod(name="ClearHistory")
async def clear_history(self) -> None: ...
class ChatActor(Actor, ChatActorInterface):
"""Per-user conversation context for AI agent integration."""
MAX_HISTORY = 10 # Keep last 10 exchanges
def __init__(self, ctx: ActorRuntimeContext, actor_id: str):
super().__init__(ctx, actor_id)
self._history_key = f"chat-history-{actor_id}"
async def _on_activate(self) -> None:
"""Initialize empty history on first activation."""
print(f"ChatActor {self.id.id} activated")
found, _ = await self._state_manager.try_get_state(self._history_key)
if not found:
await self._state_manager.set_state(self._history_key, [])
async def process_message(self, message: dict) -> dict:
"""Process user message and generate response."""
history = await self._state_manager.get_state(self._history_key)
# Add user message to history
user_entry = {
"role": "user",
"content": message.get("content", ""),
"timestamp": datetime.utcnow().isoformat()
}
history.append(user_entry)
# Generate response (in production, call AI service)
response_content = self._generate_response(message.get("content", ""), history)
assistant_entry = {
"role": "assistant",
"content": response_content,
"timestamp": datetime.utcnow().isoformat()
}
history.append(assistant_entry)
# Trim to max history
if len(history) > self.MAX_HISTORY * 2:
history = history[-(self.MAX_HISTORY * 2):]
await self._state_manager.set_state(self._history_key, history)
# Publish conversation event
await self._publish_conversation_event(user_entry, assistant_entry)
print(f"ChatActor {self.id.id}: Processed message, history size: {len(history)}")
return assistant_entry
async def get_history(self) -> list[dict]:
"""Return conversation history."""
return await self._state_manager.get_state(self._history_key)
async def clear_history(self) -> None:
"""Clear conversation history."""
await self._state_manager.set_state(self._history_key, [])
print(f"ChatActor {self.id.id}: History cleared")
def _generate_response(self, content: str, history: list) -> str:
"""Generate response based on message and context."""
if "task" in content.lower():
return f"I can help with tasks. You have {len(history)//2} messages in our conversation."
elif "help" in content.lower():
return "I'm your task management assistant. Ask me about creating, assigning, or tracking tasks."
else:
return f"Understood. I'm tracking our conversation (message #{len(history)//2 + 1})."
async def _publish_conversation_event(self, user_msg: dict, assistant_msg: dict) -> None:
"""Publish conversation event for downstream processing."""
with DaprClient() as client:
client.publish_event(
pubsub_name="pubsub",
topic_name="conversation-events",
data=json.dumps({
"event_type": "conversation.updated",
"user_id": self.id.id,
"user_message": user_msg,
"assistant_message": assistant_msg,
"timestamp": datetime.utcnow().isoformat()
}),
data_content_type="application/json"
)Output:
ChatActor user-alice activated
ChatActor user-alice: Processed message, history size: 2
ChatActor user-alice: Processed message, history size: 4
Event published: {"event_type": "conversation.updated", "user_id": "user-alice", ...}The workflow orchestrates task processing with compensation.
Create task_workflow_service/workflows/task_workflow.py:
import dapr.ext.workflow as wf
from dataclasses import dataclass
from datetime import timedelta
@dataclass
class TaskProcessingInput:
task_id: str
title: str
description: str
assignee: str
deadline_seconds: int = 3600 # Default 1 hour
@dataclass
class TaskProcessingResult:
task_id: str
status: str
message: str
def task_processing_workflow(
ctx: wf.DaprWorkflowContext,
input_data: TaskProcessingInput) -> TaskProcessingResult:
"""
Orchestrate task lifecycle with saga compensation.
Steps:
1. Validate task data
2. Create task in TaskActor
3. Assign task to user
4. Set deadline reminder
5. Notify assignee
On failure at step 5: Rollback assignment (compensation)
"""
compensations = []
try:
# Step 1: Validate task data
validation = yield ctx.call_activity(
validate_task_activity,
input={
"task_id": input_data.task_id,
"title": input_data.title,
"description": input_data.description
}
)
if not validation["is_valid"]:
return TaskProcessingResult(
task_id=input_data.task_id,
status="rejected",
message=f"Validation failed: {validation['issues']}"
)
# Step 2: Create task in TaskActor
create_result = yield ctx.call_activity(
create_task_activity,
input={
"task_id": input_data.task_id,
"title": input_data.title,
"description": input_data.description,
"deadline_seconds": input_data.deadline_seconds
}
)
# Step 3: Assign task to user
assign_result = yield ctx.call_activity(
assign_task_activity,
input={
"task_id": input_data.task_id,
"assignee": input_data.assignee
}
)
# Register compensation: unassign if notification fails
compensations.append(("unassign_task_activity", {
"task_id": input_data.task_id
}))
# Step 4: Set deadline reminder
yield ctx.call_activity(
set_reminder_activity,
input={
"task_id": input_data.task_id,
"deadline_seconds": input_data.deadline_seconds
}
)
# Step 5: Notify assignee (might fail - triggers saga compensation)
yield ctx.call_activity(
notify_assignee_activity,
input={
"task_id": input_data.task_id,
"assignee": input_data.assignee,
"title": input_data.title
},
retry_policy=wf.RetryPolicy(
max_attempts=3,
initial_interval=timedelta(seconds=1),
backoff_coefficient=2.0
)
)
return TaskProcessingResult(
task_id=input_data.task_id,
status="completed",
message=f"Task assigned to {input_data.assignee} with deadline reminder"
)
except Exception as e:
# Saga compensation: rollback in reverse order
for comp_name, comp_data in reversed(compensations):
try:
yield ctx.call_activity(comp_name, input=comp_data)
except Exception as comp_error:
print(f"Compensation failed: {comp_error}")
return TaskProcessingResult(
task_id=input_data.task_id,
status="failed",
message=f"Processing failed, compensation applied: {str(e)}"
)Create task_workflow_service/activities/task_activities.py:
from dapr.actor import ActorProxy, ActorId
from datetime import datetime
def validate_task_activity(ctx, data: dict) -> dict:
"""Validate task data meets requirements."""
issues = []
if len(data.get("title", "")) < 5:
issues.append("Title must be at least 5 characters")
if len(data.get("description", "")) < 10:
issues.append("Description must be at least 10 characters")
is_valid = len(issues) == 0
print(f"Activity validate_task: task={data['task_id']}, valid={is_valid}")
return {
"is_valid": is_valid,
"issues": issues,
"validated_at": datetime.utcnow().isoformat()
}
async def create_task_activity(ctx, data: dict) -> dict:
"""Create task in TaskActor."""
from task_actor_service.actors.task_actor import TaskActorInterface
proxy = ActorProxy.create(
"TaskActor",
ActorId(data["task_id"]),
TaskActorInterface
)
result = await proxy.CreateTask({
"title": data["title"],
"description": data["description"],
"deadline": data.get("deadline_seconds")
})
print(f"Activity create_task: task={data['task_id']} created")
return result
async def assign_task_activity(ctx, data: dict) -> dict:
"""Assign task to user via TaskActor."""
from task_actor_service.actors.task_actor import TaskActorInterface
proxy = ActorProxy.create(
"TaskActor",
ActorId(data["task_id"]),
TaskActorInterface
)
result = await proxy.AssignTask(data["assignee"])
print(f"Activity assign_task: task={data['task_id']} -> {data['assignee']}")
return result
async def set_reminder_activity(ctx, data: dict) -> dict:
"""Set deadline reminder on TaskActor."""
from task_actor_service.actors.task_actor import TaskActorInterface
proxy = ActorProxy.create(
"TaskActor",
ActorId(data["task_id"]),
TaskActorInterface
)
await proxy.SetDeadlineReminder(data["deadline_seconds"])
print(f"Activity set_reminder: task={data['task_id']}, {data['deadline_seconds']}s")
return {"reminder_set": True}
def notify_assignee_activity(ctx, data: dict) -> dict:
"""Send notification to assignee."""
import random
if random.random() < 0.2: # 20% failure rate for demonstration
raise Exception(f"Notification service unavailable for {data['assignee']}")
print(f"Activity notify_assignee: Notified {data['assignee']} about '{data['title']}'")
return {
"notified": True,
"assignee": data["assignee"],
"notified_at": datetime.utcnow().isoformat()
}
async def unassign_task_activity(ctx, data: dict) -> dict:
"""Compensation: Remove assignment from task."""
from task_actor_service.actors.task_actor import TaskActorInterface
proxy = ActorProxy.create(
"TaskActor",
ActorId(data["task_id"]),
TaskActorInterface
)
result = await proxy.UpdateStatus("pending") # Reset to pending
print(f"Activity unassign_task (COMPENSATION): task={data['task_id']} reset to pending")
return resultOutput (successful flow):
Workflow started: task-processing-task-001
Activity validate_task: task=task-001, valid=True
Activity create_task: task=task-001 created
Activity assign_task: task=task-001 -> alice@example.com
Activity set_reminder: task=task-001, 3600s
Activity notify_assignee: Notified alice@example.com about 'Review PR #42'
Workflow completed: {status: "completed", message: "Task assigned to alice@example.com..."}Output (failure with compensation):
Workflow started: task-processing-task-002
Activity validate_task: task=task-002, valid=True
Activity create_task: task=task-002 created
Activity assign_task: task=task-002 -> bob@example.com
Activity set_reminder: task=task-002, 3600s
Activity notify_assignee: FAILED - Notification service unavailable
Retry 1 of 3...
Activity notify_assignee: FAILED - Notification service unavailable
Retry 2 of 3...
Activity notify_assignee: FAILED - Notification service unavailable
Executing compensation...
Activity unassign_task (COMPENSATION): task=task-002 reset to pending
Workflow completed: {status: "failed", message: "Processing failed, compensation applied..."}Wire actors and workflows into a FastAPI application.
Create task_actor_service/main.py:
from contextlib import asynccontextmanager
from fastapi import FastAPI, HTTPException
from dapr.ext.fastapi import DaprActor, DaprApp
from dapr.actor import ActorProxy, ActorId
from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowClient
from pydantic import BaseModel
from actors.task_actor import TaskActor, TaskActorInterface
from actors.chat_actor import ChatActor, ChatActorInterface
# Note: In production, workflows and activities would be registered here
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Startup and shutdown lifecycle."""
await dapr_actor.register_actor(TaskActor)
await dapr_actor.register_actor(ChatActor)
# wfr.start()
print("Task Agent System started")
yield
# wfr.shutdown()
print("Task Agent System stopped")
app = FastAPI(title="Task Agent System", lifespan=lifespan)
dapr_actor = DaprActor(app)
dapr_app = DaprApp(app)
# Task API endpoints
@app.post("/tasks/process")
async def process_task(task: TaskCreate):
"""Start task processing workflow."""
client = DaprWorkflowClient()
instance_id = f"task-workflow-{task.task_id}"
client.schedule_new_workflow(
workflow=task_processing_workflow,
input=TaskProcessingInput(
task_id=task.task_id,
title=task.title,
description=task.description,
assignee=task.assignee,
deadline_seconds=task.deadline_seconds
),
instance_id=instance_id
)
return {"workflow_id": instance_id, "status": "started"}
@app.get("/tasks/{task_id}")
async def get_task(task_id: str):
"""Get task state from TaskActor."""
proxy = ActorProxy.create("TaskActor", ActorId(task_id), TaskActorInterface)
task = await proxy.GetTask()
if not task:
raise HTTPException(status_code=404, detail="Task not found")
return task# k8s/components/statestore.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: statestore
namespace: default
spec:
type: state.redis
version: v1
metadata:
- name: redisHost
value: redis-master.default.svc.cluster.local:6379
- name: redisPassword
value: ""
- name: actorStateStore
value: "true" # Required for actors# k8s/task-agent-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: task-agent-system
namespace: default
spec:
replicas: 2
selector:
matchLabels:
app: task-agent-system
template:
metadata:
labels:
app: task-agent-system
annotations:
dapr.io/enabled: "true"
dapr.io/app-id: "task-agent-system"
dapr.io/app-port: "8000"
dapr.io/enable-actors: "true"
spec:
containers:
- name: task-agent
image: task-agent-system:latest
ports:
- containerPort: 8000# Start task processing
curl -X POST http://localhost:8000/tasks/process \
-H "Content-Type: application/json" \
-d '{
"task_id": "task-001",
"title": "Review PR #42",
"description": "Review the authentication refactoring pull request",
"assignee": "alice@example.com",
"deadline_seconds": 60
}'Output:
{"workflow_id": "task-workflow-task-001", "status": "started"}Before deploying this system to production, verify:
| Category | Requirement | Status |
|---|---|---|
| State | Actor state persists across pod restarts | Verify with Redis persistence |
| Durability | Workflow survives node failure | Test with pod deletion |
| Compensation | Saga rollback executes on failure | Simulate notification failures |
| Observability | Traces visible in Zipkin/Jaeger | Configure tracing component |
| Security | mTLS enabled between sidecars | Verify Dapr Sentry is running |
| Resources | Sidecar limits configured | Check dapr.io annotations |
You've built a complete Digital FTE blueprint. This is the moment to assess what your dapr-deployment skill has learned throughout Module 7. Access your skill's growth by exploring these capstone scenarios.
I've built a TaskActor + TaskProcessingWorkflow system for task management.
Now I want to add a ProjectActor that manages a collection of tasks.
Help me design:
1. ProjectActor that tracks multiple TaskActor IDs
2. A workflow that creates a project with 3 initial tasks
3. An activity that calls TaskActor for each task creationMy TaskProcessingWorkflow is failing intermittently in production. The logs show:
- Workflow starts successfully
- Activities 1-3 complete
- Activity 4 (notify_assignee) fails with timeoutMy capstone system needs to handle 10,000 concurrent tasks across 100 users.
Each user has their own ChatActor, and tasks are distributed across TaskActors.
Help me analyze the impact on the Placement service and actor activation rates.This capstone combines multiple Dapr building blocks. When deploying to production: verify Redis has persistence enabled (actor state must survive Redis restarts), configure appropriate actor idle timeouts, and implement proper circuit breakers for external notification services to prevent saga compensation storms.