fischer-agentkit/tests/unit/test_phase_policy.py

506 lines
21 KiB
Python

"""Unit tests for PhasePolicy + PhaseState (G6 core, R24/R25/R26).
Covers:
- PhaseState enum (next_of, from_string)
- default_policy() KTD5 whitelist
- PhasePolicy.is_tool_allowed / is_bash_command_allowed
- policy_from_config parsing (R26 config-driven)
- ServerConfig.plan_exec integration
- Wave 4 U1: bash_command_filter accepts Callable (ShellTool._is_dangerous reuse)
"""
from __future__ import annotations
import re
import pytest
from agentkit.core.phase import (
WILDCARD,
PhasePolicy,
PhaseState,
default_policy,
policy_from_config,
)
from agentkit.server.config import ServerConfig
from agentkit.tools.shell import ShellTool
# ---------------------------------------------------------------------------
# PhaseState enum
# ---------------------------------------------------------------------------
class TestPhaseState:
def test_values(self):
assert PhaseState.PLANNING.value == "planning"
assert PhaseState.BUILDING.value == "building"
assert PhaseState.VERIFICATION.value == "verification"
assert PhaseState.DELIVERY.value == "delivery"
def test_next_of(self):
assert PhaseState.next_of(PhaseState.PLANNING) == PhaseState.BUILDING
assert PhaseState.next_of(PhaseState.BUILDING) == PhaseState.VERIFICATION
assert PhaseState.next_of(PhaseState.VERIFICATION) == PhaseState.DELIVERY
assert PhaseState.next_of(PhaseState.DELIVERY) is None
def test_from_string_case_insensitive(self):
assert PhaseState.from_string("planning") == PhaseState.PLANNING
assert PhaseState.from_string("PLANNING") == PhaseState.PLANNING
assert PhaseState.from_string("Building") == PhaseState.BUILDING
def test_from_string_invalid_raises(self):
with pytest.raises(ValueError, match="Invalid phase name"):
PhaseState.from_string("unknown")
with pytest.raises(ValueError, match="Valid:"):
PhaseState.from_string("exploration")
# ---------------------------------------------------------------------------
# default_policy() — KTD5 whitelist
# ---------------------------------------------------------------------------
class TestDefaultPolicy:
def test_has_all_four_phases(self):
policy = default_policy()
assert PhaseState.PLANNING in policy.whitelist
assert PhaseState.BUILDING in policy.whitelist
assert PhaseState.VERIFICATION in policy.whitelist
assert PhaseState.DELIVERY in policy.whitelist
def test_planning_whitelist_matches_r24(self):
policy = default_policy()
allowed = policy.whitelist[PhaseState.PLANNING]
assert "search" in allowed
assert "read_file" in allowed
assert "shell" in allowed
assert "tool_search" in allowed
# Planning must NOT allow write_file.
assert "write_file" not in allowed
def test_building_whitelist_includes_write_file(self):
policy = default_policy()
allowed = policy.whitelist[PhaseState.BUILDING]
assert "write_file" in allowed
assert "shell" in allowed
assert "read_file" in allowed
def test_verification_whitelist_excludes_write(self):
policy = default_policy()
allowed = policy.whitelist[PhaseState.VERIFICATION]
assert "shell" in allowed
assert "read_file" in allowed
assert "write_file" not in allowed
def test_delivery_wildcard(self):
policy = default_policy()
allowed = policy.whitelist[PhaseState.DELIVERY]
assert WILDCARD in allowed
def test_start_phase_default_planning(self):
assert default_policy().start_phase == PhaseState.PLANNING
def test_auto_advance_default_none(self):
# KTD6: manual by default.
assert default_policy().auto_advance_after_steps is None
def test_bash_filter_blocks_rm_in_planning(self):
policy = default_policy()
assert policy.is_bash_command_allowed("ls -la", PhaseState.PLANNING) is True
assert policy.is_bash_command_allowed("git status", PhaseState.PLANNING) is True
assert policy.is_bash_command_allowed("rm -rf /tmp/x", PhaseState.PLANNING) is False
assert policy.is_bash_command_allowed("echo x > file.txt", PhaseState.PLANNING) is False
def test_bash_filter_no_restriction_in_building(self):
policy = default_policy()
assert policy.is_bash_command_allowed("rm -rf build/", PhaseState.BUILDING) is True
assert policy.is_bash_command_allowed("echo x > out.log", PhaseState.BUILDING) is True
# --- Wave 4 U1 characterization (Wave 3 behavior preserved) -----------------
# default_policy() now wires ShellTool._is_dangerous (a Callable) for
# PLANNING/VERIFICATION. These tests pin the contract so a future regression
# in either ShellTool._is_dangerous or PhasePolicy dispatch surfaces here.
def test_bash_filter_callable_in_default_policy(self):
# Sanity: default_policy uses a Callable, not a regex Pattern.
policy = default_policy()
planning_filter = policy.bash_command_filter[PhaseState.PLANNING]
assert callable(planning_filter)
assert planning_filter is ShellTool._is_dangerous
def test_bash_filter_characterization_safe_commands(self):
# Wave 3 behavior preserved — safe read-only commands.
policy = default_policy()
for cmd in ("ls -la", "pwd", "git status", "find . -name foo", "cat README.md"):
assert policy.is_bash_command_allowed(cmd, PhaseState.PLANNING) is True, cmd
def test_bash_filter_characterization_dangerous_commands(self):
# Wave 3 behavior preserved — commands blocked by the old regex.
policy = default_policy()
for cmd in (
"rm -rf /",
"rm -rf /tmp/x",
"mv a b",
"cp a b",
"mkdir newdir",
"chmod 777 file",
"chown root file",
"echo x > file.txt",
"echo x >> file.txt",
):
assert policy.is_bash_command_allowed(cmd, PhaseState.PLANNING) is False, cmd
# --- Wave 4 U1 ceiling closed (new edge cases the old regex missed) ---------
def test_bash_filter_closes_regex_ceiling_dd_of(self):
# Old regex missed `dd of=/dev/sda` (no word-boundary match for "dd").
policy = default_policy()
assert policy.is_bash_command_allowed("dd of=/dev/sda", PhaseState.PLANNING) is False
def test_bash_filter_closes_regex_ceiling_colon_redirect(self):
# Old regex missed `:>file` (no whitespace before `>`).
policy = default_policy()
assert policy.is_bash_command_allowed(":>file", PhaseState.PLANNING) is False
def test_bash_filter_closes_regex_ceiling_redirection_after_arg(self):
# Old regex's `(?<!\S)>` looked for `>` at start or after whitespace.
# `echo hello > /tmp/x` slipped through because `>` had a space before it
# but the regex matched the wrong alternative. Verify the new filter
# classifies this as dangerous.
policy = default_policy()
assert policy.is_bash_command_allowed("echo hello > /tmp/x", PhaseState.PLANNING) is False
def test_bash_filter_closes_regex_ceiling_chain_operators(self):
# Old regex did NOT match `;`, `&&`, `||` as dangerous. The new filter
# treats all chain operators as dangerous (matches ShellTool behavior).
policy = default_policy()
for cmd in (
"ls; rm -rf /tmp",
"ls && rm -rf /tmp",
"ls || rm -rf /tmp",
"$(rm -rf /tmp)",
"`rm -rf /tmp`",
):
assert policy.is_bash_command_allowed(cmd, PhaseState.PLANNING) is False, cmd
def test_bash_filter_closes_regex_ceiling_pipe_with_dangerous_segment(self):
# Old regex scanned the WHOLE command string, so `echo x | grep y`
# would be allowed (no dangerous token) but `rm x | cat` would be
# blocked (matches `\brm\b`). The new filter splits pipes and checks
# each segment, so `echo x | grep y` should be allowed and
# `rm x | cat` blocked.
policy = default_policy()
assert policy.is_bash_command_allowed("echo x | grep y", PhaseState.PLANNING) is True
assert policy.is_bash_command_allowed("rm x | cat", PhaseState.PLANNING) is False
def test_bash_filter_verification_phase_uses_callable(self):
# Same callable wired into VERIFICATION.
# Note: `pytest` is NOT in ShellTool._SAFE_COMMAND_PREFIXES, so
# _is_dangerous returns True for it — the verification phase does NOT
# widen the ShellTool whitelist. Use a known-safe read-only command
# for the "allowed" assertion. (Wave 4 U1 reuses ShellTool._is_dangerous
# as-is; expanding its safe-whitelist is out of scope.)
policy = default_policy()
assert policy.bash_command_filter[PhaseState.VERIFICATION] is ShellTool._is_dangerous
assert policy.is_bash_command_allowed("rm -rf /", PhaseState.VERIFICATION) is False
assert policy.is_bash_command_allowed("ls -la", PhaseState.VERIFICATION) is True
assert policy.is_bash_command_allowed("git status", PhaseState.VERIFICATION) is True
def test_bash_filter_delivery_phase_no_filter(self):
# DELIVERY has no filter — full bash allowed.
policy = default_policy()
assert policy.bash_command_filter[PhaseState.DELIVERY] is None
assert policy.is_bash_command_allowed("rm -rf /", PhaseState.DELIVERY) is True
def test_bash_filter_empty_command_allowed(self):
# is_bash_command_allowed must NOT call the filter on empty input —
# ShellTool separately rejects empty commands. Empty is "allowed" by
# the policy (no rejection injected to the LLM).
policy = default_policy()
assert policy.is_bash_command_allowed("", PhaseState.PLANNING) is True
# ---------------------------------------------------------------------------
# PhasePolicy — is_tool_allowed
# ---------------------------------------------------------------------------
class TestIsToolAllowed:
def test_planning_allows_search(self):
policy = default_policy()
assert policy.is_tool_allowed("search", PhaseState.PLANNING) is True
def test_planning_blocks_write_file(self):
policy = default_policy()
assert policy.is_tool_allowed("write_file", PhaseState.PLANNING) is False
def test_building_allows_write_file(self):
policy = default_policy()
assert policy.is_tool_allowed("write_file", PhaseState.BUILDING) is True
def test_delivery_wildcard_allows_anything(self):
policy = default_policy()
assert policy.is_tool_allowed("any_random_tool", PhaseState.DELIVERY) is True
assert policy.is_tool_allowed("write_file", PhaseState.DELIVERY) is True
def test_unknown_phase_returns_false(self):
# ponytail: unknown phase → empty whitelist → no tool allowed.
# We can't construct an unknown PhaseState (enum), but if a phase
# were missing from the whitelist dict, is_tool_allowed should
# return False (defensive).
policy = PhasePolicy(
whitelist={
PhaseState.PLANNING: frozenset({"search"}),
PhaseState.BUILDING: frozenset({"write_file"}),
PhaseState.VERIFICATION: frozenset({"shell"}),
PhaseState.DELIVERY: frozenset({WILDCARD}),
}
)
# BUILDING is in whitelist, so allowed checks work normally.
assert policy.is_tool_allowed("write_file", PhaseState.BUILDING) is True
# Phase missing from whitelist would return False (defensive .get default).
# We test this by constructing a minimal policy.
minimal = PhasePolicy(
whitelist={
PhaseState.PLANNING: frozenset({WILDCARD}),
PhaseState.BUILDING: frozenset({WILDCARD}),
PhaseState.VERIFICATION: frozenset({WILDCARD}),
PhaseState.DELIVERY: frozenset({WILDCARD}),
}
)
# VERIFICATION is in whitelist — wildcard allows all.
assert minimal.is_tool_allowed("anything", PhaseState.VERIFICATION) is True
# ---------------------------------------------------------------------------
# PhasePolicy — edge cases & errors
# ---------------------------------------------------------------------------
class TestPhasePolicyEdgeCases:
def test_empty_whitelist_raises(self):
# Fail-fast: an empty whitelist for a non-wildcard phase is a bug.
with pytest.raises(ValueError, match="empty whitelist"):
PhasePolicy(
whitelist={
PhaseState.PLANNING: frozenset(), # empty!
PhaseState.BUILDING: frozenset({WILDCARD}),
PhaseState.VERIFICATION: frozenset({WILDCARD}),
PhaseState.DELIVERY: frozenset({WILDCARD}),
}
)
def test_wildcard_only_does_not_raise(self):
# Wildcard-only whitelist is valid (means "all tools").
policy = PhasePolicy(
whitelist={
PhaseState.PLANNING: frozenset({WILDCARD}),
PhaseState.BUILDING: frozenset({WILDCARD}),
PhaseState.VERIFICATION: frozenset({WILDCARD}),
PhaseState.DELIVERY: frozenset({WILDCARD}),
}
)
assert policy.is_tool_allowed("anything", PhaseState.PLANNING) is True
def test_to_dict_serializable(self):
policy = default_policy()
d = policy.to_dict()
assert "whitelist" in d
assert "planning" in d["whitelist"]
assert "delivery" in d["whitelist"]
assert d["start_phase"] == "planning"
assert d["auto_advance_after_steps"] is None
def test_to_dict_serializes_callable_as_marker(self):
# Wave 4 U1: default_policy now wires a Callable. to_dict must
# surface it as "<callable>" so logs/telemetry stay readable.
policy = default_policy()
d = policy.to_dict()
assert d["bash_command_filter"]["planning"] == "<callable>"
assert d["bash_command_filter"]["verification"] == "<callable>"
assert d["bash_command_filter"]["building"] is None
assert d["bash_command_filter"]["delivery"] is None
def test_custom_bash_filter(self):
custom_filter = re.compile(r"\b(pip install|npm install)\b")
policy = PhasePolicy(
whitelist={
PhaseState.PLANNING: frozenset({"shell"}),
PhaseState.BUILDING: frozenset({"shell"}),
PhaseState.VERIFICATION: frozenset({"shell"}),
PhaseState.DELIVERY: frozenset({WILDCARD}),
},
bash_command_filter={PhaseState.BUILDING: custom_filter},
)
assert policy.is_bash_command_allowed("npm install foo", PhaseState.BUILDING) is False
assert policy.is_bash_command_allowed("npm run build", PhaseState.BUILDING) is True
def test_custom_bash_filter_accepts_callable(self):
# Wave 4 U1: callable form. The callable returns True for dangerous.
def deny_all(_: str) -> bool:
return True # everything is "dangerous"
policy = PhasePolicy(
whitelist={
PhaseState.PLANNING: frozenset({"shell"}),
PhaseState.BUILDING: frozenset({WILDCARD}),
PhaseState.VERIFICATION: frozenset({WILDCARD}),
PhaseState.DELIVERY: frozenset({WILDCARD}),
},
bash_command_filter={PhaseState.PLANNING: deny_all},
)
assert policy.is_bash_command_allowed("ls", PhaseState.PLANNING) is False
assert policy.is_bash_command_allowed("rm -rf /", PhaseState.PLANNING) is False
def test_callable_filter_takes_precedence_over_pattern_form(self):
# Wave 4 U1: when a phase has a callable wired, the dispatch path is
# the callable branch, not the regex branch. Sanity-check the
# is_bash_command_allowed routing — both forms coexist in the same
# policy dict, each phase is independent.
pattern = re.compile(r"\brm\b")
policy = PhasePolicy(
whitelist={
PhaseState.PLANNING: frozenset({"shell"}),
PhaseState.BUILDING: frozenset({WILDCARD}),
PhaseState.VERIFICATION: frozenset({WILDCARD}),
PhaseState.DELIVERY: frozenset({WILDCARD}),
},
bash_command_filter={
PhaseState.PLANNING: pattern, # regex
PhaseState.BUILDING: ShellTool._is_dangerous, # callable
},
)
# PLANNING uses regex form.
assert policy.is_bash_command_allowed("rm x", PhaseState.PLANNING) is False
assert policy.is_bash_command_allowed("ls", PhaseState.PLANNING) is True
# BUILDING uses callable form.
assert policy.is_bash_command_allowed("rm x", PhaseState.BUILDING) is False
assert policy.is_bash_command_allowed("ls", PhaseState.BUILDING) is True
# ---------------------------------------------------------------------------
# policy_from_config — R26 (config-driven)
# ---------------------------------------------------------------------------
class TestPolicyFromConfig:
def test_empty_config_returns_none(self):
assert policy_from_config({}) is None
def test_enabled_false_returns_none(self):
# Opt-out — explicit `enabled: false` disables policy.
result = policy_from_config({"enabled": False})
assert result is None
def test_enabled_default_true_when_section_present(self):
# When section is present but `enabled` is missing, default is True.
result = policy_from_config({"auto_advance_after_steps": 3})
assert result is not None
assert result.auto_advance_after_steps == 3
def test_auto_advance_after_steps(self):
policy = policy_from_config({"enabled": True, "auto_advance_after_steps": 5})
assert policy is not None
assert policy.auto_advance_after_steps == 5
def test_start_phase_custom(self):
policy = policy_from_config({"enabled": True, "start_phase": "building"})
assert policy is not None
assert policy.start_phase == PhaseState.BUILDING
def test_start_phase_invalid_raises(self):
with pytest.raises(ValueError, match="Invalid phase name"):
policy_from_config({"enabled": True, "start_phase": "unknown"})
def test_whitelist_override_merges_with_default(self):
policy = policy_from_config(
{
"enabled": True,
"whitelist_override": {
"planning": ["search", "read_file"], # removes shell from default
},
}
)
assert policy is not None
# Override wins — shell should be removed from planning.
assert policy.is_tool_allowed("search", PhaseState.PLANNING) is True
assert policy.is_tool_allowed("read_file", PhaseState.PLANNING) is True
assert policy.is_tool_allowed("shell", PhaseState.PLANNING) is False
# Other phases unchanged.
assert policy.is_tool_allowed("write_file", PhaseState.BUILDING) is True
def test_whitelist_override_invalid_phase_raises(self):
with pytest.raises(ValueError, match="Invalid phase name"):
policy_from_config(
{
"enabled": True,
"whitelist_override": {"unknown_phase": ["tool"]},
}
)
def test_whitelist_override_non_list_raises(self):
with pytest.raises(ValueError, match="must be a list"):
policy_from_config(
{
"enabled": True,
"whitelist_override": {"planning": "not a list"},
}
)
def test_to_dict_round_trip_via_default(self):
# Sanity: default policy serializes to a dict with expected keys.
policy = default_policy()
d = policy.to_dict()
assert set(d["whitelist"].keys()) == {
"planning",
"building",
"verification",
"delivery",
}
# ---------------------------------------------------------------------------
# ServerConfig.plan_exec integration (R26)
# ---------------------------------------------------------------------------
class TestServerConfigPlanExec:
def test_default_plan_exec_empty(self):
config = ServerConfig.from_dict({})
assert config.plan_exec == {}
def test_plan_exec_loaded_from_dict(self):
config = ServerConfig.from_dict(
{
"plan_exec": {
"enabled": True,
"auto_advance_after_steps": 5,
}
}
)
assert config.plan_exec == {"enabled": True, "auto_advance_after_steps": 5}
def test_plan_exec_empty_dict_default(self):
config = ServerConfig.from_dict({"plan_exec": {}})
assert config.plan_exec == {}
def test_plan_exec_resolved_to_policy(self):
# Wire the config dict through policy_from_config to verify integration.
config = ServerConfig.from_dict(
{
"plan_exec": {
"enabled": True,
"auto_advance_after_steps": 3,
}
}
)
policy = policy_from_config(config.plan_exec)
assert policy is not None
assert policy.auto_advance_after_steps == 3
def test_plan_exec_disabled_via_config(self):
config = ServerConfig.from_dict({"plan_exec": {"enabled": False}})
policy = policy_from_config(config.plan_exec)
assert policy is None