feat(U2): G6 PhaseState + PhasePolicy + ServerConfig.plan_exec
- PhaseState enum (PLANNING/BUILDING/VERIFICATION/DELIVERY) with next_of/from_string - PhasePolicy dataclass with whitelist + bash_command_filter + auto_advance_after_steps - default_policy() factory — KTD5 whitelist matching R24 (Planning: search/read_file; Building: write_file; Delivery: wildcard) - bash_command_filter blocks rm/mv/cp/>/>> in PLANNING/VERIFICATION phases - policy_from_config() parses plan_exec YAML section (R26) with override merge - ServerConfig.plan_exec field + from_dict parsing (extends Wave 1/2 pattern) - agentkit.yaml gains commented plan_exec section (opt-in) - 37 unit tests covering PhaseState, default_policy, is_tool_allowed, bash filter, config parsing, and ServerConfig integration
This commit is contained in:
parent
50885fbc62
commit
abf758fa9c
|
|
@ -51,6 +51,19 @@ fallback_chain:
|
||||||
max_retries: 1 # ReflexionEngine max_reflections override
|
max_retries: 1 # ReflexionEngine max_reflections override
|
||||||
emergency:
|
emergency:
|
||||||
enabled: true
|
enabled: true
|
||||||
|
# G6/U2: PLAN_EXEC phase policy — SOLO four-stage state machine.
|
||||||
|
# When `enabled: true`, chat WebSocket PLAN_EXEC requests build a PhasePolicy
|
||||||
|
# (Planning → Building → Verification → Delivery) and enforce per-step tool
|
||||||
|
# whitelists (R24). Transitions are LLM-driven via AdvancePhaseTool; set
|
||||||
|
# `auto_advance_after_steps` to auto-advance as a safety net (KTD6).
|
||||||
|
# Commented to preserve default behavior — uncomment to enable.
|
||||||
|
# plan_exec:
|
||||||
|
# enabled: true
|
||||||
|
# auto_advance_after_steps: 5 # optional, default = manual (LLM calls advance_phase)
|
||||||
|
# start_phase: planning # optional, default = planning
|
||||||
|
# whitelist_override: # optional, merges with default (override wins)
|
||||||
|
# planning: [search, read_file, bash]
|
||||||
|
# building: [write_file, bash, read_file]
|
||||||
session: {backend: memory}
|
session: {backend: memory}
|
||||||
bus: {backend: memory}
|
bus: {backend: memory}
|
||||||
task_store: {backend: memory}
|
task_store: {backend: memory}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,218 @@
|
||||||
|
"""Phase state machine for PLAN_EXEC mode (G6, R24/R25).
|
||||||
|
|
||||||
|
Four sequential phases enforce per-step tool whitelists:
|
||||||
|
PLANNING → BUILDING → VERIFICATION → DELIVERY
|
||||||
|
|
||||||
|
KTD3 (Wave 3 plan): state machine lives in ReActEngine, not skill config.
|
||||||
|
KTD5: default whitelist matches brainstorm R24 (Planning: think/search;
|
||||||
|
Building: write_file; etc.).
|
||||||
|
KTD6: transitions are LLM-driven via AdvancePhaseTool; auto-advance is opt-in.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import enum
|
||||||
|
import logging
|
||||||
|
import re
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class PhaseState(enum.Enum):
|
||||||
|
"""Phases of the SOLO state machine (extends ExecutionMode.PLAN_EXEC)."""
|
||||||
|
|
||||||
|
PLANNING = "planning"
|
||||||
|
BUILDING = "building"
|
||||||
|
VERIFICATION = "verification"
|
||||||
|
DELIVERY = "delivery"
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def next_of(cls, current: "PhaseState") -> "PhaseState | None":
|
||||||
|
"""Return the phase after `current`, or None if `current` is the last."""
|
||||||
|
order = [cls.PLANNING, cls.BUILDING, cls.VERIFICATION, cls.DELIVERY]
|
||||||
|
try:
|
||||||
|
idx = order.index(current)
|
||||||
|
except ValueError:
|
||||||
|
return None
|
||||||
|
if idx + 1 >= len(order):
|
||||||
|
return None
|
||||||
|
return order[idx + 1]
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_string(cls, value: str) -> "PhaseState":
|
||||||
|
"""Parse from string (case-insensitive). Raises ValueError on unknown."""
|
||||||
|
try:
|
||||||
|
return cls(value.lower())
|
||||||
|
except ValueError as e:
|
||||||
|
valid = ", ".join(p.value for p in cls)
|
||||||
|
raise ValueError(f"Invalid phase name {value!r}. Valid: {valid}") from e
|
||||||
|
|
||||||
|
|
||||||
|
# Wildcard token meaning "all tools allowed in this phase".
|
||||||
|
WILDCARD = "*"
|
||||||
|
|
||||||
|
# Default bash command filter for PLANNING and VERIFICATION phases — blocks
|
||||||
|
# commands that mutate the filesystem or execute arbitrary code.
|
||||||
|
# ponytail: regex is intentionally conservative; misses some shell idioms
|
||||||
|
# (e.g., `:>file`, `dd of=file`). Ceiling: a real shell parser would catch
|
||||||
|
# more. Upgrade path = reuse ShellTool._is_dangerous() at enforcement time.
|
||||||
|
# Note: `\b` is a word boundary — works for word commands (rm/mv) but NOT
|
||||||
|
# for `>`/`>>` operators (not word chars). Use a non-boundary alternation
|
||||||
|
# that matches `>` either as a standalone operator or after whitespace.
|
||||||
|
_DEFAULT_BASH_FILTER = re.compile(r"\b(rm|mv|cp|mkdir|rmdir|chmod|chown)\b|(?<!\S)>|>>")
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(slots=True)
|
||||||
|
class PhasePolicy:
|
||||||
|
"""Per-phase tool whitelist + bash command filter for PLAN_EXEC mode.
|
||||||
|
|
||||||
|
The policy is enforced by ReActEngine._execute_loop before each tool
|
||||||
|
dispatch. A tool not in the current phase's whitelist is rejected with
|
||||||
|
a structured error returned to the LLM (the loop continues — the LLM
|
||||||
|
gets to react to the rejection and either switch tools or call
|
||||||
|
AdvancePhaseTool).
|
||||||
|
|
||||||
|
Wildcard ``"*"`` in a phase's whitelist means "all tools allowed"
|
||||||
|
(used by DELIVERY by default).
|
||||||
|
"""
|
||||||
|
|
||||||
|
whitelist: dict[PhaseState, frozenset[str]]
|
||||||
|
bash_command_filter: dict[PhaseState, re.Pattern | None] = field(default_factory=dict)
|
||||||
|
auto_advance_after_steps: int | None = None # None = manual (LLM calls advance_phase)
|
||||||
|
start_phase: PhaseState = PhaseState.PLANNING
|
||||||
|
|
||||||
|
def __post_init__(self) -> None:
|
||||||
|
# Fail-fast: empty whitelist for a non-wildcard phase = bug.
|
||||||
|
for phase, tools in self.whitelist.items():
|
||||||
|
if not tools:
|
||||||
|
raise ValueError(
|
||||||
|
f"Phase {phase.value!r} has an empty whitelist — set ['*'] for "
|
||||||
|
f"'all tools allowed' or list specific tool names."
|
||||||
|
)
|
||||||
|
|
||||||
|
def is_tool_allowed(self, tool_name: str, phase: PhaseState) -> bool:
|
||||||
|
"""Return True if `tool_name` is allowed in `phase`."""
|
||||||
|
allowed = self.whitelist.get(phase, frozenset())
|
||||||
|
if WILDCARD in allowed:
|
||||||
|
return True
|
||||||
|
return tool_name in allowed
|
||||||
|
|
||||||
|
def is_bash_command_allowed(self, command: str, phase: PhaseState) -> bool:
|
||||||
|
"""Return True if `command` passes the bash filter for `phase`.
|
||||||
|
|
||||||
|
A None filter = no restriction. An empty command is allowed (ShellTool
|
||||||
|
separately rejects empty commands).
|
||||||
|
"""
|
||||||
|
pattern = self.bash_command_filter.get(phase)
|
||||||
|
if pattern is None:
|
||||||
|
return True
|
||||||
|
return not pattern.search(command)
|
||||||
|
|
||||||
|
def to_dict(self) -> dict[str, Any]:
|
||||||
|
"""Serialize for logging/telemetry. Not round-trippable (regex → str)."""
|
||||||
|
return {
|
||||||
|
"whitelist": {phase.value: sorted(tools) for phase, tools in self.whitelist.items()},
|
||||||
|
"bash_command_filter": {
|
||||||
|
phase.value: (p.pattern if p else None)
|
||||||
|
for phase, p in self.bash_command_filter.items()
|
||||||
|
},
|
||||||
|
"auto_advance_after_steps": self.auto_advance_after_steps,
|
||||||
|
"start_phase": self.start_phase.value,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def default_policy() -> PhasePolicy:
|
||||||
|
"""Return the KTD5 default PhasePolicy.
|
||||||
|
|
||||||
|
Whitelist (R24):
|
||||||
|
- PLANNING: search, tool_search, read_file, bash (read-only)
|
||||||
|
- BUILDING: write_file, bash (full), read_file, search
|
||||||
|
- VERIFICATION: bash (test commands), read_file, search
|
||||||
|
- DELIVERY: all tools (wildcard)
|
||||||
|
|
||||||
|
Bash filter:
|
||||||
|
- PLANNING/VERIFICATION: blocks filesystem-mutating commands
|
||||||
|
(rm/mv/cp/mkdir/chmod/chown/>/>>)
|
||||||
|
- BUILDING/DELIVERY: no filter (full bash)
|
||||||
|
"""
|
||||||
|
return PhasePolicy(
|
||||||
|
whitelist={
|
||||||
|
PhaseState.PLANNING: frozenset({"search", "tool_search", "read_file", "bash"}),
|
||||||
|
PhaseState.BUILDING: frozenset(
|
||||||
|
{"write_file", "bash", "read_file", "search", "tool_search"}
|
||||||
|
),
|
||||||
|
PhaseState.VERIFICATION: frozenset({"bash", "read_file", "search"}),
|
||||||
|
PhaseState.DELIVERY: frozenset({WILDCARD}),
|
||||||
|
},
|
||||||
|
bash_command_filter={
|
||||||
|
PhaseState.PLANNING: _DEFAULT_BASH_FILTER,
|
||||||
|
PhaseState.VERIFICATION: _DEFAULT_BASH_FILTER,
|
||||||
|
PhaseState.BUILDING: None,
|
||||||
|
PhaseState.DELIVERY: None,
|
||||||
|
},
|
||||||
|
auto_advance_after_steps=None, # manual by default
|
||||||
|
start_phase=PhaseState.PLANNING,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def policy_from_config(config: dict[str, Any]) -> PhasePolicy | None:
|
||||||
|
"""Build a PhasePolicy from the `plan_exec` config section.
|
||||||
|
|
||||||
|
Returns None if `config` is empty or `enabled` is False (opt-out).
|
||||||
|
|
||||||
|
Config shape:
|
||||||
|
plan_exec:
|
||||||
|
enabled: true # default true if section present
|
||||||
|
auto_advance_after_steps: 5 # optional
|
||||||
|
start_phase: planning # optional, default planning
|
||||||
|
whitelist_override: # optional, merges with default
|
||||||
|
planning: [search, read_file]
|
||||||
|
building: [write_file, bash]
|
||||||
|
"""
|
||||||
|
if not config:
|
||||||
|
return None
|
||||||
|
if config.get("enabled", True) is False:
|
||||||
|
return None
|
||||||
|
|
||||||
|
policy = default_policy()
|
||||||
|
|
||||||
|
# Start phase
|
||||||
|
start_phase_str = config.get("start_phase")
|
||||||
|
if start_phase_str:
|
||||||
|
policy = PhasePolicy(
|
||||||
|
whitelist=policy.whitelist,
|
||||||
|
bash_command_filter=policy.bash_command_filter,
|
||||||
|
auto_advance_after_steps=config.get("auto_advance_after_steps"),
|
||||||
|
start_phase=PhaseState.from_string(start_phase_str),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Auto-advance override
|
||||||
|
if "auto_advance_after_steps" in config:
|
||||||
|
policy = PhasePolicy(
|
||||||
|
whitelist=policy.whitelist,
|
||||||
|
bash_command_filter=policy.bash_command_filter,
|
||||||
|
auto_advance_after_steps=config["auto_advance_after_steps"],
|
||||||
|
start_phase=policy.start_phase,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Whitelist override — merge with default (override wins on conflict)
|
||||||
|
override = config.get("whitelist_override") or {}
|
||||||
|
if override:
|
||||||
|
new_whitelist = dict(policy.whitelist)
|
||||||
|
for phase_name, tools in override.items():
|
||||||
|
phase = PhaseState.from_string(phase_name)
|
||||||
|
if not isinstance(tools, list):
|
||||||
|
raise ValueError(
|
||||||
|
f"whitelist_override[{phase_name!r}] must be a list, got {type(tools).__name__}"
|
||||||
|
)
|
||||||
|
new_whitelist[phase] = frozenset(str(t) for t in tools)
|
||||||
|
policy = PhasePolicy(
|
||||||
|
whitelist=new_whitelist,
|
||||||
|
bash_command_filter=policy.bash_command_filter,
|
||||||
|
auto_advance_after_steps=policy.auto_advance_after_steps,
|
||||||
|
start_phase=policy.start_phase,
|
||||||
|
)
|
||||||
|
|
||||||
|
return policy
|
||||||
|
|
@ -121,6 +121,9 @@ class ServerConfig:
|
||||||
verification: dict[str, Any] | None = None,
|
verification: dict[str, Any] | None = None,
|
||||||
rollback: dict[str, Any] | None = None,
|
rollback: dict[str, Any] | None = None,
|
||||||
fallback_chain: dict[str, Any] | None = None,
|
fallback_chain: dict[str, Any] | None = None,
|
||||||
|
# G6/U2: PLAN_EXEC phase policy config (opt-in — None = disabled).
|
||||||
|
# Parsed via PhasePolicy.policy_from_config() at chat.py wiring time.
|
||||||
|
plan_exec: dict[str, Any] | None = None,
|
||||||
on_change: Callable[["ServerConfig"], None] | None = None,
|
on_change: Callable[["ServerConfig"], None] | None = None,
|
||||||
):
|
):
|
||||||
self.host = host
|
self.host = host
|
||||||
|
|
@ -161,6 +164,10 @@ class ServerConfig:
|
||||||
# G7/U3: fallback_chain.{recovery,emergency}.{enabled,max_retries}
|
# G7/U3: fallback_chain.{recovery,emergency}.{enabled,max_retries}
|
||||||
# controls three-tier chain at chat.py REST send_message (KTD5).
|
# controls three-tier chain at chat.py REST send_message (KTD5).
|
||||||
self.fallback_chain = fallback_chain or {}
|
self.fallback_chain = fallback_chain or {}
|
||||||
|
# G6/U2: plan_exec phase policy config (opt-in — empty dict = disabled).
|
||||||
|
# Resolved to PhasePolicy via agentkit.core.phase.policy_from_config()
|
||||||
|
# at chat.py WebSocket wiring time (U4).
|
||||||
|
self.plan_exec = plan_exec or {}
|
||||||
self.on_change = on_change
|
self.on_change = on_change
|
||||||
|
|
||||||
# Config watching state
|
# Config watching state
|
||||||
|
|
@ -252,6 +259,8 @@ class ServerConfig:
|
||||||
rollback_data = data.get("rollback", {})
|
rollback_data = data.get("rollback", {})
|
||||||
# G7/U3: fallback_chain 配置 (从 YAML 读取)
|
# G7/U3: fallback_chain 配置 (从 YAML 读取)
|
||||||
fallback_chain_data = data.get("fallback_chain", {})
|
fallback_chain_data = data.get("fallback_chain", {})
|
||||||
|
# G6/U2: plan_exec phase policy 配置 (从 YAML 读取, opt-in)
|
||||||
|
plan_exec_data = data.get("plan_exec", {})
|
||||||
|
|
||||||
return cls(
|
return cls(
|
||||||
host=server.get("host", "0.0.0.0"),
|
host=server.get("host", "0.0.0.0"),
|
||||||
|
|
@ -285,6 +294,7 @@ class ServerConfig:
|
||||||
verification=verification_data,
|
verification=verification_data,
|
||||||
rollback=rollback_data,
|
rollback=rollback_data,
|
||||||
fallback_chain=fallback_chain_data,
|
fallback_chain=fallback_chain_data,
|
||||||
|
plan_exec=plan_exec_data,
|
||||||
)
|
)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,348 @@
|
||||||
|
"""Unit tests for PhasePolicy + PhaseState (G6 core, R24/R25/R26).
|
||||||
|
|
||||||
|
Covers:
|
||||||
|
- PhaseState enum (next_of, from_string)
|
||||||
|
- default_policy() KTD5 whitelist
|
||||||
|
- PhasePolicy.is_tool_allowed / is_bash_command_allowed
|
||||||
|
- policy_from_config parsing (R26 config-driven)
|
||||||
|
- ServerConfig.plan_exec integration
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import re
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from agentkit.core.phase import (
|
||||||
|
WILDCARD,
|
||||||
|
PhasePolicy,
|
||||||
|
PhaseState,
|
||||||
|
default_policy,
|
||||||
|
policy_from_config,
|
||||||
|
)
|
||||||
|
from agentkit.server.config import ServerConfig
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# PhaseState enum
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestPhaseState:
|
||||||
|
def test_values(self):
|
||||||
|
assert PhaseState.PLANNING.value == "planning"
|
||||||
|
assert PhaseState.BUILDING.value == "building"
|
||||||
|
assert PhaseState.VERIFICATION.value == "verification"
|
||||||
|
assert PhaseState.DELIVERY.value == "delivery"
|
||||||
|
|
||||||
|
def test_next_of(self):
|
||||||
|
assert PhaseState.next_of(PhaseState.PLANNING) == PhaseState.BUILDING
|
||||||
|
assert PhaseState.next_of(PhaseState.BUILDING) == PhaseState.VERIFICATION
|
||||||
|
assert PhaseState.next_of(PhaseState.VERIFICATION) == PhaseState.DELIVERY
|
||||||
|
assert PhaseState.next_of(PhaseState.DELIVERY) is None
|
||||||
|
|
||||||
|
def test_from_string_case_insensitive(self):
|
||||||
|
assert PhaseState.from_string("planning") == PhaseState.PLANNING
|
||||||
|
assert PhaseState.from_string("PLANNING") == PhaseState.PLANNING
|
||||||
|
assert PhaseState.from_string("Building") == PhaseState.BUILDING
|
||||||
|
|
||||||
|
def test_from_string_invalid_raises(self):
|
||||||
|
with pytest.raises(ValueError, match="Invalid phase name"):
|
||||||
|
PhaseState.from_string("unknown")
|
||||||
|
with pytest.raises(ValueError, match="Valid:"):
|
||||||
|
PhaseState.from_string("exploration")
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# default_policy() — KTD5 whitelist
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestDefaultPolicy:
|
||||||
|
def test_has_all_four_phases(self):
|
||||||
|
policy = default_policy()
|
||||||
|
assert PhaseState.PLANNING in policy.whitelist
|
||||||
|
assert PhaseState.BUILDING in policy.whitelist
|
||||||
|
assert PhaseState.VERIFICATION in policy.whitelist
|
||||||
|
assert PhaseState.DELIVERY in policy.whitelist
|
||||||
|
|
||||||
|
def test_planning_whitelist_matches_r24(self):
|
||||||
|
policy = default_policy()
|
||||||
|
allowed = policy.whitelist[PhaseState.PLANNING]
|
||||||
|
assert "search" in allowed
|
||||||
|
assert "read_file" in allowed
|
||||||
|
assert "bash" in allowed
|
||||||
|
assert "tool_search" in allowed
|
||||||
|
# Planning must NOT allow write_file.
|
||||||
|
assert "write_file" not in allowed
|
||||||
|
|
||||||
|
def test_building_whitelist_includes_write_file(self):
|
||||||
|
policy = default_policy()
|
||||||
|
allowed = policy.whitelist[PhaseState.BUILDING]
|
||||||
|
assert "write_file" in allowed
|
||||||
|
assert "bash" in allowed
|
||||||
|
assert "read_file" in allowed
|
||||||
|
|
||||||
|
def test_verification_whitelist_excludes_write(self):
|
||||||
|
policy = default_policy()
|
||||||
|
allowed = policy.whitelist[PhaseState.VERIFICATION]
|
||||||
|
assert "bash" in allowed
|
||||||
|
assert "read_file" in allowed
|
||||||
|
assert "write_file" not in allowed
|
||||||
|
|
||||||
|
def test_delivery_wildcard(self):
|
||||||
|
policy = default_policy()
|
||||||
|
allowed = policy.whitelist[PhaseState.DELIVERY]
|
||||||
|
assert WILDCARD in allowed
|
||||||
|
|
||||||
|
def test_start_phase_default_planning(self):
|
||||||
|
assert default_policy().start_phase == PhaseState.PLANNING
|
||||||
|
|
||||||
|
def test_auto_advance_default_none(self):
|
||||||
|
# KTD6: manual by default.
|
||||||
|
assert default_policy().auto_advance_after_steps is None
|
||||||
|
|
||||||
|
def test_bash_filter_blocks_rm_in_planning(self):
|
||||||
|
policy = default_policy()
|
||||||
|
assert policy.is_bash_command_allowed("ls -la", PhaseState.PLANNING) is True
|
||||||
|
assert policy.is_bash_command_allowed("git status", PhaseState.PLANNING) is True
|
||||||
|
assert policy.is_bash_command_allowed("rm -rf /tmp/x", PhaseState.PLANNING) is False
|
||||||
|
assert policy.is_bash_command_allowed("echo x > file.txt", PhaseState.PLANNING) is False
|
||||||
|
|
||||||
|
def test_bash_filter_no_restriction_in_building(self):
|
||||||
|
policy = default_policy()
|
||||||
|
assert policy.is_bash_command_allowed("rm -rf build/", PhaseState.BUILDING) is True
|
||||||
|
assert policy.is_bash_command_allowed("echo x > out.log", PhaseState.BUILDING) is True
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# PhasePolicy — is_tool_allowed
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestIsToolAllowed:
|
||||||
|
def test_planning_allows_search(self):
|
||||||
|
policy = default_policy()
|
||||||
|
assert policy.is_tool_allowed("search", PhaseState.PLANNING) is True
|
||||||
|
|
||||||
|
def test_planning_blocks_write_file(self):
|
||||||
|
policy = default_policy()
|
||||||
|
assert policy.is_tool_allowed("write_file", PhaseState.PLANNING) is False
|
||||||
|
|
||||||
|
def test_building_allows_write_file(self):
|
||||||
|
policy = default_policy()
|
||||||
|
assert policy.is_tool_allowed("write_file", PhaseState.BUILDING) is True
|
||||||
|
|
||||||
|
def test_delivery_wildcard_allows_anything(self):
|
||||||
|
policy = default_policy()
|
||||||
|
assert policy.is_tool_allowed("any_random_tool", PhaseState.DELIVERY) is True
|
||||||
|
assert policy.is_tool_allowed("write_file", PhaseState.DELIVERY) is True
|
||||||
|
|
||||||
|
def test_unknown_phase_returns_false(self):
|
||||||
|
# ponytail: unknown phase → empty whitelist → no tool allowed.
|
||||||
|
# We can't construct an unknown PhaseState (enum), but if a phase
|
||||||
|
# were missing from the whitelist dict, is_tool_allowed should
|
||||||
|
# return False (defensive).
|
||||||
|
policy = PhasePolicy(
|
||||||
|
whitelist={
|
||||||
|
PhaseState.PLANNING: frozenset({"search"}),
|
||||||
|
PhaseState.BUILDING: frozenset({"write_file"}),
|
||||||
|
PhaseState.VERIFICATION: frozenset({"bash"}),
|
||||||
|
PhaseState.DELIVERY: frozenset({WILDCARD}),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
# BUILDING is in whitelist, so allowed checks work normally.
|
||||||
|
assert policy.is_tool_allowed("write_file", PhaseState.BUILDING) is True
|
||||||
|
# Phase missing from whitelist would return False (defensive .get default).
|
||||||
|
# We test this by constructing a minimal policy.
|
||||||
|
minimal = PhasePolicy(
|
||||||
|
whitelist={
|
||||||
|
PhaseState.PLANNING: frozenset({WILDCARD}),
|
||||||
|
PhaseState.BUILDING: frozenset({WILDCARD}),
|
||||||
|
PhaseState.VERIFICATION: frozenset({WILDCARD}),
|
||||||
|
PhaseState.DELIVERY: frozenset({WILDCARD}),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
# VERIFICATION is in whitelist — wildcard allows all.
|
||||||
|
assert minimal.is_tool_allowed("anything", PhaseState.VERIFICATION) is True
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# PhasePolicy — edge cases & errors
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestPhasePolicyEdgeCases:
|
||||||
|
def test_empty_whitelist_raises(self):
|
||||||
|
# Fail-fast: an empty whitelist for a non-wildcard phase is a bug.
|
||||||
|
with pytest.raises(ValueError, match="empty whitelist"):
|
||||||
|
PhasePolicy(
|
||||||
|
whitelist={
|
||||||
|
PhaseState.PLANNING: frozenset(), # empty!
|
||||||
|
PhaseState.BUILDING: frozenset({WILDCARD}),
|
||||||
|
PhaseState.VERIFICATION: frozenset({WILDCARD}),
|
||||||
|
PhaseState.DELIVERY: frozenset({WILDCARD}),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_wildcard_only_does_not_raise(self):
|
||||||
|
# Wildcard-only whitelist is valid (means "all tools").
|
||||||
|
policy = PhasePolicy(
|
||||||
|
whitelist={
|
||||||
|
PhaseState.PLANNING: frozenset({WILDCARD}),
|
||||||
|
PhaseState.BUILDING: frozenset({WILDCARD}),
|
||||||
|
PhaseState.VERIFICATION: frozenset({WILDCARD}),
|
||||||
|
PhaseState.DELIVERY: frozenset({WILDCARD}),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
assert policy.is_tool_allowed("anything", PhaseState.PLANNING) is True
|
||||||
|
|
||||||
|
def test_to_dict_serializable(self):
|
||||||
|
policy = default_policy()
|
||||||
|
d = policy.to_dict()
|
||||||
|
assert "whitelist" in d
|
||||||
|
assert "planning" in d["whitelist"]
|
||||||
|
assert "delivery" in d["whitelist"]
|
||||||
|
assert d["start_phase"] == "planning"
|
||||||
|
assert d["auto_advance_after_steps"] is None
|
||||||
|
|
||||||
|
def test_custom_bash_filter(self):
|
||||||
|
custom_filter = re.compile(r"\b(pip install|npm install)\b")
|
||||||
|
policy = PhasePolicy(
|
||||||
|
whitelist={
|
||||||
|
PhaseState.PLANNING: frozenset({"bash"}),
|
||||||
|
PhaseState.BUILDING: frozenset({"bash"}),
|
||||||
|
PhaseState.VERIFICATION: frozenset({"bash"}),
|
||||||
|
PhaseState.DELIVERY: frozenset({WILDCARD}),
|
||||||
|
},
|
||||||
|
bash_command_filter={PhaseState.BUILDING: custom_filter},
|
||||||
|
)
|
||||||
|
assert policy.is_bash_command_allowed("npm install foo", PhaseState.BUILDING) is False
|
||||||
|
assert policy.is_bash_command_allowed("npm run build", PhaseState.BUILDING) is True
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# policy_from_config — R26 (config-driven)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestPolicyFromConfig:
|
||||||
|
def test_empty_config_returns_none(self):
|
||||||
|
assert policy_from_config({}) is None
|
||||||
|
|
||||||
|
def test_enabled_false_returns_none(self):
|
||||||
|
# Opt-out — explicit `enabled: false` disables policy.
|
||||||
|
result = policy_from_config({"enabled": False})
|
||||||
|
assert result is None
|
||||||
|
|
||||||
|
def test_enabled_default_true_when_section_present(self):
|
||||||
|
# When section is present but `enabled` is missing, default is True.
|
||||||
|
result = policy_from_config({"auto_advance_after_steps": 3})
|
||||||
|
assert result is not None
|
||||||
|
assert result.auto_advance_after_steps == 3
|
||||||
|
|
||||||
|
def test_auto_advance_after_steps(self):
|
||||||
|
policy = policy_from_config({"enabled": True, "auto_advance_after_steps": 5})
|
||||||
|
assert policy is not None
|
||||||
|
assert policy.auto_advance_after_steps == 5
|
||||||
|
|
||||||
|
def test_start_phase_custom(self):
|
||||||
|
policy = policy_from_config({"enabled": True, "start_phase": "building"})
|
||||||
|
assert policy is not None
|
||||||
|
assert policy.start_phase == PhaseState.BUILDING
|
||||||
|
|
||||||
|
def test_start_phase_invalid_raises(self):
|
||||||
|
with pytest.raises(ValueError, match="Invalid phase name"):
|
||||||
|
policy_from_config({"enabled": True, "start_phase": "unknown"})
|
||||||
|
|
||||||
|
def test_whitelist_override_merges_with_default(self):
|
||||||
|
policy = policy_from_config(
|
||||||
|
{
|
||||||
|
"enabled": True,
|
||||||
|
"whitelist_override": {
|
||||||
|
"planning": ["search", "read_file"], # removes bash from default
|
||||||
|
},
|
||||||
|
}
|
||||||
|
)
|
||||||
|
assert policy is not None
|
||||||
|
# Override wins — bash should be removed from planning.
|
||||||
|
assert policy.is_tool_allowed("search", PhaseState.PLANNING) is True
|
||||||
|
assert policy.is_tool_allowed("read_file", PhaseState.PLANNING) is True
|
||||||
|
assert policy.is_tool_allowed("bash", PhaseState.PLANNING) is False
|
||||||
|
# Other phases unchanged.
|
||||||
|
assert policy.is_tool_allowed("write_file", PhaseState.BUILDING) is True
|
||||||
|
|
||||||
|
def test_whitelist_override_invalid_phase_raises(self):
|
||||||
|
with pytest.raises(ValueError, match="Invalid phase name"):
|
||||||
|
policy_from_config(
|
||||||
|
{
|
||||||
|
"enabled": True,
|
||||||
|
"whitelist_override": {"unknown_phase": ["tool"]},
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_whitelist_override_non_list_raises(self):
|
||||||
|
with pytest.raises(ValueError, match="must be a list"):
|
||||||
|
policy_from_config(
|
||||||
|
{
|
||||||
|
"enabled": True,
|
||||||
|
"whitelist_override": {"planning": "not a list"},
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_to_dict_round_trip_via_default(self):
|
||||||
|
# Sanity: default policy serializes to a dict with expected keys.
|
||||||
|
policy = default_policy()
|
||||||
|
d = policy.to_dict()
|
||||||
|
assert set(d["whitelist"].keys()) == {
|
||||||
|
"planning",
|
||||||
|
"building",
|
||||||
|
"verification",
|
||||||
|
"delivery",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# ServerConfig.plan_exec integration (R26)
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class TestServerConfigPlanExec:
|
||||||
|
def test_default_plan_exec_empty(self):
|
||||||
|
config = ServerConfig.from_dict({})
|
||||||
|
assert config.plan_exec == {}
|
||||||
|
|
||||||
|
def test_plan_exec_loaded_from_dict(self):
|
||||||
|
config = ServerConfig.from_dict(
|
||||||
|
{
|
||||||
|
"plan_exec": {
|
||||||
|
"enabled": True,
|
||||||
|
"auto_advance_after_steps": 5,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
assert config.plan_exec == {"enabled": True, "auto_advance_after_steps": 5}
|
||||||
|
|
||||||
|
def test_plan_exec_empty_dict_default(self):
|
||||||
|
config = ServerConfig.from_dict({"plan_exec": {}})
|
||||||
|
assert config.plan_exec == {}
|
||||||
|
|
||||||
|
def test_plan_exec_resolved_to_policy(self):
|
||||||
|
# Wire the config dict through policy_from_config to verify integration.
|
||||||
|
config = ServerConfig.from_dict(
|
||||||
|
{
|
||||||
|
"plan_exec": {
|
||||||
|
"enabled": True,
|
||||||
|
"auto_advance_after_steps": 3,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)
|
||||||
|
policy = policy_from_config(config.plan_exec)
|
||||||
|
assert policy is not None
|
||||||
|
assert policy.auto_advance_after_steps == 3
|
||||||
|
|
||||||
|
def test_plan_exec_disabled_via_config(self):
|
||||||
|
config = ServerConfig.from_dict({"plan_exec": {"enabled": False}})
|
||||||
|
policy = policy_from_config(config.plan_exec)
|
||||||
|
assert policy is None
|
||||||
Loading…
Reference in New Issue