1 Data Flow 08 LLM Provider Interaction
blightbow edited this page 2025-12-07 00:47:18 +00:00

Data Flow 08: LLM Provider Interaction

Engineering documentation series - Data flows in the AI Assistant system


Overview

This document describes the data flows for LLM API interactions, including:

  • Provider abstraction layer
  • Request building and message formatting
  • Retry logic with exponential backoff
  • Circuit breaker integration
  • Response parsing and error handling
Document Description
Architecture-Core-Engine Tick loop and ReAct execution
Data-Flow-02-ReAct-Loop How LLM calls fit into tick execution
Architecture-Tool-System Tool schemas sent to LLM

1. Provider Architecture

Supported Providers

Provider API Format Tool Support Token Counting
openai OpenAI Chat Completion Native tiktoken
anthropic Anthropic Messages Native tiktoken estimate
openrouter OpenAI-compatible Native tiktoken
ollama OpenAI-compatible Native heuristic
local Custom endpoint Varies heuristic

Class Hierarchy

┌─────────────────────────────────────────────────────────────────────────────┐
│ LLMProvider (Abstract Base)                                                 │
│ llm/providers.py                                                            │
│ ─────────────────────────────────────────────────────────────────────────── │
│ Methods:                                                                    │
│   - build_request(messages, tools, tool_choice, **kwargs)                   │
│   - build_headers(auth_token)                                               │
│   - parse_response(response_json, headers)                                  │
│   - parse_rate_limit_headers(headers)                                       │
│   - parse_error_response(error_json)                                        │
│   - count_tokens(text)                                                      │
│   - format_tool_result(tool_call_id, result)                                │
│                                                                             │
│ Properties:                                                                 │
│   - name: str                                                               │
│   - model: str                                                              │
│   - supports_tools: bool                                                    │
│   - get_api_url(): str                                                      │
└─────────────────────────────────────────────────────────────────────────────┘
                                   │
          ┌────────────────────────┼────────────────────────┐
          ▼                        ▼                        ▼
    OpenAIProvider          AnthropicProvider        OllamaProvider
    (OpenRouter uses this)

Factory Function

# llm/providers.py:get_provider()
def get_provider(name: str, api_url: str = None, model: str = None, **kwargs) -> LLMProvider:
    """Factory for creating provider instances."""
    providers = {
        "openai": OpenAIProvider,
        "anthropic": AnthropicProvider,
        "openrouter": OpenRouterProvider,  # Subclass of OpenAIProvider
        "ollama": OllamaProvider,
        "local": LocalProvider,
    }
    return providers[name](api_url, model, **kwargs)

2. Request Flow

Entry Point

assistant_script.py:at_tick()
  └─> _call_llm(messages)
        └─> llm_interaction.py:call_llm(script, messages)
              └─> UnifiedLLMClient.chat_completion(messages, tools, ...)

Client Initialization

┌─────────────────────────────────────────────────────────────────────────────┐
│ UnifiedLLMClient.__init__()                                                 │
│ llm/client.py lines 109-149                                                 │
└─────────────────────────────────────────────────────────────────────────────┘
                                   │
                                   ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ CLIENT CONFIGURATION                                                        │
│ ─────────────────────────────────────────────────────────────────────────── │
│ From script.db:                                                             │
│   llm_provider: "openai" | "anthropic" | "openrouter" | "ollama" | "local"  │
│   llm_api_url: Optional custom endpoint                                     │
│   llm_auth_token: API key                                                   │
│   llm_model: Model name (e.g., "gpt-4", "claude-3-opus")                    │
│   llm_app_name: OpenRouter X-Title header                                   │
│   llm_site_url: OpenRouter HTTP-Referer header                              │
│                                                                             │
│ Optional parameters:                                                        │
│   llm_temperature: Sampling temperature                                     │
│   llm_top_p: Nucleus sampling                                               │
│   llm_max_tokens: Response length limit                                     │
│   llm_reasoning_effort: For o1-style models                                 │
│   llm_extra_params: Arbitrary additional parameters                         │
└─────────────────────────────────────────────────────────────────────────────┘
                                   │
                                   ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ TWISTED HTTP SETUP                                                          │
│ ─────────────────────────────────────────────────────────────────────────── │
│ HTTPConnectionPool: Reuses connections across requests                      │
│ Agent: Makes async HTTP requests                                            │
│ QuietHTTP11ClientFactory: Suppresses noisy logs                             │
└─────────────────────────────────────────────────────────────────────────────┘

Request Building

┌─────────────────────────────────────────────────────────────────────────────┐
│ chat_completion(messages, tools, tool_choice, **kwargs)                     │
│ llm/client.py lines 203-243                                                 │
└─────────────────────────────────────────────────────────────────────────────┘
                                   │
                                   ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ PROVIDER BUILDS REQUEST                                                     │
│ ─────────────────────────────────────────────────────────────────────────── │
│ request_body = provider.build_request(messages, tools, tool_choice, **kw)   │
│                                                                             │
│ OpenAI format:                                                              │
│ {                                                                           │
│   "model": "gpt-4",                                                         │
│   "messages": [                                                             │
│     {"role": "system", "content": "..."},                                   │
│     {"role": "user", "content": "..."},                                     │
│     {"role": "assistant", "content": "...", "tool_calls": [...]},           │
│     {"role": "tool", "tool_call_id": "...", "content": "..."}               │
│   ],                                                                        │
│   "tools": [                                                                │
│     {"type": "function", "function": {"name": "...", "parameters": {...}}}  │
│   ],                                                                        │
│   "tool_choice": "auto",                                                    │
│   "temperature": 0.7                                                        │
│ }                                                                           │
└─────────────────────────────────────────────────────────────────────────────┘

3. Retry Logic

Configuration

@dataclass
class RetryConfig:
    max_attempts: int = 3
    base_delay: float = 1.0      # seconds
    max_delay: float = 30.0      # seconds
    exponential_base: float = 2.0
    retryable_status_codes: tuple = (429, 500, 502, 503, 504)

    def get_delay(self, attempt: int) -> float:
        """Calculate delay with exponential backoff + jitter."""
        delay = min(self.base_delay * (self.exponential_base ** attempt), self.max_delay)
        jitter = random.uniform(0, delay * 0.1)
        return delay + jitter

    def is_retryable(self, status_code: int) -> bool:
        return status_code in self.retryable_status_codes

Retry Flow

┌─────────────────────────────────────────────────────────────────────────────┐
│ _request_with_retry(request_body)                                           │
│ llm/client.py lines 245-315                                                 │
└─────────────────────────────────────────────────────────────────────────────┘
                                   │
                                   ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ CIRCUIT BREAKER CHECK                                                       │
│ ─────────────────────────────────────────────────────────────────────────── │
│ if circuit_breaker and not circuit_breaker.is_available:                    │
│   raise CircuitOpenError(name, retry_after)                                 │
│                                                                             │
│ Circuit breaker prevents cascading failures when service is unhealthy       │
└─────────────────────────────────────────────────────────────────────────────┘
                                   │
                                   ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ RETRY LOOP                                                                  │
│ ─────────────────────────────────────────────────────────────────────────── │
│ for attempt in range(max_attempts):  # default: 3                           │
│                                                                             │
│   try:                                                                      │
│     result = yield _make_request(request_body)                              │
│     status_code, response_bytes, headers = result                           │
│                                                                             │
│     if not is_retryable(status_code):                                       │
│       # Success (200) or non-retryable error (400, 401, 403)                │
│       if status_code == 200:                                                │
│         circuit_breaker.record_success()                                    │
│       return result                                                         │
│                                                                             │
│     # Retryable error (429, 500, 502, 503, 504)                             │
│     circuit_breaker.record_failure(Exception(f"HTTP {status_code}"))        │
│     last_error = f"HTTP {status_code}"                                      │
│                                                                             │
│   except Exception as e:                                                    │
│     circuit_breaker.record_failure(e)                                       │
│     last_error = str(e)                                                     │
│                                                                             │
│   # Wait before retry (except last attempt)                                 │
│   if attempt < max_attempts - 1:                                            │
│     delay = retry_config.get_delay(attempt)                                 │
│     yield task.deferLater(reactor, delay, lambda: None)                     │
│                                                                             │
│ # All retries exhausted                                                     │
│ log_err(f"LLM request failed after {max_attempts} attempts")                │
│ return (500, f"Request failed: {last_error}".encode(), {})                  │
└─────────────────────────────────────────────────────────────────────────────┘

Exponential Backoff Example

Attempt Base Delay Calculated Delay With Jitter (approx)
0 1.0s 1.0s 1.0-1.1s
1 1.0s 2.0s 2.0-2.2s
2 1.0s 4.0s 4.0-4.4s

4. HTTP Request Execution

┌─────────────────────────────────────────────────────────────────────────────┐
│ _make_request(request_body)                                                 │
│ llm/client.py lines 317-343                                                 │
└─────────────────────────────────────────────────────────────────────────────┘
                                   │
                                   ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ BUILD HEADERS                                                               │
│ ─────────────────────────────────────────────────────────────────────────── │
│ headers = provider.build_headers(auth_token)                                │
│                                                                             │
│ OpenAI headers:                                                             │
│   Authorization: Bearer sk-...                                              │
│   Content-Type: application/json                                            │
│                                                                             │
│ OpenRouter headers:                                                         │
│   Authorization: Bearer sk-...                                              │
│   Content-Type: application/json                                            │
│   HTTP-Referer: https://example.com (site_url)                              │
│   X-Title: My App (app_name)                                                │
│                                                                             │
│ Anthropic headers:                                                          │
│   x-api-key: sk-...                                                         │
│   anthropic-version: 2023-06-01                                             │
│   Content-Type: application/json                                            │
└─────────────────────────────────────────────────────────────────────────────┘
                                   │
                                   ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ TWISTED AGENT REQUEST                                                       │
│ ─────────────────────────────────────────────────────────────────────────── │
│ d = agent.request(                                                          │
│   b"POST",                                                                  │
│   provider.get_api_url().encode("utf-8"),                                   │
│   headers=Headers(headers),                                                 │
│   bodyProducer=StringProducer(json.dumps(request_body))                     │
│ )                                                                           │
│                                                                             │
│ response = yield d                                                          │
│ result = yield _read_response(response)                                     │
│ return result  # (status_code, body_bytes, headers_dict)                    │
└─────────────────────────────────────────────────────────────────────────────┘

5. Response Parsing

┌─────────────────────────────────────────────────────────────────────────────┐
│ _parse_response(status_code, response_bytes, headers)                       │
│ llm/client.py lines 369-422                                                 │
└─────────────────────────────────────────────────────────────────────────────┘
                                   │
                                   ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ RATE LIMIT TRACKING                                                         │
│ ─────────────────────────────────────────────────────────────────────────── │
│ rate_limit_info = provider.parse_rate_limit_headers(headers)                │
│                                                                             │
│ OpenAI headers parsed:                                                      │
│   x-ratelimit-limit-requests                                                │
│   x-ratelimit-remaining-requests                                            │
│   x-ratelimit-limit-tokens                                                  │
│   x-ratelimit-remaining-tokens                                              │
│   x-ratelimit-reset-requests                                                │
│   x-ratelimit-reset-tokens                                                  │
│                                                                             │
│ If approaching limit (remaining < 10% of limit):                            │
│   log_warn("Approaching rate limit: tokens=X, requests=Y")                  │
└─────────────────────────────────────────────────────────────────────────────┘
                                   │
                                   ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ ERROR HANDLING (status != 200)                                              │
│ ─────────────────────────────────────────────────────────────────────────── │
│ error_msg = provider.parse_error_response(error_json)                       │
│                                                                             │
│ Common errors:                                                              │
│   400: Invalid request (bad parameters)                                     │
│   401: Authentication failed                                                │
│   403: Permission denied                                                    │
│   429: Rate limited                                                         │
│   500+: Server errors (retryable)                                           │
│                                                                             │
│ return LLMResponse(                                                         │
│   content=f"Error: {error_msg}",                                            │
│   finish_reason="error",                                                    │
│   provider=provider.name,                                                   │
│   rate_limit_info=rate_limit_info                                           │
│ )                                                                           │
└─────────────────────────────────────────────────────────────────────────────┘
                                   │
                                   ▼ (status == 200)
┌─────────────────────────────────────────────────────────────────────────────┐
│ SUCCESS PARSING                                                             │
│ ─────────────────────────────────────────────────────────────────────────── │
│ response_json = json.loads(response_bytes)                                  │
│ return provider.parse_response(response_json, headers)                      │
│                                                                             │
│ Returns LLMResponse with:                                                   │
│   - content: Text response (reasoning)                                      │
│   - tool_calls: List[ToolCall] (if any)                                     │
│   - finish_reason: "stop", "tool_calls", "length", etc.                     │
│   - usage: {prompt_tokens, completion_tokens, total_tokens}                 │
│   - provider: Provider name                                                 │
│   - model: Model used                                                       │
│   - rate_limit_info: RateLimitInfo (if available)                           │
└─────────────────────────────────────────────────────────────────────────────┘

6. LLMResponse Structure

@dataclass
class LLMResponse:
    content: Optional[str] = None       # Text content
    tool_calls: List[ToolCall] = field(default_factory=list)
    finish_reason: str = "stop"         # "stop", "tool_calls", "length", "error"
    usage: Optional[dict] = None        # Token usage stats
    provider: str = ""                  # Provider name
    model: str = ""                     # Model name
    rate_limit_info: Optional[RateLimitInfo] = None

    @property
    def has_tool_calls(self) -> bool:
        return len(self.tool_calls) > 0

@dataclass
class ToolCall:
    id: str                  # Unique ID for tool result matching
    name: str                # Tool name
    arguments: dict          # Parsed arguments

7. Tool Call Parsing

┌─────────────────────────────────────────────────────────────────────────────┐
│ parse_tool_call(script, response)                                           │
│ llm_interaction.py lines 383-432                                            │
└─────────────────────────────────────────────────────────────────────────────┘
                                   │
          ┌────────────────────────┴────────────────────────┐
          ▼                                                 ▼
┌─────────────────────────────────┐   ┌─────────────────────────────────────┐
│ NATIVE TOOL CALLS               │   │ JSON FALLBACK                       │
│ (LLMResponse with tool_calls)   │   │ (Text response with embedded JSON)  │
│ ─────────────────────────────── │   │ ─────────────────────────────────── │
│ if response.has_tool_calls:     │   │ Used for non-tool-capable providers │
│   tc = response.tool_calls[0]   │   │                                     │
│   return {                      │   │ Expected format:                    │
│     "tool": tc.name,            │   │ {"tool": "name",                    │
│     "parameters": tc.arguments, │   │  "parameters": {...},               │
│     "reasoning": response.cont, │   │  "reasoning": "..."}                │
│     "tool_call_id": tc.id       │   │                                     │
│   }                             │   │ Attempts JSON extraction if not     │
│                                 │   │ pure JSON (handles wrapper text)    │
└─────────────────────────────────┘   └─────────────────────────────────────┘
                                   │
                                   ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ TEXT-ONLY RESPONSE (no tool call)                                           │
│ ─────────────────────────────────────────────────────────────────────────── │
│ if response.content and no tool_calls:                                      │
│   return {                                                                  │
│     "tool": "noop",        # LLM chose to think without acting              │
│     "parameters": {},                                                       │
│     "reasoning": response.content,                                          │
│     "tool_call_id": None                                                    │
│   }                                                                         │
└─────────────────────────────────────────────────────────────────────────────┘

8. Circuit Breaker Integration

┌─────────────────────────────────────────────────────────────────────────────┐
│ CircuitBreaker                                                              │
│ resilience.py                                                               │
│ ─────────────────────────────────────────────────────────────────────────── │
│ States:                                                                     │
│   CLOSED: Normal operation, requests pass through                           │
│   OPEN: Too many failures, requests blocked                                 │
│   HALF_OPEN: Testing if service recovered                                   │
│                                                                             │
│ Thresholds:                                                                 │
│   failure_threshold: 5      # Failures to open circuit                      │
│   recovery_timeout: 30s     # Time before testing recovery                  │
│   success_threshold: 2      # Successes to close circuit                    │
└─────────────────────────────────────────────────────────────────────────────┘
                                   │
                                   ▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ INTEGRATION POINTS                                                          │
│ ─────────────────────────────────────────────────────────────────────────── │
│ 1. Before request: Check if circuit is open                                 │
│    if not circuit_breaker.is_available:                                     │
│      raise CircuitOpenError(...)                                            │
│                                                                             │
│ 2. On success (HTTP 200):                                                   │
│    circuit_breaker.record_success()                                         │
│                                                                             │
│ 3. On failure (retryable errors, exceptions):                               │
│    circuit_breaker.record_failure(exception)                                │
│                                                                             │
│ Benefits:                                                                   │
│   - Prevents cascading failures                                             │
│   - Saves API quota during outages                                          │
│   - Provides fast failure when service is known-bad                         │
└─────────────────────────────────────────────────────────────────────────────┘

9. Token Counting

┌─────────────────────────────────────────────────────────────────────────────┐
│ Token Counting Methods                                                      │
│ ─────────────────────────────────────────────────────────────────────────── │
│                                                                             │
│ OpenAI/OpenRouter: tiktoken (accurate)                                      │
│   import tiktoken                                                           │
│   enc = tiktoken.encoding_for_model("gpt-4")                                │
│   count = len(enc.encode(text))                                             │
│                                                                             │
│ Anthropic: tiktoken estimate (close enough)                                 │
│   # Claude uses similar tokenization to GPT-4                               │
│                                                                             │
│ Ollama/Local: Heuristic fallback                                            │
│   # ~4 characters per token                                                 │
│   count = len(text) // 4                                                    │
│                                                                             │
│ Used for:                                                                   │
│   - Context window management                                               │
│   - Token advisory system (60%/80% thresholds)                              │
│   - Emergency compaction triggers                                           │
└─────────────────────────────────────────────────────────────────────────────┘

10. Key Files

File Purpose
llm/client.py UnifiedLLMClient, retry logic, HTTP execution
llm/providers.py LLMProvider ABC, OpenAI/Anthropic/Ollama implementations
llm/responses.py LLMResponse, ToolCall, RateLimitInfo, RetryConfig dataclasses
llm/__init__.py Public exports
llm_interaction.py High-level wrappers: call_llm, build_llm_messages, parse_tool_call
resilience.py CircuitBreaker implementation
utils/token_counter.py Centralized token counting with tiktoken

Document created: 2025-12-06