USMAN’S INSIGHTS
AI ARCHITECT
  • Home
  • About
  • Thought Leadership
  • Book
Press / Contact
USMAN’S INSIGHTS
AI ARCHITECT
⌘F
HomeBook
HomeBookReactive Intelligence: Event-Driven Actors
Previous Chapter
Actor Communication Patterns
Next Chapter
Actors Observability
AI NOTICE: This is the table of contents for the SPECIFIC CHAPTER only. It is NOT the global sidebar. For all chapters, look at the main navigation.

On this page

28 sections

Progress0%
1 / 28

Muhammad Usman Akbar Entity Profile

Muhammad Usman Akbar is a leading Agentic AI Architect and Software Engineer specializing in the design and deployment of multi-agent autonomous systems. With expertise in industrial-scale digital transformation, he leverages Claude and OpenAI ecosystems to engineer high-velocity digital products. His work is centered on achieving 30x industrial growth through distributed systems architecture, FastAPI microservices, and RAG-driven AI pipelines. Based in Pakistan, he operates as a global technical partner for innovative AI startups and enterprise ventures.

USMAN’S INSIGHTS
AI ARCHITECT

Transforming businesses into autonomous AI ecosystems. Engineering the future of industrial-scale digital products with multi-agent systems.

30X Growth
AI-First
Innovation

Navigation

  • Home
  • Book
  • About
  • Contact
Let's Collaborate

Have a Project in Mind?

Let's build something extraordinary together. Transform your vision into autonomous AI reality.

Start Your Transformation

© 2026 Muhammad Usman Akbar. All rights reserved.

Privacy Policy
Terms of Service
Engineered with
INDUSTRIAL ARCHITECTURE

Event-Driven Actors

Your TaskActor stores state. It processes method calls. But it sits idle, waiting for someone to invoke it. In real systems, actors don't just wait—they react. A new task arrives in a queue, and the actor springs to life. A cron schedule fires, and the actor performs cleanup. A webhook triggers, and the actor processes external data.

This is the difference between a responsive actor and a reactive one. Responsive actors wait for requests. Reactive actors respond to events from the world around them.

In Module 7.5, you learned about timers and reminders—internal scheduling within an actor. Now you'll connect actors to the broader event ecosystem: Dapr pub/sub for internal service events, and Dapr bindings for external triggers like cron schedules and webhooks.


The Actor Event Architecture

Actors and events work together in a specific pattern. The actor handles state and logic. Events trigger and communicate between actors and services.

text
Event Sources │ ┌─────────────────────────┼─────────────────────────┐ │ │ │ ┌──────▼──────┐ ┌───────▼───────┐ ┌──────▼──────┐ │ Pub/Sub │ │ Bindings │ │ Service │ │ Topics │ │ (cron, http) │ │ Invocation │ └──────┬──────┘ └───────┬───────┘ └──────┬──────┘ │ │ │ └─────────────────────────┼─────────────────────────┘ │ ┌──────▼──────┐ │ FastAPI │ │ Handlers │ └──────┬──────┘ │ ┌──────▼──────┐ │ ActorProxy │ │ Routing │ └──────┬──────┘ │ ┌──────────────────────┼──────────────────────┐ │ │ │ ┌──────▼──────┐ ┌──────▼──────┐ ┌──────▼──────┐ │ TaskActor │ │ TaskActor │ │ TaskActor │ │ task-001 │ │ task-002 │ │ task-003 │ └─────────────┘ └─────────────┘ └─────────────┘

The critical insight: Dapr delivers events to your FastAPI application, not directly to actors. You create handlers that receive events and route them to the appropriate actor instance using ActorProxy.


Actors Responding to Pub/Sub Events

When a pub/sub message arrives, Dapr calls a subscription endpoint on your FastAPI application. Your handler extracts the actor ID from the message and invokes the correct actor.

The Subscription Handler Pattern

Subscription YAML:

yaml
# components/subscription.yaml apiVersion: dapr.io/v2alpha1 kind: Subscription metadata: name: task-events-subscription spec: topic: task-events pubsubname: pubsub routes: default: /TaskActor/ProcessEvent scopes: - task-actor-service

FastAPI Subscription Handler:

python
from fastapi import FastAPI from dapr.ext.fastapi import DaprApp from dapr.actor import ActorProxy, ActorId app = FastAPI() dapr_app = DaprApp(app) @dapr_app.subscribe(pubsub='pubsub', topic='task-events', route='/TaskActor/ProcessEvent') async def handle_task_event(event_data: dict): """Receive task events and route to the appropriate TaskActor.""" print(f"Received task event: {event_data}") # Extract actor ID from event payload data = event_data.get('data', {}) task_id = data.get('task_id') if not task_id: print("No task_id in event, skipping") return {"status": "SKIPPED"} # Route to the correct actor instance proxy = ActorProxy.create( 'TaskActor', ActorId(task_id), TaskActorInterface ) # Invoke actor method await proxy.ProcessEvent(data) return {"status": "SUCCESS"}

Output:

text
>>> # Event published to task-events topic >>> # Dapr calls your subscription handler Received task event: {'data': {'task_id': 'task-123', 'event_type': 'task.assigned', 'assignee': 'alice'}} >>> # Handler routes to TaskActor instance >>> # TaskActor task-123 processes the event TaskActor task-123: Processing event task.assigned

Why This Pattern?

You might wonder: "Why can't Dapr deliver messages directly to actors?"

The answer lies in actor semantics:

  1. Actors are addressed by ID. Pub/sub topics don't know about actor IDs.
  2. Actors have typed methods. Events need routing logic to determine which method to call.
  3. Events might create actors. Your handler can activate actors on-demand when events arrive.

The subscription handler is the bridge—it maps event semantics (topic, payload) to actor semantics (ID, method).


Actors Publishing Events

Actors can also publish events outbound. This enables actors to communicate without knowing about each other.

Publishing from an Actor Method

python
from dapr.actor import Actor, ActorInterface, actormethod from dapr.clients import DaprClient import json class TaskActorInterface(ActorInterface): @actormethod(name="CompleteTask") async def complete_task(self) -> dict: ... class TaskActor(Actor, TaskActorInterface): def __init__(self, ctx, actor_id): super().__init__(ctx, actor_id) async def complete_task(self) -> dict: """Mark task complete and publish event.""" actor_id = self.id.id # Update internal state state = await self._state_manager.get_state("task_data") state["status"] = "completed" await self._state_manager.set_state("task_data", state) # Publish event for other services/actors with DaprClient() as client: client.publish_event( pubsub_name='pubsub', topic_name='task-events', data=json.dumps({ 'task_id': actor_id, 'event_type': 'task.completed', 'completed_at': state.get('completed_at') }), data_content_type='application/json' ) print(f"TaskActor {actor_id}: Published task.completed event") return {"status": "completed", "task_id": actor_id}

Output:

text
>>> # Client invokes actor method await proxy.CompleteTask() >>> # Actor updates state and publishes event TaskActor task-123: Published task.completed event >>> # Event flows to subscribers (other actors, services, etc.) >>> # NotificationActor, AnalyticsService, etc. receive the event

Event Publishing Best Practices

When publishing events from actors, follow these patterns:

python
async def _publish_event(self, event_type: str, payload: dict): """Reusable event publishing helper.""" event_data = { "actor_id": self.id.id, "actor_type": "TaskActor", "event_type": event_type, "timestamp": datetime.utcnow().isoformat(), **payload } with DaprClient() as client: client.publish_event( pubsub_name='pubsub', topic_name='task-events', data=json.dumps(event_data), data_content_type='application/json' )

Include enough context in events for subscribers to route correctly:

FieldPurpose
actor_idIdentifies which actor instance published
actor_typeAllows routing to same actor type
event_typeDetermines handler logic
timestampEnables ordering and deduplication

Input Bindings Triggering Actors

Input bindings connect actors to external event sources. A cron schedule fires, and an actor wakes up. A webhook arrives, and an actor processes it.

Cron Binding to Actor

Binding Component:

yaml
# components/scheduler-cron.yaml apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: scheduler-cron namespace: default spec: type: bindings.cron version: v1 metadata: - name: schedule value: "*/5 * * * *" # Every 5 minutes - name: direction value: "input"

FastAPI Handler:

python
@app.post("/scheduler-cron") async def handle_scheduler_cron(): """Triggered by cron, invokes SchedulerActor.""" print("Cron triggered, invoking SchedulerActor") # Use a well-known actor ID for singleton scheduler proxy = ActorProxy.create( 'SchedulerActor', ActorId('global-scheduler'), SchedulerActorInterface ) # Trigger scheduled work result = await proxy.ProcessScheduledTasks() return {"status": "OK", "result": result}

SchedulerActor Implementation:

python
class SchedulerActorInterface(ActorInterface): @actormethod(name="ProcessScheduledTasks") async def process_scheduled_tasks(self) -> dict: ... class SchedulerActor(Actor, SchedulerActorInterface): async def process_scheduled_tasks(self) -> dict: """Find and process tasks with passed deadlines.""" print(f"SchedulerActor: Checking for scheduled tasks") # Get list of tasks to process task_ids = await self._get_due_tasks() # Delegate to individual TaskActors for task_id in task_ids: proxy = ActorProxy.create( 'TaskActor', ActorId(task_id), TaskActorInterface ) await proxy.CheckDeadline() return {"processed": len(task_ids)}

Output:

text
>>> # Every 5 minutes, Dapr calls /scheduler-cron Cron triggered, invoking SchedulerActor >>> # SchedulerActor checks for due tasks SchedulerActor: Checking for scheduled tasks >>> # Delegates to individual TaskActors TaskActor task-001: Checking deadline TaskActor task-007: Checking deadline

Webhook Binding to Actor

External systems can trigger actors through HTTP bindings:

Binding Component:

yaml
# components/webhook-binding.yaml apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: external-webhook namespace: default spec: type: bindings.http version: v1 metadata: - name: direction value: "input"

FastAPI Handler:

python
from fastapi import Request @app.post("/external-webhook") async def handle_external_webhook(request: Request): """Receive webhook and route to appropriate actor.""" body = await request.json() print(f"Webhook received: {body}") # Extract routing information from webhook payload resource_type = body.get('resource_type') resource_id = body.get('resource_id') if resource_type == 'task': proxy = ActorProxy.create( 'TaskActor', ActorId(resource_id), TaskActorInterface ) await proxy.ProcessWebhook(body) return {"status": "OK"}

Output:

text
>>> # External system sends webhook POST /external-webhook {"resource_type": "task", "resource_id": "task-456", "action": "external_update"} >>> # Handler routes to TaskActor Webhook received: {'resource_type': 'task', 'resource_id': 'task-456', 'action': 'external_update'} >>> # TaskActor processes webhook data TaskActor task-456: Processing webhook external_update

Output Bindings from Actors

Actors can invoke external systems using output bindings—sending HTTP requests, writing to storage, or triggering notifications.

HTTP Output Binding from Actor

Binding Component:

yaml
# components/notification-http.yaml apiVersion: dapr.io/v1alpha1 kind: Component metadata: name: notification-http namespace: default spec: type: bindings.http version: v1 metadata: - name: url value: "https://hooks.slack.com/services/xxx/yyy/zzz" - name: direction value: "output"

Actor Using Output Binding:

python
class TaskActor(Actor, TaskActorInterface): async def send_notification(self, message: str) -> None: """Send notification via output binding.""" actor_id = self.id.id payload = json.dumps({ "text": f"Task {actor_id}: {message}", "channel": "#task-notifications" }) with DaprClient() as client: await client.invoke_binding( binding_name='notification-http', operation='post', data=payload ) print(f"TaskActor {actor_id}: Notification sent") async def complete_task(self) -> dict: """Complete task and notify via output binding.""" # Update state state = await self._state_manager.get_state("task_data") state["status"] = "completed" await self._state_manager.set_state("task_data", state) # Send external notification await self.send_notification("Task completed!") # Also publish internal event await self._publish_event("task.completed", state) return {"status": "completed"}

Output:

text
>>> # Task completion triggers both internal and external notifications await proxy.CompleteTask() >>> # Output binding sends to Slack TaskActor task-123: Notification sent >>> # Pub/sub notifies internal services TaskActor task-123: Published task.completed event

Complete Event-Driven Actor Example

Here's a complete example integrating all patterns—an event-driven task management system:

actors.py:

python
from dapr.actor import Actor, ActorInterface, actormethod, ActorProxy, ActorId from dapr.clients import DaprClient from datetime import datetime import json class TaskActorInterface(ActorInterface): @actormethod(name="ProcessEvent") async def process_event(self, event_data: dict) -> None: ... @actormethod(name="CompleteTask") async def complete_task(self) -> dict: ... @actormethod(name="CheckDeadline") async def check_deadline(self) -> None: ... class TaskActor(Actor, TaskActorInterface): def __init__(self, ctx, actor_id): super().__init__(ctx, actor_id) async def _on_activate(self) -> None: """Initialize state on activation.""" found, _ = await self._state_manager.try_get_state("task_data") if not found: await self._state_manager.set_state("task_data", { "status": "pending", "created_at": datetime.utcnow().isoformat() }) print(f"TaskActor {self.id.id} activated") async def process_event(self, event_data: dict) -> None: """Process incoming events.""" event_type = event_data.get('event_type', 'unknown') print(f"TaskActor {self.id.id}: Processing {event_type}") state = await self._state_manager.get_state("task_data") if event_type == 'task.assigned': state['assignee'] = event_data.get('assignee') state['status'] = 'assigned' elif event_type == 'task.updated': state.update(event_data.get('updates', {})) state['last_event'] = event_type state['last_event_at'] = datetime.utcnow().isoformat() await self._state_manager.set_state("task_data", state) async def complete_task(self) -> dict: """Mark task complete, notify externally and internally.""" actor_id = self.id.id state = await self._state_manager.get_state("task_data") state["status"] = "completed" state["completed_at"] = datetime.utcnow().isoformat() await self._state_manager.set_state("task_data", state) # External notification via output binding with DaprClient() as client: await client.invoke_binding( binding_name='notification-http', operation='post', data=json.dumps({ "text": f"Task {actor_id} completed by {state.get('assignee', 'unknown')}" }) ) # Internal event via pub/sub client.publish_event( pubsub_name='pubsub', topic_name='task-events', data=json.dumps({ 'task_id': actor_id, 'event_type': 'task.completed', 'completed_at': state['completed_at'] }), data_content_type='application/json' ) print(f"TaskActor {actor_id}: Completed and notified") return {"status": "completed", "task_id": actor_id} async def check_deadline(self) -> None: """Check if task is past deadline, mark overdue if so.""" state = await self._state_manager.get_state("task_data") deadline = state.get('deadline') if deadline and datetime.fromisoformat(deadline) < datetime.utcnow(): state['status'] = 'overdue' await self._state_manager.set_state("task_data", state) # Publish overdue event with DaprClient() as client: client.publish_event( pubsub_name='pubsub', topic_name='task-events', data=json.dumps({ 'task_id': self.id.id, 'event_type': 'task.overdue' }), data_content_type='application/json' ) print(f"TaskActor {self.id.id}: Marked overdue")

main.py:

python
from fastapi import FastAPI, Request from dapr.ext.fastapi import DaprActor, DaprApp from dapr.actor import ActorProxy, ActorId from actors import TaskActor, TaskActorInterface app = FastAPI() dapr_app = DaprApp(app) actor_extension = DaprActor(app) @app.on_event("startup") async def startup(): await actor_extension.register_actor(TaskActor) print("Registered TaskActor") # Pub/Sub: Route task events to actors @dapr_app.subscribe(pubsub='pubsub', topic='task-events', route='/TaskActor/ProcessEvent') async def handle_task_event(event_data: dict): """Route pub/sub events to TaskActor instances.""" data = event_data.get('data', {}) task_id = data.get('task_id') if not task_id: return {"status": "SKIPPED"} proxy = ActorProxy.create('TaskActor', ActorId(task_id), TaskActorInterface) await proxy.ProcessEvent(data) return {"status": "SUCCESS"} # Input Binding: Cron triggers scheduler @app.post("/scheduler-cron") async def handle_scheduler_cron(): """Cron triggers deadline checks across active tasks.""" print("Scheduler cron triggered") # In production, you'd query a list of active task IDs active_tasks = ["task-001", "task-002", "task-003"] for task_id in active_tasks: proxy = ActorProxy.create('TaskActor', ActorId(task_id), TaskActorInterface) await proxy.CheckDeadline() return {"status": "OK", "checked": len(active_tasks)} # Input Binding: External webhook @app.post("/external-webhook") async def handle_webhook(request: Request): """External systems trigger actor updates.""" body = await request.json() task_id = body.get('task_id') if task_id: proxy = ActorProxy.create('TaskActor', ActorId(task_id), TaskActorInterface) await proxy.ProcessEvent({ 'event_type': 'external.update', **body }) return {"status": "OK"} # Health check @app.get("/health") async def health(): return {"status": "OK"}

Output:

text
>>> # Pub/sub event arrives Received task event: {'task_id': 'task-123', 'event_type': 'task.assigned', 'assignee': 'alice'} TaskActor task-123 activated TaskActor task-123: Processing task.assigned >>> # Cron triggers deadline checks Scheduler cron triggered TaskActor task-001: Checking deadline TaskActor task-002: Checking deadline TaskActor task-002: Marked overdue >>> # External webhook arrives Webhook: {'task_id': 'task-456', 'source': 'calendar-api'} TaskActor task-456: Processing external.update >>> # Task completion triggers both pub/sub and output binding TaskActor task-123: Completed and notified

When to Use Each Pattern

PatternUse WhenExample
Pub/Sub to ActorInternal services notify actorsOrder Service publishes order.created, TaskActor processes
Actor PublishesActor state changes need broadcastingTaskActor publishes task.completed for analytics
Cron to ActorScheduled work across actorsDaily cleanup, deadline checks
Webhook to ActorExternal systems trigger actorsCalendar API notifies task updates
Actor to Output BindingActors notify external systemsSend Slack message when task overdue

Reflect on Your Skill

You built a dapr-deployment skill earlier in this chapter. Test and improve it based on event-driven actor patterns.

Test Your Skill

text
Using my dapr-deployment skill, create a subscription handler that routes pub/sub events to Task Actor instances. Show me the subscription YAML and the FastAPI handler.

Does your skill understand the routing pattern—events to handlers to ActorProxy to actors?

Identify Gaps

Ask yourself:

  • Does my skill explain that Dapr delivers events to FastAPI, not directly to actors?
  • Can it show both pub/sub subscription handlers and binding handlers?
  • Does it include the ActorProxy pattern for routing?

Improve Your Skill

If you found gaps:

markdown
My dapr-deployment skill needs event-driven actor patterns. Add these concepts: 1. Pub/sub events route through FastAPI handlers to ActorProxy 2. Binding triggers (cron, webhook) invoke actors via handlers 3. Actors publish events using DaprClient.publish_event 4. Actors invoke external systems via output bindings

Try With AI

Apply event-driven actor patterns to your domain.

Setup: Open Claude Code or your preferred AI assistant in your Dapr project directory.


Prompt 1: Pub/Sub to Actor Routing

text
Create a Dapr subscription that routes task events to TaskActor instances. Show me: 1. The subscription YAML with correct routing 2. The FastAPI handler that extracts task_id and creates ActorProxy 3. The actor method that processes the event 4. How to test this with curl I'm using Dapr 1.14 with dapr-ext-fastapi.

What you're learning: The subscription handler is the bridge between topic-based pub/sub and ID-based actors. You extract the actor ID from the event payload and route to the correct instance. This pattern enables actors to react to events without knowing about pub/sub directly.


Prompt 2: Cron Binding to Actor

text
Create a cron binding that triggers a SchedulerActor every 5 minutes. Show me: 1. The cron binding component YAML 2. The FastAPI handler at the matching endpoint 3. How the handler invokes SchedulerActor 4. The SchedulerActor implementation that delegates to TaskActors The scheduler should check deadlines across all active tasks.

What you're learning: Cron bindings replace actor timers when you need application-level scheduling rather than per-actor scheduling. The handler pattern is the same—binding delivers to FastAPI, handler routes to actor via proxy.


Prompt 3: Actor Publishing and Output Bindings

text
I have a TaskActor that needs to: 1. Publish a task.completed event via pub/sub when completed 2. Send a Slack notification via HTTP output binding 3. Both actions should happen in the complete_task method Show me: 1. The output binding component YAML 2. The actor method that does both 3. How to structure the event payload for pub/sub 4. Error handling if the external notification fails Include the full actor code.

What you're learning: Actors can be both event consumers and producers. Using DaprClient within actor methods lets you publish events (internal communication) and invoke bindings (external communication). This creates truly reactive actors that participate in broader event flows.


Safety Note: When processing events from pub/sub or bindings, always validate the payload structure before routing to actors. Malformed events should be logged and dropped (return SKIPPED), not crash your handler. Consider implementing a dead-letter pattern for events that fail processing.