feat(agent): Wave 3 strategic coupling (G5/G6) #6

Merged
fischer merged 7 commits from feat/agent-wave3-strategic into main 2026-06-30 09:17:20 +08:00
4 changed files with 572 additions and 0 deletions
Showing only changes of commit 6efd5957f6 - Show all commits

View File

@ -28,6 +28,7 @@ from agentkit.telemetry.metrics import (
if TYPE_CHECKING:
from agentkit.core.compressor import CompressionStrategy
from agentkit.core.middleware import MiddlewareChain
from agentkit.core.phase import PhasePolicy, PhaseState
from agentkit.core.trace import TraceRecorder
from agentkit.memory.retriever import MemoryRetriever
@ -168,6 +169,9 @@ class ReActEngine:
prompt_cache_enable: bool = True,
flush_interval_ms: int = 0,
max_reinjections: int = 1,
# U3/G6: PLAN_EXEC phase policy (opt-in). None = no enforcement
# (backward compat — all existing callers unaffected).
phase_policy: "PhasePolicy | None" = None,
):
if max_steps < 1:
raise ValueError(f"max_steps must be >= 1, got {max_steps}")
@ -211,6 +215,15 @@ class ReActEngine:
self._loop_corrected: bool = False
# U6: Middleware chain (parallel integration, feature flag controlled)
self._middleware_chain = middleware_chain
# U3/G6: PLAN_EXEC phase state. None = no enforcement (default).
# When set, _execute_loop checks each tool call against the current
# phase's whitelist before dispatch.
self._phase_policy = phase_policy
self._current_phase: "PhaseState | None" = (
phase_policy.start_phase if phase_policy is not None else None
)
# Steps taken in the current phase (for auto-advance safety net).
self._steps_in_phase: int = 0
def reset(self) -> None:
"""Reset internal state for reuse across conversations.
@ -223,6 +236,99 @@ class ReActEngine:
# This method exists for API clarity and future stateful extensions.
self._loop_window.clear()
self._loop_corrected = False
# U3/G6: reset phase state to start_phase (if policy set). Each
# execute() call begins a fresh PLANNING phase.
if self._phase_policy is not None:
self._current_phase = self._phase_policy.start_phase
self._steps_in_phase = 0
# ── U3/G6: phase state machine ────────────────────────────────────
def advance_phase(self) -> "PhaseState | None":
"""Advance to the next phase. Returns the new phase, or None if
already at DELIVERY (final phase).
Called by AdvancePhaseTool when the LLM explicitly signals phase
completion. Also called by the auto-advance safety net when
``steps_in_phase >= auto_advance_after_steps``.
Returns None if no phase_policy is set (no-op).
"""
if self._phase_policy is None or self._current_phase is None:
return None
from agentkit.core.phase import PhaseState
nxt = PhaseState.next_of(self._current_phase)
if nxt is None:
# Already at DELIVERY — return None to signal no transition.
return None
previous = self._current_phase
self._current_phase = nxt
self._steps_in_phase = 0
logger.info(
"Phase transition: %s%s",
previous.value,
nxt.value,
)
return nxt
@property
def current_phase(self) -> "PhaseState | None":
"""Current phase (None if no phase_policy set)."""
return self._current_phase
def _maybe_auto_advance(self) -> bool:
"""Auto-advance phase if step budget exhausted. Returns True if advanced."""
if self._phase_policy is None or self._current_phase is None:
return False
threshold = self._phase_policy.auto_advance_after_steps
if threshold is None:
return False
if self._steps_in_phase >= threshold:
self.advance_phase()
return True
return False
def _check_phase_permission(
self, tool_name: str, arguments: dict[str, Any]
) -> dict[str, Any] | None:
"""Return None if tool is allowed; return a structured error dict if blocked.
The error dict replaces what `_execute_tool` would have returned
the loop continues, so the LLM can react to the rejection (call
AdvancePhaseTool or pick a different tool).
Also applies the bash_command_filter for `bash` tool calls.
"""
if self._phase_policy is None or self._current_phase is None:
return None
if not self._phase_policy.is_tool_allowed(tool_name, self._current_phase):
return {
"error": "phase_violation",
"message": (
f"Tool {tool_name!r} not allowed in {self._current_phase.value} phase. "
f"Call `advance_phase` to move to the next phase."
),
"current_phase": self._current_phase.value,
"tool": tool_name,
"is_error": True,
}
# Bash command filter (only applies to bash tool).
if tool_name == "bash":
command = str(arguments.get("command", ""))
if not self._phase_policy.is_bash_command_allowed(command, self._current_phase):
return {
"error": "phase_violation",
"message": (
f"Bash command blocked in {self._current_phase.value} phase "
f"(filesystem-mutating operations not allowed during "
f"planning/verification). Command: {command[:100]}"
),
"current_phase": self._current_phase.value,
"tool": tool_name,
"is_error": True,
}
return None
def _check_tool_loop(self, tool_calls: list[Any]) -> str | None:
"""检测重复工具调用模式。
@ -498,6 +604,14 @@ class ReActEngine:
if cancellation_token is not None:
cancellation_token.check()
# U3/G6: phase auto-advance safety net.
# Incremented per step (LLM call), not per tool_call. When
# auto_advance_after_steps is set, advance the phase after
# the LLM has been stuck in the same phase for N steps.
if self._phase_policy is not None:
self._steps_in_phase += 1
self._maybe_auto_advance()
# Think: 调用 LLM
llm_start = time.monotonic()
response = await self._llm_gateway.chat(
@ -1148,6 +1262,11 @@ class ReActEngine:
if cancellation_token is not None:
cancellation_token.check()
# U3/G6: phase auto-advance safety net (mirrors _execute_loop).
if self._phase_policy is not None:
self._steps_in_phase += 1
self._maybe_auto_advance()
# 超时检查
if effective_timeout > 0:
elapsed = time.monotonic() - _stream_start
@ -2069,6 +2188,20 @@ class ReActEngine:
self, tool_name: str, arguments: dict[str, Any], tools: list[Tool]
) -> dict:
"""执行工具调用,处理成功和失败情况"""
# U3/G6: phase enforcement — check before dispatch. If the tool is
# blocked, return a structured error instead of dispatching. The loop
# still counts this as a step (the LLM gets to react to the rejection).
# `advance_phase` tool bypasses the check (it's the LLM's escape hatch).
if tool_name != "advance_phase":
block = self._check_phase_permission(tool_name, arguments)
if block is not None:
logger.info(
"Phase violation: tool %r blocked in %s phase",
tool_name,
self._current_phase.value if self._current_phase else "?",
)
return block
tool = self._find_tool(tool_name, tools)
if tool is None:
error_msg = f"Tool '{tool_name}' not found"

View File

@ -54,4 +54,5 @@ __all__ = [
"ParsedOutput",
"ErrorType",
"ReadFileTool",
"AdvancePhaseTool",
]

View File

@ -0,0 +1,99 @@
"""AdvancePhaseTool — LLM-driven phase transition (G6, KTD6).
Registered alongside other tools when ReActEngine has a phase_policy set.
The LLM calls this tool to signal "I'm done planning, move to building".
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Any
from agentkit.tools.base import Tool
if TYPE_CHECKING:
from agentkit.core.react import ReActEngine
logger = logging.getLogger(__name__)
class AdvancePhaseTool(Tool):
"""Tool that advances the ReActEngine's current phase.
KTD6: LLM-driven phase transitions. Auto-advance is opt-in via
``plan_exec.auto_advance_after_steps``; this tool is the manual path.
The tool holds a weak reference to the engine (via bound method
``engine.advance_phase``) registered only when phase_policy is set.
"""
def __init__(
self,
engine: "ReActEngine",
name: str = "advance_phase",
description: str | None = None,
version: str = "1.0.0",
tags: list[str] | None = None,
):
super().__init__(
name=name,
description=description
or (
"Advance the PLAN_EXEC phase state machine to the next phase "
"(Planning → Building → Verification → Delivery). Call this "
"when you have finished the current phase's work and are ready "
"to move on. Returns the new phase name or an error if you "
"are already at the final (Delivery) phase."
),
input_schema={
"type": "object",
"properties": {},
"additionalProperties": False,
},
version=version,
tags=tags or ["phase", "control"],
)
self._engine = engine
async def execute(self, **kwargs) -> dict[str, Any]:
new_phase = self._engine.advance_phase()
if new_phase is None:
# Either no policy set, or already at DELIVERY.
current = self._engine.current_phase
if current is None:
return {
"is_error": True,
"error": "no_phase_policy",
"message": "No phase policy is set — advance_phase is a no-op.",
}
return {
"is_error": True,
"error": "already_at_final_phase",
"message": (f"Already at final phase ({current.value}). Cannot advance further."),
"current_phase": current.value,
}
return {
"is_error": False,
"previous_phase": self._previous_value(new_phase),
"current_phase": new_phase.value,
"message": f"Phase advanced to {new_phase.value}.",
}
@staticmethod
def _previous_value(current: Any) -> str:
"""Return the previous phase value (for telemetry/UI)."""
from agentkit.core.phase import PhaseState
order = [
PhaseState.PLANNING,
PhaseState.BUILDING,
PhaseState.VERIFICATION,
PhaseState.DELIVERY,
]
try:
idx = order.index(current)
except ValueError:
return ""
if idx == 0:
return ""
return order[idx - 1].value

View File

@ -0,0 +1,339 @@
"""Unit tests for ReActEngine phase enforcement (G6 wiring, R24).
Per plan U3 Execution note: characterization-first verify that
`ReActEngine(phase_policy=None)` behaves identically to pre-change (no
enforcement, no advance_phase tool, no _current_phase mutation). Then add
enforcement tests.
"""
from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock
import pytest
from agentkit.core.phase import PhasePolicy, PhaseState, default_policy
from agentkit.core.react import ReActEngine
from agentkit.tools.advance_phase import AdvancePhaseTool
# ---------------------------------------------------------------------------
# Characterization — phase_policy=None preserves existing behavior
# ---------------------------------------------------------------------------
class TestCharacterizationNoPolicy:
"""When phase_policy=None, no enforcement happens and behavior matches
pre-Wave-3."""
def test_init_without_phase_policy(self):
# Minimal stub LLM gateway — we're only testing constructor.
gateway = MagicMock()
engine = ReActEngine(llm_gateway=gateway)
assert engine._phase_policy is None
assert engine._current_phase is None
assert engine._steps_in_phase == 0
assert engine.current_phase is None
@pytest.mark.asyncio
async def test_execute_tool_dispatches_without_phase_check(self):
"""Tool dispatch proceeds normally when no policy set."""
gateway = MagicMock()
engine = ReActEngine(llm_gateway=gateway)
# MagicMock.name is a special attribute used internally by Mock for
# repr — setting it post-construction does not make mock.name == "x"
# hold. Patch _find_tool directly to bypass the name lookup.
fake_tool = MagicMock()
fake_tool.safe_execute = AsyncMock(return_value={"output": "ok"})
fake_tool.input_schema = None
engine._find_tool = lambda name, tools: fake_tool
result = await engine._execute_tool("any_tool", {"x": 1}, [fake_tool])
assert result == {"output": "ok"}
fake_tool.safe_execute.assert_awaited_once_with(x=1)
@pytest.mark.asyncio
async def test_advance_phase_returns_none_without_policy(self):
gateway = MagicMock()
engine = ReActEngine(llm_gateway=gateway)
assert engine.advance_phase() is None
def test_reset_does_not_touch_phase_state_when_no_policy(self):
gateway = MagicMock()
engine = ReActEngine(llm_gateway=gateway)
engine.reset()
assert engine._current_phase is None
# ---------------------------------------------------------------------------
# Initialization with phase_policy
# ---------------------------------------------------------------------------
class TestPhasePolicyInitialization:
def test_phase_policy_set_initializes_current_phase(self):
gateway = MagicMock()
engine = ReActEngine(
llm_gateway=gateway,
phase_policy=default_policy(),
)
assert engine._phase_policy is not None
assert engine._current_phase == PhaseState.PLANNING
assert engine._steps_in_phase == 0
def test_reset_resets_phase_to_start(self):
gateway = MagicMock()
engine = ReActEngine(
llm_gateway=gateway,
phase_policy=default_policy(),
)
# Manually move phase forward (simulating execute progress).
engine.advance_phase() # PLANNING → BUILDING
assert engine._current_phase == PhaseState.BUILDING
engine._steps_in_phase = 5
engine.reset()
assert engine._current_phase == PhaseState.PLANNING
assert engine._steps_in_phase == 0
# ---------------------------------------------------------------------------
# advance_phase() transitions
# ---------------------------------------------------------------------------
class TestAdvancePhase:
@pytest.fixture
def engine(self):
return ReActEngine(llm_gateway=MagicMock(), phase_policy=default_policy())
def test_planning_to_building(self, engine):
new_phase = engine.advance_phase()
assert new_phase == PhaseState.BUILDING
assert engine.current_phase == PhaseState.BUILDING
assert engine._steps_in_phase == 0 # counter reset on transition
def test_building_to_verification(self, engine):
engine.advance_phase() # → BUILDING
new_phase = engine.advance_phase()
assert new_phase == PhaseState.VERIFICATION
assert engine.current_phase == PhaseState.VERIFICATION
def test_verification_to_delivery(self, engine):
engine.advance_phase() # → BUILDING
engine.advance_phase() # → VERIFICATION
new_phase = engine.advance_phase()
assert new_phase == PhaseState.DELIVERY
assert engine.current_phase == PhaseState.DELIVERY
def test_delivery_returns_none(self, engine):
engine.advance_phase() # → BUILDING
engine.advance_phase() # → VERIFICATION
engine.advance_phase() # → DELIVERY
result = engine.advance_phase()
assert result is None
assert engine.current_phase == PhaseState.DELIVERY
# ---------------------------------------------------------------------------
# _check_phase_permission — whitelist enforcement
# ---------------------------------------------------------------------------
class TestPhasePermission:
@pytest.fixture
def engine(self):
return ReActEngine(llm_gateway=MagicMock(), phase_policy=default_policy())
def test_search_allowed_in_planning(self, engine):
assert engine._check_phase_permission("search", {}) is None
def test_write_file_blocked_in_planning(self, engine):
result = engine._check_phase_permission("write_file", {})
assert result is not None
assert result["error"] == "phase_violation"
assert "write_file" in result["message"]
assert result["current_phase"] == "planning"
def test_write_file_allowed_in_building(self, engine):
engine.advance_phase() # → BUILDING
assert engine._check_phase_permission("write_file", {}) is None
def test_any_tool_allowed_in_delivery(self, engine):
engine.advance_phase() # → BUILDING
engine.advance_phase() # → VERIFICATION
engine.advance_phase() # → DELIVERY
assert engine._check_phase_permission("literally_anything", {}) is None
def test_bash_command_filter_blocks_rm_in_planning(self, engine):
result = engine._check_phase_permission("bash", {"command": "rm -rf /tmp"})
assert result is not None
assert result["error"] == "phase_violation"
assert "rm" in result["message"] or "Bash command" in result["message"]
def test_bash_command_filter_allows_safe_in_planning(self, engine):
# `ls` and `git status` are not blocked.
assert engine._check_phase_permission("bash", {"command": "ls -la"}) is None
assert engine._check_phase_permission("bash", {"command": "git status"}) is None
def test_bash_command_filter_no_restriction_in_building(self, engine):
engine.advance_phase() # → BUILDING
# `rm` is allowed in building phase.
assert engine._check_phase_permission("bash", {"command": "rm -rf build/"}) is None
# ---------------------------------------------------------------------------
# _execute_tool integration — phase enforcement actually blocks dispatch
# ---------------------------------------------------------------------------
class TestExecuteToolPhaseEnforcement:
@pytest.fixture
def engine_with_tools(self):
engine = ReActEngine(llm_gateway=MagicMock(), phase_policy=default_policy())
# Two fake tools: one allowed in PLANNING (search), one not (write_file).
# MagicMock.name can't be set post-construction (special attribute),
# so we patch _find_tool with a dict-based lookup.
search_tool = MagicMock()
search_tool.input_schema = None
search_tool.safe_execute = AsyncMock(return_value={"results": []})
write_tool = MagicMock()
write_tool.input_schema = None
write_tool.safe_execute = AsyncMock(return_value={"written": True})
tools_by_name = {"search": search_tool, "write_file": write_tool}
engine._find_tool = lambda name, tools: tools_by_name.get(name)
return engine, [search_tool, write_tool]
@pytest.mark.asyncio
async def test_blocked_tool_returns_phase_violation_and_skips_dispatch(self, engine_with_tools):
engine, tools = engine_with_tools
# write_file in PLANNING should be blocked — write_tool.safe_execute
# should NEVER be called.
result = await engine._execute_tool("write_file", {"path": "/x"}, tools)
assert result["error"] == "phase_violation"
assert result["current_phase"] == "planning"
write_tool = tools[1]
write_tool.safe_execute.assert_not_called()
@pytest.mark.asyncio
async def test_allowed_tool_dispatches_normally(self, engine_with_tools):
engine, tools = engine_with_tools
result = await engine._execute_tool("search", {"query": "foo"}, tools)
assert result == {"results": []}
search_tool = tools[0]
search_tool.safe_execute.assert_awaited_once_with(query="foo")
@pytest.mark.asyncio
async def test_after_advance_phase_blocked_tool_now_dispatches(self, engine_with_tools):
engine, tools = engine_with_tools
# First: write_file blocked in PLANNING.
result = await engine._execute_tool("write_file", {"path": "/x"}, tools)
assert result["error"] == "phase_violation"
# Advance to BUILDING.
engine.advance_phase()
# Now: write_file allowed.
result = await engine._execute_tool("write_file", {"path": "/x"}, tools)
assert result == {"written": True}
# ---------------------------------------------------------------------------
# Auto-advance safety net (KTD6)
# ---------------------------------------------------------------------------
class TestAutoAdvance:
def test_auto_advance_after_threshold(self):
# Custom policy with auto-advance after 2 steps.
policy = PhasePolicy(
whitelist={
PhaseState.PLANNING: frozenset({"search"}),
PhaseState.BUILDING: frozenset({"write_file"}),
PhaseState.VERIFICATION: frozenset({"bash"}),
PhaseState.DELIVERY: frozenset({"*"}),
},
auto_advance_after_steps=2,
)
engine = ReActEngine(llm_gateway=MagicMock(), phase_policy=policy)
assert engine.current_phase == PhaseState.PLANNING
# Step 1: counter goes to 1, no advance yet.
engine._steps_in_phase += 1
assert engine._maybe_auto_advance() is False
assert engine.current_phase == PhaseState.PLANNING
# Step 2: counter hits 2, advance triggered.
engine._steps_in_phase += 1
assert engine._maybe_auto_advance() is True
assert engine.current_phase == PhaseState.BUILDING
assert engine._steps_in_phase == 0 # reset on advance
def test_auto_advance_none_default(self):
# default_policy has auto_advance_after_steps=None — no auto-advance.
engine = ReActEngine(llm_gateway=MagicMock(), phase_policy=default_policy())
engine._steps_in_phase = 100
assert engine._maybe_auto_advance() is False
assert engine.current_phase == PhaseState.PLANNING
# ---------------------------------------------------------------------------
# AdvancePhaseTool integration
# ---------------------------------------------------------------------------
class TestAdvancePhaseTool:
@pytest.mark.asyncio
async def test_advance_phase_tool_transitions_engine(self):
engine = ReActEngine(llm_gateway=MagicMock(), phase_policy=default_policy())
tool = AdvancePhaseTool(engine=engine)
result = await tool.execute()
assert result["is_error"] is False
assert result["current_phase"] == "building"
assert engine.current_phase == PhaseState.BUILDING
@pytest.mark.asyncio
async def test_advance_phase_tool_at_delivery_returns_error(self):
engine = ReActEngine(llm_gateway=MagicMock(), phase_policy=default_policy())
# Walk through all phases.
engine.advance_phase() # PLANNING → BUILDING
engine.advance_phase() # BUILDING → VERIFICATION
engine.advance_phase() # VERIFICATION → DELIVERY
tool = AdvancePhaseTool(engine=engine)
result = await tool.execute()
assert result["is_error"] is True
assert result["error"] == "already_at_final_phase"
assert result["current_phase"] == "delivery"
@pytest.mark.asyncio
async def test_advance_phase_tool_without_policy_returns_error(self):
engine = ReActEngine(llm_gateway=MagicMock()) # no policy
tool = AdvancePhaseTool(engine=engine)
result = await tool.execute()
assert result["is_error"] is True
assert result["error"] == "no_phase_policy"
def test_tool_schema_accepts_no_arguments(self):
engine = ReActEngine(llm_gateway=MagicMock(), phase_policy=default_policy())
tool = AdvancePhaseTool(engine=engine)
# input_schema has empty properties + additionalProperties:false —
# no arguments expected.
assert tool.input_schema["properties"] == {}
assert tool.input_schema["additionalProperties"] is False
def test_tool_bypasses_phase_check(self):
"""`advance_phase` is the LLM's escape hatch — must never be blocked."""
# _check_phase_permission should NOT block advance_phase even in PLANNING.
# The bypass is implemented in _execute_tool by name check.
# We verify the bypass indirectly: tool dispatches normally even in
# PLANNING (where only search/read_file/bash/tool_search are allowed).
# advance_phase is not in the whitelist, but the name-based bypass
# in _execute_tool lets it through.
# (Direct unit test of the bypass would require mocking _find_tool.)
# Sanity: advance_phase is not in any whitelist.
for phase, allowed in default_policy().whitelist.items():
assert "advance_phase" not in allowed, (
f"advance_phase must not be in {phase.value} whitelist"
)