USMAN’S INSIGHTS
AI ARCHITECT
  • Home
  • About
  • Thought Leadership
  • Book
Press / Contact
USMAN’S INSIGHTS
AI ARCHITECT
⌘F
HomeBook
HomeBookMemory as an Extension of Agency: The Chat Actor Pattern
Previous Chapter
Hello Actors - Your First Actor
Next Chapter
Actor State Management
AI NOTICE: This is the table of contents for the SPECIFIC CHAPTER only. It is NOT the global sidebar. For all chapters, look at the main navigation.

On this page

25 sections

Progress0%
1 / 25

Muhammad Usman Akbar Entity Profile

Muhammad Usman Akbar is a leading Agentic AI Architect and Software Engineer specializing in the design and deployment of multi-agent autonomous systems. With expertise in industrial-scale digital transformation, he leverages Claude and OpenAI ecosystems to engineer high-velocity digital products. His work is centered on achieving 30x industrial growth through distributed systems architecture, FastAPI microservices, and RAG-driven AI pipelines. Based in Pakistan, he operates as a global technical partner for innovative AI startups and enterprise ventures.

USMAN’S INSIGHTS
AI ARCHITECT

Transforming businesses into autonomous AI ecosystems. Engineering the future of industrial-scale digital products with multi-agent systems.

30X Growth
AI-First
Innovation

Navigation

  • Home
  • Book
  • About
  • Contact
Let's Collaborate

Have a Project in Mind?

Let's build something extraordinary together. Transform your vision into autonomous AI reality.

Start Your Transformation

© 2026 Muhammad Usman Akbar. All rights reserved.

Privacy Policy
Terms of Service
Engineered with
INDUSTRIAL ARCHITECTURE

Chat Actor - Stateful Conversations

You've created a simple actor that stores greetings. Now consider what makes AI agents useful: they remember context. When a user asks a question, then follows up with "What about the deadline?", the agent understands "the deadline" refers to something from the previous exchange. Without conversation history, every interaction starts from zero.

This is the core use case for actors in AI systems. Each user session needs its own isolated state—their conversation history, preferences, and context. A traditional approach might use Redis keys like chat:user123:history, but you'd manage concurrency, serialization, and cleanup yourself. With Dapr actors, each chat session IS an actor instance. The runtime handles activation, state persistence, and concurrent access. You focus on conversation logic.

In this lesson, you'll build a ChatActor that maintains conversation history and publishes events when conversations update. By the end, you'll have a pattern for any stateful agent that needs to remember context across interactions.

The ChatActor Pattern

A chat session maps naturally to an actor:

  • One actor instance per user session — ActorId("user-alice") and ActorId("user-bob") are separate instances
  • State is conversation history — A list of messages with roles (user, assistant)
  • Methods handle interactions — process_message adds to history and returns response
  • Events notify other services — Pub/sub announces conversation updates
text
User "alice" sends message | v +-------------------+ | ChatActor | | ID: "alice" | +-------------------+ | State: | | - history: [...] | <-- Persisted in Redis +-------------------+ | Methods: | | - process_message | | - get_history | +-------------------+ | | publishes "ConversationUpdated" v +-------------------+ | Pub/Sub Topic | <-- Other services react | "user-chat" | +-------------------+

Why not just store history in a regular state store key? Three reasons:

ChallengeRegular StateActor State
Concurrent updatesManual locking or ETagsTurn-based concurrency (automatic)
Lifecycle managementManual cleanupAutomatic garbage collection
Method invocationHTTP to your serviceActor method calls routed by Dapr

Define the Actor Interface

Start with the interface that defines what your ChatActor can do:

python
from dapr.actor import ActorInterface, actormethod from pydantic import BaseModel class Message(BaseModel): """A single message in the conversation.""" role: str # "user" or "assistant" content: str class ChatAgentInterface(ActorInterface): """Interface for the ChatAgent actor.""" @actormethod(name="ProcessMessage") async def process_message(self, user_input: Message) -> Message | None: """Process a user message and return assistant response.""" ... @actormethod(name="GetConversationHistory") async def get_conversation_history(self) -> list[dict] | None: """Retrieve the full conversation history.""" ...

The @actormethod decorator with the name parameter is critical. The name you provide (like "ProcessMessage") is the method name used in actor invocation. Python's snake_case method name (process_message) is your local implementation, but external callers use the decorator name.

Implement the ChatActor

Now implement the actor with state management and pub/sub integration:

python
import logging import json from dapr.actor import Actor from dapr.clients import DaprClient class ChatAgent(Actor, ChatAgentInterface): """Actor that maintains conversation history for a single user.""" def __init__(self, ctx, actor_id): super().__init__(ctx, actor_id) self._history_key = f"history-{actor_id.id}" self._actor_id = actor_id async def _on_activate(self) -> None: """Initialize state when actor activates.""" logging.info(f"Activating ChatAgent for {self._history_key}") try: history = await self._state_manager.get_state(self._history_key) if history is None: logging.info(f"Initializing empty history for {self._history_key}") await self._state_manager.set_state(self._history_key, []) except Exception as e: logging.warning(f"Activation error for {self._history_key}: {e}") await self._state_manager.set_state(self._history_key, []) async def process_message(self, user_input: Message) -> Message: """Process user message, update history, publish event, return response.""" # Validate input (important for data from external callers) user_input = Message.model_validate(user_input) # Load current history history = await self._state_manager.get_state(self._history_key) current_history = history if isinstance(history, list) else [] # Add user message to history current_history.append({"role": "user", "content": user_input.content}) # Generate response (static for now - you'll add LLM later) response = Message( role="assistant", content=f"Received your message: {user_input.content}" ) # Add assistant response to history current_history.append(response.model_dump()) # Limit history to last 10 exchanges (20 messages) if len(current_history) > 20: current_history = current_history[-20:] # Persist updated history await self._state_manager.set_state(self._history_key, current_history) logging.info(f"Updated history for {self._history_key}: {len(current_history)} messages") # Publish conversation event await self._publish_conversation_event(user_input, response) return response.model_dump() async def get_conversation_history(self) -> list[dict]: """Return the full conversation history.""" history = await self._state_manager.get_state(self._history_key) return history if isinstance(history, list) else [] async def _publish_conversation_event(self, user_input: Message, response: Message) -> None: """Publish ConversationUpdated event to pub/sub topic.""" event_data = { "actor_id": self._actor_id.id, "actor_type": "ChatAgent", "event_type": "ConversationUpdated", "input": user_input.model_dump(), "output": response.model_dump() } with DaprClient() as client: try: client.publish_event( pubsub_name="daca-pubsub", topic_name="user-chat", data=json.dumps(event_data) ) logging.info(f"Published ConversationUpdated for {self._actor_id.id}") except Exception as e: logging.error(f"Failed to publish event: {e}")

Key Implementation Details

PatternPurpose
_history_key = f"history-{actor_id.id}"State key includes actor ID for isolation
Message.model_validate(user_input)Validates incoming data from ActorProxy
if len(current_history) > 20Prevents unbounded history growth
DaprClient() context managerClean resource management for pub/sub

Register Actor and Create Endpoints

Wire up the actor with FastAPI:

python
from fastapi import FastAPI, HTTPException from pydantic import BaseModel from dapr.ext.fastapi import DaprActor from dapr.actor import ActorProxy, ActorId import logging import json logging.basicConfig(level=logging.INFO) app = FastAPI(title="ChatAgentService", description="Chat Actor Demo") # Add Dapr Actor extension actor = DaprActor(app) class Message(BaseModel): role: str content: str # Register actor on startup @app.on_event("startup") async def startup(): await actor.register_actor(ChatAgent) logging.info("Registered ChatAgent actor") # REST endpoint to send a message @app.post("/chat/{actor_id}") async def send_message(actor_id: str, data: Message): """Send a message to a user's chat session.""" if not data.content or not isinstance(data.content, str): raise HTTPException(status_code=400, detail="Invalid message content") proxy = ActorProxy.create("ChatAgent", ActorId(actor_id), ChatAgentInterface) response = await proxy.ProcessMessage(data.model_dump()) return {"response": response} # REST endpoint to get conversation history @app.get("/chat/{actor_id}/history") async def get_history(actor_id: str): """Get conversation history for a user's chat session.""" proxy = ActorProxy.create("ChatAgent", ActorId(actor_id), ChatAgentInterface) history = await proxy.GetConversationHistory() return {"history": history} # Subscription endpoint for conversation events @app.post("/subscribe") async def handle_conversation_event(data: dict): """Handle ConversationUpdated events from pub/sub.""" try: logging.info(f"Received event: {data}") event_data_raw = data.get("data", "{}") event_data = json.loads(event_data_raw) user_id = event_data.get("actor_id", "unknown") input_msg = event_data.get("input", {}).get("content", "no message") output_msg = event_data.get("output", {}).get("content", "no response") logging.info(f"User {user_id}: '{input_msg}' -> '{output_msg}'") return {"status": "processed"} except json.JSONDecodeError as e: logging.error(f"Invalid event data: {e}") return {"status": "error", "message": "Invalid JSON"}

Configure Pub/Sub Subscription

Create a subscription that routes conversation events to your endpoint:

yaml
# components/message-subscription.yaml apiVersion: dapr.io/v2alpha1 kind: Subscription metadata: name: chat-subscription spec: pubsubname: daca-pubsub topic: user-chat routes: default: /subscribe

Apply to your cluster:

bash
kubectl apply -f components/message-subscription.yaml

Test the ChatActor

With the service running (tilt up or kubectl apply), test the conversation flow:

bash
# Start a conversation as user "alice" curl -X POST http://localhost:8000/chat/alice \ -H "Content-Type: application/json" \ -d '{"role": "user", "content": "Hello, I need help with my project"}'

Response:

json
{ "response": { "role": "assistant", "content": "Received your message: Hello, I need help with my project" } }

Check history:

bash
curl http://localhost:8000/chat/alice/history

Response:

json
{ "history": [ {"role": "user", "content": "Hello, I need help with my project"}, {"role": "assistant", "content": "Received your message: Hello, I need help with my project"} ] }

Alice and Bob have completely separate conversation histories. This is the power of actor IDs—natural isolation without explicit partitioning logic.

Actor ID as Session Identifier

The actor ID pattern enables various session strategies:

StrategyActor ID FormatUse Case
Per-useruser-{user_id}Long-running user assistant
Per-conversationconv-{uuid}Multiple conversations per user
Per-sessionsession-{token}Temporary, anonymous chats
Per-channelchannel-{platform}-{id}Multi-platform support

Event-Driven Extensions

The pub/sub integration opens powerful patterns. Other services can subscribe to user-chat events for:

SubscriberReaction to ConversationUpdated
Analytics ServiceTrack conversation metrics
Audit ServiceLog all interactions for compliance
Notification ServiceAlert on specific keywords
Context ServiceBuild user profiles from conversations

Reflect on Your Skill

You extended your dapr-deployment skill in L00. Does it include ChatActor patterns now?

Test Your Skill

"Using my dapr-deployment skill, generate a ChatActor that stores conversation history with a configurable limit and publishes events on each message."

Identify Gaps

  • Does my skill explain why actor IDs provide session isolation?
  • Does it include the pub/sub integration pattern from within actors?
  • Does it handle the Pydantic validation for incoming messages?

Improve Your Skill

If you found gaps:

markdown
Update my dapr-deployment skill to include: - ChatActor pattern with conversation history - DaprClient pub/sub from within actor methods - Actor ID strategies for different session patterns - History size limiting to prevent unbounded growth

Try With AI

Prompt 1: Design Your Agent's State

"I'm building an AI agent that helps users plan travel itineraries. What state should my actor store besides basic conversation history? Consider preferences, intermediate results, and history limits."

Prompt 2: Add LLM Integration

"Take this ChatActor's process_message method and help me integrate an OpenAI call. How should I send conversation history as context while keeping the response time reasonable?"

Prompt 3: Debug Actor State Issues

"My ChatActor seems to lose history between calls. Help me debug: is it a configuration issue, a key pattern problem, or a misunderstanding of the virtual actor lifecycle?"