"""Golden trajectory characterization tests for ReActEngine. Locks in the current behavior of execute() and execute_stream() with fixed mock LLM responses. These tests must pass BEFORE and AFTER the U1 refactor (_execute_loop unification). Per plan KTD6: characterization-first. Scenarios covered (per plan U1 Test scenarios): - Happy path: single tool call -> final answer (execute + execute_stream) - Happy path streaming equivalence: execute vs execute_stream same output - Multi-step loop: 3 tool calls then final answer - Empty tools: LLM returns text directly - Max steps: loop reaches max_steps -> status='partial' - Tool failure: tool raises exception -> error in observation, loop continues - LLM failure: gateway raises exception -> propagate - Phase violation: tool blocked by phase policy -> phase_violation event - Cancellation: CancellationToken cancelled -> TaskCancelledError - Compression triggered: long conversation triggers compressor.compress() - Golden trajectory snapshot: fixed mock -> event type sequence """ from unittest.mock import AsyncMock, MagicMock import pytest from agentkit.core.react import ReActEvent, ReActResult, ReActStep from agentkit.llm.gateway import LLMGateway from agentkit.llm.protocol import LLMResponse, StreamChunk, TokenUsage, ToolCall from agentkit.tools.base import Tool # ── Helpers ────────────────────────────────────────── class FakeTool(Tool): """Minimal Tool implementation for trajectory tests.""" def __init__( self, name: str = "fake_tool", description: str = "A fake tool for testing", result: dict | None = None, should_fail: bool = False, ): super().__init__(name=name, description=description) self._result = result or {"status": "ok"} self._should_fail = should_fail self.call_count = 0 async def execute(self, **kwargs) -> dict: self.call_count += 1 if self._should_fail: raise RuntimeError(f"Tool '{self.name}' execution failed") return self._result def make_response( content: str = "", tool_calls: list[ToolCall] | None = None, prompt_tokens: int = 10, completion_tokens: int = 20, ) -> LLMResponse: """Quick LLMResponse builder for non-streaming gateway mocks.""" return LLMResponse( content=content, model="test-model", usage=TokenUsage( prompt_tokens=prompt_tokens, completion_tokens=completion_tokens, ), tool_calls=tool_calls or [], ) def make_mock_gateway(responses: list[LLMResponse]) -> MagicMock: """Mock LLMGateway whose chat() returns responses in order.""" gateway = MagicMock(spec=LLMGateway) gateway.chat = AsyncMock(side_effect=responses) return gateway def make_mock_stream_gateway(chunks_list: list[list[StreamChunk]]) -> MagicMock: """Mock LLMGateway whose chat_stream() yields chunks in order. Each call to chat_stream consumes one inner list from chunks_list. """ gateway = MagicMock(spec=LLMGateway) async def _stream(**kwargs): for chunks in chunks_list: for chunk in chunks: yield chunk # Remove after use so a second call would raise StopIteration chunks_list.pop(0) gateway.chat_stream = _stream return gateway def _tc(name: str, args: dict | None = None, tc_id: str = "tc_1") -> ToolCall: """Quick ToolCall builder.""" return ToolCall(id=tc_id, name=name, arguments=args or {}) def _step_summary(step: ReActStep) -> str: """Compact ReActStep summary for snapshot comparison.""" return f"{step.action}@{step.step}:{step.tool_name or ''}" def _stream_tool_call_chunk( name: str, args: dict | None = None, tc_id: str = "tc_1", prompt_tokens: int = 10, completion_tokens: int = 20, ) -> StreamChunk: """Single StreamChunk carrying a tool_call (simulates function-calling stream).""" return StreamChunk( content="", model="test-model", tool_calls=[ToolCall(id=tc_id, name=name, arguments=args or {})], usage=TokenUsage(prompt_tokens=prompt_tokens, completion_tokens=completion_tokens), is_final=True, ) def _stream_content_chunk( content: str, prompt_tokens: int = 10, completion_tokens: int = 20, ) -> StreamChunk: """Single StreamChunk carrying final text content.""" return StreamChunk( content=content, model="test-model", usage=TokenUsage(prompt_tokens=prompt_tokens, completion_tokens=completion_tokens), is_final=True, ) # ── Happy path: single tool call ────────────────────────────────────────── class TestGoldenHappyPath: """Single tool call -> final answer. Locks in execute() result shape.""" async def test_execute_single_tool_call_trajectory(self): from agentkit.core.react import ReActEngine tool = FakeTool(name="calculator", result={"value": 42}) gateway = make_mock_gateway( [ make_response(tool_calls=[_tc("calculator", {"expr": "6*7"})]), make_response(content="The result is 42"), ] ) engine = ReActEngine(llm_gateway=gateway) result = await engine.execute( messages=[{"role": "user", "content": "Calculate 6*7"}], tools=[tool], ) # Golden trajectory snapshot — locking current shape assert result.status == "success" assert result.output == "The result is 42" assert result.total_steps == 2 assert result.total_tokens == 60 # (10+20) * 2 assert [_step_summary(s) for s in result.trajectory] == [ "tool_call@1:calculator", "final_answer@2:", ] assert result.trajectory[0].result == {"value": 42} assert result.trajectory[1].content == "The result is 42" async def test_execute_stream_single_tool_call_event_types(self): """execute_stream event type sequence for single tool call. Locks current event types. After U1 refactor, an additional 'final_result' event may appear at the end (not asserted here). """ from agentkit.core.react import ReActEngine tool = FakeTool(name="calculator", result={"value": 42}) gateway = make_mock_stream_gateway( [ [_stream_tool_call_chunk("calculator", {"expr": "6*7"})], [_stream_content_chunk("The result is 42")], ] ) engine = ReActEngine(llm_gateway=gateway) events = [] async for event in engine.execute_stream( messages=[{"role": "user", "content": "Calculate 6*7"}], tools=[tool], ): events.append(event) event_types = [e.event_type for e in events] # Golden sequence: thinking -> tool_call -> tool_result -> thinking -> final_answer assert "thinking" in event_types assert "tool_call" in event_types assert "tool_result" in event_types assert "final_answer" in event_types # tool_result must come after tool_call assert event_types.index("tool_result") > event_types.index("tool_call") # final_answer must come after tool_result assert event_types.index("final_answer") > event_types.index("tool_result") # Verify tool_call event data tool_call_event = next(e for e in events if e.event_type == "tool_call") assert tool_call_event.data["tool_name"] == "calculator" assert tool_call_event.data["arguments"] == {"expr": "6*7"} # Verify tool_result event data tool_result_event = next(e for e in events if e.event_type == "tool_result") assert tool_result_event.data["tool_name"] == "calculator" assert tool_result_event.data["result"] == {"value": 42} # Verify final_answer event data final_event = next(e for e in events if e.event_type == "final_answer") assert final_event.data["output"] == "The result is 42" assert final_event.data["total_steps"] == 2 assert final_event.data["total_tokens"] == 60 # ── Streaming equivalence ───────────────────────────────────────── class TestStreamingEquivalence: """execute() and execute_stream() produce equivalent results for same input. After U1 refactor, both delegate to the same _execute_loop, so equivalence is structural. Before refactor, this test characterizes the current drift (e.g., compress_tool_result called by execute but not execute_stream). """ async def test_execute_and_stream_same_output(self): from agentkit.core.react import ReActEngine tool = FakeTool(name="search", result={"results": ["data"]}) gateway_exec = make_mock_gateway( [ make_response(tool_calls=[_tc("search", {"q": "test"})]), make_response(content="Found data"), ] ) engine_exec = ReActEngine(llm_gateway=gateway_exec) result = await engine_exec.execute( messages=[{"role": "user", "content": "Search"}], tools=[tool], ) # execute_stream path with equivalent stream chunks tool2 = FakeTool(name="search", result={"results": ["data"]}) gateway_stream = make_mock_stream_gateway( [ [_stream_tool_call_chunk("search", {"q": "test"})], [_stream_content_chunk("Found data")], ] ) engine_stream = ReActEngine(llm_gateway=gateway_stream) events = [] async for event in engine_stream.execute_stream( messages=[{"role": "user", "content": "Search"}], tools=[tool2], ): events.append(event) final_answer_events = [e for e in events if e.event_type == "final_answer"] assert len(final_answer_events) == 1 stream_final = final_answer_events[0].data # Equivalence on the user-visible fields assert result.output == stream_final["output"] assert result.total_steps == stream_final["total_steps"] assert result.total_tokens == stream_final["total_tokens"] # ── Multi-step loop ───────────────────────────────────────── class TestGoldenMultiStep: """3 tool calls then final answer.""" async def test_execute_three_step_trajectory(self): from agentkit.core.react import ReActEngine search = FakeTool(name="search", result={"results": ["a"]}) calc = FakeTool(name="calculator", result={"value": 100}) gateway = make_mock_gateway( [ make_response(tool_calls=[_tc("search", {"query": "Python"})]), make_response(tool_calls=[_tc("calculator", {"expr": "10*10"})]), make_response(content="Based on search and calculation, the answer is 100"), ] ) engine = ReActEngine(llm_gateway=gateway) result = await engine.execute( messages=[{"role": "user", "content": "Search and calculate"}], tools=[search, calc], ) assert [_step_summary(s) for s in result.trajectory] == [ "tool_call@1:search", "tool_call@2:calculator", "final_answer@3:", ] assert result.total_steps == 3 assert search.call_count == 1 assert calc.call_count == 1 # ── Empty tools ───────────────────────────────────────── class TestGoldenEmptyTools: """No tools -> LLM returns text directly.""" async def test_execute_no_tools_direct_answer(self): from agentkit.core.react import ReActEngine gateway = make_mock_gateway([make_response(content="Direct answer")]) engine = ReActEngine(llm_gateway=gateway) result = await engine.execute( messages=[{"role": "user", "content": "Hello"}], tools=None, ) assert result.output == "Direct answer" assert result.total_steps == 1 assert result.status == "success" assert [_step_summary(s) for s in result.trajectory] == ["final_answer@1:"] # ── Max steps ───────────────────────────────────────── class TestGoldenMaxSteps: """Loop reaches max_steps -> status='partial'.""" async def test_execute_max_steps_partial_status(self): from agentkit.core.react import ReActEngine tool = FakeTool(name="search", result={"results": []}) # Each step uses a different query to avoid loop detection responses = [ make_response( content="Thinking...", tool_calls=[_tc("search", {"query": f"attempt_{i}"}, tc_id=f"tc_{i}")], ) for i in range(20) ] gateway = make_mock_gateway(responses) engine = ReActEngine(llm_gateway=gateway, max_steps=3) result = await engine.execute( messages=[{"role": "user", "content": "Keep searching"}], tools=[tool], ) assert result.total_steps == 3 assert result.status == "partial" # All 3 steps are tool_calls (no final_answer) assert all(s.action == "tool_call" for s in result.trajectory) # ── Tool failure ───────────────────────────────────────── class TestGoldenToolFailure: """Tool raises exception -> error in observation, loop continues.""" async def test_execute_tool_failure_continues(self): from agentkit.core.react import ReActEngine failing = FakeTool(name="broken", should_fail=True) gateway = make_mock_gateway( [ make_response(tool_calls=[_tc("broken", {})]), make_response(content="Recovered from tool failure"), ] ) engine = ReActEngine(llm_gateway=gateway) result = await engine.execute( messages=[{"role": "user", "content": "Use broken tool"}], tools=[failing], ) assert result.trajectory[0].action == "tool_call" assert "failed" in str(result.trajectory[0].result).lower() assert result.trajectory[1].action == "final_answer" assert result.output == "Recovered from tool failure" assert result.total_steps == 2 # ── LLM failure ───────────────────────────────────────── class TestGoldenLLMFailure: """LLM gateway raises exception -> propagate to caller.""" async def test_execute_llm_failure_propagates(self): from agentkit.core.react import ReActEngine gateway = MagicMock(spec=LLMGateway) gateway.chat = AsyncMock(side_effect=RuntimeError("LLM down")) engine = ReActEngine(llm_gateway=gateway) with pytest.raises(RuntimeError, match="LLM down"): await engine.execute(messages=[{"role": "user", "content": "Hi"}]) # ── Phase violation ───────────────────────────────────────── class TestGoldenPhaseViolation: """Tool blocked by phase policy -> phase_violation event in stream.""" async def test_stream_phase_violation_event(self): from agentkit.core.phase import default_policy from agentkit.core.react import ReActEngine async def _stream(**kwargs): yield _stream_tool_call_chunk("write_file", {"path": "/x"}) yield _stream_content_chunk("done") gateway = MagicMock(spec=LLMGateway) gateway.chat_stream = _stream engine = ReActEngine( llm_gateway=gateway, phase_policy=default_policy(), max_steps=2, ) # write_file is blocked in PLANNING; _find_tool won't be reached engine._find_tool = lambda name, tools: None events: list[ReActEvent] = [] async for event in engine.execute_stream( messages=[{"role": "user", "content": "test"}], tools=[], ): events.append(event) violation_events = [e for e in events if e.event_type == "phase_violation"] assert len(violation_events) >= 1 v = violation_events[0].data assert v["tool"] == "write_file" assert v["current_phase"] == "planning" assert v["error"] == "phase_violation" # ── Cancellation ───────────────────────────────────────── class TestGoldenCancellation: """CancellationToken cancelled -> TaskCancelledError.""" async def test_execute_cancelled_before_start(self): from agentkit.core.exceptions import TaskCancelledError from agentkit.core.protocol import CancellationToken from agentkit.core.react import ReActEngine gateway = make_mock_gateway([make_response(content="hi")]) engine = ReActEngine(llm_gateway=gateway) token = CancellationToken() token.cancel() with pytest.raises(TaskCancelledError): await engine.execute( messages=[{"role": "user", "content": "Hi"}], cancellation_token=token, ) # ── Compression triggered ───────────────────────────────────────── class TestGoldenCompression: """Long conversation triggers compressor.compress().""" async def test_execute_compression_triggered(self): from agentkit.core.compressor import CompressionStrategy from agentkit.core.react import ReActEngine compressor = MagicMock(spec=CompressionStrategy) # passthrough — return messages unchanged compressor.compress = AsyncMock(side_effect=lambda msgs: msgs) compressor.is_available = MagicMock(return_value=True) compressor.should_compress = MagicMock(return_value=True) gateway = make_mock_gateway( [ make_response(tool_calls=[_tc("search", {"q": "test"})]), make_response(content="Done"), ] ) engine = ReActEngine(llm_gateway=gateway) mock_tool = MagicMock() mock_tool.name = "search" mock_tool.safe_execute = AsyncMock(return_value="result") long_content = "x" * 40000 await engine.execute( messages=[{"role": "user", "content": long_content}], tools=[mock_tool], compressor=compressor, ) # compress should be called (initial + incremental) assert compressor.compress.call_count >= 1 async def test_execute_tool_result_compressed(self): """execute() path calls compress_tool_result via _build_tool_result_message. This is the behavior the U1 refactor must preserve (and which execute_stream currently lacks — see test_execute_stream_with_compressor in test_react_compression.py). """ from agentkit.core.compressor import CompressionStrategy from agentkit.core.react import ReActEngine compressor = MagicMock(spec=CompressionStrategy) compressor.compress = AsyncMock(side_effect=lambda msgs: msgs) compressor.compress_tool_result = AsyncMock(return_value="COMPRESSED") compressor.is_available = MagicMock(return_value=True) compressor.should_compress = MagicMock(return_value=False) gateway = make_mock_gateway( [ make_response(tool_calls=[_tc("search", {"q": "test"})]), make_response(content="Done"), ] ) engine = ReActEngine(llm_gateway=gateway) mock_tool = MagicMock() mock_tool.name = "search" mock_tool.safe_execute = AsyncMock(return_value="original result") await engine.execute( messages=[{"role": "user", "content": "Search"}], tools=[mock_tool], compressor=compressor, ) # execute() path MUST call compress_tool_result — this is the # behavior that test_execute_stream_with_compressor expects # execute_stream to also have after U1 unification. compressor.compress_tool_result.assert_called_once_with("search", "original result") # ── Golden trajectory snapshot (full event sequence) ──────────────────── class TestGoldenTrajectorySnapshot: """Full event sequence snapshot for execute_stream. Locks the EXACT event type sequence for a fixed 2-step flow. Any change indicates a behavior change. """ async def test_stream_two_step_event_sequence(self): from agentkit.core.react import ReActEngine tool = FakeTool(name="search", result={"results": ["data"]}) gateway = make_mock_stream_gateway( [ [_stream_tool_call_chunk("search", {"q": "test"})], [_stream_content_chunk("Final answer")], ] ) engine = ReActEngine(llm_gateway=gateway) events: list[ReActEvent] = [] async for event in engine.execute_stream( messages=[{"role": "user", "content": "Search"}], tools=[tool], ): events.append(event) event_types = [e.event_type for e in events] # Snapshot (pre-refactor): thinking, tool_call, tool_result, thinking, final_answer # Post-refactor may append 'final_result' at the end (not asserted here). # Verify the relative ordering of key events is preserved. assert event_types[0] == "thinking" assert "tool_call" in event_types assert "tool_result" in event_types assert event_types.index("tool_result") > event_types.index("tool_call") assert "final_answer" in event_types assert event_types.index("final_answer") > event_types.index("tool_result") # Verify step numbers: tool events on step 1, final on step 2 tool_call_event = next(e for e in events if e.event_type == "tool_call") assert tool_call_event.step == 1 final_event = next(e for e in events if e.event_type == "final_answer") assert final_event.step == 2 async def test_execute_returns_react_result(self): """execute() returns a ReActResult (not events). Locks the type contract.""" from agentkit.core.react import ReActEngine gateway = make_mock_gateway([make_response(content="Done")]) engine = ReActEngine(llm_gateway=gateway) result = await engine.execute(messages=[{"role": "user", "content": "Hi"}]) assert isinstance(result, ReActResult) assert result.output == "Done" assert result.status == "success" assert isinstance(result.trajectory, list) assert all(isinstance(s, ReActStep) for s in result.trajectory)