USMAN’S INSIGHTS
AI ARCHITECT
  • Home
  • About
  • Thought Leadership
  • Book
Press / Contact
USMAN’S INSIGHTS
AI ARCHITECT
⌘F
HomeBook
HomeBookAgent Collaboration: Actor Communication Patterns
Previous Chapter
Timers and Reminders
Next Chapter
Event-Driven Actors
AI NOTICE: This is the table of contents for the SPECIFIC CHAPTER only. It is NOT the global sidebar. For all chapters, look at the main navigation.

On this page

31 sections

Progress0%
1 / 31

Muhammad Usman Akbar Entity Profile

Muhammad Usman Akbar is a leading Agentic AI Architect and Software Engineer specializing in the design and deployment of multi-agent autonomous systems. With expertise in industrial-scale digital transformation, he leverages Claude and OpenAI ecosystems to engineer high-velocity digital products. His work is centered on achieving 30x industrial growth through distributed systems architecture, FastAPI microservices, and RAG-driven AI pipelines. Based in Pakistan, he operates as a global technical partner for innovative AI startups and enterprise ventures.

USMAN’S INSIGHTS
AI ARCHITECT

Transforming businesses into autonomous AI ecosystems. Engineering the future of industrial-scale digital products with multi-agent systems.

30X Growth
AI-First
Innovation

Navigation

  • Home
  • Book
  • About
  • Contact
Let's Collaborate

Have a Project in Mind?

Let's build something extraordinary together. Transform your vision into autonomous AI reality.

Start Your Transformation

© 2026 Muhammad Usman Akbar. All rights reserved.

Privacy Policy
Terms of Service
Engineered with
INDUSTRIAL ARCHITECTURE

Actor Communication Patterns

Your ChatAgent works. It stores conversation history, manages state across activations, and publishes events. But now you need something more sophisticated: a multi-agent system where specialized actors collaborate on complex tasks.

Picture an AI customer support system. When a customer sends a message, the ChatAgent shouldn't do everything itself. It should delegate: route the message to a ResponseAgent that generates intelligent replies, while a MemoryAgent tracks conversation context across sessions. Each agent has a single responsibility, isolated state, and clear boundaries.

This is the actor coordination problem. How do actors talk to each other? How do you build hierarchies where parent actors manage child workers? How do peers collaborate without creating tangled dependencies?

The answer is ActorProxy: Dapr's mechanism for actor-to-actor communication.


ActorProxy: The Actor Communication Gateway

When one actor needs to invoke methods on another actor, it doesn't call methods directly. Direct method calls would break actor isolation and turn-based concurrency guarantees. Instead, actors communicate through ActorProxy, which routes requests through the Dapr sidecar.

python
from dapr.actor import ActorProxy, ActorId # Create a proxy to invoke another actor proxy = ActorProxy.create( actor_type="ResponseAgent", # The type of actor to invoke actor_id=ActorId("response-user1"), # The specific instance actor_interface=ResponseAgentInterface # The interface defining available methods ) # Invoke a method on the target actor response = await proxy.ProcessMessage({"content": "Hello!"})

Output:

json
{'role': 'assistant', 'content': 'Memory: user: Hello!. Got your message: Hello! at 2025-01-15T10:30:45+00:00'}

The ActorProxy.create() call doesn't actually create or activate the target actor. It creates a local proxy object that knows how to route requests. The target actor activates on first method invocation, following virtual actor semantics.

Method Naming Convention

When defining actor methods in Python, use snake_case for the implementation but PascalCase for the @actormethod decorator name:

python
class ResponseAgentInterface(ActorInterface): @actormethod(name="ProcessMessage") # PascalCase in decorator async def process_message(self, user_input: dict) -> dict | None: # snake_case implementation pass

When invoking via proxy, use the PascalCase name from the decorator:

python
# Correct: Use PascalCase method name response = await proxy.ProcessMessage(message_dict) # This won't work: # response = await proxy.process_message(message_dict)

Parent-Child Actor Pattern

The parent-child pattern establishes a hierarchy where one actor manages and coordinates others. The parent actor creates child actor IDs, delegates tasks to them, and aggregates results.

Consider a ChatAgent that delegates response generation to a ResponseAgent:

python
class ChatAgentInterface(ActorInterface): @actormethod(name="ProcessMessage") async def process_message(self, user_input: dict) -> dict | None: pass @actormethod(name="GetConversationHistory") async def get_conversation_history(self) -> list[dict] | None: pass class ChatAgent(Actor, ChatAgentInterface): """Parent actor that manages conversation flow by delegating to child actors.""" def __init__(self, ctx, actor_id): super().__init__(ctx, actor_id) self._history_key = f"history-{actor_id.id}" self._actor_id = actor_id async def _on_activate(self) -> None: """Initialize conversation history on activation.""" logging.info(f"Activating ChatAgent for {self._history_key}") try: history = await self._state_manager.get_state(self._history_key) if history is None: await self._state_manager.set_state(self._history_key, []) except Exception as e: logging.warning(f"Initializing fresh state: {e}") await self._state_manager.set_state(self._history_key, []) async def process_message(self, user_input: dict) -> dict: """Delegate message processing to ResponseAgent child.""" logging.info(f"ChatAgent processing: {user_input}") # Load and update history history = await self._state_manager.get_state(self._history_key) current_history = history if isinstance(history, list) else [] current_history.append(user_input) # Create child actor proxy with predictable ID response_actor_id = ActorId(f"response-{self._actor_id.id}") response_proxy = ActorProxy.create( "ResponseAgent", response_actor_id, ResponseAgentInterface ) # Delegate to child actor response = await response_proxy.ProcessMessage(user_input) current_history.append(response) # Save updated history await self._state_manager.set_state(self._history_key, current_history) logging.info(f"ChatAgent delegated successfully") return response async def get_conversation_history(self) -> list[dict]: """Retrieve full conversation history.""" history = await self._state_manager.get_state(self._history_key) return history if isinstance(history, list) else []

Output (when invoking ChatAgent):

text
INFO: Activating ChatAgent for history-user1 INFO: ChatAgent processing: {'role': 'user', 'content': 'Hello!'} INFO: ChatAgent delegated successfully {'role': 'assistant', 'content': 'Memory: Message count: 1. Got your message: Hello! at 2025-01-15T10:30:45+00:00'}

The Child Actor

The ResponseAgent handles the actual response generation:

python
class ResponseAgentInterface(ActorInterface): @actormethod(name="ProcessMessage") async def process_message(self, user_input: dict) -> dict | None: pass @actormethod(name="GetMessageCount") async def get_message_count(self) -> int | None: pass class ResponseAgent(Actor, ResponseAgentInterface): """Child actor that generates responses and tracks message count.""" def __init__(self, ctx, actor_id): super().__init__(ctx, actor_id) self._count_key = f"response-count-{actor_id.id}" self._actor_id = actor_id async def _on_activate(self) -> None: """Initialize message counter on activation.""" logging.info(f"Activating ResponseAgent for {self._count_key}") try: count = await self._state_manager.get_state(self._count_key) if count is None: await self._state_manager.set_state(self._count_key, 0) except Exception: await self._state_manager.set_state(self._count_key, 0) async def process_message(self, user_input: dict) -> dict: """Generate response with timestamp and message count.""" logging.info(f"ResponseAgent processing: {user_input}") # Increment and persist message count count = await self._state_manager.get_state(self._count_key) count = count if isinstance(count, int) else 0 count += 1 await self._state_manager.set_state(self._count_key, count) # Generate timestamped response timestamp = datetime.now(UTC).isoformat() response_content = f"Memory: Message count: {count}. Got your message: {user_input['content']} at {timestamp}" response = {"role": "assistant", "content": response_content} logging.info(f"ResponseAgent generated response #{count}") return response async def get_message_count(self) -> int: """Return total messages processed.""" count = await self._state_manager.get_state(self._count_key) return count if isinstance(count, int) else 0

Output (when ResponseAgent activates):

text
INFO: Activating ResponseAgent for response-count-response-user1 INFO: ResponseAgent processing: {'role': 'user', 'content': 'Hello!'} INFO: ResponseAgent generated response #1

Actor ID Conventions

Notice the predictable ID pattern: when ChatAgent with ID user1 needs to delegate, it creates a ResponseAgent with ID response-user1. This convention enables:

  • Predictable routing: Any actor can compute the ID of related actors
  • State isolation: Each user gets dedicated actor instances
  • Debugging clarity: Actor IDs reveal relationships in logs
Parent IDChild TypeChild ID Convention
user1ResponseAgentresponse-user1
user1MemoryAgentmemory-user1
order-123PaymentActorpayment-order-123
task-456WorkerActorworker-task-456

Peer-to-Peer Actor Communication

Not all actor relationships are hierarchical. Sometimes actors communicate as equals, each maintaining their own domain while collaborating on shared workflows.

Consider a ResponseAgent that queries a MemoryAgent for conversation context:

python
class MemoryAgentInterface(ActorInterface): @actormethod(name="UpdateMemory") async def update_memory(self, message: dict) -> None: pass @actormethod(name="GetMemory") async def get_memory(self) -> list[dict] | None: pass class MemoryAgent(Actor, MemoryAgentInterface): """Peer actor that maintains conversation memory for context-aware responses.""" def __init__(self, ctx, actor_id): super().__init__(ctx, actor_id) self._memory_key = f"memory-{actor_id.id}" async def _on_activate(self) -> None: """Initialize empty memory on activation.""" logging.info(f"Activating MemoryAgent for {self._memory_key}") try: memory = await self._state_manager.get_state(self._memory_key) if memory is None: await self._state_manager.set_state(self._memory_key, []) except Exception: await self._state_manager.set_state(self._memory_key, []) async def update_memory(self, message: dict) -> None: """Add messages to memory, keeping last 6 entries.""" user_message = message.get("user_message") response_message = message.get("response_message") logging.info(f"MemoryAgent updating: user={user_message}") memory = await self._state_manager.get_state(self._memory_key) current_memory = memory if isinstance(memory, list) else [] if user_message: current_memory.append(user_message) if response_message: current_memory.append(response_message) # Keep only last 6 messages for context window if len(current_memory) > 6: current_memory = current_memory[-6:] await self._state_manager.set_state(self._memory_key, current_memory) logging.info(f"MemoryAgent stored {len(current_memory)} messages") async def get_memory(self) -> list[dict]: """Retrieve conversation memory.""" memory = await self._state_manager.get_state(self._memory_key) return memory if isinstance(memory, list) else []

Now ResponseAgent can query MemoryAgent as a peer:

python
async def process_message(self, user_input: dict) -> dict: """Generate response enriched with memory context from peer actor.""" logging.info(f"ResponseAgent processing: {user_input}") # Increment message count count = await self._state_manager.get_state(self._count_key) count = (count if isinstance(count, int) else 0) + 1 await self._state_manager.set_state(self._count_key, count) # Query peer MemoryAgent for context memory_actor_id = ActorId(f"memory-{self._actor_id.id}") memory_proxy = ActorProxy.create( "MemoryAgent", memory_actor_id, MemoryAgentInterface ) memory = await memory_proxy.GetMemory() # Build context from memory if memory: memory_context = "; ".join([ f"{m['role']}: {m['content']}" for m in memory ]) else: memory_context = f"Message count: {count}" # Generate response with context timestamp = datetime.now(UTC).isoformat() response_content = f"Memory: {memory_context}. Got your message: {user_input['content']} at {timestamp}" logging.info(f"ResponseAgent used memory context: {memory_context[:50]}...") return {"role": "assistant", "content": response_content}

Output (when ResponseAgent queries MemoryAgent):

text
INFO: ResponseAgent processing: {'role': 'user', 'content': 'What did I say earlier?'} INFO: MemoryAgent retrieved 4 messages INFO: ResponseAgent used memory context: user: Hello!; assistant: Got your...

Complete Multi-Agent System

Here's a complete example showing parent-child and peer-to-peer patterns working together:

python
import logging import json from datetime import datetime, UTC from fastapi import FastAPI, HTTPException from pydantic import BaseModel from dapr.ext.fastapi import DaprActor from dapr.actor import Actor, ActorInterface, ActorProxy, ActorId, actormethod from dapr.clients import DaprClient logging.basicConfig(level=logging.INFO) app = FastAPI(title="MultiAgentService") actor = DaprActor(app) class Message(BaseModel): role: str content: str # ============ INTERFACES ============ class ChatAgentInterface(ActorInterface): @actormethod(name="ProcessMessage") async def process_message(self, user_input: dict) -> dict | None: pass @actormethod(name="GetConversationHistory") async def get_conversation_history(self) -> list[dict] | None: pass class ResponseAgentInterface(ActorInterface): @actormethod(name="ProcessMessage") async def process_message(self, user_input: dict) -> dict | None: pass class MemoryAgentInterface(ActorInterface): @actormethod(name="UpdateMemory") async def update_memory(self, message: dict) -> None: pass @actormethod(name="GetMemory") async def get_memory(self) -> list[dict] | None: pass # ============ IMPLEMENTATIONS ============ class ChatAgent(Actor, ChatAgentInterface): """Parent: Orchestrates conversation by delegating to child actors.""" def __init__(self, ctx, actor_id): super().__init__(ctx, actor_id) self._history_key = f"history-{actor_id.id}" self._actor_id = actor_id async def _on_activate(self) -> None: has_state = await self._state_manager.try_get_state(self._history_key) if not has_state[0]: await self._state_manager.set_state(self._history_key, []) async def process_message(self, user_input: dict) -> dict: # Update history history = await self._state_manager.get_state(self._history_key) current_history = history if isinstance(history, list) else [] current_history.append(user_input) # Delegate to ResponseAgent (parent-child) response_proxy = ActorProxy.create( "ResponseAgent", ActorId(f"response-{self._actor_id.id}"), ResponseAgentInterface ) response = await response_proxy.ProcessMessage(user_input) current_history.append(response) await self._state_manager.set_state(self._history_key, current_history) # Update MemoryAgent via pub/sub (triggers event-driven update) await self._publish_conversation_event(user_input, response) return response async def _publish_conversation_event(self, user_input: dict, response: dict) -> None: event_data = { "actor_id": self._actor_id.id, "input": user_input, "output": response } with DaprClient() as client: client.publish_event( pubsub_name="daca-pubsub", topic_name="user-chat", data=json.dumps(event_data) ) async def get_conversation_history(self) -> list[dict]: history = await self._state_manager.get_state(self._history_key) return history if isinstance(history, list) else [] class ResponseAgent(Actor, ResponseAgentInterface): """Child: Generates responses using memory from peer MemoryAgent.""" def __init__(self, ctx, actor_id): super().__init__(ctx, actor_id) self._count_key = f"count-{actor_id.id}" self._actor_id = actor_id async def _on_activate(self) -> None: has_state = await self._state_manager.try_get_state(self._count_key) if not has_state[0]: await self._state_manager.set_state(self._count_key, 0) async def process_message(self, user_input: dict) -> dict: # Increment count count = await self._state_manager.get_state(self._count_key) count = (count if isinstance(count, int) else 0) + 1 await self._state_manager.set_state(self._count_key, count) # Query peer MemoryAgent for context memory_proxy = ActorProxy.create( "MemoryAgent", ActorId(f"memory-{self._actor_id.id}"), MemoryAgentInterface ) memory = await memory_proxy.GetMemory() memory_context = ( "; ".join([f"{m['role']}: {m['content']}" for m in memory]) if memory else f"Message count: {count}" ) timestamp = datetime.now(UTC).isoformat() return { "role": "assistant", "content": f"Memory: {memory_context}. Got: {user_input['content']} at {timestamp}" } class MemoryAgent(Actor, MemoryAgentInterface): """Peer: Maintains conversation memory for ResponseAgent.""" def __init__(self, ctx, actor_id): super().__init__(ctx, actor_id) self._memory_key = f"memory-{actor_id.id}" async def _on_activate(self) -> None: has_state = await self._state_manager.try_get_state(self._memory_key) if not has_state[0]: await self._state_manager.set_state(self._memory_key, []) async def update_memory(self, message: dict) -> None: memory = await self._state_manager.get_state(self._memory_key) current_memory = memory if isinstance(memory, list) else [] if message.get("user_message"): current_memory.append(message["user_message"]) if message.get("response_message"): current_memory.append(message["response_message"]) # Keep last 6 for context window current_memory = current_memory[-6:] if len(current_memory) > 6 else current_memory await self._state_manager.set_state(self._memory_key, current_memory) async def get_memory(self) -> list[dict]: memory = await self._state_manager.get_state(self._memory_key) return memory if isinstance(memory, list) else [] # ============ REGISTRATION ============ @app.on_event("startup") async def startup(): await actor.register_actor(ChatAgent) await actor.register_actor(ResponseAgent) await actor.register_actor(MemoryAgent) logging.info("Registered: ChatAgent, ResponseAgent, MemoryAgent") # ============ ENDPOINTS ============ @app.post("/chat/{actor_id}") async def chat(actor_id: str, data: Message): """Send message to ChatAgent, which delegates to ResponseAgent.""" proxy = ActorProxy.create("ChatAgent", ActorId(actor_id), ChatAgentInterface) response = await proxy.ProcessMessage(data.model_dump()) return {"response": response} @app.get("/chat/{actor_id}/history") async def get_history(actor_id: str): """Get conversation history from ChatAgent.""" proxy = ActorProxy.create("ChatAgent", ActorId(actor_id), ChatAgentInterface) history = await proxy.GetConversationHistory() return {"history": history} @app.get("/memory/{actor_id}") async def get_memory(actor_id: str): """Get memory from MemoryAgent.""" proxy = ActorProxy.create("MemoryAgent", ActorId(f"memory-response-{actor_id}"), MemoryAgentInterface) memory = await proxy.GetMemory() return {"memory": memory} @app.post("/subscribe") async def subscribe(data: dict): """Handle pub/sub events to update MemoryAgent.""" event_data = data.get("data", {}) if isinstance(event_data, str): event_data = json.loads(event_data) user_id = event_data.get("actor_id", "unknown") memory_proxy = ActorProxy.create( "MemoryAgent", ActorId(f"memory-response-{user_id}"), MemoryAgentInterface ) await memory_proxy.UpdateMemory({ "user_message": event_data.get("input"), "response_message": event_data.get("output") }) return {"status": "processed"}

Output (complete flow):

text
POST /chat/alice {"role": "user", "content": "Hello!"} INFO: Registered: ChatAgent, ResponseAgent, MemoryAgent INFO: Activating ChatAgent for history-alice INFO: Activating ResponseAgent for count-response-alice INFO: Activating MemoryAgent for memory-response-alice {"response": {"role": "assistant", "content": "Memory: Message count: 1. Got: Hello! at 2025-01-15T10:30:45+00:00"}} POST /chat/alice {"role": "user", "content": "Remember me?"} {"response": {"role": "assistant", "content": "Memory: user: Hello!; assistant: Memory: Message count: 1... Got: Remember me? at 2025-01-15T10:31:12+00:00"}}

Virtual Actor Semantics: What Happens When You Call a Non-Existent Actor?

A common question: "What if the target actor doesn't exist when I call it?"

The answer reveals a key insight about Dapr's virtual actor model: actors don't need to exist before you call them. When you invoke an actor that hasn't been activated:

  1. Dapr routes the request to the Placement service.
  2. Placement assigns the actor to an available host.
  3. The actor activates (runs _on_activate).
  4. The method executes.
  5. Response returns to the caller.

This means your ChatAgent can delegate to a ResponseAgent that has never been called before. The first method invocation creates and activates it automatically.

python
# ResponseAgent with ID "response-alice" doesn't exist yet proxy = ActorProxy.create("ResponseAgent", ActorId("response-alice"), ResponseAgentInterface) # This call activates the actor, then executes ProcessMessage response = await proxy.ProcessMessage({"content": "Hello"}) # ResponseAgent "response-alice" now exists with initialized state

Communication Flow Visualization

text
User Request: POST /chat/alice {"role": "user", "content": "Hello!"} | v +---------------+ | ChatAgent | (Parent) | ID: alice | +---------------+ | | 1. ActorProxy.create("ResponseAgent", "response-alice") | 2. await proxy.ProcessMessage(...) v +---------------+ | ResponseAgent | (Child) | ID: response- | | alice | +---------------+ | | 3. ActorProxy.create("MemoryAgent", "memory-response-alice") | 4. await proxy.GetMemory() v +---------------+ | MemoryAgent | (Peer) | ID: memory- | | response-alice| +---------------+ | | 5. Return memory context v ResponseAgent generates response with context | | 6. Return response to ChatAgent v ChatAgent stores history, publishes event | | 7. Return response to user v {"response": {"role": "assistant", "content": "..."}}

Reflect on Your Skill

You built a dapr-deployment skill earlier. Does it understand actor communication patterns?

Test Your Skill

"Using my dapr-deployment skill, generate code for two actors that communicate: a TaskManagerActor that delegates to WorkerActors using ActorProxy."

Identify Gaps

  • Did my skill use PascalCase method names in proxy calls?
  • Did it explain that target actors activate on first invocation?
  • Did it show both parent-child and peer-to-peer patterns?

Improve Your Skill

If you found gaps:

markdown
Update my dapr-deployment skill to include actor communication patterns: - ActorProxy.create() syntax with type, ActorId, and interface - Parent-child delegation pattern with ID conventions - Peer-to-peer actor queries - Virtual actor activation semantics (actors created on first call)

Try With AI

Prompt 1: Design Parent-Child Hierarchy

"Help me design a parent-child actor hierarchy for a task processing system. I need a TaskManagerActor that delegates to workers, tracking completion via predictable IDs like 'worker-{task_id}'."

Prompt 2: Implement Peer-to-Peer Communication

"I have an OrderActor and an InventoryActor. When OrderActor processes an order, it should query InventoryActor to check stock availability before completing. Show me the ActorProxy calls."

Prompt 3: Debug Actor Communication

"My ChatAgent calls ResponseAgent, but I get a 'Method not found' error. Here is my interface and my proxy call. What is wrong with the naming convention?"

Safety Note

Actor-to-actor communication adds latency (each proxy call goes through the Dapr sidecar) and complexity (more actors to debug). Start with simple hierarchies before building deep actor chains. For synchronous request-response patterns, consider whether actors are the right abstraction or whether direct service invocation would be simpler.