USMAN’S INSIGHTS
AI ARCHITECT
  • Home
  • About
  • Thought Leadership
  • Book
Press / Contact
USMAN’S INSIGHTS
AI ARCHITECT
⌘F
HomeBook
HomeBookCrafting Resilience: Authoring Dapr Workflows
Previous Chapter
Workflow Architecture
Next Chapter
Managing Workflows
AI NOTICE: This is the table of contents for the SPECIFIC CHAPTER only. It is NOT the global sidebar. For all chapters, look at the main navigation.

On this page

31 sections

Progress0%
1 / 31

Muhammad Usman Akbar Entity Profile

Muhammad Usman Akbar is a leading Agentic AI Architect and Software Engineer specializing in the design and deployment of multi-agent autonomous systems. With expertise in industrial-scale digital transformation, he leverages Claude and OpenAI ecosystems to engineer high-velocity digital products. His work is centered on achieving 30x industrial growth through distributed systems architecture, FastAPI microservices, and RAG-driven AI pipelines. Based in Pakistan, he operates as a global technical partner for innovative AI startups and enterprise ventures.

USMAN’S INSIGHTS
AI ARCHITECT

Transforming businesses into autonomous AI ecosystems. Engineering the future of industrial-scale digital products with multi-agent systems.

30X Growth
AI-First
Innovation

Navigation

  • Home
  • Book
  • About
  • Contact
Let's Collaborate

Have a Project in Mind?

Let's build something extraordinary together. Transform your vision into autonomous AI reality.

Start Your Transformation

© 2026 Muhammad Usman Akbar. All rights reserved.

Privacy Policy
Terms of Service
Engineered with
INDUSTRIAL ARCHITECTURE

Authoring Workflows

You understand workflow architecture from the previous lessons: durable execution through event sourcing, replay-based recovery, and the determinism rules that make it all work. Now it's time to write actual workflow code.

Your Task API needs to process tasks through multiple stages: validation, assignment, and notification. Each stage might fail. The process might take hours if it requires human approval. Your pod might restart mid-processing. With traditional code, you'd lose state. With a Dapr workflow, you pick up exactly where you left off.

The dapr-ext-workflow Python SDK gives you the building blocks: workflow functions that orchestrate, activity functions that execute units of work, and a runtime that manages durability. By the end of this lesson, you'll have a working workflow that survives restarts and chains activities together seamlessly.


Installing the Workflow SDK

Start by adding the workflow extension to your project:

bash
pip install dapr-ext-workflow

Output:

text
Successfully installed dapr-ext-workflow-x.x.x durabletask-x.x.x

This installs dapr-ext-workflow and its dependency durabletask, which provides the underlying durable execution framework.


Activities: Your Units of Work

Activities are where real work happens. They call APIs, access databases, generate timestamps, read environment variables. All the non-deterministic operations that workflows cannot do directly.

An activity is a simple function decorated with @wfr.activity:

python
from dapr.ext.workflow import WorkflowRuntime, WorkflowActivityContext from datetime import datetime wfr = WorkflowRuntime() @wfr.activity(name="validate_task") def validate_task(ctx: WorkflowActivityContext, task: dict) -> dict: """Validate that task has required fields.""" print(f"Validating task: {task['task_id']}") # Activities CAN use non-deterministic operations validated_at = datetime.utcnow().isoformat() if not task.get("title"): return {"valid": False, "reason": "Missing title", "validated_at": validated_at} if not task.get("assignee"): return {"valid": False, "reason": "Missing assignee", "validated_at": validated_at} return {"valid": True, "validated_at": validated_at}

Key points about activities:

AspectDetail
Decorator@wfr.activity(name="activity_name") registers the activity
ContextWorkflowActivityContext provides workflow and task IDs
InputsSecond parameter receives data passed from workflow
OutputsReturn value goes back to the workflow
DeterminismActivities CAN be non-deterministic (timestamps, random, API calls)

Let's add two more activities to complete our task processing pipeline:

python
@wfr.activity(name="assign_task") def assign_task(ctx: WorkflowActivityContext, task: dict) -> dict: """Assign task to the specified user.""" print(f"Assigning task {task['task_id']} to {task['assignee']}") # Simulate assignment (would call a user service in production) assigned_at = datetime.utcnow().isoformat() return { "assigned": True, "assignee": task["assignee"], "assigned_at": assigned_at } @wfr.activity(name="send_notification") def send_notification(ctx: WorkflowActivityContext, notification: dict) -> dict: """Send notification about task assignment.""" print(f"Sending notification to {notification['recipient']}: {notification['message']}") # Would call notification service in production sent_at = datetime.utcnow().isoformat() return {"sent": True, "sent_at": sent_at}

Output when activities run:

text
Validating task: task-123 Assigning task task-123 to alice@example.com Sending notification to alice@example.com: You have been assigned task: Review PR

Notice that activities print output, use datetime.utcnow(), and could call external services. This is exactly what workflows CANNOT do directly. Activities are the escape hatch for real-world operations.


Workflows: Your Orchestrators

A workflow function orchestrates activities. It defines the sequence, handles branching logic, and manages data flow. The magic happens with yield: each yield ctx.call_activity(...) is a durability checkpoint.

python
from dapr.ext.workflow import DaprWorkflowContext from dataclasses import dataclass @dataclass class TaskInput: task_id: str title: str assignee: str @dataclass class TaskResult: task_id: str status: str message: str @wfr.workflow(name="task_processing_workflow") def task_processing_workflow(ctx: DaprWorkflowContext, task_input: TaskInput): """Orchestrate task processing: validate -> assign -> notify.""" # Prepare task data task = { "task_id": task_input.task_id, "title": task_input.title, "assignee": task_input.assignee } # Step 1: Validate the task validation = yield ctx.call_activity(validate_task, input=task) if not validation["valid"]: return TaskResult( task_id=task_input.task_id, status="rejected", message=f"Validation failed: {validation['reason']}" ) # Step 2: Assign the task assignment = yield ctx.call_activity(assign_task, input=task) # Step 3: Send notification notification = { "recipient": task_input.assignee, "message": f"You have been assigned task: {task_input.title}" } notify_result = yield ctx.call_activity(send_notification, input=notification) return TaskResult( task_id=task_input.task_id, status="completed", message=f"Task assigned to {assignment['assignee']} at {assignment['assigned_at']}" )

Understanding the yield Keyword

The yield keyword is not just Python's generator mechanism. In Dapr workflows, each yield is a durability checkpoint:

text
Workflow starts │ ▼ yield ctx.call_activity(validate_task, ...) │ ├─► Engine records: "activity validate_task called" ├─► Activity executes └─► Engine records: "activity validate_task completed, result={...}" │ ▼ yield ctx.call_activity(assign_task, ...) │ ├─► Engine records: "activity assign_task called" └─► ... POD CRASHES HERE ... ... Pod restarts ... Workflow replays: │ ▼ yield ctx.call_activity(validate_task, ...) └─► Engine sees: "already completed" → returns cached result (no re-execution) │ ▼ yield ctx.call_activity(assign_task, ...) └─► Engine sees: "was called but didn't complete" → re-executes activity

Each yield creates a checkpoint. If the workflow restarts, the engine replays from history: completed activities return cached results instantly, incomplete activities re-execute.


Setting Up the WorkflowRuntime

Activities and workflows must be registered with a WorkflowRuntime before they can execute. Here's a complete FastAPI application with proper lifecycle management:

python
from contextlib import asynccontextmanager from fastapi import FastAPI, HTTPException from pydantic import BaseModel from dapr.ext.workflow import WorkflowRuntime, DaprWorkflowClient # Create the runtime instance wfr = WorkflowRuntime() # ... (activities and workflow defined above using @wfr.activity and @wfr.workflow) @asynccontextmanager async def lifespan(app: FastAPI): """Manage workflow runtime lifecycle.""" # Startup: Start the workflow runtime wfr.start() print("Workflow runtime started") yield # Shutdown: Stop the workflow runtime wfr.shutdown() print("Workflow runtime stopped") app = FastAPI(lifespan=lifespan)

Output on startup:

text
Workflow runtime started INFO: Uvicorn running on http://0.0.0.0:8000

Output on shutdown:

text
Workflow runtime stopped INFO: Shutting down

Why Lifespan Matters

The workflow runtime needs to:

  1. Connect to Dapr sidecar on startup (port 50001 gRPC)
  2. Register workflows and activities so Dapr knows what code to execute
  3. Clean up connections on shutdown

Using FastAPI's lifespan context manager ensures the runtime starts before your app accepts requests and shuts down gracefully when the app terminates.


Starting Workflows with DaprWorkflowClient

The DaprWorkflowClient schedules new workflow instances and queries their status:

python
from dapr.ext.workflow import DaprWorkflowClient class StartTaskRequest(BaseModel): task_id: str title: str assignee: str class WorkflowResponse(BaseModel): instance_id: str status: str @app.post("/tasks/process", response_model=WorkflowResponse) async def start_task_processing(request: StartTaskRequest): """Start a new task processing workflow.""" # Create workflow client client = DaprWorkflowClient() # Prepare input task_input = TaskInput( task_id=request.task_id, title=request.title, assignee=request.assignee ) # Schedule the workflow instance_id = client.schedule_new_workflow( workflow=task_processing_workflow, input=task_input, instance_id=f"task-{request.task_id}" # Optional: custom instance ID ) return WorkflowResponse( instance_id=instance_id, status="scheduled" )

Test it with curl:

bash
curl -X POST http://localhost:8000/tasks/process \ -H "Content-Type: application/json" \ -d '{"task_id": "123", "title": "Review PR", "assignee": "alice@example.com"}'

Output:

json
{ "instance_id": "task-123", "status": "scheduled" }

Workflow logs:

text
Validating task: 123 Assigning task 123 to alice@example.com Sending notification to alice@example.com: You have been assigned task: Review PR

Querying Workflow Status

Add an endpoint to check workflow progress:

python
@app.get("/tasks/{instance_id}/status") async def get_task_status(instance_id: str): """Get the status of a task processing workflow.""" client = DaprWorkflowClient() state = client.get_workflow_state(instance_id, fetch_payloads=True) if not state: raise HTTPException(status_code=404, detail="Workflow not found") return { "instance_id": instance_id, "status": state.runtime_status.name, "output": state.serialized_output if state.runtime_status.name == "COMPLETED" else None }

Complete Working Example

Here's the full application combining everything:

python
from contextlib import asynccontextmanager from dataclasses import dataclass from datetime import datetime from fastapi import FastAPI, HTTPException from pydantic import BaseModel from dapr.ext.workflow import ( WorkflowRuntime, DaprWorkflowContext, WorkflowActivityContext, DaprWorkflowClient ) # ============================================================================= # Data Classes # ============================================================================= @dataclass class TaskInput: task_id: str title: str assignee: str @dataclass class TaskResult: task_id: str status: str message: str # ============================================================================= # Workflow Runtime # ============================================================================= wfr = WorkflowRuntime() # ============================================================================= # Activities # ============================================================================= @wfr.activity(name="validate_task") def validate_task(ctx: WorkflowActivityContext, task: dict) -> dict: """Validate task has required fields.""" print(f"[Activity] Validating task: {task['task_id']}") validated_at = datetime.utcnow().isoformat() if not task.get("title"): return {"valid": False, "reason": "Missing title", "validated_at": validated_at} if not task.get("assignee"): return {"valid": False, "reason": "Missing assignee", "validated_at": validated_at} return {"valid": True, "validated_at": validated_at} @wfr.activity(name="assign_task") def assign_task(ctx: WorkflowActivityContext, task: dict) -> dict: """Assign task to user.""" print(f"[Activity] Assigning task {task['task_id']} to {task['assignee']}") assigned_at = datetime.utcnow().isoformat() return { "assigned": True, "assignee": task["assignee"], "assigned_at": assigned_at } @wfr.activity(name="send_notification") def send_notification(ctx: WorkflowActivityContext, notification: dict) -> dict: """Send notification to user.""" print(f"[Activity] Notifying {notification['recipient']}: {notification['message']}") sent_at = datetime.utcnow().isoformat() return {"sent": True, "sent_at": sent_at} # ============================================================================= # Workflow # ============================================================================= @wfr.workflow(name="task_processing_workflow") def task_processing_workflow(ctx: DaprWorkflowContext, task_input: TaskInput): """Orchestrate task processing through validation, assignment, and notification.""" task = { "task_id": task_input.task_id, "title": task_input.title, "assignee": task_input.assignee } # Step 1: Validate validation = yield ctx.call_activity(validate_task, input=task) if not validation["valid"]: return TaskResult(task_input.task_id, "rejected", validation["reason"]) # Step 2: Assign assignment = yield ctx.call_activity(assign_task, input=task) # Step 3: Notify notification = { "recipient": task_input.assignee, "message": f"You have been assigned: {task_input.title}" } yield ctx.call_activity(send_notification, input=notification) return TaskResult( task_input.task_id, "completed", f"Assigned to {assignment['assignee']} at {assignment['assigned_at']}" ) # ============================================================================= # FastAPI Application # ============================================================================= @asynccontextmanager async def lifespan(app: FastAPI): wfr.start() print("Workflow runtime started") yield wfr.shutdown() print("Workflow runtime stopped") app = FastAPI(title="Task Workflow Service", lifespan=lifespan) class StartTaskRequest(BaseModel): task_id: str title: str assignee: str @app.post("/tasks/process") async def start_task_processing(request: StartTaskRequest): """Start task processing workflow.""" client = DaprWorkflowClient() instance_id = client.schedule_new_workflow( workflow=task_processing_workflow, input=TaskInput(request.task_id, request.title, request.assignee), instance_id=f"task-{request.task_id}" ) return {"instance_id": instance_id, "status": "scheduled"} @app.get("/tasks/{instance_id}/status") async def get_task_status(instance_id: str): """Get workflow status.""" client = DaprWorkflowClient() state = client.get_workflow_state(instance_id, fetch_payloads=True) if not state: raise HTTPException(status_code=404, detail="Workflow not found") return { "instance_id": instance_id, "status": state.runtime_status.name, "output": state.serialized_output if state.runtime_status.name == "COMPLETED" else None }

Run with Dapr:

bash
dapr run --app-id task-workflow --app-port 8000 --dapr-grpc-port 50001 -- uvicorn main:app --host 0.0.0.0 --port 8000

Reflect on Your Skill

You extended your dapr-deployment skill with actor and workflow patterns in Module 7.0. Does it now cover workflow authoring?

Test Your Skill

text
Using my dapr-deployment skill, generate a workflow that processes customer orders with three activities: validate_order, reserve_inventory, and process_payment. Include the WorkflowRuntime setup and a FastAPI endpoint to start the workflow.

If your skill generates proper @wfr.activity and @wfr.workflow decorators, correct yield statements, and lifespan integration, it is working correctly.


Try With AI

Prompt 1: Understand the yield Mechanism

text
Explain what happens at each yield statement in a Dapr workflow. I have this code: validation = yield ctx.call_activity(validate_task, input=task) assignment = yield ctx.call_activity(assign_task, input=task) Walk me through: 1. What the workflow engine records when each yield is reached 2. What happens if my pod crashes between the two yield statements 3. How replay uses the recorded history to skip completed activities

Prompt 2: Design a Multi-Activity Workflow

text
Help me design a workflow for processing job applications. The workflow should: 1. Validate the application (check required fields) 2. Screen for keywords (skills matching) 3. Score the candidate (0-100 based on criteria) 4. Route based on score (high: interview, medium: review, low: reject) 5. Send appropriate notification Show me the activities, workflow function, and data passing between steps.

Prompt 3: Debug a Workflow Issue

text
My workflow isn't progressing past the first activity. Here's what I see in logs: [Activity] Validating task: 123 ... nothing after this ... My workflow code: @wfr.workflow(name="task_workflow") def task_workflow(ctx: DaprWorkflowContext, task: dict): validation = yield ctx.call_activity(validate_task, input=task) print(f"Validation result: {validation}") # This never prints Help me debug: 1. What could cause the workflow to stop after the first activity? 2. How do I check if the activity returned an error? 3. What logs should I look at in the Dapr sidecar?

Safety Note: When authoring workflows, remember that workflow code must be deterministic. Never use datetime.now(), random, or direct API calls inside the workflow function itself. Always put non-deterministic operations inside activities.