"""Reflexion Engine 单元测试""" import asyncio import json from unittest.mock import AsyncMock, MagicMock, patch import pytest from agentkit.core.exceptions import TaskCancelledError, TaskTimeoutError from agentkit.core.protocol import CancellationToken from agentkit.core.react import ReActEngine, ReActResult, ReActStep from agentkit.core.reflexion import ReflexionEngine, ReflexionReflection, ReflexionResult from agentkit.llm.gateway import LLMGateway from agentkit.llm.protocol import LLMResponse, TokenUsage, ToolCall from agentkit.tools.base import Tool # ── Test Helpers ────────────────────────────────────────── class FakeTool(Tool): """用于测试的 Fake Tool""" def __init__( self, name: str = "fake_tool", description: str = "A fake tool for testing", result: dict | None = None, should_fail: bool = False, ): super().__init__(name=name, description=description) self._result = result or {"status": "ok"} self._should_fail = should_fail async def execute(self, **kwargs) -> dict: if self._should_fail: raise RuntimeError(f"Tool '{self.name}' execution failed") return self._result def make_mock_gateway(responses: list[LLMResponse]) -> MagicMock: """创建一个 mock LLMGateway,按顺序返回给定响应""" gateway = MagicMock(spec=LLMGateway) gateway.chat = AsyncMock(side_effect=responses) return gateway def make_response( content: str = "", tool_calls: list[ToolCall] | None = None, prompt_tokens: int = 10, completion_tokens: int = 20, ) -> LLMResponse: """快速构造 LLMResponse""" return LLMResponse( content=content, model="test-model", usage=TokenUsage( prompt_tokens=prompt_tokens, completion_tokens=completion_tokens, ), tool_calls=tool_calls or [], ) def make_react_result( output: str = "test output", total_steps: int = 1, total_tokens: int = 30, status: str = "success", ) -> ReActResult: """快速构造 ReActResult""" return ReActResult( output=output, trajectory=[ReActStep(step=1, action="final_answer", content=output, tokens=total_tokens)], total_steps=total_steps, total_tokens=total_tokens, status=status, ) # ── Test Classes ────────────────────────────────────────── class TestReflexionFirstExecutionPasses: """首次执行即通过质量阈值,无需重试""" async def test_no_retry_when_score_above_threshold(self): gateway = make_mock_gateway([ # ReAct call make_response(content="The answer is 42"), # Evaluation call make_response(content='```json\n{"score": 0.9, "reasoning": "Excellent"}\n```'), ]) engine = ReflexionEngine(llm_gateway=gateway, quality_threshold=0.7) result = await engine.execute( messages=[{"role": "user", "content": "What is the answer?"}], ) assert isinstance(result, ReflexionResult) assert result.output == "The answer is 42" assert result.evaluation_score == 0.9 assert result.reflection_count == 0 assert len(result.reflections) == 0 assert result.status == "success" async def test_score_exactly_at_threshold(self): gateway = make_mock_gateway([ make_response(content="Answer"), make_response(content='```json\n{"score": 0.7, "reasoning": "OK"}\n```'), ]) engine = ReflexionEngine(llm_gateway=gateway, quality_threshold=0.7) result = await engine.execute( messages=[{"role": "user", "content": "Task"}], ) assert result.evaluation_score == 0.7 assert result.reflection_count == 0 class TestReflexionLowScoreTriggersReflection: """评估分数低于阈值时触发反思和重试""" async def test_reflection_and_retry_on_low_score(self): gateway = make_mock_gateway([ # 1st ReAct call make_response(content="Initial poor answer"), # 1st Evaluation call - low score make_response(content='```json\n{"score": 0.3, "reasoning": "Incomplete"}\n```'), # 1st Reflection call make_response(content="You need to be more specific and provide detailed analysis."), # 2nd ReAct call make_response(content="Improved detailed answer"), # 2nd Evaluation call - high score make_response(content='```json\n{"score": 0.85, "reasoning": "Good improvement"}\n```'), ]) engine = ReflexionEngine(llm_gateway=gateway, quality_threshold=0.7) result = await engine.execute( messages=[{"role": "user", "content": "Analyze this"}], ) assert result.output == "Improved detailed answer" assert result.evaluation_score == 0.85 assert result.reflection_count == 1 assert len(result.reflections) == 1 assert result.reflections[0].score_before == 0.3 assert result.reflections[0].retry_number == 1 assert "specific" in result.reflections[0].reflection_text.lower() or "detailed" in result.reflections[0].reflection_text.lower() class TestReflexionRetryImprovesScore: """重试后分数提升,返回最终结果""" async def test_multiple_retries_improve_score(self): gateway = make_mock_gateway([ # Attempt 1 make_response(content="Bad answer"), make_response(content='```json\n{"score": 0.2}\n```'), make_response(content="Need more depth"), # Attempt 2 make_response(content="Better answer"), make_response(content='```json\n{"score": 0.5}\n```'), make_response(content="Still needs improvement"), # Attempt 3 make_response(content="Great answer"), make_response(content='```json\n{"score": 0.9}\n```'), ]) engine = ReflexionEngine(llm_gateway=gateway, quality_threshold=0.7, max_reflections=3) result = await engine.execute( messages=[{"role": "user", "content": "Complex task"}], ) assert result.output == "Great answer" assert result.evaluation_score == 0.9 assert result.reflection_count == 2 assert len(result.reflections) == 2 assert result.reflections[0].retry_number == 1 assert result.reflections[1].retry_number == 2 class TestReflexionMaxReflectionsReached: """达到最大反思次数后返回最佳结果""" async def test_returns_best_result_when_max_reflections_reached(self): gateway = make_mock_gateway([ # Attempt 1 make_response(content="Poor answer"), make_response(content='```json\n{"score": 0.3}\n```'), make_response(content="Try harder"), # Attempt 2 make_response(content="Slightly better answer"), make_response(content='```json\n{"score": 0.5}\n```'), make_response(content="Still not good enough"), # Attempt 3 (max) make_response(content="Another answer"), make_response(content='```json\n{"score": 0.6}\n```'), ]) engine = ReflexionEngine(llm_gateway=gateway, quality_threshold=0.7, max_reflections=3) result = await engine.execute( messages=[{"role": "user", "content": "Hard task"}], ) # Should return the best result (score 0.6 from last attempt) assert result.evaluation_score == 0.6 assert result.reflection_count == 2 assert result.output == "Another answer" class TestReflexionEvaluationFailure: """评估 LLM 调用失败时回退到中性分数""" async def test_evaluation_failure_falls_back_to_neutral_score(self): """评估失败时使用 0.5 中性分数,低于阈值则触发反思和重试""" call_count = 0 async def chat_side_effect(**kwargs): nonlocal call_count call_count += 1 if call_count == 1: # ReAct call return make_response(content="Some answer") elif call_count == 2: # Evaluation call - fails raise RuntimeError("LLM unavailable") elif call_count == 3: # Reflection call (0.5 < 0.7 triggers reflection) return make_response(content="Try to be more detailed") elif call_count == 4: # 2nd ReAct call return make_response(content="Better answer") else: # 2nd Evaluation call - succeeds return make_response(content='```json\n{"score": 0.9}\n```') gateway = MagicMock(spec=LLMGateway) gateway.chat = AsyncMock(side_effect=chat_side_effect) engine = ReflexionEngine(llm_gateway=gateway, quality_threshold=0.7) result = await engine.execute( messages=[{"role": "user", "content": "Task"}], ) # Evaluation failure should be handled gracefully # Neutral score 0.5 < 0.7 triggers reflection and retry assert isinstance(result, ReflexionResult) assert result.output == "Better answer" assert result.evaluation_score == 0.9 assert result.reflection_count == 1 async def test_evaluation_failure_returns_neutral_score(self): """验证评估失败时确实使用了 0.5 中性分数""" call_count = 0 async def chat_side_effect(**kwargs): nonlocal call_count call_count += 1 if call_count == 1: return make_response(content="Answer") elif call_count == 2: raise RuntimeError("Evaluation failed") elif call_count == 3: return make_response(content="Reflection text") elif call_count == 4: return make_response(content="Better answer") else: return make_response(content='```json\n{"score": 0.9}\n```') gateway = MagicMock(spec=LLMGateway) gateway.chat = AsyncMock(side_effect=chat_side_effect) engine = ReflexionEngine(llm_gateway=gateway, quality_threshold=0.7) result = await engine.execute( messages=[{"role": "user", "content": "Task"}], ) # Should have triggered reflection (0.5 < 0.7) and retried assert result.reflection_count >= 1 class TestReflexionReflectionFailure: """反思 LLM 调用失败时返回当前结果""" async def test_reflection_failure_returns_current_result(self): call_count = 0 async def chat_side_effect(**kwargs): nonlocal call_count call_count += 1 if call_count == 1: # ReAct call return make_response(content="Initial answer") elif call_count == 2: # Evaluation call - low score return make_response(content='```json\n{"score": 0.3}\n```') elif call_count == 3: # Reflection call - fails raise RuntimeError("Reflection LLM unavailable") else: return make_response(content="Should not reach here") gateway = MagicMock(spec=LLMGateway) gateway.chat = AsyncMock(side_effect=chat_side_effect) engine = ReflexionEngine(llm_gateway=gateway, quality_threshold=0.7) result = await engine.execute( messages=[{"role": "user", "content": "Task"}], ) # Should return current result without crashing assert isinstance(result, ReflexionResult) assert result.output == "Initial answer" assert result.evaluation_score == 0.3 assert result.reflection_count == 0 # Reflection failed, not recorded class TestReflexionCancellationToken: """取消令牌测试""" async def test_cancelled_before_execution(self): gateway = make_mock_gateway([ make_response(content="Answer"), ]) engine = ReflexionEngine(llm_gateway=gateway) token = CancellationToken() token.cancel() with pytest.raises(TaskCancelledError): await engine.execute( messages=[{"role": "user", "content": "Task"}], cancellation_token=token, ) async def test_cancelled_mid_execution(self): call_count = 0 async def chat_side_effect(**kwargs): nonlocal call_count call_count += 1 if call_count >= 2: # Simulate cancel after first ReAct + evaluation pass return make_response(content="Answer") gateway = MagicMock(spec=LLMGateway) gateway.chat = AsyncMock(side_effect=chat_side_effect) engine = ReflexionEngine(llm_gateway=gateway) token = CancellationToken() # Pre-cancel to test the check at the beginning of the loop token.cancel() with pytest.raises(TaskCancelledError): await engine.execute( messages=[{"role": "user", "content": "Task"}], cancellation_token=token, ) async def test_uncancelled_token_works_normally(self): gateway = make_mock_gateway([ make_response(content="Answer"), make_response(content='```json\n{"score": 0.9}\n```'), ]) engine = ReflexionEngine(llm_gateway=gateway) token = CancellationToken() result = await engine.execute( messages=[{"role": "user", "content": "Task"}], cancellation_token=token, ) assert result.output == "Answer" assert result.evaluation_score == 0.9 class TestReflexionInterfaceCompatibility: """接口兼容性测试""" async def test_same_parameter_signature_as_react(self): """ReflexionEngine.execute() 接受与 ReActEngine 相同的参数""" gateway = make_mock_gateway([ make_response(content="Answer"), make_response(content='```json\n{"score": 0.8}\n```'), ]) engine = ReflexionEngine(llm_gateway=gateway) # Should accept all the same parameters as ReActEngine result = await engine.execute( messages=[{"role": "user", "content": "Task"}], tools=None, model="gpt-4", agent_name="test_agent", task_type="analysis", system_prompt="You are helpful", trace_recorder=None, memory_retriever=None, task_id="task-123", compressor=None, retrieval_config=None, cancellation_token=None, timeout_seconds=300, ) assert isinstance(result, ReflexionResult) async def test_reflexion_result_has_react_result_fields(self): """ReflexionResult 包含 ReActResult 的所有字段""" gateway = make_mock_gateway([ make_response(content="Answer"), make_response(content='```json\n{"score": 0.85}\n```'), ]) engine = ReflexionEngine(llm_gateway=gateway) result = await engine.execute( messages=[{"role": "user", "content": "Task"}], ) # ReActResult fields assert hasattr(result, "output") assert hasattr(result, "trajectory") assert hasattr(result, "total_steps") assert hasattr(result, "total_tokens") assert hasattr(result, "status") # ReflexionResult additional fields assert hasattr(result, "evaluation_score") assert hasattr(result, "reflection_count") assert hasattr(result, "reflections") async def test_reflexion_composes_react_engine(self): """ReflexionEngine 组合(而非继承)ReActEngine""" gateway = make_mock_gateway([ make_response(content="Answer"), make_response(content='```json\n{"score": 0.9}\n```'), ]) engine = ReflexionEngine(llm_gateway=gateway) # Should have a _react_engine attribute (composition) assert hasattr(engine, "_react_engine") assert isinstance(engine._react_engine, ReActEngine) # Should NOT be a subclass of ReActEngine assert not isinstance(engine, ReActEngine) async def test_reflexion_result_trajectory_uses_react_step(self): """ReflexionResult.trajectory 使用 ReActStep 类型""" gateway = make_mock_gateway([ make_response(content="Answer"), make_response(content='```json\n{"score": 0.9}\n```'), ]) engine = ReflexionEngine(llm_gateway=gateway) result = await engine.execute( messages=[{"role": "user", "content": "Task"}], ) assert all(isinstance(step, ReActStep) for step in result.trajectory) class TestReflexionLayeredModels: """分层模型测试""" async def test_default_models_same_as_input(self): """默认情况下 evaluate_model 和 reflect_model 与 act_model 相同""" gateway = make_mock_gateway([ make_response(content="Answer"), make_response(content='```json\n{"score": 0.9}\n```'), ]) engine = ReflexionEngine(llm_gateway=gateway) result = await engine.execute( messages=[{"role": "user", "content": "Task"}], model="gpt-4", ) # Verify evaluation call used the same model # The 2nd call should be the evaluation call eval_call = gateway.chat.call_args_list[1] assert eval_call.kwargs.get("model") == "gpt-4" async def test_separate_evaluate_model(self): """使用独立的 evaluate_model""" gateway = make_mock_gateway([ make_response(content="Answer"), make_response(content='```json\n{"score": 0.9}\n```'), ]) engine = ReflexionEngine(llm_gateway=gateway) result = await engine.execute( messages=[{"role": "user", "content": "Task"}], model="gpt-3.5", evaluate_model="gpt-4", ) # Evaluation call should use gpt-4 eval_call = gateway.chat.call_args_list[1] assert eval_call.kwargs.get("model") == "gpt-4" async def test_separate_reflect_model(self): """使用独立的 reflect_model""" gateway = make_mock_gateway([ make_response(content="Poor answer"), make_response(content='```json\n{"score": 0.3}\n```'), make_response(content="Reflection text"), make_response(content="Better answer"), make_response(content='```json\n{"score": 0.9}\n```'), ]) engine = ReflexionEngine(llm_gateway=gateway) result = await engine.execute( messages=[{"role": "user", "content": "Task"}], model="gpt-3.5", evaluate_model="gpt-4", reflect_model="claude-3", ) # Reflection call (3rd call) should use claude-3 reflect_call = gateway.chat.call_args_list[2] assert reflect_call.kwargs.get("model") == "claude-3" class TestReflexionConstructorValidation: """构造函数参数验证""" def test_invalid_max_steps(self): gateway = MagicMock(spec=LLMGateway) with pytest.raises(ValueError, match="max_steps"): ReflexionEngine(llm_gateway=gateway, max_steps=0) def test_invalid_max_reflections(self): gateway = MagicMock(spec=LLMGateway) with pytest.raises(ValueError, match="max_reflections"): ReflexionEngine(llm_gateway=gateway, max_reflections=0) def test_invalid_quality_threshold(self): gateway = MagicMock(spec=LLMGateway) with pytest.raises(ValueError, match="quality_threshold"): ReflexionEngine(llm_gateway=gateway, quality_threshold=1.5) def test_valid_construction(self): gateway = MagicMock(spec=LLMGateway) engine = ReflexionEngine( llm_gateway=gateway, max_steps=5, max_reflections=2, quality_threshold=0.8, default_timeout=60.0, ) assert engine._max_steps == 5 assert engine._max_reflections == 2 assert engine._quality_threshold == 0.8 assert engine._default_timeout == 60.0 class TestReflexionTimeout: """超时测试""" async def test_timeout_raises_task_timeout_error(self): async def slow_chat(**kwargs): await asyncio.sleep(0.5) return make_response(content="slow") gateway = MagicMock(spec=LLMGateway) gateway.chat = AsyncMock(side_effect=slow_chat) engine = ReflexionEngine(llm_gateway=gateway) with pytest.raises(TaskTimeoutError): await engine.execute( messages=[{"role": "user", "content": "Task"}], timeout_seconds=0.3, ) class TestReflexionEvaluationParsing: """评估分数解析测试""" async def test_parse_score_from_json_code_block(self): gateway = make_mock_gateway([ make_response(content="Answer"), make_response(content='```json\n{"score": 0.85, "reasoning": "Good"}\n```'), ]) engine = ReflexionEngine(llm_gateway=gateway, quality_threshold=0.7) result = await engine.execute( messages=[{"role": "user", "content": "Task"}], ) assert result.evaluation_score == 0.85 async def test_parse_score_from_plain_json(self): gateway = make_mock_gateway([ make_response(content="Answer"), make_response(content='{"score": 0.75, "reasoning": "OK"}'), ]) engine = ReflexionEngine(llm_gateway=gateway, quality_threshold=0.7) result = await engine.execute( messages=[{"role": "user", "content": "Task"}], ) assert result.evaluation_score == 0.75 async def test_parse_score_from_text(self): gateway = make_mock_gateway([ make_response(content="Answer"), make_response(content='The score is 0.8 based on my evaluation.'), ]) engine = ReflexionEngine(llm_gateway=gateway, quality_threshold=0.7) result = await engine.execute( messages=[{"role": "user", "content": "Task"}], ) assert result.evaluation_score == 0.8 async def test_score_clamped_to_range(self): gateway = make_mock_gateway([ make_response(content="Answer"), make_response(content='```json\n{"score": 1.5}\n```'), ]) engine = ReflexionEngine(llm_gateway=gateway, quality_threshold=0.7) result = await engine.execute( messages=[{"role": "user", "content": "Task"}], ) # Score should be clamped to 1.0 assert result.evaluation_score == 1.0 class TestReflexionReflectionPrompt: """反思提示构建测试""" async def test_reflection_injected_into_system_prompt(self): """验证反思文本被注入到下一次 ReAct 的 system prompt 中""" gateway = make_mock_gateway([ make_response(content="Poor answer"), make_response(content='```json\n{"score": 0.3}\n```'), make_response(content="You need to provide more specific details."), make_response(content="Better answer with details"), make_response(content='```json\n{"score": 0.9}\n```'), ]) engine = ReflexionEngine(llm_gateway=gateway, quality_threshold=0.7) result = await engine.execute( messages=[{"role": "user", "content": "Task"}], system_prompt="You are a helpful assistant", ) # The 4th call (2nd ReAct) should have the reflection in system prompt # Note: ReActEngine builds its own messages, so we check the gateway call assert result.reflection_count == 1 assert result.evaluation_score == 0.9 class TestReflexionStreaming: """流式执行测试""" async def test_execute_stream_yields_events(self): """execute_stream 产生正确的事件类型""" gateway = MagicMock(spec=LLMGateway) # Mock ReActEngine.execute_stream to yield events async def mock_react_stream(**kwargs): from agentkit.core.react import ReActEvent yield ReActEvent(event_type="thinking", step=1, data={"message": "Thinking..."}) yield ReActEvent(event_type="final_answer", step=1, data={"output": "Answer", "total_steps": 1, "total_tokens": 30}) # Mock evaluation and reflection gateway.chat = AsyncMock(side_effect=[ make_response(content='```json\n{"score": 0.9}\n```'), ]) engine = ReflexionEngine(llm_gateway=gateway, quality_threshold=0.7) with patch.object(engine._react_engine, "execute_stream", side_effect=mock_react_stream): events = [] async for event in engine.execute_stream( messages=[{"role": "user", "content": "Task"}], ): events.append(event) event_types = [e.event_type for e in events] assert "executing" in event_types assert "evaluating" in event_types assert "evaluation_result" in event_types assert "final_answer" in event_types async def test_execute_stream_reflection_events(self): """execute_stream 在低分时产生反思和重试事件""" gateway = MagicMock(spec=LLMGateway) call_count = 0 async def mock_react_stream(**kwargs): nonlocal call_count call_count += 1 from agentkit.core.react import ReActEvent if call_count == 1: yield ReActEvent(event_type="final_answer", step=1, data={"output": "Poor answer", "total_steps": 1, "total_tokens": 30}) else: yield ReActEvent(event_type="final_answer", step=1, data={"output": "Good answer", "total_steps": 1, "total_tokens": 30}) # Evaluation: first low, then high gateway.chat = AsyncMock(side_effect=[ make_response(content='```json\n{"score": 0.3}\n```'), # 1st eval make_response(content="Need improvement"), # reflection make_response(content='```json\n{"score": 0.9}\n```'), # 2nd eval ]) engine = ReflexionEngine(llm_gateway=gateway, quality_threshold=0.7) with patch.object(engine._react_engine, "execute_stream", side_effect=mock_react_stream): events = [] async for event in engine.execute_stream( messages=[{"role": "user", "content": "Task"}], ): events.append(event) event_types = [e.event_type for e in events] assert "executing" in event_types assert "evaluating" in event_types assert "evaluation_result" in event_types assert "reflecting" in event_types assert "reflection_result" in event_types assert "retrying" in event_types assert "final_answer" in event_types class TestReflexionBestResultTracking: """最佳结果追踪测试""" async def test_returns_best_result_across_attempts(self): """当后续尝试分数更低时,返回之前最佳的结果""" gateway = make_mock_gateway([ # Attempt 1: score 0.5 make_response(content="Decent answer"), make_response(content='```json\n{"score": 0.5}\n```'), make_response(content="Try to improve"), # Attempt 2: score 0.4 (worse) make_response(content="Worse answer"), make_response(content='```json\n{"score": 0.4}\n```'), make_response(content="Still trying"), # Attempt 3: score 0.45 (still worse than attempt 1) make_response(content="Another answer"), make_response(content='```json\n{"score": 0.45}\n```'), # Reflection for attempt 3 (will be consumed but loop ends) make_response(content="Final reflection"), ]) engine = ReflexionEngine(llm_gateway=gateway, quality_threshold=0.7, max_reflections=3) result = await engine.execute( messages=[{"role": "user", "content": "Task"}], ) # Best score was 0.5 from attempt 1 assert result.evaluation_score == 0.5 assert result.output == "Decent answer"