"""Phase state machine for PLAN_EXEC mode (G6, R24/R25). Four sequential phases enforce per-step tool whitelists: PLANNING → BUILDING → VERIFICATION → DELIVERY KTD3 (Wave 3 plan): state machine lives in ReActEngine, not skill config. KTD5: default whitelist matches brainstorm R24 (Planning: think/search; Building: write_file; etc.). KTD6: transitions are LLM-driven via AdvancePhaseTool; auto-advance is opt-in. U3 (R3): ``default_policy()`` accepts an optional ``workspace_root`` and populates ``PhasePolicy.verification_commands`` via coding-task detection (``pyproject.toml`` / ``.py`` presence) — coding tasks force pytest/ruff; non-coding tasks leave the list empty for Spec-declared commands. """ from __future__ import annotations import enum import logging import re from dataclasses import dataclass, field, replace from pathlib import Path from typing import Any, Callable from agentkit.tools.shell import ShellTool logger = logging.getLogger(__name__) class PhaseState(enum.Enum): """Phases of the SOLO state machine (extends ExecutionMode.PLAN_EXEC).""" PLANNING = "planning" BUILDING = "building" VERIFICATION = "verification" DELIVERY = "delivery" @classmethod def next_of(cls, current: "PhaseState") -> "PhaseState | None": """Return the phase after `current`, or None if `current` is the last.""" order = [cls.PLANNING, cls.BUILDING, cls.VERIFICATION, cls.DELIVERY] try: idx = order.index(current) except ValueError: return None if idx + 1 >= len(order): return None return order[idx + 1] @classmethod def from_string(cls, value: str) -> "PhaseState": """Parse from string (case-insensitive). Raises ValueError on unknown.""" try: return cls(value.lower()) except ValueError as e: valid = ", ".join(p.value for p in cls) raise ValueError(f"Invalid phase name {value!r}. Valid: {valid}") from e # Wildcard token meaning "all tools allowed in this phase". WILDCARD = "*" @dataclass(slots=True) class PhasePolicy: """Per-phase tool whitelist + bash command filter for PLAN_EXEC mode. The policy is enforced by ReActEngine._execute_loop before each tool dispatch. A tool not in the current phase's whitelist is rejected with a structured error returned to the LLM (the loop continues — the LLM gets to react to the rejection and either switch tools or call AdvancePhaseTool). Wildcard ``"*"`` in a phase's whitelist means "all tools allowed" (used by DELIVERY by default). `bash_command_filter` values accept either: - `Callable[[str], bool]`: returns True if the command is dangerous (matches `ShellTool._is_dangerous` semantics); allowed = not dangerous. - `re.Pattern`: pattern matches dangerous substrings; allowed = no match. Kept for backward compat with Wave 3 configs. - `None`: no restriction for this phase. """ whitelist: dict[PhaseState, frozenset[str]] bash_command_filter: dict[PhaseState, Callable[[str], bool] | re.Pattern | None] = field( default_factory=dict ) auto_advance_after_steps: int | None = None # None = manual (LLM calls advance_phase) start_phase: PhaseState = PhaseState.PLANNING # U3/R3: verification commands to run at the VERIFICATION phase's final-answer # point. Populated by default_policy() via coding-task detection. None = no # opinion (ReActEngine falls back to its own verification_commands param or # VerificationLoop defaults). An empty list means "no commands" (verification # passes trivially — for non-coding tasks using Spec-declared commands instead). verification_commands: list[str] | None = None # U4/R11: total step budget for the plan (sum of think+verify+reflect). # None = use ReActEngine's max_steps. Provides a checkpoint-reconstructable # record of the plan's total step budget (KTD-7). step_budget: int | None = None def __post_init__(self) -> None: # Fail-fast: empty whitelist for a non-wildcard phase = bug. for phase, tools in self.whitelist.items(): if not tools: raise ValueError( f"Phase {phase.value!r} has an empty whitelist — set ['*'] for " f"'all tools allowed' or list specific tool names." ) def is_tool_allowed(self, tool_name: str, phase: PhaseState) -> bool: """Return True if `tool_name` is allowed in `phase`.""" allowed = self.whitelist.get(phase, frozenset()) if WILDCARD in allowed: return True return tool_name in allowed def is_bash_command_allowed(self, command: str, phase: PhaseState) -> bool: """Return True if `command` passes the bash filter for `phase`. A None filter = no restriction. An empty command is allowed (ShellTool separately rejects empty commands) — short-circuited here so the ShellTool path emits a clearer "empty command" error instead of a phase-violation noise injected back to the LLM. """ if not command: return True filter_value = self.bash_command_filter.get(phase) if filter_value is None: return True if callable(filter_value): # Callable contract: returns True if dangerous. return not filter_value(command) # re.Pattern contract: search() returns a Match if dangerous. return not filter_value.search(command) def to_dict(self) -> dict[str, Any]: """Serialize for logging/telemetry. Not round-trippable (regex/callable → str).""" return { "whitelist": {phase.value: sorted(tools) for phase, tools in self.whitelist.items()}, "bash_command_filter": { phase.value: ("" if callable(p) else (p.pattern if p else None)) for phase, p in self.bash_command_filter.items() }, "auto_advance_after_steps": self.auto_advance_after_steps, "start_phase": self.start_phase.value, "verification_commands": self.verification_commands, "step_budget": self.step_budget, } def default_policy(workspace_root: str | Path | None = None) -> PhasePolicy: """Return the KTD5 default PhasePolicy. Whitelist (R24): - PLANNING: search, tool_search, read_file, shell (read-only) - BUILDING: write_file, shell (full), read_file, search - VERIFICATION: shell (test commands), read_file, search - DELIVERY: all tools (wildcard) Bash filter: - PLANNING/VERIFICATION: reuse `ShellTool._is_dangerous` (Wave 4 U1). Closes the regex ceiling — catches `:>file`, `dd of=/dev/sda`, chain operators, and the full danger taxonomy shared with the ShellTool confirmation path. - BUILDING/DELIVERY: no filter (full bash) U3/R3: ``verification_commands`` is populated via coding-task detection on ``workspace_root``. Coding workspaces (``pyproject.toml`` or ``.py`` present) force ``pytest -x -q`` and ``ruff check src/``. Non-coding workspaces get ``None`` (no opinion — Spec-declared commands are used). """ # U3/R3: coding-task detection. Local import avoids a circular dependency # (sandbox.py is standalone, but keeping the import local makes the R3 # concern visually scoped to default_policy). from agentkit.core.sandbox import detect_verification_commands verification_cmds = detect_verification_commands(workspace_root) # detect_verification_commands returns [] for non-coding workspaces. # For non-coding workspaces, leave verification_commands as None so the # caller knows "no coding-specific commands" and can substitute Spec-declared # commands. For coding workspaces, set the forced pytest/ruff list. return PhasePolicy( whitelist={ # Tool name is "shell" (ShellTool default); bash_command_filter # gates on the same name. Using "bash" here would make the filter # dead code and block the LLM from shell access. PhaseState.PLANNING: frozenset({"search", "tool_search", "read_file", "shell"}), PhaseState.BUILDING: frozenset( {"write_file", "shell", "read_file", "search", "tool_search"} ), PhaseState.VERIFICATION: frozenset({"shell", "read_file", "search"}), PhaseState.DELIVERY: frozenset({WILDCARD}), }, bash_command_filter={ PhaseState.PLANNING: ShellTool._is_dangerous, PhaseState.VERIFICATION: ShellTool._is_dangerous, PhaseState.BUILDING: None, PhaseState.DELIVERY: None, }, auto_advance_after_steps=None, # manual by default start_phase=PhaseState.PLANNING, verification_commands=verification_cmds if verification_cmds else None, ) def policy_from_config(config: dict[str, Any]) -> PhasePolicy | None: """Build a PhasePolicy from the `plan_exec` config section. Returns None if `config` is empty or `enabled` is False (opt-out). Config shape: plan_exec: enabled: true # default true if section present auto_advance_after_steps: 5 # optional start_phase: planning # optional, default planning whitelist_override: # optional, merges with default planning: [search, read_file] building: [write_file, bash] """ if not config: return None if config.get("enabled", True) is False: return None policy = default_policy() # Start phase start_phase_str = config.get("start_phase") if start_phase_str: policy = replace(policy, start_phase=PhaseState.from_string(start_phase_str)) # Auto-advance override if "auto_advance_after_steps" in config: policy = replace(policy, auto_advance_after_steps=config["auto_advance_after_steps"]) # Whitelist override — merge with default (override wins on conflict) override = config.get("whitelist_override") or {} if override: new_whitelist = dict(policy.whitelist) for phase_name, tools in override.items(): phase = PhaseState.from_string(phase_name) if not isinstance(tools, list): raise ValueError( f"whitelist_override[{phase_name!r}] must be a list, got {type(tools).__name__}" ) new_whitelist[phase] = frozenset(str(t) for t in tools) policy = replace(policy, whitelist=new_whitelist) return policy