1 Architecture Event Sourcing
blightbow edited this page 2025-12-08 04:15:26 +00:00

Architecture: Event Sourcing

Infrastructure - Audit Trail and Time-Travel Debugging


Overview

The event sourcing system captures all assistant state changes as immutable events:

  • AssistantAggregate - Domain model with event-decorated methods
  • AssistantApplication - Application layer for persistence and queries
  • Archive system - Export to JSON and database cleanup

Built on the eventsourcing library with Django persistence via eventsourcing-django.


1. AssistantAggregate

Domain model capturing all state changes as events (eventsourcing/domain.py).

Events

Event Trigger Data Captured
AssistantCreated create() assistant_key, config
ToolExecuted execute_tool() tool_name, parameters, reasoning, success, error
GoalCreated create_goal() goal_id, description, priority, parent_id
GoalUpdated update_goal() goal_id, status, priority, metadata
GoalCompleted complete_goal() goal_id, outcome
ReflectionRecorded record_reflection() content, tags, context
EmergencyStopTriggered trigger_emergency_stop() reason, consecutive_errors
EmergencyStopCleared clear_emergency_stop() cleared_by
TickCompleted complete_tick() tick_number, duration_ms, action_taken
ConfigurationChanged change_configuration() key, old_value, new_value
AssistantArchived archive() reason, archive_path, final_stats

Aggregate State

class AssistantAggregate(Aggregate):
    def __init__(self):
        self.assistant_key: str = ""
        self.created_at: Optional[datetime] = None

        # Tool tracking
        self.tool_count: int = 0
        self.last_tool: Optional[str] = None
        self.tool_stats: Dict[str, int] = {}  # tool_name → count

        # Goal tracking
        self.goals: Dict[str, Dict] = {}      # goal_id → goal_data
        self.completed_goals: int = 0

        # Reflection tracking
        self.reflection_count: int = 0

        # Emergency stop state
        self.is_stopped: bool = False
        self.stop_reason: Optional[str] = None

        # Tick tracking
        self.tick_count: int = 0

        # Archive state (tombstone)
        self.is_archived: bool = False

Event-Decorated Methods

@event("ToolExecuted")
def execute_tool(
    self,
    tool_name: str,
    parameters: Dict[str, Any],
    reasoning: str,
    success: bool = True,
    error: Optional[str] = None,
    execution_time_ms: Optional[float] = None,
):
    # State mutations happen here
    self.tool_count += 1
    self.last_tool = tool_name
    self.tool_stats[tool_name] = self.tool_stats.get(tool_name, 0) + 1

Each @event method:

  1. Captures method call as immutable event
  2. Stores event in event store
  3. Applies state mutations to aggregate

2. AssistantApplication

Application layer for interacting with aggregates (eventsourcing/application.py).

Getting the Application

from evennia.contrib.base_systems.ai.eventsourcing import get_application

app = get_application()  # Singleton instance

Assistant Lifecycle

# Create new assistant
assistant_id = app.create_assistant("my_assistant", config={...})

# Get current state
assistant = app.get_assistant(assistant_id)

# Get by key
assistant = app.get_assistant_by_key("my_assistant")

# Get or create
assistant_id, created = app.get_or_create_assistant("my_assistant")

# Register existing (after server restart)
app.register_existing_assistant("my_assistant", aggregate_id)

Recording Events

# Tool execution
app.execute_tool(
    assistant_id,
    tool_name="inspect_location",
    parameters={"target": "room"},
    reasoning="Checking surroundings",
    success=True,
    execution_time_ms=150.0
)

# Goal lifecycle
app.create_goal(assistant_id, "goal_1", "Build tavern", priority=1)
app.update_goal(assistant_id, "goal_1", status="blocked")
app.complete_goal(assistant_id, "goal_1", outcome="Completed successfully")

# Emergency stop
app.trigger_emergency_stop(assistant_id, "Too many errors", consecutive_errors=5)
app.clear_emergency_stop(assistant_id, cleared_by="admin")

# Tick completion
app.complete_tick(
    assistant_id,
    tick_number=42,
    duration_ms=250.0,
    action_taken="say",
    token_usage={"prompt": 1000, "completion": 50}
)

Time-Travel (Version Access)

# Get current state (latest version)
current = app.get_assistant(assistant_id)
print(f"Current version: {current.version}")

# Replay to earlier version
earlier = app.get_assistant_at_version(assistant_id, version=5)
print(f"Tool count at v5: {earlier.tool_count}")

Statistics

stats = app.get_assistant_stats(assistant_id)
# {
#     "assistant_key": "my_assistant",
#     "created_at": "2025-12-06T10:00:00+00:00",
#     "tool_count": 150,
#     "tool_stats": {"say": 50, "inspect_location": 30, ...},
#     "active_goals": 3,
#     "completed_goals": 12,
#     "reflection_count": 5,
#     "tick_count": 500,
#     "is_stopped": False,
#     "version": 175,
# }

3. Archive System

Export events to JSON and cleanup database (eventsourcing/archive.py).

Archive Workflow

# Step 1: Export events to JSON file
archive_path = app.archive_assistant(
    assistant_id,
    archive_dir="/path/to/archives",
    reason="manual_clear"
)
# → Records AssistantArchived tombstone event
# → Returns path to JSON file

# Step 2: Delete events from database
deleted_count = app.delete_assistant_events(assistant_id)
# → Removes key-to-id mapping
# → Deletes all events from DB

Archive File Format

{
    "assistant_key": "my_assistant",
    "aggregate_id": "550e8400-e29b-41d4-a716-446655440000",
    "exported_at": "2025-12-06T14:30:00+00:00",
    "event_count": 175,
    "events": [
        {
            "topic": "AssistantCreated",
            "timestamp": "2025-12-01T10:00:00+00:00",
            "data": {"assistant_key": "my_assistant", "config": {...}}
        },
        {
            "topic": "ToolExecuted",
            "timestamp": "2025-12-01T10:01:00+00:00",
            "data": {"tool_name": "say", "parameters": {...}, ...}
        },
        ...
    ]
}

4. Integration with AssistantScript

Events are recorded via record_eventsourcing_event() helper:

from evennia.contrib.base_systems.ai.helpers import record_eventsourcing_event

# In tool execution
record_eventsourcing_event(
    script,
    event="execute_tool",
    method="execute_tool",
    tool_name=tool.name,
    parameters=params,
    reasoning=reasoning,
    success=result.get("success"),
    execution_time_ms=duration_ms
)

# The helper:
# 1. Gets aggregate_id from script.db.aggregate_id
# 2. Calls app.execute_tool(...) if aggregate exists
# 3. Handles errors gracefully

Enabling Event Sourcing

# During assistant initialization
from evennia.contrib.base_systems.ai.eventsourcing import get_application

app = get_application()
aggregate_id, created = app.get_or_create_assistant(assistant_key)
script.db.aggregate_id = str(aggregate_id)

5. Use Cases

Audit Trail

Every action is recorded with timestamp and context:

# Query: What did the assistant do in the last hour?
# → Replay events from event store with time filter

Time-Travel Debugging

# Assistant behaving strangely? Check state at any point:
v10 = app.get_assistant_at_version(assistant_id, version=10)
v50 = app.get_assistant_at_version(assistant_id, version=50)

# Compare goal states
print(f"Goals at v10: {v10.goals}")
print(f"Goals at v50: {v50.goals}")

Metrics Computation

# Compute from event stream:
# - Tool usage patterns (tool_stats)
# - Goal completion rate
# - Average tick duration
# - Error frequency

Key Files

File Lines Purpose
eventsourcing/domain.py 16-71 AssistantAggregate state
eventsourcing/domain.py 76-121 Tool execution event
eventsourcing/domain.py 126-198 Goal events
eventsourcing/domain.py 203-220 Reflection event
eventsourcing/domain.py 225-251 Emergency stop events
eventsourcing/domain.py 256-276 Tick completion event
eventsourcing/domain.py 307-331 Archive tombstone event
eventsourcing/application.py 20-32 get_application() singleton
eventsourcing/application.py 34-166 Lifecycle methods
eventsourcing/application.py 184-341 Event recording methods
eventsourcing/application.py 346-373 Statistics query
eventsourcing/application.py 378-450 Archive and delete
eventsourcing/archive.py JSON export and DB cleanup

See also: Architecture-Core-Engine | Architecture-Self-Management | Architecture-Tool-System