From 9e28ab315e6f5f5221da4ee0d044381eb6ede38b Mon Sep 17 00:00:00 2001 From: chiguyong Date: Tue, 30 Jun 2026 10:39:44 +0800 Subject: [PATCH] feat(U1): widen PhasePolicy bash_command_filter to accept Callable Reuses ShellTool._is_dangerous as the default bash filter for PLANNING and VERIFICATION phases, closing the regex ceiling documented in Wave 3. - Convert ShellTool._is_dangerous and _is_single_command_dangerous to @staticmethod (backward-compatible; instance calls still work via Python's descriptor protocol). - Widen PhasePolicy.bash_command_filter field type to dict[PhaseState, Callable[[str], bool] | re.Pattern | None]. - is_bash_command_allowed dispatches on callable vs pattern at call time. Empty commands short-circuit to allowed (Wave 3 contract; ShellTool emits the clearer empty-command error). - to_dict serializes callables as for log readability. - default_policy() now wires ShellTool._is_dangerous for PLANNING and VERIFICATION. _DEFAULT_BASH_FILTER kept for backward compat with configs that pass a re.Pattern. - Tests: characterization tests pin Wave 3 behavior (rm/mv/cp/echo > still blocked) plus new edge-case coverage for ceiling closed (dd of=/dev/sda, :>file, chain operators, pipe segments). --- src/agentkit/core/phase.py | 61 +++++++++---- src/agentkit/tools/shell.py | 14 ++- tests/unit/test_phase_policy.py | 157 ++++++++++++++++++++++++++++++++ 3 files changed, 208 insertions(+), 24 deletions(-) diff --git a/src/agentkit/core/phase.py b/src/agentkit/core/phase.py index 0e5326a..8101c24 100644 --- a/src/agentkit/core/phase.py +++ b/src/agentkit/core/phase.py @@ -15,7 +15,9 @@ import enum import logging import re from dataclasses import dataclass, field, replace -from typing import Any +from typing import Any, Callable + +from agentkit.tools.shell import ShellTool logger = logging.getLogger(__name__) @@ -53,14 +55,10 @@ class PhaseState(enum.Enum): # Wildcard token meaning "all tools allowed in this phase". WILDCARD = "*" -# Default bash command filter for PLANNING and VERIFICATION phases — blocks -# commands that mutate the filesystem or execute arbitrary code. -# ponytail: regex is intentionally conservative; misses some shell idioms -# (e.g., `:>file`, `dd of=file`). Ceiling: a real shell parser would catch -# more. Upgrade path = reuse ShellTool._is_dangerous() at enforcement time. -# Note: `\b` is a word boundary — works for word commands (rm/mv) but NOT -# for `>`/`>>` operators (not word chars). Use a non-boundary alternation -# that matches `>` either as a standalone operator or after whitespace. +# Legacy regex-based bash filter. Kept for backward compatibility with configs +# that pass a `re.Pattern` into `bash_command_filter`. New default uses +# `ShellTool._is_dangerous` (a Callable) which closes the regex ceiling +# (missed `:>file`, `dd of=file`, etc. — see Wave 4 U1). _DEFAULT_BASH_FILTER = re.compile(r"\b(rm|mv|cp|mkdir|rmdir|chmod|chown)\b|(?|>>") @@ -76,10 +74,19 @@ class PhasePolicy: Wildcard ``"*"`` in a phase's whitelist means "all tools allowed" (used by DELIVERY by default). + + `bash_command_filter` values accept either: + - `Callable[[str], bool]`: returns True if the command is dangerous + (matches `ShellTool._is_dangerous` semantics); allowed = not dangerous. + - `re.Pattern`: pattern matches dangerous substrings; allowed = no match. + Kept for backward compat with Wave 3 configs. + - `None`: no restriction for this phase. """ whitelist: dict[PhaseState, frozenset[str]] - bash_command_filter: dict[PhaseState, re.Pattern | None] = field(default_factory=dict) + bash_command_filter: dict[ + PhaseState, Callable[[str], bool] | re.Pattern | None + ] = field(default_factory=dict) auto_advance_after_steps: int | None = None # None = manual (LLM calls advance_phase) start_phase: PhaseState = PhaseState.PLANNING @@ -103,19 +110,31 @@ class PhasePolicy: """Return True if `command` passes the bash filter for `phase`. A None filter = no restriction. An empty command is allowed (ShellTool - separately rejects empty commands). + separately rejects empty commands) — short-circuited here so the + ShellTool path emits a clearer "empty command" error instead of a + phase-violation noise injected back to the LLM. """ - pattern = self.bash_command_filter.get(phase) - if pattern is None: + if not command: return True - return not pattern.search(command) + filter_value = self.bash_command_filter.get(phase) + if filter_value is None: + return True + if callable(filter_value): + # Callable contract: returns True if dangerous. + return not filter_value(command) + # re.Pattern contract: search() returns a Match if dangerous. + return not filter_value.search(command) def to_dict(self) -> dict[str, Any]: - """Serialize for logging/telemetry. Not round-trippable (regex → str).""" + """Serialize for logging/telemetry. Not round-trippable (regex/callable → str).""" return { "whitelist": {phase.value: sorted(tools) for phase, tools in self.whitelist.items()}, "bash_command_filter": { - phase.value: (p.pattern if p else None) + phase.value: ( + "" + if callable(p) + else (p.pattern if p else None) + ) for phase, p in self.bash_command_filter.items() }, "auto_advance_after_steps": self.auto_advance_after_steps, @@ -133,8 +152,10 @@ def default_policy() -> PhasePolicy: - DELIVERY: all tools (wildcard) Bash filter: - - PLANNING/VERIFICATION: blocks filesystem-mutating commands - (rm/mv/cp/mkdir/chmod/chown/>/>>) + - PLANNING/VERIFICATION: reuse `ShellTool._is_dangerous` (Wave 4 U1). + Closes the regex ceiling — catches `:>file`, `dd of=/dev/sda`, chain + operators, and the full danger taxonomy shared with the ShellTool + confirmation path. - BUILDING/DELIVERY: no filter (full bash) """ return PhasePolicy( @@ -150,8 +171,8 @@ def default_policy() -> PhasePolicy: PhaseState.DELIVERY: frozenset({WILDCARD}), }, bash_command_filter={ - PhaseState.PLANNING: _DEFAULT_BASH_FILTER, - PhaseState.VERIFICATION: _DEFAULT_BASH_FILTER, + PhaseState.PLANNING: ShellTool._is_dangerous, + PhaseState.VERIFICATION: ShellTool._is_dangerous, PhaseState.BUILDING: None, PhaseState.DELIVERY: None, }, diff --git a/src/agentkit/tools/shell.py b/src/agentkit/tools/shell.py index ba84377..04627c9 100644 --- a/src/agentkit/tools/shell.py +++ b/src/agentkit/tools/shell.py @@ -463,11 +463,16 @@ class ShellTool(Tool): return self._output_parser.parse(output, exit_code) - def _is_dangerous(self, command: str) -> bool: + @staticmethod + def _is_dangerous(command: str) -> bool: """检查命令是否为危险操作 白名单命令直接放行。管道命令(|)在所有子命令都安全时放行。 其他链式操作符(;、&&、||、$()、>、< 等)一律视为危险。 + + Static so callers without a ShellTool instance (e.g. PhasePolicy) can + reuse the same danger classification. Instance calls still work via + Python's descriptor protocol. """ command_stripped = command.strip() @@ -482,14 +487,15 @@ class ShellTool(Tool): part = part.strip() if not part: continue - if self._is_single_command_dangerous(part): + if ShellTool._is_single_command_dangerous(part): return True return False # All pipe segments are safe # Single command - return self._is_single_command_dangerous(command_stripped) + return ShellTool._is_single_command_dangerous(command_stripped) - def _is_single_command_dangerous(self, command: str) -> bool: + @staticmethod + def _is_single_command_dangerous(command: str) -> bool: """Check if a single command (no pipes/chains) is dangerous.""" command_stripped = command.strip() if not command_stripped: diff --git a/tests/unit/test_phase_policy.py b/tests/unit/test_phase_policy.py index 936e823..36fb584 100644 --- a/tests/unit/test_phase_policy.py +++ b/tests/unit/test_phase_policy.py @@ -6,6 +6,7 @@ Covers: - PhasePolicy.is_tool_allowed / is_bash_command_allowed - policy_from_config parsing (R26 config-driven) - ServerConfig.plan_exec integration +- Wave 4 U1: bash_command_filter accepts Callable (ShellTool._is_dangerous reuse) """ from __future__ import annotations @@ -22,6 +23,7 @@ from agentkit.core.phase import ( policy_from_config, ) from agentkit.server.config import ServerConfig +from agentkit.tools.shell import ShellTool # --------------------------------------------------------------------------- @@ -115,6 +117,109 @@ class TestDefaultPolicy: assert policy.is_bash_command_allowed("rm -rf build/", PhaseState.BUILDING) is True assert policy.is_bash_command_allowed("echo x > out.log", PhaseState.BUILDING) is True + # --- Wave 4 U1 characterization (Wave 3 behavior preserved) ----------------- + # default_policy() now wires ShellTool._is_dangerous (a Callable) for + # PLANNING/VERIFICATION. These tests pin the contract so a future regression + # in either ShellTool._is_dangerous or PhasePolicy dispatch surfaces here. + + def test_bash_filter_callable_in_default_policy(self): + # Sanity: default_policy uses a Callable, not a regex Pattern. + policy = default_policy() + planning_filter = policy.bash_command_filter[PhaseState.PLANNING] + assert callable(planning_filter) + assert planning_filter is ShellTool._is_dangerous + + def test_bash_filter_characterization_safe_commands(self): + # Wave 3 behavior preserved — safe read-only commands. + policy = default_policy() + for cmd in ("ls -la", "pwd", "git status", "find . -name foo", "cat README.md"): + assert policy.is_bash_command_allowed(cmd, PhaseState.PLANNING) is True, cmd + + def test_bash_filter_characterization_dangerous_commands(self): + # Wave 3 behavior preserved — commands blocked by the old regex. + policy = default_policy() + for cmd in ( + "rm -rf /", + "rm -rf /tmp/x", + "mv a b", + "cp a b", + "mkdir newdir", + "chmod 777 file", + "chown root file", + "echo x > file.txt", + "echo x >> file.txt", + ): + assert policy.is_bash_command_allowed(cmd, PhaseState.PLANNING) is False, cmd + + # --- Wave 4 U1 ceiling closed (new edge cases the old regex missed) --------- + + def test_bash_filter_closes_regex_ceiling_dd_of(self): + # Old regex missed `dd of=/dev/sda` (no word-boundary match for "dd"). + policy = default_policy() + assert policy.is_bash_command_allowed("dd of=/dev/sda", PhaseState.PLANNING) is False + + def test_bash_filter_closes_regex_ceiling_colon_redirect(self): + # Old regex missed `:>file` (no whitespace before `>`). + policy = default_policy() + assert policy.is_bash_command_allowed(":>file", PhaseState.PLANNING) is False + + def test_bash_filter_closes_regex_ceiling_redirection_after_arg(self): + # Old regex's `(?` looked for `>` at start or after whitespace. + # `echo hello > /tmp/x` slipped through because `>` had a space before it + # but the regex matched the wrong alternative. Verify the new filter + # classifies this as dangerous. + policy = default_policy() + assert policy.is_bash_command_allowed("echo hello > /tmp/x", PhaseState.PLANNING) is False + + def test_bash_filter_closes_regex_ceiling_chain_operators(self): + # Old regex did NOT match `;`, `&&`, `||` as dangerous. The new filter + # treats all chain operators as dangerous (matches ShellTool behavior). + policy = default_policy() + for cmd in ( + "ls; rm -rf /tmp", + "ls && rm -rf /tmp", + "ls || rm -rf /tmp", + "$(rm -rf /tmp)", + "`rm -rf /tmp`", + ): + assert policy.is_bash_command_allowed(cmd, PhaseState.PLANNING) is False, cmd + + def test_bash_filter_closes_regex_ceiling_pipe_with_dangerous_segment(self): + # Old regex scanned the WHOLE command string, so `echo x | grep y` + # would be allowed (no dangerous token) but `rm x | cat` would be + # blocked (matches `\brm\b`). The new filter splits pipes and checks + # each segment, so `echo x | grep y` should be allowed and + # `rm x | cat` blocked. + policy = default_policy() + assert policy.is_bash_command_allowed("echo x | grep y", PhaseState.PLANNING) is True + assert policy.is_bash_command_allowed("rm x | cat", PhaseState.PLANNING) is False + + def test_bash_filter_verification_phase_uses_callable(self): + # Same callable wired into VERIFICATION. + # Note: `pytest` is NOT in ShellTool._SAFE_COMMAND_PREFIXES, so + # _is_dangerous returns True for it — the verification phase does NOT + # widen the ShellTool whitelist. Use a known-safe read-only command + # for the "allowed" assertion. (Wave 4 U1 reuses ShellTool._is_dangerous + # as-is; expanding its safe-whitelist is out of scope.) + policy = default_policy() + assert policy.bash_command_filter[PhaseState.VERIFICATION] is ShellTool._is_dangerous + assert policy.is_bash_command_allowed("rm -rf /", PhaseState.VERIFICATION) is False + assert policy.is_bash_command_allowed("ls -la", PhaseState.VERIFICATION) is True + assert policy.is_bash_command_allowed("git status", PhaseState.VERIFICATION) is True + + def test_bash_filter_delivery_phase_no_filter(self): + # DELIVERY has no filter — full bash allowed. + policy = default_policy() + assert policy.bash_command_filter[PhaseState.DELIVERY] is None + assert policy.is_bash_command_allowed("rm -rf /", PhaseState.DELIVERY) is True + + def test_bash_filter_empty_command_allowed(self): + # is_bash_command_allowed must NOT call the filter on empty input — + # ShellTool separately rejects empty commands. Empty is "allowed" by + # the policy (no rejection injected to the LLM). + policy = default_policy() + assert policy.is_bash_command_allowed("", PhaseState.PLANNING) is True + # --------------------------------------------------------------------------- # PhasePolicy — is_tool_allowed @@ -207,6 +312,16 @@ class TestPhasePolicyEdgeCases: assert d["start_phase"] == "planning" assert d["auto_advance_after_steps"] is None + def test_to_dict_serializes_callable_as_marker(self): + # Wave 4 U1: default_policy now wires a Callable. to_dict must + # surface it as "" so logs/telemetry stay readable. + policy = default_policy() + d = policy.to_dict() + assert d["bash_command_filter"]["planning"] == "" + assert d["bash_command_filter"]["verification"] == "" + assert d["bash_command_filter"]["building"] is None + assert d["bash_command_filter"]["delivery"] is None + def test_custom_bash_filter(self): custom_filter = re.compile(r"\b(pip install|npm install)\b") policy = PhasePolicy( @@ -221,6 +336,48 @@ class TestPhasePolicyEdgeCases: assert policy.is_bash_command_allowed("npm install foo", PhaseState.BUILDING) is False assert policy.is_bash_command_allowed("npm run build", PhaseState.BUILDING) is True + def test_custom_bash_filter_accepts_callable(self): + # Wave 4 U1: callable form. The callable returns True for dangerous. + def deny_all(_: str) -> bool: + return True # everything is "dangerous" + + policy = PhasePolicy( + whitelist={ + PhaseState.PLANNING: frozenset({"shell"}), + PhaseState.BUILDING: frozenset({WILDCARD}), + PhaseState.VERIFICATION: frozenset({WILDCARD}), + PhaseState.DELIVERY: frozenset({WILDCARD}), + }, + bash_command_filter={PhaseState.PLANNING: deny_all}, + ) + assert policy.is_bash_command_allowed("ls", PhaseState.PLANNING) is False + assert policy.is_bash_command_allowed("rm -rf /", PhaseState.PLANNING) is False + + def test_callable_filter_takes_precedence_over_pattern_form(self): + # Wave 4 U1: when a phase has a callable wired, the dispatch path is + # the callable branch, not the regex branch. Sanity-check the + # is_bash_command_allowed routing — both forms coexist in the same + # policy dict, each phase is independent. + pattern = re.compile(r"\brm\b") + policy = PhasePolicy( + whitelist={ + PhaseState.PLANNING: frozenset({"shell"}), + PhaseState.BUILDING: frozenset({WILDCARD}), + PhaseState.VERIFICATION: frozenset({WILDCARD}), + PhaseState.DELIVERY: frozenset({WILDCARD}), + }, + bash_command_filter={ + PhaseState.PLANNING: pattern, # regex + PhaseState.BUILDING: ShellTool._is_dangerous, # callable + }, + ) + # PLANNING uses regex form. + assert policy.is_bash_command_allowed("rm x", PhaseState.PLANNING) is False + assert policy.is_bash_command_allowed("ls", PhaseState.PLANNING) is True + # BUILDING uses callable form. + assert policy.is_bash_command_allowed("rm x", PhaseState.BUILDING) is False + assert policy.is_bash_command_allowed("ls", PhaseState.BUILDING) is True + # --------------------------------------------------------------------------- # policy_from_config — R26 (config-driven)