diff --git a/agentkit.yaml b/agentkit.yaml index ed77b24..9588b70 100644 --- a/agentkit.yaml +++ b/agentkit.yaml @@ -40,6 +40,17 @@ llm: # not `git checkout .` which would wipe unrelated changes). rollback: default_timeout: 30.0 +# G7/U3: Three-tier fallback chain at chat REST send_message. +# main → Recovery (ReflexionEngine retry) → Emergency (rule-based classifier). +# Wired only at chat REST path (KTD5); CLI / ReWOO / Reflexion internal +# ReAct calls bypass the chain (no recursive loop). +fallback_chain: + enabled: true + recovery: + enabled: true + max_retries: 1 # ReflexionEngine max_reflections override + emergency: + enabled: true session: {backend: memory} bus: {backend: memory} task_store: {backend: memory} diff --git a/src/agentkit/server/_fallback_chain.py b/src/agentkit/server/_fallback_chain.py new file mode 100644 index 0000000..48d080f --- /dev/null +++ b/src/agentkit/server/_fallback_chain.py @@ -0,0 +1,199 @@ +"""G7/U3 — Three-tier fallback chain (main → Recovery → Emergency). + +Wired at chat.py REST send_message endpoint. Composes U2's EmergencyRules +with existing ReflexionEngine for the Recovery layer. + +Scope (KTD5): Only the chat REST path is wrapped. CLI / ReWOO / Reflexion +internal ReAct calls are NOT wrapped (would create recursive loop). +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from typing import Any + +from agentkit.core.exceptions import ( + LLMProviderError, + LoopDetectedError, + TaskCancelledError, + TaskTimeoutError, +) +from agentkit.core.fallback import EmergencyError, EmergencyRules +from agentkit.core.react import ReActEngine, ReActResult +from agentkit.core.reflexion import ReflexionEngine +from agentkit.llm.gateway import LLMGateway + +logger = logging.getLogger(__name__) + + +# ReActResult.status values that indicate soft failure → trigger Recovery. +# "success" is the only clean-pass; everything else is fallback-worthy. +_SOFT_FAILURE_STATUSES = frozenset({"empty_fallback", "verify_failed", "timeout"}) + + +@dataclass +class ChatExecutionResult: + """Wrapper produced by execute_with_fallback_chain. + + Carries a ReActResult-like ``output`` field plus an optional + ``error_struct`` (set only when Emergency tier fires). The chat + handler reads ``.output`` for the assistant reply and ``.error_struct`` + for the optional structured error payload. + """ + + output: str + status: str # "success" | "recovered" | "emergency" + error_struct: dict[str, Any] | None = None + trajectory: list[Any] = field(default_factory=list) + total_steps: int = 0 + total_tokens: int = 0 + fallback_strategy: str | None = None + + +def _react_to_chat_result(react: ReActResult) -> ChatExecutionResult: + return ChatExecutionResult( + output=react.output, + status="success", + trajectory=react.trajectory, + total_steps=react.total_steps, + total_tokens=react.total_tokens, + fallback_strategy=react.fallback_strategy, + ) + + +def _reflexion_to_chat_result(reflexion_result: Any) -> ChatExecutionResult: + """Best-effort conversion from ReflexionResult to ChatExecutionResult.""" + output = getattr(reflexion_result, "output", None) or getattr( + reflexion_result, "final_answer", "" + ) + return ChatExecutionResult( + output=output or "", + status="recovered", + trajectory=getattr(reflexion_result, "trajectory", []) or [], + total_steps=getattr(reflexion_result, "total_steps", 0), + total_tokens=getattr(reflexion_result, "total_tokens", 0), + fallback_strategy="reflexion_recovery", + ) + + +def _to_emergency(exc: Exception, config: dict | None) -> ChatExecutionResult: + emergency: EmergencyError = EmergencyRules.classify(exc, config) + return ChatExecutionResult( + output=emergency.to_error_message(), + status="emergency", + error_struct=emergency.to_dict(), + fallback_strategy="emergency", + ) + + +async def execute_with_fallback_chain( + *, + react_engine: ReActEngine, + llm_gateway: LLMGateway, + messages: list[dict[str, str]], + tools: list[Any] | None, + model: str, + agent_name: str, + system_prompt: str | None, + fallback_chain_config: dict | None = None, +) -> ChatExecutionResult: + """Three-tier fallback chain: Main → Recovery (ReflexionEngine) → Emergency. + + KTD5: only this entry point wraps the chain. ReflexionEngine's internal + ReAct call bypasses the chain (no recursive loop possible). + + Returns ChatExecutionResult with status: + - "success": main agent succeeded + - "recovered": main failed, ReflexionEngine recovery succeeded + - "emergency": main failed, recovery failed/exhausted, Emergency layer fired + """ + config = fallback_chain_config or {} + recovery_cfg = config.get("recovery", {}) if isinstance(config, dict) else {} + emergency_cfg = config.get("emergency", {}) if isinstance(config, dict) else {} + recovery_enabled = recovery_cfg.get("enabled", True) if isinstance(recovery_cfg, dict) else True + emergency_enabled = ( + emergency_cfg.get("enabled", True) if isinstance(emergency_cfg, dict) else True + ) + max_reflections = recovery_cfg.get("max_retries", 1) if isinstance(recovery_cfg, dict) else 1 + + # ── Tier 1: Main ────────────────────────────────────────────── + main_exc: Exception | None = None + try: + result = await react_engine.execute( + messages=messages, + tools=tools, + model=model, + agent_name=agent_name, + system_prompt=system_prompt, + ) + if result.status == "success": + return _react_to_chat_result(result) + # Soft failure (empty_fallback / verify_failed / timeout) → trigger Recovery + if result.status in _SOFT_FAILURE_STATUSES: + main_exc = AgentSoftFailureError( + f"main agent status={result.status}: {result.output[:200]}" + ) + else: + # Unknown status — treat as success-like (don't trigger recovery) + return _react_to_chat_result(result) + except TaskCancelledError: + # KTD3: TaskCancelledError propagates as-is, NOT routed to Emergency. + raise + except (TaskTimeoutError, LoopDetectedError, LLMProviderError) as exc: + main_exc = exc + except Exception as exc: # noqa: BLE001 - last-resort catch for Emergency routing + main_exc = exc + + # ── Tier 2: Recovery (ReflexionEngine) ──────────────────────── + if recovery_enabled and main_exc is not None: + try: + reflexion = ReflexionEngine( + llm_gateway=llm_gateway, + max_reflections=max_reflections, + ) + recovery_result = await reflexion.execute( + messages=messages, + tools=tools, + model=model, + agent_name=agent_name, + system_prompt=system_prompt, + ) + # Recovery succeeds if Reflexion reports success or produces output. + recovery_status = getattr(recovery_result, "status", "") + if recovery_status == "success" or getattr(recovery_result, "output", None): + return _reflexion_to_chat_result(recovery_result) + logger.warning( + f"Recovery layer did not succeed (status={recovery_status}), " + f"falling through to Emergency" + ) + except TaskCancelledError: + raise + except Exception as recovery_exc: # noqa: BLE001 + logger.warning(f"Recovery layer raised: {recovery_exc}; falling through to Emergency") + + # ── Tier 3: Emergency ───────────────────────────────────────── + if not emergency_enabled: + # Re-raise original exception if Emergency disabled. + if main_exc is not None: + raise main_exc + # No exception but no success either — synthesise an emergency-style result. + return ChatExecutionResult( + output="Agent 未返回有效结果且 Emergency 层已禁用。", + status="emergency", + fallback_strategy="emergency_disabled", + ) + + # main_exc may be None if main returned soft-failure status without raising. + # Synthesize a generic exception for Emergency classification. + exc_for_emergency = main_exc or AgentSoftFailureError("soft failure without exception") + return _to_emergency(exc_for_emergency, config) + + +class AgentSoftFailureError(Exception): + """Internal marker — main agent returned a soft-failure status without raising. + + Used to feed the Emergency classifier when main status was e.g. + ``empty_fallback`` (no exception raised, but result not usable). + Classified as ``internal_error`` by EmergencyRules (generic fallback). + """ diff --git a/src/agentkit/server/config.py b/src/agentkit/server/config.py index 59ce483..fcfe979 100644 --- a/src/agentkit/server/config.py +++ b/src/agentkit/server/config.py @@ -120,6 +120,7 @@ class ServerConfig: streaming: dict[str, Any] | None = None, verification: dict[str, Any] | None = None, rollback: dict[str, Any] | None = None, + fallback_chain: dict[str, Any] | None = None, on_change: Callable[["ServerConfig"], None] | None = None, ): self.host = host @@ -157,6 +158,9 @@ class ServerConfig: # G9/U4: rollback.default_timeout 控制 RollbackExecutor subprocess 超时 # PlanPhase.rollback_command 未设置时此配置无效 (KTD6 opt-in) self.rollback = rollback or {} + # G7/U3: fallback_chain.{recovery,emergency}.{enabled,max_retries} + # controls three-tier chain at chat.py REST send_message (KTD5). + self.fallback_chain = fallback_chain or {} self.on_change = on_change # Config watching state @@ -246,6 +250,8 @@ class ServerConfig: verification_data = data.get("verification", {}) # G9/U4: rollback 配置 (从 YAML 读取,opt-in) rollback_data = data.get("rollback", {}) + # G7/U3: fallback_chain 配置 (从 YAML 读取) + fallback_chain_data = data.get("fallback_chain", {}) return cls( host=server.get("host", "0.0.0.0"), @@ -278,6 +284,7 @@ class ServerConfig: streaming=streaming_data, verification=verification_data, rollback=rollback_data, + fallback_chain=fallback_chain_data, ) @staticmethod diff --git a/src/agentkit/server/routes/chat.py b/src/agentkit/server/routes/chat.py index d0bc489..6737740 100644 --- a/src/agentkit/server/routes/chat.py +++ b/src/agentkit/server/routes/chat.py @@ -27,6 +27,7 @@ from pydantic import BaseModel from agentkit.chat.skill_routing import ExecutionMode from agentkit.core.protocol import CancellationToken from agentkit.core.react import ReActEngine +from agentkit.server._fallback_chain import execute_with_fallback_chain from agentkit.session.manager import SessionManager from agentkit.session.models import MessageRole, SessionStatus @@ -610,7 +611,15 @@ async def send_message(session_id: str, request: SendMessageRequest, req: Reques system_prompt = getattr(agent, "_system_prompt", None) or ( agent.get_system_prompt() if hasattr(agent, "get_system_prompt") else None ) - result = await react_engine.execute( + # G7/U3: Three-tier fallback chain (main → Recovery → Emergency). + # Wired only here (KTD5); CLI / ReWOO / Reflexion internal ReAct bypass. + server_config = getattr(req.app.state, "server_config", None) + fallback_chain_cfg = ( + getattr(server_config, "fallback_chain", None) if server_config else None + ) + chat_result = await execute_with_fallback_chain( + react_engine=react_engine, + llm_gateway=req.app.state.llm_gateway, messages=chat_messages, tools=tools, model=agent.get_model() @@ -618,16 +627,26 @@ async def send_message(session_id: str, request: SendMessageRequest, req: Reques else getattr(agent, "_llm_model", "default"), agent_name=agent.name, system_prompt=system_prompt, + fallback_chain_config=fallback_chain_cfg, ) # Append assistant reply assistant_msg = await sm.append_message( session_id=session_id, role=MessageRole.ASSISTANT, - content=result.output if hasattr(result, "output") else str(result), + content=chat_result.output, agent_name=agent.name, ) - return _message_to_response(assistant_msg) + response = _message_to_response(assistant_msg) + # Attach structured error payload when Emergency tier fired. + if chat_result.error_struct is not None: + response_dict = ( + response.model_dump() if hasattr(response, "model_dump") else dict(response) + ) + response_dict["error_struct"] = chat_result.error_struct + response_dict["fallback_status"] = chat_result.status + return response_dict + return response except Exception as e: logger.error(f"Chat execution error for session {session_id}: {e}") diff --git a/tests/unit/test_fallback_chain.py b/tests/unit/test_fallback_chain.py new file mode 100644 index 0000000..0ada39d --- /dev/null +++ b/tests/unit/test_fallback_chain.py @@ -0,0 +1,404 @@ +"""G7/U3 — Three-tier fallback chain wiring tests. + +Verifies Main → Recovery (ReflexionEngine) → Emergency (EmergencyRules) +at chat REST path. Mocks ReActEngine + ReflexionEngine + LLMGateway. +""" + +from __future__ import annotations + +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from agentkit.core.exceptions import ( + LLMProviderError, + LoopDetectedError, + TaskCancelledError, + TaskTimeoutError, +) +from agentkit.core.react import ReActResult +from agentkit.server._fallback_chain import execute_with_fallback_chain + + +def _make_react_result(status: str = "success", output: str = "ok") -> ReActResult: + return ReActResult( + output=output, + trajectory=[], + total_steps=1, + total_tokens=10, + status=status, + ) + + +def _make_react_engine(result=None, raises=None): + """Build a fake ReActEngine with .execute returning result or raising.""" + engine = MagicMock() + engine.reset = MagicMock() + if raises is not None: + engine.execute = AsyncMock(side_effect=raises) + else: + engine.execute = AsyncMock(return_value=result or _make_react_result()) + return engine + + +def _make_llm_gateway(): + gw = MagicMock() + gw.chat = AsyncMock(return_value=MagicMock(content="recovered")) + return gw + + +def _make_reflexion_result(status: str = "success", output: str = "recovered"): + """Synthesize a ReflexionResult-like object.""" + return MagicMock( + status=status, + output=output, + trajectory=[], + total_steps=1, + total_tokens=5, + ) + + +@pytest.fixture +def patched_reflexion(monkeypatch): + """Patch ReflexionEngine used inside the chain to a controllable mock.""" + from agentkit.server import _fallback_chain + + instances: list[MagicMock] = [] + + class _MockReflexion: + def __init__(self, llm_gateway, max_reflections=1, **kwargs): + self._llm_gateway = llm_gateway + self._max_reflections = max_reflections + self.execute = AsyncMock(return_value=_make_reflexion_result()) + instances.append(self) + + monkeypatch.setattr(_fallback_chain, "ReflexionEngine", _MockReflexion) + return instances + + +# ─── Tier 1: Main ───────────────────────────────────────────────────────── + + +class TestMainTier: + @pytest.mark.asyncio + async def test_main_success_no_recovery_no_emergency(self): + engine = _make_react_engine(result=_make_react_result(status="success", output="hello")) + result = await execute_with_fallback_chain( + react_engine=engine, + llm_gateway=_make_llm_gateway(), + messages=[{"role": "user", "content": "hi"}], + tools=[], + model="default", + agent_name="a", + system_prompt=None, + ) + assert result.status == "success" + assert result.output == "hello" + assert result.error_struct is None + + @pytest.mark.asyncio + async def test_main_unknown_status_treated_as_success(self): + """Unknown status (not in soft_failure set) is treated as success-like.""" + engine = _make_react_engine(result=_make_react_result(status="partial", output="x")) + result = await execute_with_fallback_chain( + react_engine=engine, + llm_gateway=_make_llm_gateway(), + messages=[{"role": "user", "content": "hi"}], + tools=[], + model="default", + agent_name="a", + system_prompt=None, + ) + assert result.status == "success" + + +# ─── Tier 2: Recovery ────────────────────────────────────────────────────── + + +class TestRecoveryTier: + @pytest.mark.asyncio + async def test_main_timeout_triggers_recovery_success(self, patched_reflexion): + engine = _make_react_engine(raises=TaskTimeoutError(task_id="t1", timeout_seconds=10)) + result = await execute_with_fallback_chain( + react_engine=engine, + llm_gateway=_make_llm_gateway(), + messages=[{"role": "user", "content": "hi"}], + tools=[], + model="default", + agent_name="a", + system_prompt=None, + ) + assert result.status == "recovered" + assert result.output == "recovered" + # ReflexionEngine was instantiated and called + assert len(patched_reflexion) == 1 + patched_reflexion[0].execute.assert_awaited_once() + + @pytest.mark.asyncio + async def test_main_loop_detected_triggers_recovery(self, patched_reflexion): + engine = _make_react_engine(raises=LoopDetectedError(tool_name="search", repetitions=5)) + result = await execute_with_fallback_chain( + react_engine=engine, + llm_gateway=_make_llm_gateway(), + messages=[{"role": "user", "content": "hi"}], + tools=[], + model="default", + agent_name="a", + system_prompt=None, + ) + assert result.status == "recovered" + + @pytest.mark.asyncio + async def test_main_llm_provider_error_triggers_recovery(self, patched_reflexion): + engine = _make_react_engine(raises=LLMProviderError(provider="openai", reason="503")) + result = await execute_with_fallback_chain( + react_engine=engine, + llm_gateway=_make_llm_gateway(), + messages=[{"role": "user", "content": "hi"}], + tools=[], + model="default", + agent_name="a", + system_prompt=None, + ) + assert result.status == "recovered" + + @pytest.mark.asyncio + async def test_main_soft_failure_status_triggers_recovery(self, patched_reflexion): + """Soft failure (empty_fallback) without exception still triggers Recovery.""" + engine = _make_react_engine(result=_make_react_result(status="empty_fallback", output="")) + result = await execute_with_fallback_chain( + react_engine=engine, + llm_gateway=_make_llm_gateway(), + messages=[{"role": "user", "content": "hi"}], + tools=[], + model="default", + agent_name="a", + system_prompt=None, + ) + assert result.status == "recovered" + + @pytest.mark.asyncio + async def test_recovery_disabled_skips_to_emergency(self): + engine = _make_react_engine(raises=TaskTimeoutError(task_id="t1", timeout_seconds=10)) + result = await execute_with_fallback_chain( + react_engine=engine, + llm_gateway=_make_llm_gateway(), + messages=[{"role": "user", "content": "hi"}], + tools=[], + model="default", + agent_name="a", + system_prompt=None, + fallback_chain_config={"recovery": {"enabled": False}}, + ) + assert result.status == "emergency" + assert result.error_struct["error_code"] == "timeout" + + @pytest.mark.asyncio + async def test_recovery_failure_falls_through_to_emergency(self, patched_reflexion): + """Recovery raises → Emergency tier fires with original exception.""" + engine = _make_react_engine(raises=TaskTimeoutError(task_id="t1", timeout_seconds=10)) + # Make ReflexionEngine.execute raise + patched_reflexion_instance = MagicMock() + patched_reflexion_instance.execute = AsyncMock( + side_effect=RuntimeError("reflexion crashed") + ) + # Override the patched class to use our instance + from agentkit.server import _fallback_chain + + original_cls = _fallback_chain.ReflexionEngine + + class _MockReflexionWithExc: + def __init__(self, **kwargs): + self.execute = patched_reflexion_instance.execute + + _fallback_chain.ReflexionEngine = _MockReflexionWithExc + try: + result = await execute_with_fallback_chain( + react_engine=engine, + llm_gateway=_make_llm_gateway(), + messages=[{"role": "user", "content": "hi"}], + tools=[], + model="default", + agent_name="a", + system_prompt=None, + ) + finally: + _fallback_chain.ReflexionEngine = original_cls + + assert result.status == "emergency" + assert result.error_struct["error_code"] == "timeout" + + @pytest.mark.asyncio + async def test_recovery_unsuccessful_status_falls_through(self, patched_reflexion): + """Recovery returns non-success status → Emergency fires.""" + engine = _make_react_engine(raises=TaskTimeoutError(task_id="t1", timeout_seconds=10)) + # Make ReflexionEngine return unsuccessful result with empty output + from agentkit.server import _fallback_chain + + class _MockReflexionNoOutput: + def __init__(self, **kwargs): + self.execute = AsyncMock(return_value=MagicMock(status="failed", output=None)) + + original_cls = _fallback_chain.ReflexionEngine + _fallback_chain.ReflexionEngine = _MockReflexionNoOutput + try: + result = await execute_with_fallback_chain( + react_engine=engine, + llm_gateway=_make_llm_gateway(), + messages=[{"role": "user", "content": "hi"}], + tools=[], + model="default", + agent_name="a", + system_prompt=None, + ) + finally: + _fallback_chain.ReflexionEngine = original_cls + + assert result.status == "emergency" + assert result.error_struct["error_code"] == "timeout" + + +# ─── Tier 3: Emergency ──────────────────────────────────────────────────── + + +class TestEmergencyTier: + @pytest.mark.asyncio + async def test_emergency_timeout_error_code(self, patched_reflexion): + # Make recovery fail (empty result) so Emergency fires + from agentkit.server import _fallback_chain + + class _MockReflexionEmpty: + def __init__(self, **kwargs): + self.execute = AsyncMock(return_value=MagicMock(status="failed", output=None)) + + original_cls = _fallback_chain.ReflexionEngine + _fallback_chain.ReflexionEngine = _MockReflexionEmpty + try: + engine = _make_react_engine(raises=TaskTimeoutError(task_id="t1", timeout_seconds=10)) + result = await execute_with_fallback_chain( + react_engine=engine, + llm_gateway=_make_llm_gateway(), + messages=[{"role": "user", "content": "hi"}], + tools=[], + model="default", + agent_name="a", + system_prompt=None, + ) + finally: + _fallback_chain.ReflexionEngine = original_cls + + assert result.status == "emergency" + assert result.error_struct["error_code"] == "timeout" + assert result.error_struct["retryable"] is True + assert "建议" in result.output + + @pytest.mark.asyncio + async def test_emergency_loop_detected_error_code(self): + engine = _make_react_engine(raises=LoopDetectedError(tool_name="search", repetitions=5)) + # Recovery disabled so Emergency fires directly + result = await execute_with_fallback_chain( + react_engine=engine, + llm_gateway=_make_llm_gateway(), + messages=[{"role": "user", "content": "hi"}], + tools=[], + model="default", + agent_name="a", + system_prompt=None, + fallback_chain_config={"recovery": {"enabled": False}}, + ) + assert result.status == "emergency" + assert result.error_struct["error_code"] == "loop_detected" + + @pytest.mark.asyncio + async def test_emergency_llm_failure_error_code(self): + engine = _make_react_engine(raises=LLMProviderError(provider="openai", reason="500")) + result = await execute_with_fallback_chain( + react_engine=engine, + llm_gateway=_make_llm_gateway(), + messages=[{"role": "user", "content": "hi"}], + tools=[], + model="default", + agent_name="a", + system_prompt=None, + fallback_chain_config={"recovery": {"enabled": False}}, + ) + assert result.status == "emergency" + assert result.error_struct["error_code"] == "llm_failure" + + @pytest.mark.asyncio + async def test_emergency_internal_error_for_generic_exception(self): + engine = _make_react_engine(raises=RuntimeError("unexpected")) + result = await execute_with_fallback_chain( + react_engine=engine, + llm_gateway=_make_llm_gateway(), + messages=[{"role": "user", "content": "hi"}], + tools=[], + model="default", + agent_name="a", + system_prompt=None, + fallback_chain_config={"recovery": {"enabled": False}}, + ) + assert result.status == "emergency" + assert result.error_struct["error_code"] == "internal_error" + assert result.error_struct["retryable"] is False + + @pytest.mark.asyncio + async def test_task_cancelled_propagates_not_routed_to_emergency(self): + """TaskCancelledError must propagate, not be classified by Emergency.""" + engine = _make_react_engine(raises=TaskCancelledError(task_id="t1")) + with pytest.raises(TaskCancelledError): + await execute_with_fallback_chain( + react_engine=engine, + llm_gateway=_make_llm_gateway(), + messages=[{"role": "user", "content": "hi"}], + tools=[], + model="default", + agent_name="a", + system_prompt=None, + fallback_chain_config={"recovery": {"enabled": False}}, + ) + + @pytest.mark.asyncio + async def test_emergency_disabled_reraises_original(self): + engine = _make_react_engine(raises=TaskTimeoutError(task_id="t1", timeout_seconds=10)) + with pytest.raises(TaskTimeoutError): + await execute_with_fallback_chain( + react_engine=engine, + llm_gateway=_make_llm_gateway(), + messages=[{"role": "user", "content": "hi"}], + tools=[], + model="default", + agent_name="a", + system_prompt=None, + fallback_chain_config={ + "recovery": {"enabled": False}, + "emergency": {"enabled": False}, + }, + ) + + +# ─── Config wiring ──────────────────────────────────────────────────────── + + +class TestServerConfigFallbackChain: + def test_fallback_chain_section_read_from_dict(self): + from agentkit.server.config import ServerConfig + + config = ServerConfig.from_dict( + { + "fallback_chain": { + "enabled": True, + "recovery": {"enabled": False, "max_retries": 3}, + "emergency": {"enabled": True}, + } + } + ) + assert config.fallback_chain["enabled"] is True + assert config.fallback_chain["recovery"] == {"enabled": False, "max_retries": 3} + assert config.fallback_chain["emergency"] == {"enabled": True} + + def test_fallback_chain_defaults_empty_when_absent(self): + from agentkit.server.config import ServerConfig + + config = ServerConfig.from_dict({}) + assert config.fallback_chain == {}