349 lines
14 KiB
Python
349 lines
14 KiB
Python
"""Unit tests for PhasePolicy + PhaseState (G6 core, R24/R25/R26).
|
|
|
|
Covers:
|
|
- PhaseState enum (next_of, from_string)
|
|
- default_policy() KTD5 whitelist
|
|
- PhasePolicy.is_tool_allowed / is_bash_command_allowed
|
|
- policy_from_config parsing (R26 config-driven)
|
|
- ServerConfig.plan_exec integration
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import re
|
|
|
|
import pytest
|
|
|
|
from agentkit.core.phase import (
|
|
WILDCARD,
|
|
PhasePolicy,
|
|
PhaseState,
|
|
default_policy,
|
|
policy_from_config,
|
|
)
|
|
from agentkit.server.config import ServerConfig
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# PhaseState enum
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestPhaseState:
|
|
def test_values(self):
|
|
assert PhaseState.PLANNING.value == "planning"
|
|
assert PhaseState.BUILDING.value == "building"
|
|
assert PhaseState.VERIFICATION.value == "verification"
|
|
assert PhaseState.DELIVERY.value == "delivery"
|
|
|
|
def test_next_of(self):
|
|
assert PhaseState.next_of(PhaseState.PLANNING) == PhaseState.BUILDING
|
|
assert PhaseState.next_of(PhaseState.BUILDING) == PhaseState.VERIFICATION
|
|
assert PhaseState.next_of(PhaseState.VERIFICATION) == PhaseState.DELIVERY
|
|
assert PhaseState.next_of(PhaseState.DELIVERY) is None
|
|
|
|
def test_from_string_case_insensitive(self):
|
|
assert PhaseState.from_string("planning") == PhaseState.PLANNING
|
|
assert PhaseState.from_string("PLANNING") == PhaseState.PLANNING
|
|
assert PhaseState.from_string("Building") == PhaseState.BUILDING
|
|
|
|
def test_from_string_invalid_raises(self):
|
|
with pytest.raises(ValueError, match="Invalid phase name"):
|
|
PhaseState.from_string("unknown")
|
|
with pytest.raises(ValueError, match="Valid:"):
|
|
PhaseState.from_string("exploration")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# default_policy() — KTD5 whitelist
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestDefaultPolicy:
|
|
def test_has_all_four_phases(self):
|
|
policy = default_policy()
|
|
assert PhaseState.PLANNING in policy.whitelist
|
|
assert PhaseState.BUILDING in policy.whitelist
|
|
assert PhaseState.VERIFICATION in policy.whitelist
|
|
assert PhaseState.DELIVERY in policy.whitelist
|
|
|
|
def test_planning_whitelist_matches_r24(self):
|
|
policy = default_policy()
|
|
allowed = policy.whitelist[PhaseState.PLANNING]
|
|
assert "search" in allowed
|
|
assert "read_file" in allowed
|
|
assert "bash" in allowed
|
|
assert "tool_search" in allowed
|
|
# Planning must NOT allow write_file.
|
|
assert "write_file" not in allowed
|
|
|
|
def test_building_whitelist_includes_write_file(self):
|
|
policy = default_policy()
|
|
allowed = policy.whitelist[PhaseState.BUILDING]
|
|
assert "write_file" in allowed
|
|
assert "bash" in allowed
|
|
assert "read_file" in allowed
|
|
|
|
def test_verification_whitelist_excludes_write(self):
|
|
policy = default_policy()
|
|
allowed = policy.whitelist[PhaseState.VERIFICATION]
|
|
assert "bash" in allowed
|
|
assert "read_file" in allowed
|
|
assert "write_file" not in allowed
|
|
|
|
def test_delivery_wildcard(self):
|
|
policy = default_policy()
|
|
allowed = policy.whitelist[PhaseState.DELIVERY]
|
|
assert WILDCARD in allowed
|
|
|
|
def test_start_phase_default_planning(self):
|
|
assert default_policy().start_phase == PhaseState.PLANNING
|
|
|
|
def test_auto_advance_default_none(self):
|
|
# KTD6: manual by default.
|
|
assert default_policy().auto_advance_after_steps is None
|
|
|
|
def test_bash_filter_blocks_rm_in_planning(self):
|
|
policy = default_policy()
|
|
assert policy.is_bash_command_allowed("ls -la", PhaseState.PLANNING) is True
|
|
assert policy.is_bash_command_allowed("git status", PhaseState.PLANNING) is True
|
|
assert policy.is_bash_command_allowed("rm -rf /tmp/x", PhaseState.PLANNING) is False
|
|
assert policy.is_bash_command_allowed("echo x > file.txt", PhaseState.PLANNING) is False
|
|
|
|
def test_bash_filter_no_restriction_in_building(self):
|
|
policy = default_policy()
|
|
assert policy.is_bash_command_allowed("rm -rf build/", PhaseState.BUILDING) is True
|
|
assert policy.is_bash_command_allowed("echo x > out.log", PhaseState.BUILDING) is True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# PhasePolicy — is_tool_allowed
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestIsToolAllowed:
|
|
def test_planning_allows_search(self):
|
|
policy = default_policy()
|
|
assert policy.is_tool_allowed("search", PhaseState.PLANNING) is True
|
|
|
|
def test_planning_blocks_write_file(self):
|
|
policy = default_policy()
|
|
assert policy.is_tool_allowed("write_file", PhaseState.PLANNING) is False
|
|
|
|
def test_building_allows_write_file(self):
|
|
policy = default_policy()
|
|
assert policy.is_tool_allowed("write_file", PhaseState.BUILDING) is True
|
|
|
|
def test_delivery_wildcard_allows_anything(self):
|
|
policy = default_policy()
|
|
assert policy.is_tool_allowed("any_random_tool", PhaseState.DELIVERY) is True
|
|
assert policy.is_tool_allowed("write_file", PhaseState.DELIVERY) is True
|
|
|
|
def test_unknown_phase_returns_false(self):
|
|
# ponytail: unknown phase → empty whitelist → no tool allowed.
|
|
# We can't construct an unknown PhaseState (enum), but if a phase
|
|
# were missing from the whitelist dict, is_tool_allowed should
|
|
# return False (defensive).
|
|
policy = PhasePolicy(
|
|
whitelist={
|
|
PhaseState.PLANNING: frozenset({"search"}),
|
|
PhaseState.BUILDING: frozenset({"write_file"}),
|
|
PhaseState.VERIFICATION: frozenset({"bash"}),
|
|
PhaseState.DELIVERY: frozenset({WILDCARD}),
|
|
}
|
|
)
|
|
# BUILDING is in whitelist, so allowed checks work normally.
|
|
assert policy.is_tool_allowed("write_file", PhaseState.BUILDING) is True
|
|
# Phase missing from whitelist would return False (defensive .get default).
|
|
# We test this by constructing a minimal policy.
|
|
minimal = PhasePolicy(
|
|
whitelist={
|
|
PhaseState.PLANNING: frozenset({WILDCARD}),
|
|
PhaseState.BUILDING: frozenset({WILDCARD}),
|
|
PhaseState.VERIFICATION: frozenset({WILDCARD}),
|
|
PhaseState.DELIVERY: frozenset({WILDCARD}),
|
|
}
|
|
)
|
|
# VERIFICATION is in whitelist — wildcard allows all.
|
|
assert minimal.is_tool_allowed("anything", PhaseState.VERIFICATION) is True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# PhasePolicy — edge cases & errors
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestPhasePolicyEdgeCases:
|
|
def test_empty_whitelist_raises(self):
|
|
# Fail-fast: an empty whitelist for a non-wildcard phase is a bug.
|
|
with pytest.raises(ValueError, match="empty whitelist"):
|
|
PhasePolicy(
|
|
whitelist={
|
|
PhaseState.PLANNING: frozenset(), # empty!
|
|
PhaseState.BUILDING: frozenset({WILDCARD}),
|
|
PhaseState.VERIFICATION: frozenset({WILDCARD}),
|
|
PhaseState.DELIVERY: frozenset({WILDCARD}),
|
|
}
|
|
)
|
|
|
|
def test_wildcard_only_does_not_raise(self):
|
|
# Wildcard-only whitelist is valid (means "all tools").
|
|
policy = PhasePolicy(
|
|
whitelist={
|
|
PhaseState.PLANNING: frozenset({WILDCARD}),
|
|
PhaseState.BUILDING: frozenset({WILDCARD}),
|
|
PhaseState.VERIFICATION: frozenset({WILDCARD}),
|
|
PhaseState.DELIVERY: frozenset({WILDCARD}),
|
|
}
|
|
)
|
|
assert policy.is_tool_allowed("anything", PhaseState.PLANNING) is True
|
|
|
|
def test_to_dict_serializable(self):
|
|
policy = default_policy()
|
|
d = policy.to_dict()
|
|
assert "whitelist" in d
|
|
assert "planning" in d["whitelist"]
|
|
assert "delivery" in d["whitelist"]
|
|
assert d["start_phase"] == "planning"
|
|
assert d["auto_advance_after_steps"] is None
|
|
|
|
def test_custom_bash_filter(self):
|
|
custom_filter = re.compile(r"\b(pip install|npm install)\b")
|
|
policy = PhasePolicy(
|
|
whitelist={
|
|
PhaseState.PLANNING: frozenset({"bash"}),
|
|
PhaseState.BUILDING: frozenset({"bash"}),
|
|
PhaseState.VERIFICATION: frozenset({"bash"}),
|
|
PhaseState.DELIVERY: frozenset({WILDCARD}),
|
|
},
|
|
bash_command_filter={PhaseState.BUILDING: custom_filter},
|
|
)
|
|
assert policy.is_bash_command_allowed("npm install foo", PhaseState.BUILDING) is False
|
|
assert policy.is_bash_command_allowed("npm run build", PhaseState.BUILDING) is True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# policy_from_config — R26 (config-driven)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestPolicyFromConfig:
|
|
def test_empty_config_returns_none(self):
|
|
assert policy_from_config({}) is None
|
|
|
|
def test_enabled_false_returns_none(self):
|
|
# Opt-out — explicit `enabled: false` disables policy.
|
|
result = policy_from_config({"enabled": False})
|
|
assert result is None
|
|
|
|
def test_enabled_default_true_when_section_present(self):
|
|
# When section is present but `enabled` is missing, default is True.
|
|
result = policy_from_config({"auto_advance_after_steps": 3})
|
|
assert result is not None
|
|
assert result.auto_advance_after_steps == 3
|
|
|
|
def test_auto_advance_after_steps(self):
|
|
policy = policy_from_config({"enabled": True, "auto_advance_after_steps": 5})
|
|
assert policy is not None
|
|
assert policy.auto_advance_after_steps == 5
|
|
|
|
def test_start_phase_custom(self):
|
|
policy = policy_from_config({"enabled": True, "start_phase": "building"})
|
|
assert policy is not None
|
|
assert policy.start_phase == PhaseState.BUILDING
|
|
|
|
def test_start_phase_invalid_raises(self):
|
|
with pytest.raises(ValueError, match="Invalid phase name"):
|
|
policy_from_config({"enabled": True, "start_phase": "unknown"})
|
|
|
|
def test_whitelist_override_merges_with_default(self):
|
|
policy = policy_from_config(
|
|
{
|
|
"enabled": True,
|
|
"whitelist_override": {
|
|
"planning": ["search", "read_file"], # removes bash from default
|
|
},
|
|
}
|
|
)
|
|
assert policy is not None
|
|
# Override wins — bash should be removed from planning.
|
|
assert policy.is_tool_allowed("search", PhaseState.PLANNING) is True
|
|
assert policy.is_tool_allowed("read_file", PhaseState.PLANNING) is True
|
|
assert policy.is_tool_allowed("bash", PhaseState.PLANNING) is False
|
|
# Other phases unchanged.
|
|
assert policy.is_tool_allowed("write_file", PhaseState.BUILDING) is True
|
|
|
|
def test_whitelist_override_invalid_phase_raises(self):
|
|
with pytest.raises(ValueError, match="Invalid phase name"):
|
|
policy_from_config(
|
|
{
|
|
"enabled": True,
|
|
"whitelist_override": {"unknown_phase": ["tool"]},
|
|
}
|
|
)
|
|
|
|
def test_whitelist_override_non_list_raises(self):
|
|
with pytest.raises(ValueError, match="must be a list"):
|
|
policy_from_config(
|
|
{
|
|
"enabled": True,
|
|
"whitelist_override": {"planning": "not a list"},
|
|
}
|
|
)
|
|
|
|
def test_to_dict_round_trip_via_default(self):
|
|
# Sanity: default policy serializes to a dict with expected keys.
|
|
policy = default_policy()
|
|
d = policy.to_dict()
|
|
assert set(d["whitelist"].keys()) == {
|
|
"planning",
|
|
"building",
|
|
"verification",
|
|
"delivery",
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# ServerConfig.plan_exec integration (R26)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestServerConfigPlanExec:
|
|
def test_default_plan_exec_empty(self):
|
|
config = ServerConfig.from_dict({})
|
|
assert config.plan_exec == {}
|
|
|
|
def test_plan_exec_loaded_from_dict(self):
|
|
config = ServerConfig.from_dict(
|
|
{
|
|
"plan_exec": {
|
|
"enabled": True,
|
|
"auto_advance_after_steps": 5,
|
|
}
|
|
}
|
|
)
|
|
assert config.plan_exec == {"enabled": True, "auto_advance_after_steps": 5}
|
|
|
|
def test_plan_exec_empty_dict_default(self):
|
|
config = ServerConfig.from_dict({"plan_exec": {}})
|
|
assert config.plan_exec == {}
|
|
|
|
def test_plan_exec_resolved_to_policy(self):
|
|
# Wire the config dict through policy_from_config to verify integration.
|
|
config = ServerConfig.from_dict(
|
|
{
|
|
"plan_exec": {
|
|
"enabled": True,
|
|
"auto_advance_after_steps": 3,
|
|
}
|
|
}
|
|
)
|
|
policy = policy_from_config(config.plan_exec)
|
|
assert policy is not None
|
|
assert policy.auto_advance_after_steps == 3
|
|
|
|
def test_plan_exec_disabled_via_config(self):
|
|
config = ServerConfig.from_dict({"plan_exec": {"enabled": False}})
|
|
policy = policy_from_config(config.plan_exec)
|
|
assert policy is None
|