fischer-agentkit/src/agentkit/core/phase.py

207 lines
7.9 KiB
Python

"""Phase state machine for PLAN_EXEC mode (G6, R24/R25).
Four sequential phases enforce per-step tool whitelists:
PLANNING → BUILDING → VERIFICATION → DELIVERY
KTD3 (Wave 3 plan): state machine lives in ReActEngine, not skill config.
KTD5: default whitelist matches brainstorm R24 (Planning: think/search;
Building: write_file; etc.).
KTD6: transitions are LLM-driven via AdvancePhaseTool; auto-advance is opt-in.
"""
from __future__ import annotations
import enum
import logging
import re
from dataclasses import dataclass, field, replace
from typing import Any
logger = logging.getLogger(__name__)
class PhaseState(enum.Enum):
"""Phases of the SOLO state machine (extends ExecutionMode.PLAN_EXEC)."""
PLANNING = "planning"
BUILDING = "building"
VERIFICATION = "verification"
DELIVERY = "delivery"
@classmethod
def next_of(cls, current: "PhaseState") -> "PhaseState | None":
"""Return the phase after `current`, or None if `current` is the last."""
order = [cls.PLANNING, cls.BUILDING, cls.VERIFICATION, cls.DELIVERY]
try:
idx = order.index(current)
except ValueError:
return None
if idx + 1 >= len(order):
return None
return order[idx + 1]
@classmethod
def from_string(cls, value: str) -> "PhaseState":
"""Parse from string (case-insensitive). Raises ValueError on unknown."""
try:
return cls(value.lower())
except ValueError as e:
valid = ", ".join(p.value for p in cls)
raise ValueError(f"Invalid phase name {value!r}. Valid: {valid}") from e
# Wildcard token meaning "all tools allowed in this phase".
WILDCARD = "*"
# Default bash command filter for PLANNING and VERIFICATION phases — blocks
# commands that mutate the filesystem or execute arbitrary code.
# ponytail: regex is intentionally conservative; misses some shell idioms
# (e.g., `:>file`, `dd of=file`). Ceiling: a real shell parser would catch
# more. Upgrade path = reuse ShellTool._is_dangerous() at enforcement time.
# Note: `\b` is a word boundary — works for word commands (rm/mv) but NOT
# for `>`/`>>` operators (not word chars). Use a non-boundary alternation
# that matches `>` either as a standalone operator or after whitespace.
_DEFAULT_BASH_FILTER = re.compile(r"\b(rm|mv|cp|mkdir|rmdir|chmod|chown)\b|(?<!\S)>|>>")
@dataclass(slots=True)
class PhasePolicy:
"""Per-phase tool whitelist + bash command filter for PLAN_EXEC mode.
The policy is enforced by ReActEngine._execute_loop before each tool
dispatch. A tool not in the current phase's whitelist is rejected with
a structured error returned to the LLM (the loop continues — the LLM
gets to react to the rejection and either switch tools or call
AdvancePhaseTool).
Wildcard ``"*"`` in a phase's whitelist means "all tools allowed"
(used by DELIVERY by default).
"""
whitelist: dict[PhaseState, frozenset[str]]
bash_command_filter: dict[PhaseState, re.Pattern | None] = field(default_factory=dict)
auto_advance_after_steps: int | None = None # None = manual (LLM calls advance_phase)
start_phase: PhaseState = PhaseState.PLANNING
def __post_init__(self) -> None:
# Fail-fast: empty whitelist for a non-wildcard phase = bug.
for phase, tools in self.whitelist.items():
if not tools:
raise ValueError(
f"Phase {phase.value!r} has an empty whitelist — set ['*'] for "
f"'all tools allowed' or list specific tool names."
)
def is_tool_allowed(self, tool_name: str, phase: PhaseState) -> bool:
"""Return True if `tool_name` is allowed in `phase`."""
allowed = self.whitelist.get(phase, frozenset())
if WILDCARD in allowed:
return True
return tool_name in allowed
def is_bash_command_allowed(self, command: str, phase: PhaseState) -> bool:
"""Return True if `command` passes the bash filter for `phase`.
A None filter = no restriction. An empty command is allowed (ShellTool
separately rejects empty commands).
"""
pattern = self.bash_command_filter.get(phase)
if pattern is None:
return True
return not pattern.search(command)
def to_dict(self) -> dict[str, Any]:
"""Serialize for logging/telemetry. Not round-trippable (regex → str)."""
return {
"whitelist": {phase.value: sorted(tools) for phase, tools in self.whitelist.items()},
"bash_command_filter": {
phase.value: (p.pattern if p else None)
for phase, p in self.bash_command_filter.items()
},
"auto_advance_after_steps": self.auto_advance_after_steps,
"start_phase": self.start_phase.value,
}
def default_policy() -> PhasePolicy:
"""Return the KTD5 default PhasePolicy.
Whitelist (R24):
- PLANNING: search, tool_search, read_file, shell (read-only)
- BUILDING: write_file, shell (full), read_file, search
- VERIFICATION: shell (test commands), read_file, search
- DELIVERY: all tools (wildcard)
Bash filter:
- PLANNING/VERIFICATION: blocks filesystem-mutating commands
(rm/mv/cp/mkdir/chmod/chown/>/>>)
- BUILDING/DELIVERY: no filter (full bash)
"""
return PhasePolicy(
whitelist={
# Tool name is "shell" (ShellTool default); bash_command_filter
# gates on the same name. Using "bash" here would make the filter
# dead code and block the LLM from shell access.
PhaseState.PLANNING: frozenset({"search", "tool_search", "read_file", "shell"}),
PhaseState.BUILDING: frozenset(
{"write_file", "shell", "read_file", "search", "tool_search"}
),
PhaseState.VERIFICATION: frozenset({"shell", "read_file", "search"}),
PhaseState.DELIVERY: frozenset({WILDCARD}),
},
bash_command_filter={
PhaseState.PLANNING: _DEFAULT_BASH_FILTER,
PhaseState.VERIFICATION: _DEFAULT_BASH_FILTER,
PhaseState.BUILDING: None,
PhaseState.DELIVERY: None,
},
auto_advance_after_steps=None, # manual by default
start_phase=PhaseState.PLANNING,
)
def policy_from_config(config: dict[str, Any]) -> PhasePolicy | None:
"""Build a PhasePolicy from the `plan_exec` config section.
Returns None if `config` is empty or `enabled` is False (opt-out).
Config shape:
plan_exec:
enabled: true # default true if section present
auto_advance_after_steps: 5 # optional
start_phase: planning # optional, default planning
whitelist_override: # optional, merges with default
planning: [search, read_file]
building: [write_file, bash]
"""
if not config:
return None
if config.get("enabled", True) is False:
return None
policy = default_policy()
# Start phase
start_phase_str = config.get("start_phase")
if start_phase_str:
policy = replace(policy, start_phase=PhaseState.from_string(start_phase_str))
# Auto-advance override
if "auto_advance_after_steps" in config:
policy = replace(policy, auto_advance_after_steps=config["auto_advance_after_steps"])
# Whitelist override — merge with default (override wins on conflict)
override = config.get("whitelist_override") or {}
if override:
new_whitelist = dict(policy.whitelist)
for phase_name, tools in override.items():
phase = PhaseState.from_string(phase_name)
if not isinstance(tools, list):
raise ValueError(
f"whitelist_override[{phase_name!r}] must be a list, got {type(tools).__name__}"
)
new_whitelist[phase] = frozenset(str(t) for t in tools)
policy = replace(policy, whitelist=new_whitelist)
return policy