From abf758fa9ca268e952b4e7096fddb034da506a5d Mon Sep 17 00:00:00 2001 From: chiguyong Date: Mon, 29 Jun 2026 23:58:56 +0800 Subject: [PATCH] feat(U2): G6 PhaseState + PhasePolicy + ServerConfig.plan_exec MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - PhaseState enum (PLANNING/BUILDING/VERIFICATION/DELIVERY) with next_of/from_string - PhasePolicy dataclass with whitelist + bash_command_filter + auto_advance_after_steps - default_policy() factory — KTD5 whitelist matching R24 (Planning: search/read_file; Building: write_file; Delivery: wildcard) - bash_command_filter blocks rm/mv/cp/>/>> in PLANNING/VERIFICATION phases - policy_from_config() parses plan_exec YAML section (R26) with override merge - ServerConfig.plan_exec field + from_dict parsing (extends Wave 1/2 pattern) - agentkit.yaml gains commented plan_exec section (opt-in) - 37 unit tests covering PhaseState, default_policy, is_tool_allowed, bash filter, config parsing, and ServerConfig integration --- agentkit.yaml | 13 ++ src/agentkit/core/phase.py | 218 ++++++++++++++++++++ src/agentkit/server/config.py | 10 + tests/unit/test_phase_policy.py | 348 ++++++++++++++++++++++++++++++++ 4 files changed, 589 insertions(+) create mode 100644 src/agentkit/core/phase.py create mode 100644 tests/unit/test_phase_policy.py diff --git a/agentkit.yaml b/agentkit.yaml index 9588b70..e843128 100644 --- a/agentkit.yaml +++ b/agentkit.yaml @@ -51,6 +51,19 @@ fallback_chain: max_retries: 1 # ReflexionEngine max_reflections override emergency: enabled: true +# G6/U2: PLAN_EXEC phase policy — SOLO four-stage state machine. +# When `enabled: true`, chat WebSocket PLAN_EXEC requests build a PhasePolicy +# (Planning → Building → Verification → Delivery) and enforce per-step tool +# whitelists (R24). Transitions are LLM-driven via AdvancePhaseTool; set +# `auto_advance_after_steps` to auto-advance as a safety net (KTD6). +# Commented to preserve default behavior — uncomment to enable. +# plan_exec: +# enabled: true +# auto_advance_after_steps: 5 # optional, default = manual (LLM calls advance_phase) +# start_phase: planning # optional, default = planning +# whitelist_override: # optional, merges with default (override wins) +# planning: [search, read_file, bash] +# building: [write_file, bash, read_file] session: {backend: memory} bus: {backend: memory} task_store: {backend: memory} diff --git a/src/agentkit/core/phase.py b/src/agentkit/core/phase.py new file mode 100644 index 0000000..e119075 --- /dev/null +++ b/src/agentkit/core/phase.py @@ -0,0 +1,218 @@ +"""Phase state machine for PLAN_EXEC mode (G6, R24/R25). + +Four sequential phases enforce per-step tool whitelists: + PLANNING → BUILDING → VERIFICATION → DELIVERY + +KTD3 (Wave 3 plan): state machine lives in ReActEngine, not skill config. +KTD5: default whitelist matches brainstorm R24 (Planning: think/search; + Building: write_file; etc.). +KTD6: transitions are LLM-driven via AdvancePhaseTool; auto-advance is opt-in. +""" + +from __future__ import annotations + +import enum +import logging +import re +from dataclasses import dataclass, field +from typing import Any + +logger = logging.getLogger(__name__) + + +class PhaseState(enum.Enum): + """Phases of the SOLO state machine (extends ExecutionMode.PLAN_EXEC).""" + + PLANNING = "planning" + BUILDING = "building" + VERIFICATION = "verification" + DELIVERY = "delivery" + + @classmethod + def next_of(cls, current: "PhaseState") -> "PhaseState | None": + """Return the phase after `current`, or None if `current` is the last.""" + order = [cls.PLANNING, cls.BUILDING, cls.VERIFICATION, cls.DELIVERY] + try: + idx = order.index(current) + except ValueError: + return None + if idx + 1 >= len(order): + return None + return order[idx + 1] + + @classmethod + def from_string(cls, value: str) -> "PhaseState": + """Parse from string (case-insensitive). Raises ValueError on unknown.""" + try: + return cls(value.lower()) + except ValueError as e: + valid = ", ".join(p.value for p in cls) + raise ValueError(f"Invalid phase name {value!r}. Valid: {valid}") from e + + +# Wildcard token meaning "all tools allowed in this phase". +WILDCARD = "*" + +# Default bash command filter for PLANNING and VERIFICATION phases — blocks +# commands that mutate the filesystem or execute arbitrary code. +# ponytail: regex is intentionally conservative; misses some shell idioms +# (e.g., `:>file`, `dd of=file`). Ceiling: a real shell parser would catch +# more. Upgrade path = reuse ShellTool._is_dangerous() at enforcement time. +# Note: `\b` is a word boundary — works for word commands (rm/mv) but NOT +# for `>`/`>>` operators (not word chars). Use a non-boundary alternation +# that matches `>` either as a standalone operator or after whitespace. +_DEFAULT_BASH_FILTER = re.compile(r"\b(rm|mv|cp|mkdir|rmdir|chmod|chown)\b|(?|>>") + + +@dataclass(slots=True) +class PhasePolicy: + """Per-phase tool whitelist + bash command filter for PLAN_EXEC mode. + + The policy is enforced by ReActEngine._execute_loop before each tool + dispatch. A tool not in the current phase's whitelist is rejected with + a structured error returned to the LLM (the loop continues — the LLM + gets to react to the rejection and either switch tools or call + AdvancePhaseTool). + + Wildcard ``"*"`` in a phase's whitelist means "all tools allowed" + (used by DELIVERY by default). + """ + + whitelist: dict[PhaseState, frozenset[str]] + bash_command_filter: dict[PhaseState, re.Pattern | None] = field(default_factory=dict) + auto_advance_after_steps: int | None = None # None = manual (LLM calls advance_phase) + start_phase: PhaseState = PhaseState.PLANNING + + def __post_init__(self) -> None: + # Fail-fast: empty whitelist for a non-wildcard phase = bug. + for phase, tools in self.whitelist.items(): + if not tools: + raise ValueError( + f"Phase {phase.value!r} has an empty whitelist — set ['*'] for " + f"'all tools allowed' or list specific tool names." + ) + + def is_tool_allowed(self, tool_name: str, phase: PhaseState) -> bool: + """Return True if `tool_name` is allowed in `phase`.""" + allowed = self.whitelist.get(phase, frozenset()) + if WILDCARD in allowed: + return True + return tool_name in allowed + + def is_bash_command_allowed(self, command: str, phase: PhaseState) -> bool: + """Return True if `command` passes the bash filter for `phase`. + + A None filter = no restriction. An empty command is allowed (ShellTool + separately rejects empty commands). + """ + pattern = self.bash_command_filter.get(phase) + if pattern is None: + return True + return not pattern.search(command) + + def to_dict(self) -> dict[str, Any]: + """Serialize for logging/telemetry. Not round-trippable (regex → str).""" + return { + "whitelist": {phase.value: sorted(tools) for phase, tools in self.whitelist.items()}, + "bash_command_filter": { + phase.value: (p.pattern if p else None) + for phase, p in self.bash_command_filter.items() + }, + "auto_advance_after_steps": self.auto_advance_after_steps, + "start_phase": self.start_phase.value, + } + + +def default_policy() -> PhasePolicy: + """Return the KTD5 default PhasePolicy. + + Whitelist (R24): + - PLANNING: search, tool_search, read_file, bash (read-only) + - BUILDING: write_file, bash (full), read_file, search + - VERIFICATION: bash (test commands), read_file, search + - DELIVERY: all tools (wildcard) + + Bash filter: + - PLANNING/VERIFICATION: blocks filesystem-mutating commands + (rm/mv/cp/mkdir/chmod/chown/>/>>) + - BUILDING/DELIVERY: no filter (full bash) + """ + return PhasePolicy( + whitelist={ + PhaseState.PLANNING: frozenset({"search", "tool_search", "read_file", "bash"}), + PhaseState.BUILDING: frozenset( + {"write_file", "bash", "read_file", "search", "tool_search"} + ), + PhaseState.VERIFICATION: frozenset({"bash", "read_file", "search"}), + PhaseState.DELIVERY: frozenset({WILDCARD}), + }, + bash_command_filter={ + PhaseState.PLANNING: _DEFAULT_BASH_FILTER, + PhaseState.VERIFICATION: _DEFAULT_BASH_FILTER, + PhaseState.BUILDING: None, + PhaseState.DELIVERY: None, + }, + auto_advance_after_steps=None, # manual by default + start_phase=PhaseState.PLANNING, + ) + + +def policy_from_config(config: dict[str, Any]) -> PhasePolicy | None: + """Build a PhasePolicy from the `plan_exec` config section. + + Returns None if `config` is empty or `enabled` is False (opt-out). + + Config shape: + plan_exec: + enabled: true # default true if section present + auto_advance_after_steps: 5 # optional + start_phase: planning # optional, default planning + whitelist_override: # optional, merges with default + planning: [search, read_file] + building: [write_file, bash] + """ + if not config: + return None + if config.get("enabled", True) is False: + return None + + policy = default_policy() + + # Start phase + start_phase_str = config.get("start_phase") + if start_phase_str: + policy = PhasePolicy( + whitelist=policy.whitelist, + bash_command_filter=policy.bash_command_filter, + auto_advance_after_steps=config.get("auto_advance_after_steps"), + start_phase=PhaseState.from_string(start_phase_str), + ) + + # Auto-advance override + if "auto_advance_after_steps" in config: + policy = PhasePolicy( + whitelist=policy.whitelist, + bash_command_filter=policy.bash_command_filter, + auto_advance_after_steps=config["auto_advance_after_steps"], + start_phase=policy.start_phase, + ) + + # Whitelist override — merge with default (override wins on conflict) + override = config.get("whitelist_override") or {} + if override: + new_whitelist = dict(policy.whitelist) + for phase_name, tools in override.items(): + phase = PhaseState.from_string(phase_name) + if not isinstance(tools, list): + raise ValueError( + f"whitelist_override[{phase_name!r}] must be a list, got {type(tools).__name__}" + ) + new_whitelist[phase] = frozenset(str(t) for t in tools) + policy = PhasePolicy( + whitelist=new_whitelist, + bash_command_filter=policy.bash_command_filter, + auto_advance_after_steps=policy.auto_advance_after_steps, + start_phase=policy.start_phase, + ) + + return policy diff --git a/src/agentkit/server/config.py b/src/agentkit/server/config.py index fcfe979..da2ee38 100644 --- a/src/agentkit/server/config.py +++ b/src/agentkit/server/config.py @@ -121,6 +121,9 @@ class ServerConfig: verification: dict[str, Any] | None = None, rollback: dict[str, Any] | None = None, fallback_chain: dict[str, Any] | None = None, + # G6/U2: PLAN_EXEC phase policy config (opt-in — None = disabled). + # Parsed via PhasePolicy.policy_from_config() at chat.py wiring time. + plan_exec: dict[str, Any] | None = None, on_change: Callable[["ServerConfig"], None] | None = None, ): self.host = host @@ -161,6 +164,10 @@ class ServerConfig: # G7/U3: fallback_chain.{recovery,emergency}.{enabled,max_retries} # controls three-tier chain at chat.py REST send_message (KTD5). self.fallback_chain = fallback_chain or {} + # G6/U2: plan_exec phase policy config (opt-in — empty dict = disabled). + # Resolved to PhasePolicy via agentkit.core.phase.policy_from_config() + # at chat.py WebSocket wiring time (U4). + self.plan_exec = plan_exec or {} self.on_change = on_change # Config watching state @@ -252,6 +259,8 @@ class ServerConfig: rollback_data = data.get("rollback", {}) # G7/U3: fallback_chain 配置 (从 YAML 读取) fallback_chain_data = data.get("fallback_chain", {}) + # G6/U2: plan_exec phase policy 配置 (从 YAML 读取, opt-in) + plan_exec_data = data.get("plan_exec", {}) return cls( host=server.get("host", "0.0.0.0"), @@ -285,6 +294,7 @@ class ServerConfig: verification=verification_data, rollback=rollback_data, fallback_chain=fallback_chain_data, + plan_exec=plan_exec_data, ) @staticmethod diff --git a/tests/unit/test_phase_policy.py b/tests/unit/test_phase_policy.py new file mode 100644 index 0000000..0e3275a --- /dev/null +++ b/tests/unit/test_phase_policy.py @@ -0,0 +1,348 @@ +"""Unit tests for PhasePolicy + PhaseState (G6 core, R24/R25/R26). + +Covers: +- PhaseState enum (next_of, from_string) +- default_policy() KTD5 whitelist +- PhasePolicy.is_tool_allowed / is_bash_command_allowed +- policy_from_config parsing (R26 config-driven) +- ServerConfig.plan_exec integration +""" + +from __future__ import annotations + +import re + +import pytest + +from agentkit.core.phase import ( + WILDCARD, + PhasePolicy, + PhaseState, + default_policy, + policy_from_config, +) +from agentkit.server.config import ServerConfig + + +# --------------------------------------------------------------------------- +# PhaseState enum +# --------------------------------------------------------------------------- + + +class TestPhaseState: + def test_values(self): + assert PhaseState.PLANNING.value == "planning" + assert PhaseState.BUILDING.value == "building" + assert PhaseState.VERIFICATION.value == "verification" + assert PhaseState.DELIVERY.value == "delivery" + + def test_next_of(self): + assert PhaseState.next_of(PhaseState.PLANNING) == PhaseState.BUILDING + assert PhaseState.next_of(PhaseState.BUILDING) == PhaseState.VERIFICATION + assert PhaseState.next_of(PhaseState.VERIFICATION) == PhaseState.DELIVERY + assert PhaseState.next_of(PhaseState.DELIVERY) is None + + def test_from_string_case_insensitive(self): + assert PhaseState.from_string("planning") == PhaseState.PLANNING + assert PhaseState.from_string("PLANNING") == PhaseState.PLANNING + assert PhaseState.from_string("Building") == PhaseState.BUILDING + + def test_from_string_invalid_raises(self): + with pytest.raises(ValueError, match="Invalid phase name"): + PhaseState.from_string("unknown") + with pytest.raises(ValueError, match="Valid:"): + PhaseState.from_string("exploration") + + +# --------------------------------------------------------------------------- +# default_policy() — KTD5 whitelist +# --------------------------------------------------------------------------- + + +class TestDefaultPolicy: + def test_has_all_four_phases(self): + policy = default_policy() + assert PhaseState.PLANNING in policy.whitelist + assert PhaseState.BUILDING in policy.whitelist + assert PhaseState.VERIFICATION in policy.whitelist + assert PhaseState.DELIVERY in policy.whitelist + + def test_planning_whitelist_matches_r24(self): + policy = default_policy() + allowed = policy.whitelist[PhaseState.PLANNING] + assert "search" in allowed + assert "read_file" in allowed + assert "bash" in allowed + assert "tool_search" in allowed + # Planning must NOT allow write_file. + assert "write_file" not in allowed + + def test_building_whitelist_includes_write_file(self): + policy = default_policy() + allowed = policy.whitelist[PhaseState.BUILDING] + assert "write_file" in allowed + assert "bash" in allowed + assert "read_file" in allowed + + def test_verification_whitelist_excludes_write(self): + policy = default_policy() + allowed = policy.whitelist[PhaseState.VERIFICATION] + assert "bash" in allowed + assert "read_file" in allowed + assert "write_file" not in allowed + + def test_delivery_wildcard(self): + policy = default_policy() + allowed = policy.whitelist[PhaseState.DELIVERY] + assert WILDCARD in allowed + + def test_start_phase_default_planning(self): + assert default_policy().start_phase == PhaseState.PLANNING + + def test_auto_advance_default_none(self): + # KTD6: manual by default. + assert default_policy().auto_advance_after_steps is None + + def test_bash_filter_blocks_rm_in_planning(self): + policy = default_policy() + assert policy.is_bash_command_allowed("ls -la", PhaseState.PLANNING) is True + assert policy.is_bash_command_allowed("git status", PhaseState.PLANNING) is True + assert policy.is_bash_command_allowed("rm -rf /tmp/x", PhaseState.PLANNING) is False + assert policy.is_bash_command_allowed("echo x > file.txt", PhaseState.PLANNING) is False + + def test_bash_filter_no_restriction_in_building(self): + policy = default_policy() + assert policy.is_bash_command_allowed("rm -rf build/", PhaseState.BUILDING) is True + assert policy.is_bash_command_allowed("echo x > out.log", PhaseState.BUILDING) is True + + +# --------------------------------------------------------------------------- +# PhasePolicy — is_tool_allowed +# --------------------------------------------------------------------------- + + +class TestIsToolAllowed: + def test_planning_allows_search(self): + policy = default_policy() + assert policy.is_tool_allowed("search", PhaseState.PLANNING) is True + + def test_planning_blocks_write_file(self): + policy = default_policy() + assert policy.is_tool_allowed("write_file", PhaseState.PLANNING) is False + + def test_building_allows_write_file(self): + policy = default_policy() + assert policy.is_tool_allowed("write_file", PhaseState.BUILDING) is True + + def test_delivery_wildcard_allows_anything(self): + policy = default_policy() + assert policy.is_tool_allowed("any_random_tool", PhaseState.DELIVERY) is True + assert policy.is_tool_allowed("write_file", PhaseState.DELIVERY) is True + + def test_unknown_phase_returns_false(self): + # ponytail: unknown phase → empty whitelist → no tool allowed. + # We can't construct an unknown PhaseState (enum), but if a phase + # were missing from the whitelist dict, is_tool_allowed should + # return False (defensive). + policy = PhasePolicy( + whitelist={ + PhaseState.PLANNING: frozenset({"search"}), + PhaseState.BUILDING: frozenset({"write_file"}), + PhaseState.VERIFICATION: frozenset({"bash"}), + PhaseState.DELIVERY: frozenset({WILDCARD}), + } + ) + # BUILDING is in whitelist, so allowed checks work normally. + assert policy.is_tool_allowed("write_file", PhaseState.BUILDING) is True + # Phase missing from whitelist would return False (defensive .get default). + # We test this by constructing a minimal policy. + minimal = PhasePolicy( + whitelist={ + PhaseState.PLANNING: frozenset({WILDCARD}), + PhaseState.BUILDING: frozenset({WILDCARD}), + PhaseState.VERIFICATION: frozenset({WILDCARD}), + PhaseState.DELIVERY: frozenset({WILDCARD}), + } + ) + # VERIFICATION is in whitelist — wildcard allows all. + assert minimal.is_tool_allowed("anything", PhaseState.VERIFICATION) is True + + +# --------------------------------------------------------------------------- +# PhasePolicy — edge cases & errors +# --------------------------------------------------------------------------- + + +class TestPhasePolicyEdgeCases: + def test_empty_whitelist_raises(self): + # Fail-fast: an empty whitelist for a non-wildcard phase is a bug. + with pytest.raises(ValueError, match="empty whitelist"): + PhasePolicy( + whitelist={ + PhaseState.PLANNING: frozenset(), # empty! + PhaseState.BUILDING: frozenset({WILDCARD}), + PhaseState.VERIFICATION: frozenset({WILDCARD}), + PhaseState.DELIVERY: frozenset({WILDCARD}), + } + ) + + def test_wildcard_only_does_not_raise(self): + # Wildcard-only whitelist is valid (means "all tools"). + policy = PhasePolicy( + whitelist={ + PhaseState.PLANNING: frozenset({WILDCARD}), + PhaseState.BUILDING: frozenset({WILDCARD}), + PhaseState.VERIFICATION: frozenset({WILDCARD}), + PhaseState.DELIVERY: frozenset({WILDCARD}), + } + ) + assert policy.is_tool_allowed("anything", PhaseState.PLANNING) is True + + def test_to_dict_serializable(self): + policy = default_policy() + d = policy.to_dict() + assert "whitelist" in d + assert "planning" in d["whitelist"] + assert "delivery" in d["whitelist"] + assert d["start_phase"] == "planning" + assert d["auto_advance_after_steps"] is None + + def test_custom_bash_filter(self): + custom_filter = re.compile(r"\b(pip install|npm install)\b") + policy = PhasePolicy( + whitelist={ + PhaseState.PLANNING: frozenset({"bash"}), + PhaseState.BUILDING: frozenset({"bash"}), + PhaseState.VERIFICATION: frozenset({"bash"}), + PhaseState.DELIVERY: frozenset({WILDCARD}), + }, + bash_command_filter={PhaseState.BUILDING: custom_filter}, + ) + assert policy.is_bash_command_allowed("npm install foo", PhaseState.BUILDING) is False + assert policy.is_bash_command_allowed("npm run build", PhaseState.BUILDING) is True + + +# --------------------------------------------------------------------------- +# policy_from_config — R26 (config-driven) +# --------------------------------------------------------------------------- + + +class TestPolicyFromConfig: + def test_empty_config_returns_none(self): + assert policy_from_config({}) is None + + def test_enabled_false_returns_none(self): + # Opt-out — explicit `enabled: false` disables policy. + result = policy_from_config({"enabled": False}) + assert result is None + + def test_enabled_default_true_when_section_present(self): + # When section is present but `enabled` is missing, default is True. + result = policy_from_config({"auto_advance_after_steps": 3}) + assert result is not None + assert result.auto_advance_after_steps == 3 + + def test_auto_advance_after_steps(self): + policy = policy_from_config({"enabled": True, "auto_advance_after_steps": 5}) + assert policy is not None + assert policy.auto_advance_after_steps == 5 + + def test_start_phase_custom(self): + policy = policy_from_config({"enabled": True, "start_phase": "building"}) + assert policy is not None + assert policy.start_phase == PhaseState.BUILDING + + def test_start_phase_invalid_raises(self): + with pytest.raises(ValueError, match="Invalid phase name"): + policy_from_config({"enabled": True, "start_phase": "unknown"}) + + def test_whitelist_override_merges_with_default(self): + policy = policy_from_config( + { + "enabled": True, + "whitelist_override": { + "planning": ["search", "read_file"], # removes bash from default + }, + } + ) + assert policy is not None + # Override wins — bash should be removed from planning. + assert policy.is_tool_allowed("search", PhaseState.PLANNING) is True + assert policy.is_tool_allowed("read_file", PhaseState.PLANNING) is True + assert policy.is_tool_allowed("bash", PhaseState.PLANNING) is False + # Other phases unchanged. + assert policy.is_tool_allowed("write_file", PhaseState.BUILDING) is True + + def test_whitelist_override_invalid_phase_raises(self): + with pytest.raises(ValueError, match="Invalid phase name"): + policy_from_config( + { + "enabled": True, + "whitelist_override": {"unknown_phase": ["tool"]}, + } + ) + + def test_whitelist_override_non_list_raises(self): + with pytest.raises(ValueError, match="must be a list"): + policy_from_config( + { + "enabled": True, + "whitelist_override": {"planning": "not a list"}, + } + ) + + def test_to_dict_round_trip_via_default(self): + # Sanity: default policy serializes to a dict with expected keys. + policy = default_policy() + d = policy.to_dict() + assert set(d["whitelist"].keys()) == { + "planning", + "building", + "verification", + "delivery", + } + + +# --------------------------------------------------------------------------- +# ServerConfig.plan_exec integration (R26) +# --------------------------------------------------------------------------- + + +class TestServerConfigPlanExec: + def test_default_plan_exec_empty(self): + config = ServerConfig.from_dict({}) + assert config.plan_exec == {} + + def test_plan_exec_loaded_from_dict(self): + config = ServerConfig.from_dict( + { + "plan_exec": { + "enabled": True, + "auto_advance_after_steps": 5, + } + } + ) + assert config.plan_exec == {"enabled": True, "auto_advance_after_steps": 5} + + def test_plan_exec_empty_dict_default(self): + config = ServerConfig.from_dict({"plan_exec": {}}) + assert config.plan_exec == {} + + def test_plan_exec_resolved_to_policy(self): + # Wire the config dict through policy_from_config to verify integration. + config = ServerConfig.from_dict( + { + "plan_exec": { + "enabled": True, + "auto_advance_after_steps": 3, + } + } + ) + policy = policy_from_config(config.plan_exec) + assert policy is not None + assert policy.auto_advance_after_steps == 3 + + def test_plan_exec_disabled_via_config(self): + config = ServerConfig.from_dict({"plan_exec": {"enabled": False}}) + policy = policy_from_config(config.plan_exec) + assert policy is None