# U2 Architecture Design: LLM Cache Integration > Status: APPROVED — Design follows U1 architecture, minimal integration surface > Date: 2026-06-14 > Unit: U2 of P0 Production Hardening Plan --- ## 1. Design Goals 1. **Transparent injection**: Cache check happens inside `LLMGateway.chat()` without changing the public API 2. **Usage tracking on cache hits**: Cached requests record 0 cost to maintain usage query integrity 3. **Opt-in by default**: Cache disabled unless explicitly configured 4. **Stream exclusion**: `chat_stream()` is NOT cached in this iteration --- ## 2. Integration Point Analysis ### Current `LLMGateway.chat()` flow (gateway.py:34-121): ``` 1. _resolve_model_alias(model) → resolved_model 2. Check providers exist 3. Start telemetry span 4. _get_models_to_try(resolved_model) → models_to_try 5. For each model: a. _resolve_model(model_name) → (provider, actual_model) b. Build LLMRequest c. provider.chat(req) → response d. Break on success 6. Calculate cost 7. Record usage 8. Record telemetry 9. Return response ``` ### Cache insertion points: **Cache CHECK** (before step 5): After `LLMRequest` is constructed, before provider call. - Reason: All request normalization (alias resolution, model fallback list) has completed. - The `resolved_model` and `actual_model` are known, so the cache key is deterministic. **Cache WRITE** (after step 5d): After successful response, before usage tracking. - Reason: Response is validated (no exception thrown). Usage tracking needs to happen regardless of cache hit/miss. **Cache HIT usage tracking** (step 6-7): On cache hit, record usage with cost=0. --- ## 3. Modified Flow ```python async def chat(self, messages, model, agent_name="", task_type="", tools=None, tool_choice="auto", **kwargs): resolved_model = self._resolve_model_alias(model) # ... provider check, telemetry span setup ... # ── Cache check (NEW) ── cache_key = None query_embedding = None if self._cache is not None: from agentkit.llm.cache_key import generate_cache_key cache_key = generate_cache_key( model=resolved_model, messages=messages, temperature=kwargs.get("temperature", 0.7), tools=tools, tool_choice=tool_choice, max_tokens=kwargs.get("max_tokens", 2000), ) result = await self._cache.get(cache_key) if result.hit: # Record usage with 0 cost latency_ms = (time.monotonic() - start) * 1000 self._usage_tracker.record( agent_name=agent_name, model=result.response.model, usage=result.response.usage, cost=0.0, latency_ms=latency_ms, ) return result.response # Semantic match (only for temperature == 0) temperature = kwargs.get("temperature", 0.7) if temperature == 0 and self._embedder is not None: try: last_user_msg = next( (m["content"] for m in reversed(messages) if m.get("role") == "user"), "", ) if last_user_msg: query_embedding = await self._embedder.embed(last_user_msg) result = await self._cache.semantic_search(query_embedding) if result.hit: latency_ms = (time.monotonic() - start) * 1000 self._usage_tracker.record( agent_name=agent_name, model=result.response.model, usage=result.response.usage, cost=0.0, latency_ms=latency_ms, ) return result.response except Exception as e: logger.warning(f"Semantic cache search failed: {e}") # ── Normal provider call ── for model_name in models_to_try: # ... existing fallback loop ... # ── Cache write (NEW) ── if self._cache is not None and cache_key is not None: try: await self._cache.put(cache_key, response, query_embedding) except Exception as e: logger.warning(f"Cache write failed: {e}") # ... existing usage tracking, telemetry ... return response ``` --- ## 4. CacheConfig Design ```python @dataclass class CacheConfig: """LLM Cache configuration.""" enabled: bool = False backend: str = "auto" # "auto" | "redis" | "memory" redis_url: str = "redis://localhost:6379" exact_ttl: int = 3600 semantic_ttl: int = 86400 similarity_threshold: float = 0.92 max_entries: int = 10000 # Embedding config for semantic cache embedding_provider: str = "openai" # "openai" | "xinference" | "local" embedding_model: str = "bge-m3" # model name at provider embedding_base_url: str | None = None embedding_api_key: str | None = None ``` **Nesting**: `CacheConfig` is nested under `LLMConfig.cache`. ```python @dataclass class LLMConfig: providers: dict[str, ProviderConfig] = field(default_factory=dict) model_aliases: dict[str, str] = field(default_factory=dict) fallbacks: dict[str, list[str]] = field(default_factory=dict) cache: CacheConfig | None = None # NEW ``` --- ## 5. LLMGateway Constructor Change ```python class LLMGateway: def __init__(self, config: LLMConfig | None = None): self._providers: dict[str, LLMProvider] = {} self._usage_tracker = UsageTracker() self._config = config or LLMConfig() # Cache (NEW) self._cache: LLMCache | None = None self._embedder: Embedder | None = None if self._config.cache and self._config.cache.enabled: from agentkit.llm.cache import create_llm_cache self._cache = create_llm_cache( backend=self._config.cache.backend, redis_url=self._config.cache.redis_url, max_entries=self._config.cache.max_entries, exact_ttl=self._config.cache.exact_ttl, semantic_ttl=self._config.cache.semantic_ttl, similarity_threshold=self._config.cache.similarity_threshold, ) # Embedder for semantic cache self._embedder = self._create_embedder(self._config.cache) ``` **Design Decision**: Cache and embedder are created in `__init__`, not lazily. This ensures configuration errors are caught at startup, not at first request. --- ## 6. Embedder Factory Method ```python def _create_embedder(self, cache_config: CacheConfig) -> Embedder | None: """Create embedder for semantic cache based on config.""" try: if cache_config.embedding_provider == "openai": from agentkit.memory.embedder import OpenAIEmbedder return OpenAIEmbedder( api_key=cache_config.embedding_api_key, model=cache_config.embedding_model, base_url=cache_config.embedding_base_url, ) elif cache_config.embedding_provider in ("xinference", "local"): # Xinference/TEI uses OpenAI-compatible API from agentkit.memory.embedder import OpenAIEmbedder return OpenAIEmbedder( api_key=cache_config.embedding_api_key or "not-needed", model=cache_config.embedding_model, base_url=cache_config.embedding_base_url or "http://localhost:9997/v1", ) except Exception as e: logger.warning(f"Failed to create embedder for semantic cache: {e}") return None ``` **Design Decision**: Use `OpenAIEmbedder` for all providers since Xinference and TEI expose OpenAI-compatible `/embeddings` endpoints. No need for a separate XinferenceEmbedder class. --- ## 7. Stream Handling `chat_stream()` is NOT cached in this iteration. Document as known limitation. **Reasoning**: - Streaming requires collecting all chunks before caching, adding latency - Chunk collection adds complexity (error handling mid-stream, partial responses) - Most cacheable requests (temperature=0, simple queries) don't need streaming - Streaming is typically used for long-form generation where caching is less beneficial --- ## 8. Edge Cases | Edge Case | Behavior | |-----------|----------| | Cache disabled (default) | No cache check, no performance impact | | Cache enabled, first request | Cache miss, provider called, response cached | | Cache hit with tool_calls | Return cached response including tool_calls | | Embedder fails during semantic search | Log warning, return miss, proceed to provider | | Cache write fails | Log warning, response still returned to caller | | Fallback model used | Cache key uses `resolved_model`, not `actual_model` — same query hits cache regardless of which fallback responded | **Fallback model cache key issue**: When model A fails and fallback model B responds, the cache key is based on `resolved_model` (the alias), not `actual_model` (B). This means a subsequent request for the same alias will get a cache hit even if model A is back online. This is **correct behavior** — the user asked for the alias, not a specific model. However, if the user explicitly specifies model B (not an alias), the cache key will be different. This is also correct — different model = different cache entry. --- ## 9. Test Strategy ### Integration Tests (`tests/unit/test_gateway_cache.py`) 1. **test_cache_disabled**: Requests pass through to provider normally 2. **test_cache_enabled_first_request**: Cache miss, provider called, response cached 3. **test_cache_enabled_second_request**: Cache hit, provider NOT called 4. **test_cache_hit_usage_tracking**: Usage record has 0 cost, correct token counts 5. **test_cache_miss_fallback**: Primary model fails, fallback response cached 6. **test_config_from_dict**: `LLMConfig.from_dict({"cache": {"enabled": True}})` works 7. **test_semantic_cache_hit**: temperature=0, semantically similar query hits cache 8. **test_semantic_cache_skipped_for_nonzero_temp**: temperature>0 skips semantic search --- ## 10. Argumentation Summary | Design Choice | Alternatives Considered | Why This Choice | |--------------|------------------------|----------------| | Cache check after LLMRequest construction | Before construction | Request normalization must complete first; key depends on resolved model | | Cache write before usage tracking | After usage tracking | Response must be cached before tracking so cache-hit tracking uses same response | | OpenAIEmbedder for all providers | Separate XinferenceEmbedder | Xinference/TEI use OpenAI-compatible API; no need for separate class | | No stream caching | Collect chunks then cache | Adds latency and complexity; most cacheable requests don't need streaming | | Cache key uses resolved_model alias | Uses actual_model | User requests alias, not specific model; cache should be model-agnostic within alias |