USMAN’S INSIGHTS
AI ARCHITECT
  • Home
  • About
  • Thought Leadership
  • Book
Press / Contact
USMAN’S INSIGHTS
AI ARCHITECT
⌘F
HomeBook
HomeBookThe Finished Architecture: Capstone Event-Driven Agent Notifications
Previous Chapter
AI-Assisted Kafka Development
Next Chapter
Dapr Core - Sidecar Building Blocks
AI NOTICE: This is the table of contents for the SPECIFIC CHAPTER only. It is NOT the global sidebar. For all chapters, look at the main navigation.

On this page

50 sections

Progress0%
1 / 50

Muhammad Usman Akbar Entity Profile

Muhammad Usman Akbar is a leading Agentic AI Architect and Software Engineer specializing in the design and deployment of multi-agent autonomous systems. With expertise in industrial-scale digital transformation, he leverages Claude and OpenAI ecosystems to engineer high-velocity digital products. His work is centered on achieving 30x industrial growth through distributed systems architecture, FastAPI microservices, and RAG-driven AI pipelines. Based in Pakistan, he operates as a global technical partner for innovative AI startups and enterprise ventures.

USMAN’S INSIGHTS
AI ARCHITECT

Transforming businesses into autonomous AI ecosystems. Engineering the future of industrial-scale digital products with multi-agent systems.

30X Growth
AI-First
Innovation

Navigation

  • Home
  • Book
  • About
  • Contact
Let's Collaborate

Have a Project in Mind?

Let's build something extraordinary together. Transform your vision into autonomous AI reality.

Start Your Transformation

© 2026 Muhammad Usman Akbar. All rights reserved.

Privacy Policy
Terms of Service
Engineered with
INDUSTRIAL ARCHITECTURE

Capstone: Event-Driven Agent Notifications

You've spent this chapter building event-driven system components piece by piece: producers with reliability guarantees, consumers with proper offset management, Avro schemas with Schema Registry, FastAPI integration patterns, and agent event designs. Now it's time to compose these skills into a complete system.

This capstone follows the spec-driven development approach you've seen throughout this book. You'll write a specification first, then implement it by composing the patterns you've learned. This mirrors how production AI agents are built: specify intent clearly, then orchestrate implementation using accumulated skills.

The goal is practical: add event-driven notifications to the Task API from Part 6. When a task is created, updated, or completed, the system should publish events that trigger notifications and create an immutable audit trail. By the end, you'll have a working event-driven notification system that demonstrates the patterns professional teams use in production.

Phase 1: Write the Specification

Before writing any code, define precisely what you're building. A clear specification enables focused implementation and provides acceptance criteria for validation.

Specification Structure

Every capstone specification answers these questions:

SectionQuestion Answered
IntentWhat problem does this solve? What's the business value?
ConstraintsWhat boundaries must the implementation respect?
Success CriteriaHow do we know it's working correctly?
Non-GoalsWhat are we explicitly NOT building?
ArchitectureHow do components interact?

The Event-Driven Notifications Specification

markdown
# Event-Driven Notifications Specification ## Intent Add event-driven capabilities to the Task API that enable: 1. Decoupled notification delivery when task state changes 2. Immutable audit logging of all task lifecycle events 3. Foundation for future event consumers (reminders, analytics, integrations) **Business Value**: The current Task API uses synchronous calls for notifications. If the notification service is slow or unavailable, task creation blocks. Event-driven architecture decouples these concerns. ## Constraints ### Technical Constraints - **Kafka Cluster**: Use existing Strimzi-managed Kafka (from Chapter 4) - **Python Client**: Use confluent-kafka-python (from Chapters 6-8) - **Delivery Guarantee**: At-least-once for notifications (duplicates acceptable) - **Audit Guarantee**: At-least-once with consumer deduplication - **Schema**: JSON initially (Schema Registry optional extension) ## Success Criteria ### SC-1: Task Events Published - [ ] When a task is created, a `task.created` event is published to `task-events` topic - [ ] When a task is updated, a `task.updated` event is published - [ ] When a task is completed, a `task.completed` event is published - [ ] Each event includes: event_id, event_type, occurred_at, task data, metadata ### SC-2: Notification Service Consumes Events - [ ] Notification service runs as separate consumer group (`notification-service`) - [ ] Service receives task.created events and logs notification delivery - [ ] Service receives task.completed events and logs completion notification - [ ] Service commits offsets only after successful processing ### SC-3: Audit Service Consumes Events - [ ] Audit service runs as separate consumer group (`audit-service`) - [ ] Service receives ALL task events (created, updated, completed) - [ ] Service appends each event to an immutable log file - [ ] Service deduplication by event_id to handle redeliveries ## Architecture ┌─────────────────┐ ┌─────────────────────────┐ │ Task API │ │ Kafka Cluster │ │ (FastAPI) │──────>│ topic: task-events │ │ │ │ partitions: 1 │ └─────────────────┘ └───────────┬─────────────┘ │ ┌───────────────┴───────────────┐ │ │ v v ┌───────────────────┐ ┌───────────────────┐ │ Notification Svc │ │ Audit Service │ │ group: notify-svc │ │ group: audit-svc │ │ events: created, │ │ events: ALL │ │ completed│ │ output: log file │ └───────────────────┘ └───────────────────┘ ## Event Schema ```json { "event_id": "uuid", "event_type": "task.created | task.updated | task.completed", "occurred_at": "ISO-8601 timestamp", "data": { "task_id": "uuid", "title": "string", "status": "pending | in_progress | completed", "owner_id": "uuid (optional)" }, "metadata": { "correlation_id": "uuid (request trace)", "source": "task-api" } }

Why This Specification Matters

Notice what the specification provides:

  1. Clear intent: Not "add Kafka to Task API" but "enable decoupled notifications with audit trail"

  2. Measurable criteria: Each success criterion is a checkbox that can be verified

  3. Explicit boundaries: Non-goals prevent scope creep

  4. Visual architecture: The diagram clarifies component relationships

When you work with AI to implement this specification, you have shared understanding of what "done" looks like.

Phase 2: Implement the Specification

Now implement the specification by composing patterns from earlier chapters. This phase demonstrates the core skill of spec-driven development: translating clear requirements into working code.

Step 1: Create Event Schema and Publisher

First, implement the event schema and producer integration for the Task API.

Create events/schemas.py:

python
from dataclasses import dataclass, field, asdict from datetime import datetime, timezone from typing import Optional from uuid import uuid4 import json @dataclass class TaskData: """Task information included in events.""" task_id: str title: str status: str owner_id: Optional[str] = None @dataclass class EventMetadata: """Metadata for event tracing and debugging.""" correlation_id: str source: str = "task-api" @dataclass class TaskEvent: """Base event schema for task lifecycle events.""" event_type: str data: TaskData metadata: EventMetadata event_id: str = field(default_factory=lambda: str(uuid4())) occurred_at: str = field( default_factory=lambda: datetime.now(timezone.utc).isoformat() ) def to_json(self) -> str: """Serialize event to JSON string.""" return json.dumps(asdict(self)) @classmethod def task_created( cls, task_id: str, title: str, correlation_id: str, owner_id: Optional[str] = None ) -> "TaskEvent": """Factory for task.created events.""" return cls( event_type="task.created", data=TaskData( task_id=task_id, title=title, status="pending", owner_id=owner_id ), metadata=EventMetadata(correlation_id=correlation_id) ) @classmethod def task_updated( cls, task_id: str, title: str, status: str, correlation_id: str, owner_id: Optional[str] = None ) -> "TaskEvent": """Factory for task.updated events.""" return cls( event_type="task.updated", data=TaskData( task_id=task_id, title=title, status=status, owner_id=owner_id ), metadata=EventMetadata(correlation_id=correlation_id) ) @classmethod def task_completed( cls, task_id: str, title: str, correlation_id: str, owner_id: Optional[str] = None ) -> "TaskEvent": """Factory for task.completed events.""" return cls( event_type="task.completed", data=TaskData( task_id=task_id, title=title, status="completed", owner_id=owner_id ), metadata=EventMetadata(correlation_id=correlation_id) )

Output:

text
>>> from events.schemas import TaskEvent >>> event = TaskEvent.task_created("task-123", "Buy groceries", "req-456") >>> print(event.to_json()) {"event_type": "task.created", "data": {"task_id": "task-123", "title": "Buy groceries", "status": "pending", "owner_id": null}, "metadata": {"correlation_id": "req-456", "source": "task-api"}, "event_id": "a1b2c3d4-...", "occurred_at": "2025-01-15T10:30:00.000000+00:00"}

Step 2: Implement Event Publisher

Create a reliable publisher that integrates with FastAPI's lifespan.

Create events/publisher.py:

python
from confluent_kafka import Producer from typing import Optional, Callable import logging from .schemas import TaskEvent logger = logging.getLogger(__name__) class EventPublisher: """Reliable event publisher for Task API events.""" def __init__(self, bootstrap_servers: str): self.producer = Producer({ 'bootstrap.servers': bootstrap_servers, 'client.id': 'task-api-publisher', 'acks': 'all', 'enable.idempotence': True, 'max.in.flight.requests.per.connection': 5, 'retries': 2147483647, 'delivery.timeout.ms': 30000 }) self.topic = 'task-events' def _delivery_callback(self, err, msg): """Handle delivery result.""" if err is not None: logger.error( f"Event delivery failed: {err}, " f"key={msg.key()}, topic={msg.topic()}" ) else: logger.info( f"Event delivered: topic={msg.topic()}, " f"partition={msg.partition()}, offset={msg.offset()}" ) def publish(self, event: TaskEvent) -> None: """Publish event to Kafka topic.""" self.producer.produce( topic=self.topic, key=event.data.task_id, value=event.to_json(), callback=self._delivery_callback ) # Process callbacks without blocking self.producer.poll(0) def flush(self) -> None: """Ensure all pending events are delivered.""" remaining = self.producer.flush(timeout=10) if remaining > 0: logger.warning(f"{remaining} events not delivered on flush") # Global publisher instance (initialized in lifespan) _publisher: Optional[EventPublisher] = None def get_publisher() -> EventPublisher: """Get the global publisher instance.""" if _publisher is None: raise RuntimeError("Publisher not initialized. Check lifespan setup.") return _publisher def init_publisher(bootstrap_servers: str) -> EventPublisher: """Initialize the global publisher.""" global _publisher _publisher = EventPublisher(bootstrap_servers) return _publisher def shutdown_publisher() -> None: """Shutdown the global publisher.""" global _publisher if _publisher is not None: _publisher.flush() _publisher = None

Output:

text
>>> from events.publisher import init_publisher, get_publisher >>> from events.schemas import TaskEvent >>> publisher = init_publisher("localhost:30092") >>> event = TaskEvent.task_created("task-789", "Review PR", "req-111") >>> publisher.publish(event) >>> publisher.flush() INFO:events.publisher:Event delivered: topic=task-events, partition=0, offset=42

Step 3: Integrate with FastAPI

Connect the publisher to FastAPI endpoints.

Update main.py:

python
from contextlib import asynccontextmanager from fastapi import FastAPI, HTTPException from pydantic import BaseModel from typing import Optional from uuid import uuid4 import os from events.publisher import init_publisher, shutdown_publisher, get_publisher from events.schemas import TaskEvent @asynccontextmanager async def lifespan(app: FastAPI): """Manage application lifespan for Kafka producer.""" bootstrap_servers = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:30092") init_publisher(bootstrap_servers) yield shutdown_publisher() app = FastAPI(title="Task API", lifespan=lifespan) class TaskCreate(BaseModel): title: str owner_id: Optional[str] = None class TaskUpdate(BaseModel): title: Optional[str] = None status: Optional[str] = None class Task(BaseModel): id: str title: str status: str owner_id: Optional[str] = None # In-memory task storage (replace with database in production) tasks: dict[str, Task] = {} @app.post("/tasks", response_model=Task) async def create_task(task_in: TaskCreate): """Create a new task and publish task.created event.""" correlation_id = str(uuid4()) task_id = str(uuid4()) task = Task( id=task_id, title=task_in.title, status="pending", owner_id=task_in.owner_id ) tasks[task_id] = task # Publish event event = TaskEvent.task_created( task_id=task_id, title=task.title, correlation_id=correlation_id, owner_id=task.owner_id ) get_publisher().publish(event) return task @app.patch("/tasks/{task_id}", response_model=Task) async def update_task(task_id: str, task_in: TaskUpdate): """Update a task and publish task.updated event.""" if task_id not in tasks: raise HTTPException(status_code=404, detail="Task not found") correlation_id = str(uuid4()) task = tasks[task_id] if task_in.title is not None: task.title = task_in.title if task_in.status is not None: task.status = task_in.status # Publish event event = TaskEvent.task_updated( task_id=task_id, title=task.title, status=task.status, correlation_id=correlation_id, owner_id=task.owner_id ) get_publisher().publish(event) return task @app.post("/tasks/{task_id}/complete", response_model=Task) async def complete_task(task_id: str): """Complete a task and publish task.completed event.""" if task_id not in tasks: raise HTTPException(status_code=404, detail="Task not found") correlation_id = str(uuid4()) task = tasks[task_id] task.status = "completed" # Publish event event = TaskEvent.task_completed( task_id=task_id, title=task.title, correlation_id=correlation_id, owner_id=task.owner_id ) get_publisher().publish(event) return task

Output:

bash
$ curl -X POST http://localhost:8000/tasks \ -H "Content-Type: application/json" \ -d '{"title": "Write capstone lesson"}' {"id":"abc123","title":"Write capstone lesson","status":"pending","owner_id":null} # In the Task API logs: INFO:events.publisher:Event delivered: topic=task-events, partition=0, offset=43

Step 4: Implement Notification Service

Create a consumer that processes task events for notifications.

Create services/notification_service.py:

python
from confluent_kafka import Consumer, KafkaError import json import logging import signal import sys logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class NotificationService: """Consume task events and deliver notifications.""" def __init__(self, bootstrap_servers: str): self.consumer = Consumer({ 'bootstrap.servers': bootstrap_servers, 'group.id': 'notification-service', 'auto.offset.reset': 'earliest', 'enable.auto.commit': False # Manual commit after processing }) self.running = True self.subscribed_events = {'task.created', 'task.completed'} def process_event(self, event: dict) -> bool: """Process a single event. Returns True if successful.""" event_type = event.get('event_type') if event_type not in self.subscribed_events: # Skip events we don't handle return True task_data = event.get('data', {}) task_id = task_data.get('task_id') title = task_data.get('title') if event_type == 'task.created': logger.info( f"NOTIFICATION: New task created - '{title}' (ID: {task_id})" ) # In production: send email, push notification, etc. elif event_type == 'task.completed': logger.info( f"NOTIFICATION: Task completed - '{title}' (ID: {task_id})" ) # In production: send completion notification return True def run(self): """Main consumer loop.""" self.consumer.subscribe(['task-events']) logger.info("Notification service started, waiting for events...") while self.running: msg = self.consumer.poll(timeout=1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: continue logger.error(f"Consumer error: {msg.error()}") continue try: event = json.loads(msg.value().decode('utf-8')) if self.process_event(event): # Commit only after successful processing self.consumer.commit(message=msg) except json.JSONDecodeError as e: logger.error(f"Invalid JSON in message: {e}") # Commit to skip malformed message self.consumer.commit(message=msg) def shutdown(self): """Graceful shutdown.""" logger.info("Shutting down notification service...") self.running = False self.consumer.close() def main(): import os bootstrap_servers = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:30092") service = NotificationService(bootstrap_servers) # Handle graceful shutdown def signal_handler(sig, frame): service.shutdown() sys.exit(0) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) service.run() if __name__ == "__main__": main()

Output:

bash
$ python services/notification_service.py INFO:__main__:Notification service started, waiting for events... INFO:__main__:NOTIFICATION: New task created - 'Write capstone lesson' (ID: abc123) INFO:__main__:NOTIFICATION: Task completed - 'Write capstone lesson' (ID: abc123)

Step 5: Implement Audit Service

Create a consumer that logs all events to an immutable audit trail.

Create services/audit_service.py:

python
from confluent_kafka import Consumer, KafkaError import json import logging import signal import sys from pathlib import Path from datetime import datetime logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class AuditService: """Consume ALL task events and write to immutable audit log.""" def __init__(self, bootstrap_servers: str, audit_log_path: str = "audit.log"): self.consumer = Consumer({ 'bootstrap.servers': bootstrap_servers, 'group.id': 'audit-service', 'auto.offset.reset': 'earliest', 'enable.auto.commit': False }) self.running = True self.audit_log_path = Path(audit_log_path) self.seen_events: set[str] = set() self._load_seen_events() def _load_seen_events(self): """Load previously seen event IDs for deduplication.""" if self.audit_log_path.exists(): with open(self.audit_log_path, 'r') as f: for line in f: try: entry = json.loads(line) self.seen_events.add(entry.get('event_id')) except json.JSONDecodeError: continue logger.info(f"Loaded {len(self.seen_events)} previously seen events") def process_event(self, event: dict) -> bool: """Process event and append to audit log. Returns True if successful.""" event_id = event.get('event_id') # Deduplicate: skip if we've seen this event if event_id in self.seen_events: logger.debug(f"Skipping duplicate event: {event_id}") return True # Create audit entry audit_entry = { 'event_id': event_id, 'event_type': event.get('event_type'), 'occurred_at': event.get('occurred_at'), 'data': event.get('data'), 'metadata': event.get('metadata'), 'audited_at': datetime.utcnow().isoformat() } # Append to audit log (immutable append-only) with open(self.audit_log_path, 'a') as f: f.write(json.dumps(audit_entry) + '\n') self.seen_events.add(event_id) logger.info( f"AUDIT: {event.get('event_type')} - " f"task_id={event.get('data', {}).get('task_id')}" ) return True def run(self): """Main consumer loop.""" self.consumer.subscribe(['task-events']) logger.info("Audit service started, logging all events...") while self.running: msg = self.consumer.poll(timeout=1.0) if msg is None: continue if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: continue logger.error(f"Consumer error: {msg.error()}") continue try: event = json.loads(msg.value().decode('utf-8')) if self.process_event(event): self.consumer.commit(message=msg) except json.JSONDecodeError as e: logger.error(f"Invalid JSON in message: {e}") self.consumer.commit(message=msg) def shutdown(self): """Graceful shutdown.""" logger.info("Shutting down audit service...") self.running = False self.consumer.close() def main(): import os bootstrap_servers = os.getenv("KAFKA_BOOTSTRAP_SERVERS", "localhost:30092") audit_log_path = os.getenv("AUDIT_LOG_PATH", "audit.log") service = AuditService(bootstrap_servers, audit_log_path) def signal_handler(sig, frame): service.shutdown() sys.exit(0) signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) service.run() if __name__ == "__main__": main()

Phase 3: Validate Against Specification

The implementation is complete. Now verify each success criterion from the specification.

Validation Checklist

SC-1: Task Events Published

bash
# Start Kafka console consumer to observe events kubectl exec -it task-events-kafka-0 -n kafka -- \ bin/kafka-console-consumer.sh \ --bootstrap-server localhost:30092 \ --topic task-events \ --from-beginning # In another terminal, create a task via API curl -X POST http://localhost:8000/tasks \ -H "Content-Type: application/json" \ -d '{"title": "Validate capstone"}' # Console consumer shows: {"event_type":"task.created","data":{"task_id":"xyz789","title":"Validate capstone",...}
CriterionStatus
task.created event publishedPASS
task.updated event publishedPASS
task.completed event publishedPASS
Event schema includes all required fieldsPASS

SC-2: Notification Service Consumes Events

bash
# Start notification service python services/notification_service.py # Create and complete a task curl -X POST http://localhost:8000/tasks -d '{"title": "Test notification"}' curl -X POST http://localhost:8000/tasks/xyz789/complete # Notification service logs: INFO: NOTIFICATION: New task created - 'Test notification' (ID: xyz789) INFO: NOTIFICATION: Task completed - 'Test notification' (ID: xyz789)
CriterionStatus
Runs as separate consumer groupPASS
Receives task.created eventsPASS
Receives task.completed eventsPASS
Commits after successful processingPASS

SC-3: Audit Service Consumes Events

bash
# Start audit service python services/audit_service.py # Create, update, and complete a task curl -X POST http://localhost:8000/tasks -d '{"title": "Test audit"}' curl -X PATCH http://localhost:8000/tasks/abc123 -d '{"status": "in_progress"}' curl -X POST http://localhost:8000/tasks/abc123/complete # Verify audit log contains all three events cat audit.log | wc -l 3 # Restart audit service and verify deduplication python services/audit_service.py # Logs: "Loaded 3 previously seen events" # No duplicate entries in audit.log
CriterionStatus
Runs as separate consumer groupPASS
Receives ALL task eventsPASS
Appends to immutable log filePASS
Deduplicates by event_idPASS

SC-4: End-to-End Flow Verified

bash
# Run all services simultaneously # Terminal 1: Task API uvicorn main:app --reload # Terminal 2: Notification Service python services/notification_service.py # Terminal 3: Audit Service python services/audit_service.py # Terminal 4: Test the flow curl -X POST http://localhost:8000/tasks \ -H "Content-Type: application/json" \ -d '{"title": "End-to-end test"}' # Verify: # - Notification service logged the creation # - Audit service logged the creation # - audit.log has the entry # Check consumer lag kubectl exec task-events-kafka-0 -n kafka -- \ bin/kafka-consumer-groups.sh \ --bootstrap-server localhost:30092 \ --describe --group notification-service
CriterionStatus
Create task triggers both consumersPASS
Consumer lag below 100PASS
Resume from correct offset after restartPASS

Validation Summary

All success criteria from the specification have been verified:

Success CriteriaResult
SC-1: Task Events Published4/4 PASS
SC-2: Notification Service4/4 PASS
SC-3: Audit Service4/4 PASS
SC-4: End-to-End Flow3/3 PASS

The implementation matches the specification.

What You Built

This capstone demonstrated the spec-driven development pattern for event-driven systems:

PhaseActivityOutcome
SpecificationDefine intent, constraints, success criteriaClear requirements document
ImplementationCompose patterns from previous chaptersWorking code matching spec
ValidationVerify each success criterionEvidence of correctness

The system you built includes:

  • Event schema with factory methods for type-safe event creation
  • Reliable publisher with idempotent producer and proper lifespan management
  • FastAPI integration publishing events from API endpoints
  • Notification service consuming specific event types with manual offset commits
  • Audit service with deduplication for immutable event logging

These patterns compose into production event-driven architectures. The same structure scales to dozens of consumers processing millions of events.

Try With AI

Use AI to extend and refine your capstone implementation.

Prompt 1: Add Schema Registry Integration

Specification
I have a working event-driven notification system with JSON events.I want to add Avro schemas with Schema Registry for type safety.Here's my current TaskEvent schema (Python dataclass):[paste TaskEvent class]Help me:1. Create an Avro schema that matches this structure2. Modify the publisher to use AvroSerializer3. Modify the consumers to use AvroDeserializer4. Handle schema evolution (what if I add a new field later?)Show the code changes needed for each component.

What you're learning: Schema Registry integration adds type safety and evolution management. This is the pattern production systems use to prevent schema drift between producers and consumers.


Prompt 2: Add Dead Letter Queue

Specification
My notification service might fail to process some events(e.g., invalid task data, external service timeout).Help me implement a dead letter queue pattern:1. Catch processing failures in the consumer2. Publish failed events to a 'task-events-dlq' topic3. Include original event, error message, and retry count4. Create a simple DLQ processor that logs failed eventsShow me the modified NotificationService and the DLQ processor.

What you're learning: Production consumers need failure handling. Dead letter queues capture problematic events for investigation without blocking the main consumer.


Prompt 3: Improve the Specification

Specification
Review my event-driven notifications specification and suggest improvements:[paste the specification from Phase 1]Consider:1. Are the success criteria specific enough to automate testing?2. What edge cases are missing (network failures, duplicate events)?3. Should the architecture include error handling components?4. What monitoring requirements should be added?Propose an enhanced specification with these improvements.

What you're learning: Specifications evolve through iteration. Reviewing your spec after implementation reveals gaps and improvements for future systems.


Safety Note: When testing event-driven systems, always verify consumer offsets are committed correctly before stopping services. Uncommitted offsets can cause duplicate processing on restart. Use kafka-consumer-groups.sh --describe to check consumer group state before any planned maintenance.