"""G7/U3 — Three-tier fallback chain wiring tests. Verifies Main → Recovery (ReflexionEngine) → Emergency (EmergencyRules) at chat REST path. Mocks ReActEngine + ReflexionEngine + LLMGateway. """ from __future__ import annotations from unittest.mock import AsyncMock, MagicMock import pytest from agentkit.core.exceptions import ( LLMProviderError, LoopDetectedError, TaskCancelledError, TaskTimeoutError, ) from agentkit.core.react import ReActResult from agentkit.server._fallback_chain import execute_with_fallback_chain def _make_react_result(status: str = "success", output: str = "ok") -> ReActResult: return ReActResult( output=output, trajectory=[], total_steps=1, total_tokens=10, status=status, ) def _make_react_engine(result=None, raises=None): """Build a fake ReActEngine with .execute returning result or raising.""" engine = MagicMock() engine.reset = MagicMock() if raises is not None: engine.execute = AsyncMock(side_effect=raises) else: engine.execute = AsyncMock(return_value=result or _make_react_result()) return engine def _make_llm_gateway(): gw = MagicMock() gw.chat = AsyncMock(return_value=MagicMock(content="recovered")) return gw def _make_reflexion_result(status: str = "success", output: str = "recovered"): """Synthesize a ReflexionResult-like object.""" return MagicMock( status=status, output=output, trajectory=[], total_steps=1, total_tokens=5, ) @pytest.fixture def patched_reflexion(monkeypatch): """Patch ReflexionEngine used inside the chain to a controllable mock.""" from agentkit.server import _fallback_chain instances: list[MagicMock] = [] class _MockReflexion: def __init__(self, llm_gateway, max_reflections=1, **kwargs): self._llm_gateway = llm_gateway self._max_reflections = max_reflections self.execute = AsyncMock(return_value=_make_reflexion_result()) instances.append(self) monkeypatch.setattr(_fallback_chain, "ReflexionEngine", _MockReflexion) return instances # ─── Tier 1: Main ───────────────────────────────────────────────────────── class TestMainTier: @pytest.mark.asyncio async def test_main_success_no_recovery_no_emergency(self): engine = _make_react_engine(result=_make_react_result(status="success", output="hello")) result = await execute_with_fallback_chain( react_engine=engine, llm_gateway=_make_llm_gateway(), messages=[{"role": "user", "content": "hi"}], tools=[], model="default", agent_name="a", system_prompt=None, ) assert result.status == "success" assert result.output == "hello" assert result.error_struct is None @pytest.mark.asyncio async def test_main_unknown_status_treated_as_success(self): """Unknown status (not in soft_failure set) is treated as success-like.""" engine = _make_react_engine(result=_make_react_result(status="partial", output="x")) result = await execute_with_fallback_chain( react_engine=engine, llm_gateway=_make_llm_gateway(), messages=[{"role": "user", "content": "hi"}], tools=[], model="default", agent_name="a", system_prompt=None, ) assert result.status == "success" # ─── Tier 2: Recovery ────────────────────────────────────────────────────── class TestRecoveryTier: @pytest.mark.asyncio async def test_main_timeout_triggers_recovery_success(self, patched_reflexion): engine = _make_react_engine(raises=TaskTimeoutError(task_id="t1", timeout_seconds=10)) result = await execute_with_fallback_chain( react_engine=engine, llm_gateway=_make_llm_gateway(), messages=[{"role": "user", "content": "hi"}], tools=[], model="default", agent_name="a", system_prompt=None, ) assert result.status == "recovered" assert result.output == "recovered" # ReflexionEngine was instantiated and called assert len(patched_reflexion) == 1 patched_reflexion[0].execute.assert_awaited_once() @pytest.mark.asyncio async def test_main_loop_detected_triggers_recovery(self, patched_reflexion): engine = _make_react_engine(raises=LoopDetectedError(tool_name="search", repetitions=5)) result = await execute_with_fallback_chain( react_engine=engine, llm_gateway=_make_llm_gateway(), messages=[{"role": "user", "content": "hi"}], tools=[], model="default", agent_name="a", system_prompt=None, ) assert result.status == "recovered" @pytest.mark.asyncio async def test_main_llm_provider_error_triggers_recovery(self, patched_reflexion): engine = _make_react_engine(raises=LLMProviderError(provider="openai", reason="503")) result = await execute_with_fallback_chain( react_engine=engine, llm_gateway=_make_llm_gateway(), messages=[{"role": "user", "content": "hi"}], tools=[], model="default", agent_name="a", system_prompt=None, ) assert result.status == "recovered" @pytest.mark.asyncio async def test_main_soft_failure_status_triggers_recovery(self, patched_reflexion): """Soft failure (empty_fallback) without exception still triggers Recovery.""" engine = _make_react_engine(result=_make_react_result(status="empty_fallback", output="")) result = await execute_with_fallback_chain( react_engine=engine, llm_gateway=_make_llm_gateway(), messages=[{"role": "user", "content": "hi"}], tools=[], model="default", agent_name="a", system_prompt=None, ) assert result.status == "recovered" @pytest.mark.asyncio async def test_recovery_disabled_skips_to_emergency(self): engine = _make_react_engine(raises=TaskTimeoutError(task_id="t1", timeout_seconds=10)) result = await execute_with_fallback_chain( react_engine=engine, llm_gateway=_make_llm_gateway(), messages=[{"role": "user", "content": "hi"}], tools=[], model="default", agent_name="a", system_prompt=None, fallback_chain_config={"recovery": {"enabled": False}}, ) assert result.status == "emergency" assert result.error_struct["error_code"] == "timeout" @pytest.mark.asyncio async def test_recovery_failure_falls_through_to_emergency(self, patched_reflexion): """Recovery raises → Emergency tier fires with original exception.""" engine = _make_react_engine(raises=TaskTimeoutError(task_id="t1", timeout_seconds=10)) # Make ReflexionEngine.execute raise patched_reflexion_instance = MagicMock() patched_reflexion_instance.execute = AsyncMock( side_effect=RuntimeError("reflexion crashed") ) # Override the patched class to use our instance from agentkit.server import _fallback_chain original_cls = _fallback_chain.ReflexionEngine class _MockReflexionWithExc: def __init__(self, **kwargs): self.execute = patched_reflexion_instance.execute _fallback_chain.ReflexionEngine = _MockReflexionWithExc try: result = await execute_with_fallback_chain( react_engine=engine, llm_gateway=_make_llm_gateway(), messages=[{"role": "user", "content": "hi"}], tools=[], model="default", agent_name="a", system_prompt=None, ) finally: _fallback_chain.ReflexionEngine = original_cls assert result.status == "emergency" assert result.error_struct["error_code"] == "timeout" @pytest.mark.asyncio async def test_recovery_unsuccessful_status_falls_through(self, patched_reflexion): """Recovery returns non-success status → Emergency fires.""" engine = _make_react_engine(raises=TaskTimeoutError(task_id="t1", timeout_seconds=10)) # Make ReflexionEngine return unsuccessful result with empty output from agentkit.server import _fallback_chain class _MockReflexionNoOutput: def __init__(self, **kwargs): self.execute = AsyncMock(return_value=MagicMock(status="failed", output=None)) original_cls = _fallback_chain.ReflexionEngine _fallback_chain.ReflexionEngine = _MockReflexionNoOutput try: result = await execute_with_fallback_chain( react_engine=engine, llm_gateway=_make_llm_gateway(), messages=[{"role": "user", "content": "hi"}], tools=[], model="default", agent_name="a", system_prompt=None, ) finally: _fallback_chain.ReflexionEngine = original_cls assert result.status == "emergency" assert result.error_struct["error_code"] == "timeout" # ─── Tier 3: Emergency ──────────────────────────────────────────────────── class TestEmergencyTier: @pytest.mark.asyncio async def test_emergency_timeout_error_code(self, patched_reflexion): # Make recovery fail (empty result) so Emergency fires from agentkit.server import _fallback_chain class _MockReflexionEmpty: def __init__(self, **kwargs): self.execute = AsyncMock(return_value=MagicMock(status="failed", output=None)) original_cls = _fallback_chain.ReflexionEngine _fallback_chain.ReflexionEngine = _MockReflexionEmpty try: engine = _make_react_engine(raises=TaskTimeoutError(task_id="t1", timeout_seconds=10)) result = await execute_with_fallback_chain( react_engine=engine, llm_gateway=_make_llm_gateway(), messages=[{"role": "user", "content": "hi"}], tools=[], model="default", agent_name="a", system_prompt=None, ) finally: _fallback_chain.ReflexionEngine = original_cls assert result.status == "emergency" assert result.error_struct["error_code"] == "timeout" assert result.error_struct["retryable"] is True assert "建议" in result.output @pytest.mark.asyncio async def test_emergency_loop_detected_error_code(self): engine = _make_react_engine(raises=LoopDetectedError(tool_name="search", repetitions=5)) # Recovery disabled so Emergency fires directly result = await execute_with_fallback_chain( react_engine=engine, llm_gateway=_make_llm_gateway(), messages=[{"role": "user", "content": "hi"}], tools=[], model="default", agent_name="a", system_prompt=None, fallback_chain_config={"recovery": {"enabled": False}}, ) assert result.status == "emergency" assert result.error_struct["error_code"] == "loop_detected" @pytest.mark.asyncio async def test_emergency_llm_failure_error_code(self): engine = _make_react_engine(raises=LLMProviderError(provider="openai", reason="500")) result = await execute_with_fallback_chain( react_engine=engine, llm_gateway=_make_llm_gateway(), messages=[{"role": "user", "content": "hi"}], tools=[], model="default", agent_name="a", system_prompt=None, fallback_chain_config={"recovery": {"enabled": False}}, ) assert result.status == "emergency" assert result.error_struct["error_code"] == "llm_failure" @pytest.mark.asyncio async def test_emergency_internal_error_for_generic_exception(self): engine = _make_react_engine(raises=RuntimeError("unexpected")) result = await execute_with_fallback_chain( react_engine=engine, llm_gateway=_make_llm_gateway(), messages=[{"role": "user", "content": "hi"}], tools=[], model="default", agent_name="a", system_prompt=None, fallback_chain_config={"recovery": {"enabled": False}}, ) assert result.status == "emergency" assert result.error_struct["error_code"] == "internal_error" assert result.error_struct["retryable"] is False @pytest.mark.asyncio async def test_task_cancelled_propagates_not_routed_to_emergency(self): """TaskCancelledError must propagate, not be classified by Emergency.""" engine = _make_react_engine(raises=TaskCancelledError(task_id="t1")) with pytest.raises(TaskCancelledError): await execute_with_fallback_chain( react_engine=engine, llm_gateway=_make_llm_gateway(), messages=[{"role": "user", "content": "hi"}], tools=[], model="default", agent_name="a", system_prompt=None, fallback_chain_config={"recovery": {"enabled": False}}, ) @pytest.mark.asyncio async def test_emergency_disabled_reraises_original(self): engine = _make_react_engine(raises=TaskTimeoutError(task_id="t1", timeout_seconds=10)) with pytest.raises(TaskTimeoutError): await execute_with_fallback_chain( react_engine=engine, llm_gateway=_make_llm_gateway(), messages=[{"role": "user", "content": "hi"}], tools=[], model="default", agent_name="a", system_prompt=None, fallback_chain_config={ "recovery": {"enabled": False}, "emergency": {"enabled": False}, }, ) # ─── Config wiring ──────────────────────────────────────────────────────── class TestServerConfigFallbackChain: def test_fallback_chain_section_read_from_dict(self): from agentkit.server.config import ServerConfig config = ServerConfig.from_dict( { "fallback_chain": { "enabled": True, "recovery": {"enabled": False, "max_retries": 3}, "emergency": {"enabled": True}, } } ) assert config.fallback_chain["enabled"] is True assert config.fallback_chain["recovery"] == {"enabled": False, "max_retries": 3} assert config.fallback_chain["emergency"] == {"enabled": True} def test_fallback_chain_defaults_empty_when_absent(self): from agentkit.server.config import ServerConfig config = ServerConfig.from_dict({}) assert config.fallback_chain == {}