"""Unit tests for PhasePolicy + PhaseState (G6 core, R24/R25/R26). Covers: - PhaseState enum (next_of, from_string) - default_policy() KTD5 whitelist - PhasePolicy.is_tool_allowed / is_bash_command_allowed - policy_from_config parsing (R26 config-driven) - ServerConfig.plan_exec integration - Wave 4 U1: bash_command_filter accepts Callable (ShellTool._is_dangerous reuse) """ from __future__ import annotations import re import pytest from agentkit.core.phase import ( WILDCARD, PhasePolicy, PhaseState, default_policy, policy_from_config, ) from agentkit.server.config import ServerConfig from agentkit.tools.shell import ShellTool # --------------------------------------------------------------------------- # PhaseState enum # --------------------------------------------------------------------------- class TestPhaseState: def test_values(self): assert PhaseState.PLANNING.value == "planning" assert PhaseState.BUILDING.value == "building" assert PhaseState.VERIFICATION.value == "verification" assert PhaseState.DELIVERY.value == "delivery" def test_next_of(self): assert PhaseState.next_of(PhaseState.PLANNING) == PhaseState.BUILDING assert PhaseState.next_of(PhaseState.BUILDING) == PhaseState.VERIFICATION assert PhaseState.next_of(PhaseState.VERIFICATION) == PhaseState.DELIVERY assert PhaseState.next_of(PhaseState.DELIVERY) is None def test_from_string_case_insensitive(self): assert PhaseState.from_string("planning") == PhaseState.PLANNING assert PhaseState.from_string("PLANNING") == PhaseState.PLANNING assert PhaseState.from_string("Building") == PhaseState.BUILDING def test_from_string_invalid_raises(self): with pytest.raises(ValueError, match="Invalid phase name"): PhaseState.from_string("unknown") with pytest.raises(ValueError, match="Valid:"): PhaseState.from_string("exploration") # --------------------------------------------------------------------------- # default_policy() — KTD5 whitelist # --------------------------------------------------------------------------- class TestDefaultPolicy: def test_has_all_four_phases(self): policy = default_policy() assert PhaseState.PLANNING in policy.whitelist assert PhaseState.BUILDING in policy.whitelist assert PhaseState.VERIFICATION in policy.whitelist assert PhaseState.DELIVERY in policy.whitelist def test_planning_whitelist_matches_r24(self): policy = default_policy() allowed = policy.whitelist[PhaseState.PLANNING] assert "search" in allowed assert "read_file" in allowed assert "shell" in allowed assert "tool_search" in allowed # Planning must NOT allow write_file. assert "write_file" not in allowed def test_building_whitelist_includes_write_file(self): policy = default_policy() allowed = policy.whitelist[PhaseState.BUILDING] assert "write_file" in allowed assert "shell" in allowed assert "read_file" in allowed def test_verification_whitelist_excludes_write(self): policy = default_policy() allowed = policy.whitelist[PhaseState.VERIFICATION] assert "shell" in allowed assert "read_file" in allowed assert "write_file" not in allowed def test_delivery_wildcard(self): policy = default_policy() allowed = policy.whitelist[PhaseState.DELIVERY] assert WILDCARD in allowed def test_start_phase_default_planning(self): assert default_policy().start_phase == PhaseState.PLANNING def test_auto_advance_default_none(self): # KTD6: manual by default. assert default_policy().auto_advance_after_steps is None def test_bash_filter_blocks_rm_in_planning(self): policy = default_policy() assert policy.is_bash_command_allowed("ls -la", PhaseState.PLANNING) is True assert policy.is_bash_command_allowed("git status", PhaseState.PLANNING) is True assert policy.is_bash_command_allowed("rm -rf /tmp/x", PhaseState.PLANNING) is False assert policy.is_bash_command_allowed("echo x > file.txt", PhaseState.PLANNING) is False def test_bash_filter_no_restriction_in_building(self): policy = default_policy() assert policy.is_bash_command_allowed("rm -rf build/", PhaseState.BUILDING) is True assert policy.is_bash_command_allowed("echo x > out.log", PhaseState.BUILDING) is True # --- Wave 4 U1 characterization (Wave 3 behavior preserved) ----------------- # default_policy() now wires ShellTool._is_dangerous (a Callable) for # PLANNING/VERIFICATION. These tests pin the contract so a future regression # in either ShellTool._is_dangerous or PhasePolicy dispatch surfaces here. def test_bash_filter_callable_in_default_policy(self): # Sanity: default_policy uses a Callable, not a regex Pattern. policy = default_policy() planning_filter = policy.bash_command_filter[PhaseState.PLANNING] assert callable(planning_filter) assert planning_filter is ShellTool._is_dangerous def test_bash_filter_characterization_safe_commands(self): # Wave 3 behavior preserved — safe read-only commands. policy = default_policy() for cmd in ("ls -la", "pwd", "git status", "find . -name foo", "cat README.md"): assert policy.is_bash_command_allowed(cmd, PhaseState.PLANNING) is True, cmd def test_bash_filter_characterization_dangerous_commands(self): # Wave 3 behavior preserved — commands blocked by the old regex. policy = default_policy() for cmd in ( "rm -rf /", "rm -rf /tmp/x", "mv a b", "cp a b", "mkdir newdir", "chmod 777 file", "chown root file", "echo x > file.txt", "echo x >> file.txt", ): assert policy.is_bash_command_allowed(cmd, PhaseState.PLANNING) is False, cmd # --- Wave 4 U1 ceiling closed (new edge cases the old regex missed) --------- def test_bash_filter_closes_regex_ceiling_dd_of(self): # Old regex missed `dd of=/dev/sda` (no word-boundary match for "dd"). policy = default_policy() assert policy.is_bash_command_allowed("dd of=/dev/sda", PhaseState.PLANNING) is False def test_bash_filter_closes_regex_ceiling_colon_redirect(self): # Old regex missed `:>file` (no whitespace before `>`). policy = default_policy() assert policy.is_bash_command_allowed(":>file", PhaseState.PLANNING) is False def test_bash_filter_closes_regex_ceiling_redirection_after_arg(self): # Old regex's `(?` looked for `>` at start or after whitespace. # `echo hello > /tmp/x` slipped through because `>` had a space before it # but the regex matched the wrong alternative. Verify the new filter # classifies this as dangerous. policy = default_policy() assert policy.is_bash_command_allowed("echo hello > /tmp/x", PhaseState.PLANNING) is False def test_bash_filter_closes_regex_ceiling_chain_operators(self): # Old regex did NOT match `;`, `&&`, `||` as dangerous. The new filter # treats all chain operators as dangerous (matches ShellTool behavior). policy = default_policy() for cmd in ( "ls; rm -rf /tmp", "ls && rm -rf /tmp", "ls || rm -rf /tmp", "$(rm -rf /tmp)", "`rm -rf /tmp`", ): assert policy.is_bash_command_allowed(cmd, PhaseState.PLANNING) is False, cmd def test_bash_filter_closes_regex_ceiling_pipe_with_dangerous_segment(self): # Old regex scanned the WHOLE command string, so `echo x | grep y` # would be allowed (no dangerous token) but `rm x | cat` would be # blocked (matches `\brm\b`). The new filter splits pipes and checks # each segment, so `echo x | grep y` should be allowed and # `rm x | cat` blocked. policy = default_policy() assert policy.is_bash_command_allowed("echo x | grep y", PhaseState.PLANNING) is True assert policy.is_bash_command_allowed("rm x | cat", PhaseState.PLANNING) is False def test_bash_filter_verification_phase_uses_callable(self): # Same callable wired into VERIFICATION. # Note: `pytest` is NOT in ShellTool._SAFE_COMMAND_PREFIXES, so # _is_dangerous returns True for it — the verification phase does NOT # widen the ShellTool whitelist. Use a known-safe read-only command # for the "allowed" assertion. (Wave 4 U1 reuses ShellTool._is_dangerous # as-is; expanding its safe-whitelist is out of scope.) policy = default_policy() assert policy.bash_command_filter[PhaseState.VERIFICATION] is ShellTool._is_dangerous assert policy.is_bash_command_allowed("rm -rf /", PhaseState.VERIFICATION) is False assert policy.is_bash_command_allowed("ls -la", PhaseState.VERIFICATION) is True assert policy.is_bash_command_allowed("git status", PhaseState.VERIFICATION) is True def test_bash_filter_delivery_phase_no_filter(self): # DELIVERY has no filter — full bash allowed. policy = default_policy() assert policy.bash_command_filter[PhaseState.DELIVERY] is None assert policy.is_bash_command_allowed("rm -rf /", PhaseState.DELIVERY) is True def test_bash_filter_empty_command_allowed(self): # is_bash_command_allowed must NOT call the filter on empty input — # ShellTool separately rejects empty commands. Empty is "allowed" by # the policy (no rejection injected to the LLM). policy = default_policy() assert policy.is_bash_command_allowed("", PhaseState.PLANNING) is True # --------------------------------------------------------------------------- # PhasePolicy — is_tool_allowed # --------------------------------------------------------------------------- class TestIsToolAllowed: def test_planning_allows_search(self): policy = default_policy() assert policy.is_tool_allowed("search", PhaseState.PLANNING) is True def test_planning_blocks_write_file(self): policy = default_policy() assert policy.is_tool_allowed("write_file", PhaseState.PLANNING) is False def test_building_allows_write_file(self): policy = default_policy() assert policy.is_tool_allowed("write_file", PhaseState.BUILDING) is True def test_delivery_wildcard_allows_anything(self): policy = default_policy() assert policy.is_tool_allowed("any_random_tool", PhaseState.DELIVERY) is True assert policy.is_tool_allowed("write_file", PhaseState.DELIVERY) is True def test_unknown_phase_returns_false(self): # ponytail: unknown phase → empty whitelist → no tool allowed. # We can't construct an unknown PhaseState (enum), but if a phase # were missing from the whitelist dict, is_tool_allowed should # return False (defensive). policy = PhasePolicy( whitelist={ PhaseState.PLANNING: frozenset({"search"}), PhaseState.BUILDING: frozenset({"write_file"}), PhaseState.VERIFICATION: frozenset({"shell"}), PhaseState.DELIVERY: frozenset({WILDCARD}), } ) # BUILDING is in whitelist, so allowed checks work normally. assert policy.is_tool_allowed("write_file", PhaseState.BUILDING) is True # Phase missing from whitelist would return False (defensive .get default). # We test this by constructing a minimal policy. minimal = PhasePolicy( whitelist={ PhaseState.PLANNING: frozenset({WILDCARD}), PhaseState.BUILDING: frozenset({WILDCARD}), PhaseState.VERIFICATION: frozenset({WILDCARD}), PhaseState.DELIVERY: frozenset({WILDCARD}), } ) # VERIFICATION is in whitelist — wildcard allows all. assert minimal.is_tool_allowed("anything", PhaseState.VERIFICATION) is True # --------------------------------------------------------------------------- # PhasePolicy — edge cases & errors # --------------------------------------------------------------------------- class TestPhasePolicyEdgeCases: def test_empty_whitelist_raises(self): # Fail-fast: an empty whitelist for a non-wildcard phase is a bug. with pytest.raises(ValueError, match="empty whitelist"): PhasePolicy( whitelist={ PhaseState.PLANNING: frozenset(), # empty! PhaseState.BUILDING: frozenset({WILDCARD}), PhaseState.VERIFICATION: frozenset({WILDCARD}), PhaseState.DELIVERY: frozenset({WILDCARD}), } ) def test_wildcard_only_does_not_raise(self): # Wildcard-only whitelist is valid (means "all tools"). policy = PhasePolicy( whitelist={ PhaseState.PLANNING: frozenset({WILDCARD}), PhaseState.BUILDING: frozenset({WILDCARD}), PhaseState.VERIFICATION: frozenset({WILDCARD}), PhaseState.DELIVERY: frozenset({WILDCARD}), } ) assert policy.is_tool_allowed("anything", PhaseState.PLANNING) is True def test_to_dict_serializable(self): policy = default_policy() d = policy.to_dict() assert "whitelist" in d assert "planning" in d["whitelist"] assert "delivery" in d["whitelist"] assert d["start_phase"] == "planning" assert d["auto_advance_after_steps"] is None def test_to_dict_serializes_callable_as_marker(self): # Wave 4 U1: default_policy now wires a Callable. to_dict must # surface it as "" so logs/telemetry stay readable. policy = default_policy() d = policy.to_dict() assert d["bash_command_filter"]["planning"] == "" assert d["bash_command_filter"]["verification"] == "" assert d["bash_command_filter"]["building"] is None assert d["bash_command_filter"]["delivery"] is None def test_custom_bash_filter(self): custom_filter = re.compile(r"\b(pip install|npm install)\b") policy = PhasePolicy( whitelist={ PhaseState.PLANNING: frozenset({"shell"}), PhaseState.BUILDING: frozenset({"shell"}), PhaseState.VERIFICATION: frozenset({"shell"}), PhaseState.DELIVERY: frozenset({WILDCARD}), }, bash_command_filter={PhaseState.BUILDING: custom_filter}, ) assert policy.is_bash_command_allowed("npm install foo", PhaseState.BUILDING) is False assert policy.is_bash_command_allowed("npm run build", PhaseState.BUILDING) is True def test_custom_bash_filter_accepts_callable(self): # Wave 4 U1: callable form. The callable returns True for dangerous. def deny_all(_: str) -> bool: return True # everything is "dangerous" policy = PhasePolicy( whitelist={ PhaseState.PLANNING: frozenset({"shell"}), PhaseState.BUILDING: frozenset({WILDCARD}), PhaseState.VERIFICATION: frozenset({WILDCARD}), PhaseState.DELIVERY: frozenset({WILDCARD}), }, bash_command_filter={PhaseState.PLANNING: deny_all}, ) assert policy.is_bash_command_allowed("ls", PhaseState.PLANNING) is False assert policy.is_bash_command_allowed("rm -rf /", PhaseState.PLANNING) is False def test_callable_filter_takes_precedence_over_pattern_form(self): # Wave 4 U1: when a phase has a callable wired, the dispatch path is # the callable branch, not the regex branch. Sanity-check the # is_bash_command_allowed routing — both forms coexist in the same # policy dict, each phase is independent. pattern = re.compile(r"\brm\b") policy = PhasePolicy( whitelist={ PhaseState.PLANNING: frozenset({"shell"}), PhaseState.BUILDING: frozenset({WILDCARD}), PhaseState.VERIFICATION: frozenset({WILDCARD}), PhaseState.DELIVERY: frozenset({WILDCARD}), }, bash_command_filter={ PhaseState.PLANNING: pattern, # regex PhaseState.BUILDING: ShellTool._is_dangerous, # callable }, ) # PLANNING uses regex form. assert policy.is_bash_command_allowed("rm x", PhaseState.PLANNING) is False assert policy.is_bash_command_allowed("ls", PhaseState.PLANNING) is True # BUILDING uses callable form. assert policy.is_bash_command_allowed("rm x", PhaseState.BUILDING) is False assert policy.is_bash_command_allowed("ls", PhaseState.BUILDING) is True # --------------------------------------------------------------------------- # policy_from_config — R26 (config-driven) # --------------------------------------------------------------------------- class TestPolicyFromConfig: def test_empty_config_returns_none(self): assert policy_from_config({}) is None def test_enabled_false_returns_none(self): # Opt-out — explicit `enabled: false` disables policy. result = policy_from_config({"enabled": False}) assert result is None def test_enabled_default_true_when_section_present(self): # When section is present but `enabled` is missing, default is True. result = policy_from_config({"auto_advance_after_steps": 3}) assert result is not None assert result.auto_advance_after_steps == 3 def test_auto_advance_after_steps(self): policy = policy_from_config({"enabled": True, "auto_advance_after_steps": 5}) assert policy is not None assert policy.auto_advance_after_steps == 5 def test_start_phase_custom(self): policy = policy_from_config({"enabled": True, "start_phase": "building"}) assert policy is not None assert policy.start_phase == PhaseState.BUILDING def test_start_phase_invalid_raises(self): with pytest.raises(ValueError, match="Invalid phase name"): policy_from_config({"enabled": True, "start_phase": "unknown"}) def test_whitelist_override_merges_with_default(self): policy = policy_from_config( { "enabled": True, "whitelist_override": { "planning": ["search", "read_file"], # removes shell from default }, } ) assert policy is not None # Override wins — shell should be removed from planning. assert policy.is_tool_allowed("search", PhaseState.PLANNING) is True assert policy.is_tool_allowed("read_file", PhaseState.PLANNING) is True assert policy.is_tool_allowed("shell", PhaseState.PLANNING) is False # Other phases unchanged. assert policy.is_tool_allowed("write_file", PhaseState.BUILDING) is True def test_whitelist_override_invalid_phase_raises(self): with pytest.raises(ValueError, match="Invalid phase name"): policy_from_config( { "enabled": True, "whitelist_override": {"unknown_phase": ["tool"]}, } ) def test_whitelist_override_non_list_raises(self): with pytest.raises(ValueError, match="must be a list"): policy_from_config( { "enabled": True, "whitelist_override": {"planning": "not a list"}, } ) def test_to_dict_round_trip_via_default(self): # Sanity: default policy serializes to a dict with expected keys. policy = default_policy() d = policy.to_dict() assert set(d["whitelist"].keys()) == { "planning", "building", "verification", "delivery", } # --------------------------------------------------------------------------- # ServerConfig.plan_exec integration (R26) # --------------------------------------------------------------------------- class TestServerConfigPlanExec: def test_default_plan_exec_empty(self): config = ServerConfig.from_dict({}) assert config.plan_exec == {} def test_plan_exec_loaded_from_dict(self): config = ServerConfig.from_dict( { "plan_exec": { "enabled": True, "auto_advance_after_steps": 5, } } ) assert config.plan_exec == {"enabled": True, "auto_advance_after_steps": 5} def test_plan_exec_empty_dict_default(self): config = ServerConfig.from_dict({"plan_exec": {}}) assert config.plan_exec == {} def test_plan_exec_resolved_to_policy(self): # Wire the config dict through policy_from_config to verify integration. config = ServerConfig.from_dict( { "plan_exec": { "enabled": True, "auto_advance_after_steps": 3, } } ) policy = policy_from_config(config.plan_exec) assert policy is not None assert policy.auto_advance_after_steps == 3 def test_plan_exec_disabled_via_config(self): config = ServerConfig.from_dict({"plan_exec": {"enabled": False}}) policy = policy_from_config(config.plan_exec) assert policy is None