Page:
Architecture Event Sourcing
Pages
Architecture Commands and API
Architecture Context System
Architecture Core Engine
Architecture Event Sourcing
Architecture Generative Reflection
Architecture Helpers
Architecture Journal System
Architecture LLM Interaction
Architecture LLM Providers
Architecture Logging
Architecture Memory and Sleep
Architecture Overview
Architecture Persona Protection
Architecture Prompt System
Architecture RAG Implementation
Architecture Resilience System
Architecture Safety System
Architecture Self Management
Architecture Sub Agent Delegation
Architecture Task Assessment
Architecture Token Management
Architecture Tool System
Configuration Reference
Context and Memory Flow Analysis
Data Flow 01 Context Compaction
Data Flow 02 ReAct Loop
Data Flow 03 Memory Consolidation
Data Flow 04 Message Classification
Data Flow 05 Entity Profile System
Data Flow 06 Tool Execution
Data Flow 07 Sleep Mode Transitions
Data Flow 08 LLM Provider Interaction
Data Flow 09 Self Management Operations
Home
LLM Decision Patterns
Research Foundations
User Guide 00 Index
User Guide 01 Getting Started
User Guide 02 Configuration and Customization
User Guide 03 Advanced Capabilities
User Guide 04 Troubleshooting
No results
1
Architecture Event Sourcing
blightbow edited this page 2025-12-08 04:15:26 +00:00
Architecture: Event Sourcing
Infrastructure - Audit Trail and Time-Travel Debugging
Overview
The event sourcing system captures all assistant state changes as immutable events:
- AssistantAggregate - Domain model with event-decorated methods
- AssistantApplication - Application layer for persistence and queries
- Archive system - Export to JSON and database cleanup
Built on the eventsourcing library with Django persistence via eventsourcing-django.
1. AssistantAggregate
Domain model capturing all state changes as events (eventsourcing/domain.py).
Events
| Event | Trigger | Data Captured |
|---|---|---|
AssistantCreated |
create() |
assistant_key, config |
ToolExecuted |
execute_tool() |
tool_name, parameters, reasoning, success, error |
GoalCreated |
create_goal() |
goal_id, description, priority, parent_id |
GoalUpdated |
update_goal() |
goal_id, status, priority, metadata |
GoalCompleted |
complete_goal() |
goal_id, outcome |
ReflectionRecorded |
record_reflection() |
content, tags, context |
EmergencyStopTriggered |
trigger_emergency_stop() |
reason, consecutive_errors |
EmergencyStopCleared |
clear_emergency_stop() |
cleared_by |
TickCompleted |
complete_tick() |
tick_number, duration_ms, action_taken |
ConfigurationChanged |
change_configuration() |
key, old_value, new_value |
AssistantArchived |
archive() |
reason, archive_path, final_stats |
Aggregate State
class AssistantAggregate(Aggregate):
def __init__(self):
self.assistant_key: str = ""
self.created_at: Optional[datetime] = None
# Tool tracking
self.tool_count: int = 0
self.last_tool: Optional[str] = None
self.tool_stats: Dict[str, int] = {} # tool_name → count
# Goal tracking
self.goals: Dict[str, Dict] = {} # goal_id → goal_data
self.completed_goals: int = 0
# Reflection tracking
self.reflection_count: int = 0
# Emergency stop state
self.is_stopped: bool = False
self.stop_reason: Optional[str] = None
# Tick tracking
self.tick_count: int = 0
# Archive state (tombstone)
self.is_archived: bool = False
Event-Decorated Methods
@event("ToolExecuted")
def execute_tool(
self,
tool_name: str,
parameters: Dict[str, Any],
reasoning: str,
success: bool = True,
error: Optional[str] = None,
execution_time_ms: Optional[float] = None,
):
# State mutations happen here
self.tool_count += 1
self.last_tool = tool_name
self.tool_stats[tool_name] = self.tool_stats.get(tool_name, 0) + 1
Each @event method:
- Captures method call as immutable event
- Stores event in event store
- Applies state mutations to aggregate
2. AssistantApplication
Application layer for interacting with aggregates (eventsourcing/application.py).
Getting the Application
from evennia.contrib.base_systems.ai.eventsourcing import get_application
app = get_application() # Singleton instance
Assistant Lifecycle
# Create new assistant
assistant_id = app.create_assistant("my_assistant", config={...})
# Get current state
assistant = app.get_assistant(assistant_id)
# Get by key
assistant = app.get_assistant_by_key("my_assistant")
# Get or create
assistant_id, created = app.get_or_create_assistant("my_assistant")
# Register existing (after server restart)
app.register_existing_assistant("my_assistant", aggregate_id)
Recording Events
# Tool execution
app.execute_tool(
assistant_id,
tool_name="inspect_location",
parameters={"target": "room"},
reasoning="Checking surroundings",
success=True,
execution_time_ms=150.0
)
# Goal lifecycle
app.create_goal(assistant_id, "goal_1", "Build tavern", priority=1)
app.update_goal(assistant_id, "goal_1", status="blocked")
app.complete_goal(assistant_id, "goal_1", outcome="Completed successfully")
# Emergency stop
app.trigger_emergency_stop(assistant_id, "Too many errors", consecutive_errors=5)
app.clear_emergency_stop(assistant_id, cleared_by="admin")
# Tick completion
app.complete_tick(
assistant_id,
tick_number=42,
duration_ms=250.0,
action_taken="say",
token_usage={"prompt": 1000, "completion": 50}
)
Time-Travel (Version Access)
# Get current state (latest version)
current = app.get_assistant(assistant_id)
print(f"Current version: {current.version}")
# Replay to earlier version
earlier = app.get_assistant_at_version(assistant_id, version=5)
print(f"Tool count at v5: {earlier.tool_count}")
Statistics
stats = app.get_assistant_stats(assistant_id)
# {
# "assistant_key": "my_assistant",
# "created_at": "2025-12-06T10:00:00+00:00",
# "tool_count": 150,
# "tool_stats": {"say": 50, "inspect_location": 30, ...},
# "active_goals": 3,
# "completed_goals": 12,
# "reflection_count": 5,
# "tick_count": 500,
# "is_stopped": False,
# "version": 175,
# }
3. Archive System
Export events to JSON and cleanup database (eventsourcing/archive.py).
Archive Workflow
# Step 1: Export events to JSON file
archive_path = app.archive_assistant(
assistant_id,
archive_dir="/path/to/archives",
reason="manual_clear"
)
# → Records AssistantArchived tombstone event
# → Returns path to JSON file
# Step 2: Delete events from database
deleted_count = app.delete_assistant_events(assistant_id)
# → Removes key-to-id mapping
# → Deletes all events from DB
Archive File Format
{
"assistant_key": "my_assistant",
"aggregate_id": "550e8400-e29b-41d4-a716-446655440000",
"exported_at": "2025-12-06T14:30:00+00:00",
"event_count": 175,
"events": [
{
"topic": "AssistantCreated",
"timestamp": "2025-12-01T10:00:00+00:00",
"data": {"assistant_key": "my_assistant", "config": {...}}
},
{
"topic": "ToolExecuted",
"timestamp": "2025-12-01T10:01:00+00:00",
"data": {"tool_name": "say", "parameters": {...}, ...}
},
...
]
}
4. Integration with AssistantScript
Events are recorded via record_eventsourcing_event() helper:
from evennia.contrib.base_systems.ai.helpers import record_eventsourcing_event
# In tool execution
record_eventsourcing_event(
script,
event="execute_tool",
method="execute_tool",
tool_name=tool.name,
parameters=params,
reasoning=reasoning,
success=result.get("success"),
execution_time_ms=duration_ms
)
# The helper:
# 1. Gets aggregate_id from script.db.aggregate_id
# 2. Calls app.execute_tool(...) if aggregate exists
# 3. Handles errors gracefully
Enabling Event Sourcing
# During assistant initialization
from evennia.contrib.base_systems.ai.eventsourcing import get_application
app = get_application()
aggregate_id, created = app.get_or_create_assistant(assistant_key)
script.db.aggregate_id = str(aggregate_id)
5. Use Cases
Audit Trail
Every action is recorded with timestamp and context:
# Query: What did the assistant do in the last hour?
# → Replay events from event store with time filter
Time-Travel Debugging
# Assistant behaving strangely? Check state at any point:
v10 = app.get_assistant_at_version(assistant_id, version=10)
v50 = app.get_assistant_at_version(assistant_id, version=50)
# Compare goal states
print(f"Goals at v10: {v10.goals}")
print(f"Goals at v50: {v50.goals}")
Metrics Computation
# Compute from event stream:
# - Tool usage patterns (tool_stats)
# - Goal completion rate
# - Average tick duration
# - Error frequency
Key Files
| File | Lines | Purpose |
|---|---|---|
eventsourcing/domain.py |
16-71 | AssistantAggregate state |
eventsourcing/domain.py |
76-121 | Tool execution event |
eventsourcing/domain.py |
126-198 | Goal events |
eventsourcing/domain.py |
203-220 | Reflection event |
eventsourcing/domain.py |
225-251 | Emergency stop events |
eventsourcing/domain.py |
256-276 | Tick completion event |
eventsourcing/domain.py |
307-331 | Archive tombstone event |
eventsourcing/application.py |
20-32 | get_application() singleton |
eventsourcing/application.py |
34-166 | Lifecycle methods |
eventsourcing/application.py |
184-341 | Event recording methods |
eventsourcing/application.py |
346-373 | Statistics query |
eventsourcing/application.py |
378-450 | Archive and delete |
eventsourcing/archive.py |
— | JSON export and DB cleanup |
See also: Architecture-Core-Engine | Architecture-Self-Management | Architecture-Tool-System