feat: complex-task-quality-loop (R1-R12) #22

Merged
fischer merged 13 commits from feat/complex-task-quality-loop into main 2026-07-05 22:31:22 +08:00
7 changed files with 834 additions and 22 deletions
Showing only changes of commit b8418968c2 - Show all commits

View File

@ -7,6 +7,11 @@ KTD3 (Wave 3 plan): state machine lives in ReActEngine, not skill config.
KTD5: default whitelist matches brainstorm R24 (Planning: think/search; KTD5: default whitelist matches brainstorm R24 (Planning: think/search;
Building: write_file; etc.). Building: write_file; etc.).
KTD6: transitions are LLM-driven via AdvancePhaseTool; auto-advance is opt-in. KTD6: transitions are LLM-driven via AdvancePhaseTool; auto-advance is opt-in.
U3 (R3): ``default_policy()`` accepts an optional ``workspace_root`` and
populates ``PhasePolicy.verification_commands`` via coding-task detection
(``pyproject.toml`` / ``.py`` presence) coding tasks force pytest/ruff;
non-coding tasks leave the list empty for Spec-declared commands.
""" """
from __future__ import annotations from __future__ import annotations
@ -15,6 +20,7 @@ import enum
import logging import logging
import re import re
from dataclasses import dataclass, field, replace from dataclasses import dataclass, field, replace
from pathlib import Path
from typing import Any, Callable from typing import Any, Callable
from agentkit.tools.shell import ShellTool from agentkit.tools.shell import ShellTool
@ -78,11 +84,17 @@ class PhasePolicy:
""" """
whitelist: dict[PhaseState, frozenset[str]] whitelist: dict[PhaseState, frozenset[str]]
bash_command_filter: dict[ bash_command_filter: dict[PhaseState, Callable[[str], bool] | re.Pattern | None] = field(
PhaseState, Callable[[str], bool] | re.Pattern | None default_factory=dict
] = field(default_factory=dict) )
auto_advance_after_steps: int | None = None # None = manual (LLM calls advance_phase) auto_advance_after_steps: int | None = None # None = manual (LLM calls advance_phase)
start_phase: PhaseState = PhaseState.PLANNING start_phase: PhaseState = PhaseState.PLANNING
# U3/R3: verification commands to run at the VERIFICATION phase's final-answer
# point. Populated by default_policy() via coding-task detection. None = no
# opinion (ReActEngine falls back to its own verification_commands param or
# VerificationLoop defaults). An empty list means "no commands" (verification
# passes trivially — for non-coding tasks using Spec-declared commands instead).
verification_commands: list[str] | None = None
def __post_init__(self) -> None: def __post_init__(self) -> None:
# Fail-fast: empty whitelist for a non-wildcard phase = bug. # Fail-fast: empty whitelist for a non-wildcard phase = bug.
@ -124,19 +136,16 @@ class PhasePolicy:
return { return {
"whitelist": {phase.value: sorted(tools) for phase, tools in self.whitelist.items()}, "whitelist": {phase.value: sorted(tools) for phase, tools in self.whitelist.items()},
"bash_command_filter": { "bash_command_filter": {
phase.value: ( phase.value: ("<callable>" if callable(p) else (p.pattern if p else None))
"<callable>"
if callable(p)
else (p.pattern if p else None)
)
for phase, p in self.bash_command_filter.items() for phase, p in self.bash_command_filter.items()
}, },
"auto_advance_after_steps": self.auto_advance_after_steps, "auto_advance_after_steps": self.auto_advance_after_steps,
"start_phase": self.start_phase.value, "start_phase": self.start_phase.value,
"verification_commands": self.verification_commands,
} }
def default_policy() -> PhasePolicy: def default_policy(workspace_root: str | Path | None = None) -> PhasePolicy:
"""Return the KTD5 default PhasePolicy. """Return the KTD5 default PhasePolicy.
Whitelist (R24): Whitelist (R24):
@ -151,7 +160,22 @@ def default_policy() -> PhasePolicy:
operators, and the full danger taxonomy shared with the ShellTool operators, and the full danger taxonomy shared with the ShellTool
confirmation path. confirmation path.
- BUILDING/DELIVERY: no filter (full bash) - BUILDING/DELIVERY: no filter (full bash)
U3/R3: ``verification_commands`` is populated via coding-task detection on
``workspace_root``. Coding workspaces (``pyproject.toml`` or ``.py``
present) force ``pytest -x -q`` and ``ruff check src/``. Non-coding
workspaces get ``None`` (no opinion Spec-declared commands are used).
""" """
# U3/R3: coding-task detection. Local import avoids a circular dependency
# (sandbox.py is standalone, but keeping the import local makes the R3
# concern visually scoped to default_policy).
from agentkit.core.sandbox import detect_verification_commands
verification_cmds = detect_verification_commands(workspace_root)
# detect_verification_commands returns [] for non-coding workspaces.
# For non-coding workspaces, leave verification_commands as None so the
# caller knows "no coding-specific commands" and can substitute Spec-declared
# commands. For coding workspaces, set the forced pytest/ruff list.
return PhasePolicy( return PhasePolicy(
whitelist={ whitelist={
# Tool name is "shell" (ShellTool default); bash_command_filter # Tool name is "shell" (ShellTool default); bash_command_filter
@ -172,6 +196,7 @@ def default_policy() -> PhasePolicy:
}, },
auto_advance_after_steps=None, # manual by default auto_advance_after_steps=None, # manual by default
start_phase=PhaseState.PLANNING, start_phase=PhaseState.PLANNING,
verification_commands=verification_cmds if verification_cmds else None,
) )

View File

@ -75,6 +75,13 @@ class PlanExecEngine:
workspace: SharedWorkspace | None = None, workspace: SharedWorkspace | None = None,
step_event_callback: "Callable[[str, dict[str, Any]], Awaitable[None]] | None" = None, step_event_callback: "Callable[[str, dict[str, Any]], Awaitable[None]] | None" = None,
spec_manager: SpecManager | None = None, spec_manager: SpecManager | None = None,
# U3/R2: verification defaults True for PLAN_EXEC (per R2). Threaded
# through to each step's ReActEngine so the canonical verify-at-final-
# answer path (react.py:1303+) runs pytest/ruff on coding tasks.
# DIRECT_CHAT/REACT keep verification_enabled=False (the ReActEngine
# default) — only PLAN_EXEC/TEAM_COLLAB verify.
verification_enabled: bool = True,
verification_commands: list[str] | None = None,
): ):
""" """
Args: Args:
@ -84,6 +91,12 @@ class PlanExecEngine:
workspace: SharedWorkspace 实例用于步骤间状态传递 workspace: SharedWorkspace 实例用于步骤间状态传递
step_event_callback: 步骤事件回调用于非流式执行时推送进度 step_event_callback: 步骤事件回调用于非流式执行时推送进度
spec_manager: SpecManager 实例用于持久化执行计划为 Spec 文档 spec_manager: SpecManager 实例用于持久化执行计划为 Spec 文档
verification_enabled: U3/R2 verification-on-final-answer for
PLAN_EXEC steps. Defaults True (per R2). Each step's
ReActEngine is constructed with this flag.
verification_commands: Optional override for the verification
commands. None = let ReActEngine / VerificationLoop use its
own defaults (pytest -x -q, ruff check src/).
""" """
self._llm_gateway = llm_gateway self._llm_gateway = llm_gateway
self._max_replans = max_replans self._max_replans = max_replans
@ -92,6 +105,8 @@ class PlanExecEngine:
self._step_event_callback = step_event_callback self._step_event_callback = step_event_callback
self._spec_manager = spec_manager self._spec_manager = spec_manager
self._confirmation_handler: Any | None = None self._confirmation_handler: Any | None = None
self._verification_enabled = verification_enabled
self._verification_commands = verification_commands
# 组合子组件 # 组合子组件
self._planner = GoalPlanner(llm_gateway=llm_gateway) self._planner = GoalPlanner(llm_gateway=llm_gateway)
@ -929,6 +944,8 @@ class PlanExecEngine:
system_prompt=system_prompt, system_prompt=system_prompt,
tools=tools, tools=tools,
confirmation_handler=self._confirmation_handler, confirmation_handler=self._confirmation_handler,
verification_enabled=self._verification_enabled,
verification_commands=self._verification_commands,
) )
return PlanExecutor( return PlanExecutor(
agent_pool=step_executor, agent_pool=step_executor,
@ -1089,6 +1106,8 @@ class ReActStepExecutor:
tools: list["Tool"] | None = None, tools: list["Tool"] | None = None,
max_steps: int = 5, max_steps: int = 5,
confirmation_handler: Any | None = None, confirmation_handler: Any | None = None,
verification_enabled: bool = False,
verification_commands: list[str] | None = None,
): ):
self._llm_gateway = llm_gateway self._llm_gateway = llm_gateway
self._messages = messages or [] self._messages = messages or []
@ -1097,6 +1116,8 @@ class ReActStepExecutor:
self._tools = tools or [] self._tools = tools or []
self._max_steps = max_steps self._max_steps = max_steps
self._confirmation_handler = confirmation_handler self._confirmation_handler = confirmation_handler
self._verification_enabled = verification_enabled
self._verification_commands = verification_commands
self._agents: dict[str, _ReActStepAgent] = {} self._agents: dict[str, _ReActStepAgent] = {}
async def create_agent_from_skill(self, skill_name: str): async def create_agent_from_skill(self, skill_name: str):
@ -1110,6 +1131,8 @@ class ReActStepExecutor:
tools=self._tools, tools=self._tools,
max_steps=self._max_steps, max_steps=self._max_steps,
confirmation_handler=self._confirmation_handler, confirmation_handler=self._confirmation_handler,
verification_enabled=self._verification_enabled,
verification_commands=self._verification_commands,
) )
self._agents[skill_name] = agent self._agents[skill_name] = agent
return agent return agent
@ -1126,6 +1149,8 @@ class ReActStepExecutor:
system_prompt=self._system_prompt, system_prompt=self._system_prompt,
tools=self._tools, tools=self._tools,
max_steps=self._max_steps, max_steps=self._max_steps,
verification_enabled=self._verification_enabled,
verification_commands=self._verification_commands,
) )
self._agents[key] = agent self._agents[key] = agent
return agent return agent
@ -1148,6 +1173,8 @@ class _ReActStepAgent:
tools: list["Tool"] | None = None, tools: list["Tool"] | None = None,
max_steps: int = 5, max_steps: int = 5,
confirmation_handler: Any | None = None, confirmation_handler: Any | None = None,
verification_enabled: bool = False,
verification_commands: list[str] | None = None,
): ):
self.name = name self.name = name
self._llm_gateway = llm_gateway self._llm_gateway = llm_gateway
@ -1157,6 +1184,8 @@ class _ReActStepAgent:
self._tools = tools or [] self._tools = tools or []
self._max_steps = max_steps self._max_steps = max_steps
self._confirmation_handler = confirmation_handler self._confirmation_handler = confirmation_handler
self._verification_enabled = verification_enabled
self._verification_commands = verification_commands
async def execute(self, task_msg: TaskMessage) -> "TaskResult": async def execute(self, task_msg: TaskMessage) -> "TaskResult":
"""执行步骤:通过 ReActEngine 循环调用""" """执行步骤:通过 ReActEngine 循环调用"""
@ -1184,6 +1213,8 @@ class _ReActStepAgent:
engine = ReActEngine( engine = ReActEngine(
llm_gateway=self._llm_gateway, llm_gateway=self._llm_gateway,
max_steps=self._max_steps, max_steps=self._max_steps,
verification_enabled=self._verification_enabled,
verification_commands=self._verification_commands,
) )
# 构建 messages # 构建 messages

View File

@ -23,6 +23,7 @@ from agentkit.core.exceptions import (
) )
from agentkit.core.protocol import CancellationToken from agentkit.core.protocol import CancellationToken
from agentkit.core.compressor import estimate_text_tokens from agentkit.core.compressor import estimate_text_tokens
from agentkit.core.sandbox import SandboxNetworkBlockedError
from agentkit.llm.gateway import LLMGateway from agentkit.llm.gateway import LLMGateway
from agentkit.llm.protocol import LLMResponse from agentkit.llm.protocol import LLMResponse
from agentkit.tools.base import Tool, ToolValidationError from agentkit.tools.base import Tool, ToolValidationError
@ -36,6 +37,7 @@ if TYPE_CHECKING:
from agentkit.core.compressor import CompressionStrategy from agentkit.core.compressor import CompressionStrategy
from agentkit.core.middleware import MiddlewareChain from agentkit.core.middleware import MiddlewareChain
from agentkit.core.phase import PhasePolicy, PhaseState from agentkit.core.phase import PhasePolicy, PhaseState
from agentkit.core.sandbox import WorkspaceSandbox
from agentkit.core.trace import TraceRecorder from agentkit.core.trace import TraceRecorder
from agentkit.memory.retriever import MemoryRetriever from agentkit.memory.retriever import MemoryRetriever
@ -182,6 +184,11 @@ class ReActEngine:
# U3/G6: PLAN_EXEC phase policy (opt-in). None = no enforcement # U3/G6: PLAN_EXEC phase policy (opt-in). None = no enforcement
# (backward compat — all existing callers unaffected). # (backward compat — all existing callers unaffected).
phase_policy: "PhasePolicy | None" = None, phase_policy: "PhasePolicy | None" = None,
# U3/RV3: minimum sandbox. When set and the engine is in VERIFICATION
# phase, tool execution is wrapped in sandbox.network_block() so tools
# cannot make outbound network calls during verification. None = no
# sandbox (backward compat for DIRECT_CHAT/REACT and existing tests).
sandbox: "WorkspaceSandbox | None" = None,
): ):
if max_steps < 1: if max_steps < 1:
raise ValueError(f"max_steps must be >= 1, got {max_steps}") raise ValueError(f"max_steps must be >= 1, got {max_steps}")
@ -194,7 +201,16 @@ class ReActEngine:
self._default_timeout = default_timeout self._default_timeout = default_timeout
self._parallel_tools = parallel_tools self._parallel_tools = parallel_tools
self._verification_enabled = verification_enabled self._verification_enabled = verification_enabled
self._verification_commands = verification_commands # U3/R3: if no explicit verification_commands were passed but the
# phase_policy carries coding-task-detected commands (from
# default_policy(workspace_root)), inherit them. Explicit param wins
# so callers can override per-engine.
if verification_commands is not None:
self._verification_commands = verification_commands
elif phase_policy is not None and phase_policy.verification_commands:
self._verification_commands = list(phase_policy.verification_commands)
else:
self._verification_commands = verification_commands
# U2/G2: prompt cache 双块结构开关(True 时 Anthropic 用 cache_control blocks, # U2/G2: prompt cache 双块结构开关(True 时 Anthropic 用 cache_control blocks,
# 其他 provider 走字符串拼接依赖自动前缀缓存) # 其他 provider 走字符串拼接依赖自动前缀缓存)
self._prompt_cache_enable = prompt_cache_enable self._prompt_cache_enable = prompt_cache_enable
@ -240,6 +256,9 @@ class ReActEngine:
# simply ignores the accumulator (the error dict returned to the LLM is # simply ignores the accumulator (the error dict returned to the LLM is
# the only signal there). # the only signal there).
self._phase_violations: list[dict[str, object]] = [] self._phase_violations: list[dict[str, object]] = []
# U3/RV3: minimum sandbox. When set and current phase is VERIFICATION,
# _execute_tool wraps tool.safe_execute() in sandbox.network_block().
self._sandbox = sandbox
def reset(self) -> None: def reset(self) -> None:
"""Reset internal state for reuse across conversations. """Reset internal state for reuse across conversations.
@ -1805,9 +1824,39 @@ class ReActEngine:
# Strip internal metadata keys before passing to tool # Strip internal metadata keys before passing to tool
clean_args = {k: v for k, v in arguments.items() if not k.startswith("_")} clean_args = {k: v for k, v in arguments.items() if not k.startswith("_")}
# U3/RV3: sandbox network block during VERIFICATION phase. When a
# sandbox is configured and the engine is in VERIFICATION, wrap the
# tool call so outbound network access is rejected. The error is
# returned as a structured dict (the loop continues — the LLM sees
# the rejection and can adjust). Other phases and no-sandbox engines
# are unaffected (backward compat).
in_verification = (
self._sandbox is not None
and self._current_phase is not None
and self._current_phase.value == "verification"
)
try: try:
result = await tool.safe_execute(**clean_args) if in_verification:
async with self._sandbox.network_block():
result = await tool.safe_execute(**clean_args)
else:
result = await tool.safe_execute(**clean_args)
return result return result
except SandboxNetworkBlockedError as e:
# Structured error so the LLM understands *why* the call was
# rejected and can react (e.g. switch to a local-only approach).
error_msg = (
f"Tool '{tool_name}' blocked by sandbox: network access is "
f"not allowed during VERIFICATION phase"
)
logger.info("sandbox: %s blocked (%s)", tool_name, e)
return {
"error": error_msg,
"error_code": "sandbox_network_blocked",
"current_phase": "verification",
"tool": tool_name,
}
except ToolValidationError as e: except ToolValidationError as e:
# 保留类型化错误码,不被通用 except 平坦化为字符串 # 保留类型化错误码,不被通用 except 平坦化为字符串
error_msg = f"Tool '{tool_name}' schema validation failed: {e}" error_msg = f"Tool '{tool_name}' schema validation failed: {e}"

View File

@ -0,0 +1,166 @@
"""Minimum sandbox enforcement for VERIFICATION phase (U3, RV3).
Two concerns:
1. **Workspace-write path enforcement** reuses the 3-layer path validation
pattern from ``str_replace_editor.py`` (U1): reject absolute paths, reject
``..`` traversal, and verify ``Path.resolve()`` stays within the workspace
root (catches symlink escape).
2. **Network blocking** an async context manager that patches
``socket.socket.connect`` / ``connect_ex`` to raise during VERIFICATION
tool calls. This catches ``httpx`` / ``requests`` / ``urllib`` at their
common chokepoint (the stdlib socket layer).
ponytail: process-wide socket patch not subprocess-safe. A ``bash`` tool
spawning ``curl`` bypasses this because the child gets its own socket
namespace from the OS. Upgrade path: OS-level network namespace isolation
(``unshare -n`` / netns) or a seccomp filter on ``socket(2)``. The context
manager is sufficient for in-process tool calls (the stated RV3 scope).
Full tiering (read-only / workspace-write / danger) is deferred this module
implements only the minimum: workspace-write + no-network.
"""
from __future__ import annotations
import contextlib
import errno
import logging
import socket
from pathlib import Path
logger = logging.getLogger(__name__)
class SandboxNetworkBlockedError(RuntimeError):
"""Raised when a tool attempts an outbound network call under sandbox."""
class WorkspaceSandbox:
"""Minimum sandbox: workspace-write path enforcement + network blocking.
Construct once per engine (or per VERIFICATION phase) and reuse. The
``validate_path`` method is sync (cheap, no I/O). The ``network_block``
context manager is async because it is used around ``await tool.execute()``.
"""
def __init__(self, workspace_root: str | Path) -> None:
# Resolve once so prefix checks compare against a stable, real
# directory (no symlink inside the workspace root itself).
self._workspace_root: Path = Path(workspace_root).resolve()
@property
def workspace_root(self) -> Path:
return self._workspace_root
# ── path validation (reuses U1 str_replace_editor 3-layer pattern) ──
def validate_path(self, raw_path: str) -> Path:
"""Resolve ``raw_path`` against the workspace root and verify confinement.
Returns the resolved absolute ``Path`` on success. Raises ``ValueError``
if the path is absolute, contains a ``..`` component, or resolves
outside the workspace root (path traversal or symlink escape).
Mirrors ``StrReplaceEditorTool._resolve_within_workspace`` but raises
instead of returning ``None`` this is the security boundary, so a
loud exception is the right signal for misuse from internal callers.
"""
if not isinstance(raw_path, str) or not raw_path:
raise ValueError("sandbox: path must be a non-empty string")
p = Path(raw_path)
if p.is_absolute():
raise ValueError(
f"sandbox: absolute paths are rejected ({raw_path!r}); "
f"use a path relative to the workspace root"
)
if ".." in p.parts:
raise ValueError(f"sandbox: path traversal ('..') is rejected ({raw_path!r})")
resolved = (self._workspace_root / raw_path).resolve()
try:
resolved.relative_to(self._workspace_root)
except ValueError as e:
raise ValueError(
f"sandbox: path {raw_path!r} resolves outside the workspace "
f"root ({self._workspace_root})"
) from e
return resolved
# ── coding-workspace detection ─────────────────────────────────────
def is_coding_workspace(self) -> bool:
"""Return True if the workspace looks like a Python coding project.
Heuristic: ``pyproject.toml`` OR any ``.py`` file in the workspace root
(non-recursive scan of the top level cheap, O(dirent count)).
ponytail: top-level scan only a ``.py`` file nested 3 levels deep
is missed. Upgrade path: recursive walk with a depth cap, or trust
``pyproject.toml`` as the single signal (which it nearly always is).
"""
if (self._workspace_root / "pyproject.toml").exists():
return True
try:
for entry in self._workspace_root.iterdir():
if entry.is_file() and entry.suffix == ".py":
return True
except (PermissionError, OSError) as e:
logger.warning("sandbox: failed to scan workspace root: %s", e)
return False
# ── network blocking ───────────────────────────────────────────────
@contextlib.asynccontextmanager
async def network_block(self):
"""Block outbound network connections within the async context.
Patches ``socket.socket.connect`` and ``connect_ex`` to raise /
return ``ECONNREFUSED`` respectively. Restores the originals on exit,
even if the wrapped code raises.
Already-connected sockets (e.g. an LLM gateway keep-alive pool) are
unaffected only *new* ``connect()`` calls are blocked. This is the
correct granularity: the LLM gateway talks over its existing
connection, while a tool trying to ``requests.get(...)`` makes a new
connect and is rejected.
"""
original_connect = socket.socket.connect
original_connect_ex = socket.socket.connect_ex
def _blocked_connect(self_sock, *args, **kwargs): # noqa: ANN001
raise SandboxNetworkBlockedError(
"Network access blocked by sandbox during VERIFICATION phase"
)
def _blocked_connect_ex(self_sock, *args, **kwargs): # noqa: ANN001
# connect_ex returns an errno instead of raising (POSIX contract).
return errno.ECONNREFUSED
socket.socket.connect = _blocked_connect # type: ignore[method-assign]
socket.socket.connect_ex = _blocked_connect_ex # type: ignore[method-assign]
logger.debug("sandbox: network block engaged")
try:
yield
finally:
socket.socket.connect = original_connect # type: ignore[method-assign]
socket.socket.connect_ex = original_connect_ex # type: ignore[method-assign]
logger.debug("sandbox: network block released")
def detect_verification_commands(workspace_root: str | Path | None) -> list[str]:
"""Return the verification commands appropriate for the workspace.
Coding workspaces (``pyproject.toml`` or ``.py`` present) force
``pytest -x -q`` and ``ruff check src/`` (R3). Non-coding workspaces return
an empty list the caller (VerificationLoop) then falls back to its own
default, or the Spec-declared verification commands are used.
A ``None`` workspace returns an empty list (conservative: don't assume a
coding project without evidence).
"""
if workspace_root is None:
return []
sandbox = WorkspaceSandbox(workspace_root)
if sandbox.is_coding_workspace():
return ["pytest -x -q", "ruff check src/"]
return []

View File

@ -74,6 +74,11 @@ class TeamOrchestrator(
checkpoint: object | None = None, checkpoint: object | None = None,
workspace_root: str | None = None, workspace_root: str | None = None,
rollback_timeout: float | None = None, rollback_timeout: float | None = None,
# U3/R2: verification defaults True for TEAM_COLLAB (per R2). Applied
# to each phase's isolated agent engine so the canonical verify-at-
# final-answer path (react.py:1303+) runs on coding tasks.
verification_enabled: bool = True,
verification_commands: list[str] | None = None,
) -> None: ) -> None:
self._team = team self._team = team
# Track temporary agent names created for context isolation (KTD3) # Track temporary agent names created for context isolation (KTD3)
@ -93,6 +98,47 @@ class TeamOrchestrator(
# Both default to no-op-friendly values so existing call sites behave identically. # Both default to no-op-friendly values so existing call sites behave identically.
self._workspace_root = workspace_root self._workspace_root = workspace_root
self._rollback_timeout = rollback_timeout or self.DEFAULT_ROLLBACK_TIMEOUT self._rollback_timeout = rollback_timeout or self.DEFAULT_ROLLBACK_TIMEOUT
# U3/R2: verification defaults for TEAM_COLLAB.
self._verification_enabled = verification_enabled
# U3/R3: if no explicit commands, detect from workspace (coding-task
# detection forces pytest/ruff). None workspace → None commands →
# ReActEngine/VerificationLoop uses its own defaults.
if verification_commands is not None:
self._verification_commands = verification_commands
else:
from agentkit.core.sandbox import detect_verification_commands
self._verification_commands = detect_verification_commands(workspace_root) or None
async def _get_isolated_agent(self, expert: Expert, phase: PlanPhase):
"""Override to apply verification defaults to freshly created agents.
Calls the mixin's ``_get_isolated_agent`` (which creates an isolated
ConfigDrivenAgent via the pool), then for freshly created temp agents
only (not the shared fallback ``expert.agent``) flips the engine's
``_verification_enabled`` flag and sets ``_verification_commands`` so
the canonical verify-at-final-answer path runs for TEAM_COLLAB.
We mutate the engine's private attributes directly because the pool
constructs the ReActEngine without a verification_enabled parameter
(the pool is shared across modes). The temp agent is cleaned up after
the phase, so this mutation is scoped and does not leak into other
team executions or the shared expert agent.
"""
agent = await super()._get_isolated_agent(expert, phase)
# Only configure freshly-created temp agents (not the shared fallback).
# _temp_agents[phase.id] is set by the mixin only on successful
# pool.create_agent — its presence means this is a fresh agent.
if (
self._verification_enabled
and phase.id in self._temp_agents
and getattr(agent, "_react_engine", None) is not None
):
engine = agent._react_engine # type: ignore[attr-defined]
engine._verification_enabled = True # type: ignore[attr-defined]
if self._verification_commands is not None:
engine._verification_commands = self._verification_commands # type: ignore[attr-defined] # noqa: E501
return agent
async def execute(self, task: str) -> dict[str, object]: async def execute(self, task: str) -> dict[str, object]:
"""Execute a task in pipeline mode. Lead decomposes → topological sort → """Execute a task in pipeline mode. Lead decomposes → topological sort →
@ -169,7 +215,14 @@ class TeamOrchestrator(
if self._checkpoint is not None: if self._checkpoint is not None:
try: try:
await self._checkpoint.save_plan(plan) await self._checkpoint.save_plan(plan)
except (ConnectionError, OSError, asyncio.TimeoutError, RuntimeError, ValueError, KeyError) as e: except (
ConnectionError,
OSError,
asyncio.TimeoutError,
RuntimeError,
ValueError,
KeyError,
) as e:
logger.warning(f"Checkpoint save_plan failed: {e}") logger.warning(f"Checkpoint save_plan failed: {e}")
# 4. Set EXECUTING status, execute phases # 4. Set EXECUTING status, execute phases
@ -266,7 +319,14 @@ class TeamOrchestrator(
if should_save_checkpoint and self._checkpoint is not None: if should_save_checkpoint and self._checkpoint is not None:
try: try:
await self._checkpoint.save(plan.id, ph, plan.status.value) await self._checkpoint.save(plan.id, ph, plan.status.value)
except (ConnectionError, OSError, asyncio.TimeoutError, RuntimeError, ValueError, KeyError) as e: except (
ConnectionError,
OSError,
asyncio.TimeoutError,
RuntimeError,
ValueError,
KeyError,
) as e:
logger.warning(f"Checkpoint save failed for phase {ph.id}: {e}") logger.warning(f"Checkpoint save failed for phase {ph.id}: {e}")
# U3: Divergence detection — check completed phases for conflicts # U3: Divergence detection — check completed phases for conflicts
@ -289,6 +349,7 @@ class TeamOrchestrator(
# U3: 流式综合 — 每个 chunk 广播 team_synthesis_chunk # U3: 流式综合 — 每个 chunk 广播 team_synthesis_chunk
# P2 fix: 携带 synthesis_id 让前端去重 streaming milestone避免附身到上一次孤儿 # P2 fix: 携带 synthesis_id 让前端去重 streaming milestone避免附身到上一次孤儿
synthesis_id = f"{plan.id}:synthesis" synthesis_id = f"{plan.id}:synthesis"
async def _broadcast_synthesis_chunk(data: dict[str, object]) -> None: async def _broadcast_synthesis_chunk(data: dict[str, object]) -> None:
# data 可能是 {"chunk": "..."} 或 {"value": "..."}synthesizer 决定) # data 可能是 {"chunk": "..."} 或 {"value": "..."}synthesizer 决定)
# 统一注入 synthesis_id不破坏原 data 结构 # 统一注入 synthesis_id不破坏原 data 结构
@ -306,18 +367,27 @@ class TeamOrchestrator(
except asyncio.CancelledError: except asyncio.CancelledError:
await self._broadcast_event( await self._broadcast_event(
"team_synthesis", "team_synthesis",
{"content": "", "phases_completed": len(completed), {
"phases_total": len(plan.phases), "status": "cancelled", "content": "",
"synthesis_id": synthesis_id}, "phases_completed": len(completed),
"phases_total": len(plan.phases),
"status": "cancelled",
"synthesis_id": synthesis_id,
},
) )
raise raise
except Exception as synth_err: except Exception as synth_err:
logger.error(f"Synthesis streaming failed: {synth_err}") logger.error(f"Synthesis streaming failed: {synth_err}")
await self._broadcast_event( await self._broadcast_event(
"team_synthesis", "team_synthesis",
{"content": "", "phases_completed": len(completed), {
"phases_total": len(plan.phases), "status": "error", "content": "",
"error": str(synth_err), "synthesis_id": synthesis_id}, "phases_completed": len(completed),
"phases_total": len(plan.phases),
"status": "error",
"error": str(synth_err),
"synthesis_id": synthesis_id,
},
) )
raise # 让外层 except 决定是否 fallback raise # 让外层 except 决定是否 fallback
@ -345,7 +415,14 @@ class TeamOrchestrator(
if self._checkpoint is not None: if self._checkpoint is not None:
try: try:
await self._checkpoint.clear(plan.id) await self._checkpoint.clear(plan.id)
except (ConnectionError, OSError, asyncio.TimeoutError, RuntimeError, ValueError, KeyError) as e: except (
ConnectionError,
OSError,
asyncio.TimeoutError,
RuntimeError,
ValueError,
KeyError,
) as e:
logger.warning(f"Checkpoint clear failed: {e}") logger.warning(f"Checkpoint clear failed: {e}")
return { return {
@ -363,7 +440,15 @@ class TeamOrchestrator(
return await self._fallback_to_single_agent(task, plan, phase_results) return await self._fallback_to_single_agent(task, plan, phase_results)
except asyncio.CancelledError: except asyncio.CancelledError:
raise raise
except (RuntimeError, ValueError, KeyError, AttributeError, ConnectionError, asyncio.TimeoutError, LLMProviderError) as e: except (
RuntimeError,
ValueError,
KeyError,
AttributeError,
ConnectionError,
asyncio.TimeoutError,
LLMProviderError,
) as e:
logger.error(f"Pipeline execution failed: {e}") logger.error(f"Pipeline execution failed: {e}")
plan.status = PlanStatus.FAILED plan.status = PlanStatus.FAILED
await self._broadcast_event("team_dissolved", {"team_id": self._team.team_id}) await self._broadcast_event("team_dissolved", {"team_id": self._team.team_id})
@ -500,7 +585,14 @@ class TeamOrchestrator(
if phases: if phases:
return phases return phases
logger.warning("LLM decomposition returned no valid phases") logger.warning("LLM decomposition returned no valid phases")
except (LLMProviderError, asyncio.TimeoutError, ConnectionError, json.JSONDecodeError, ValueError, TypeError) as e: except (
LLMProviderError,
asyncio.TimeoutError,
ConnectionError,
json.JSONDecodeError,
ValueError,
TypeError,
) as e:
logger.warning(f"LLM task decomposition failed: {e}") logger.warning(f"LLM task decomposition failed: {e}")
return [PlanPhase(name="执行", assigned_expert=lead.config.name, task_description=task)] return [PlanPhase(name="执行", assigned_expert=lead.config.name, task_description=task)]

173
tests/unit/test_sandbox.py Normal file
View File

@ -0,0 +1,173 @@
"""Unit tests for the minimum sandbox (U3, RV3).
Covers:
- WorkspaceSandbox.validate_path happy path + 3-layer security (absolute,
``..`` traversal, symlink escape)
- WorkspaceSandbox.is_coding_workspace pyproject.toml / .py detection
- WorkspaceSandbox.network_block socket connect blocked inside context,
restored after exit, no effect outside
- detect_verification_commands coding / non-coding / None workspace
"""
from __future__ import annotations
import socket
from pathlib import Path
import pytest
from agentkit.core.sandbox import (
SandboxNetworkBlockedError,
WorkspaceSandbox,
detect_verification_commands,
)
# ── fixtures ──────────────────────────────────────────────────────────
@pytest.fixture
def workspace(tmp_path: Path) -> Path:
return tmp_path
@pytest.fixture
def sandbox(workspace: Path) -> WorkspaceSandbox:
return WorkspaceSandbox(workspace_root=workspace)
# ── validate_path ─────────────────────────────────────────────────────
def test_validate_path_resolves_relative(sandbox: WorkspaceSandbox, workspace: Path) -> None:
resolved = sandbox.validate_path("src/main.py")
assert resolved == (workspace / "src" / "main.py").resolve()
def test_validate_path_rejects_absolute(sandbox: WorkspaceSandbox) -> None:
with pytest.raises(ValueError, match="absolute paths are rejected"):
sandbox.validate_path("/etc/passwd")
def test_validate_path_rejects_traversal(sandbox: WorkspaceSandbox) -> None:
with pytest.raises(ValueError, match="path traversal"):
sandbox.validate_path("../../etc/passwd")
def test_validate_path_rejects_empty(sandbox: WorkspaceSandbox) -> None:
with pytest.raises(ValueError, match="non-empty string"):
sandbox.validate_path("")
def test_validate_path_rejects_symlink_escape(
sandbox: WorkspaceSandbox, workspace: Path, tmp_path_factory: pytest.TempPathFactory
) -> None:
outside = tmp_path_factory.mktemp("outside")
link = workspace / "escape"
link.symlink_to(outside)
with pytest.raises(ValueError, match="resolves outside the workspace"):
sandbox.validate_path("escape/secret.txt")
def test_validate_path_allows_nested(sandbox: WorkspaceSandbox, workspace: Path) -> None:
resolved = sandbox.validate_path("a/b/c/d.txt")
assert resolved == (workspace / "a" / "b" / "c" / "d.txt").resolve()
# ── is_coding_workspace ───────────────────────────────────────────────
def test_is_coding_workspace_pyproject(sandbox: WorkspaceSandbox, workspace: Path) -> None:
(workspace / "pyproject.toml").write_text("[project]\nname='x'\n")
assert sandbox.is_coding_workspace() is True
def test_is_coding_workspace_py_file(sandbox: WorkspaceSandbox, workspace: Path) -> None:
(workspace / "main.py").write_text("print('hi')")
assert sandbox.is_coding_workspace() is True
def test_is_coding_workspace_empty(sandbox: WorkspaceSandbox) -> None:
assert sandbox.is_coding_workspace() is False
def test_is_coding_workspace_non_python(sandbox: WorkspaceSandbox, workspace: Path) -> None:
(workspace / "README.md").write_text("# not python")
(workspace / "index.js").write_text("console.log('hi')")
assert sandbox.is_coding_workspace() is False
# ── network_block ─────────────────────────────────────────────────────
async def test_network_block_blocks_connect(sandbox: WorkspaceSandbox) -> None:
async with sandbox.network_block():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
with pytest.raises(SandboxNetworkBlockedError, match="blocked by sandbox"):
sock.connect(("127.0.0.1", 1))
finally:
sock.close()
async def test_network_block_restores_after_exit(sandbox: WorkspaceSandbox) -> None:
original = socket.socket.connect
async with sandbox.network_block():
assert socket.socket.connect is not original
assert socket.socket.connect is original
async def test_network_block_restores_on_exception(sandbox: WorkspaceSandbox) -> None:
original = socket.socket.connect
with pytest.raises(RuntimeError, match="boom"):
async with sandbox.network_block():
raise RuntimeError("boom")
assert socket.socket.connect is original
async def test_network_block_connect_ex_returns_errno(sandbox: WorkspaceSandbox) -> None:
import errno as errno_mod
async with sandbox.network_block():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
rc = sock.connect_ex(("127.0.0.1", 1))
assert rc == errno_mod.ECONNREFUSED
finally:
sock.close()
async def test_no_network_block_outside_context(sandbox: WorkspaceSandbox) -> None:
"""Sockets connect normally when the block is not active."""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
# connect_ex to a closed port returns ECONNREFUSED, not the sandbox error.
rc = sock.connect_ex(("127.0.0.1", 1))
assert rc != 0 # some connection error (expected — nothing listening)
# The key assertion: no SandboxNetworkBlockedError was raised, meaning
# the block is not active outside its context.
finally:
sock.close()
# ── detect_verification_commands ──────────────────────────────────────
def test_detect_verification_commands_coding(workspace: Path) -> None:
(workspace / "pyproject.toml").write_text("[project]\nname='x'\n")
cmds = detect_verification_commands(workspace)
assert cmds == ["pytest -x -q", "ruff check src/"]
def test_detect_verification_commands_non_coding(workspace: Path) -> None:
(workspace / "README.md").write_text("# docs only")
cmds = detect_verification_commands(workspace)
assert cmds == []
def test_detect_verification_commands_none() -> None:
assert detect_verification_commands(None) == []
def test_detect_verification_commands_empty_workspace(workspace: Path) -> None:
assert detect_verification_commands(workspace) == []

View File

@ -0,0 +1,276 @@
"""Unit tests for verification defaults (U3, R2/R3) + sandbox integration.
Covers:
- default_policy(workspace_root) coding-task detection sets verification_commands
- PhasePolicy.verification_commands field default None, to_dict() round-trip
- PlanExecEngine verification_enabled defaults True (R2), thread-through
- TeamOrchestrator verification_enabled defaults True (R2)
- ReActEngine verification_commands inherited from phase_policy; default
verification_enabled stays False (RV2 DIRECT_CHAT/REACT do not verify)
- ReActEngine._execute_tool sandbox blocks network during VERIFICATION,
no block in other phases or when sandbox is None
"""
from __future__ import annotations
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock
from agentkit.core.phase import PhasePolicy, PhaseState, WILDCARD, default_policy
from agentkit.core.plan_exec_engine import PlanExecEngine, ReActStepExecutor
from agentkit.core.react import ReActEngine
from agentkit.core.sandbox import WorkspaceSandbox
from agentkit.tools.base import Tool
# ── helpers ───────────────────────────────────────────────────────────
def make_mock_gateway() -> MagicMock:
"""A minimal mock LLMGateway for ReActEngine construction."""
from agentkit.llm.gateway import LLMGateway
gateway = MagicMock(spec=LLMGateway)
gateway.chat = AsyncMock(return_value=MagicMock())
return gateway
class _NetworkTool(Tool):
"""A test tool that attempts a socket connect — used to verify the sandbox
network block is active during VERIFICATION.
Catches ``OSError`` (e.g. ``ConnectionRefusedError``) so that when the
sandbox is NOT active, the tool returns a normal result dict. When the
sandbox IS active, ``SandboxNetworkBlockedError`` (a ``RuntimeError``,
not an ``OSError``) propagates past this catch to ``_execute_tool``'s
dedicated handler.
"""
def __init__(self) -> None:
super().__init__(
name="net_tool",
description="test tool that connects a socket",
input_schema={"type": "object", "properties": {}, "additionalProperties": False},
)
async def execute(self, **kwargs) -> dict[str, object]:
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect(("127.0.0.1", 1))
except OSError as e:
# Normal connection refusal (no listener) — proves the sandbox
# did NOT intercept the connect.
return {"ok": False, "error": type(e).__name__}
finally:
sock.close()
return {"ok": True}
# ── default_policy + PhasePolicy.verification_commands ────────────────
def test_default_policy_no_workspace_has_none_commands() -> None:
policy = default_policy()
assert policy.verification_commands is None
def test_default_policy_coding_workspace_forces_pytest_ruff(tmp_path: Path) -> None:
(tmp_path / "pyproject.toml").write_text("[project]\nname='x'\n")
policy = default_policy(workspace_root=tmp_path)
assert policy.verification_commands == ["pytest -x -q", "ruff check src/"]
def test_default_policy_non_coding_workspace_has_none_commands(tmp_path: Path) -> None:
(tmp_path / "README.md").write_text("# docs only")
policy = default_policy(workspace_root=tmp_path)
assert policy.verification_commands is None
def test_default_policy_empty_workspace_has_none_commands(tmp_path: Path) -> None:
policy = default_policy(workspace_root=tmp_path)
assert policy.verification_commands is None
def test_phase_policy_verification_commands_defaults_none() -> None:
policy = PhasePolicy(
whitelist={PhaseState.PLANNING: frozenset({WILDCARD})},
)
assert policy.verification_commands is None
def test_phase_policy_to_dict_includes_verification_commands() -> None:
policy = PhasePolicy(
whitelist={PhaseState.PLANNING: frozenset({WILDCARD})},
verification_commands=["pytest -x -q"],
)
d = policy.to_dict()
assert d["verification_commands"] == ["pytest -x -q"]
# ── PlanExecEngine defaults (R2) ──────────────────────────────────────
def test_plan_exec_engine_verification_enabled_defaults_true() -> None:
engine = PlanExecEngine(llm_gateway=None)
assert engine._verification_enabled is True
def test_plan_exec_engine_verification_enabled_can_be_disabled() -> None:
engine = PlanExecEngine(llm_gateway=None, verification_enabled=False)
assert engine._verification_enabled is False
def test_plan_exec_engine_verification_commands_threaded() -> None:
cmds = ["pytest -x -q", "ruff check src/"]
engine = PlanExecEngine(llm_gateway=None, verification_commands=cmds)
assert engine._verification_commands == cmds
def test_react_step_executor_threads_verification_params() -> None:
executor = ReActStepExecutor(
verification_enabled=True,
verification_commands=["pytest"],
)
assert executor._verification_enabled is True
assert executor._verification_commands == ["pytest"]
# ── TeamOrchestrator defaults (R2) ────────────────────────────────────
def test_team_orchestrator_verification_enabled_defaults_true() -> None:
from agentkit.experts.orchestrator import TeamOrchestrator
from agentkit.experts.team import ExpertTeam
team = MagicMock(spec=ExpertTeam)
orch = TeamOrchestrator(team=team)
assert orch._verification_enabled is True
def test_team_orchestrator_verification_can_be_disabled() -> None:
from agentkit.experts.orchestrator import TeamOrchestrator
from agentkit.experts.team import ExpertTeam
team = MagicMock(spec=ExpertTeam)
orch = TeamOrchestrator(team=team, verification_enabled=False)
assert orch._verification_enabled is False
def test_team_orchestrator_detects_commands_from_workspace(tmp_path: Path) -> None:
from agentkit.experts.orchestrator import TeamOrchestrator
from agentkit.experts.team import ExpertTeam
(tmp_path / "pyproject.toml").write_text("[project]\nname='x'\n")
team = MagicMock(spec=ExpertTeam)
orch = TeamOrchestrator(team=team, workspace_root=str(tmp_path))
assert orch._verification_commands == ["pytest -x -q", "ruff check src/"]
# ── ReActEngine: verification_commands inheritance + default (RV2) ────
def test_react_engine_default_verification_enabled_stays_false() -> None:
"""RV2: DIRECT_CHAT/REACT do not verify by default."""
engine = ReActEngine(llm_gateway=make_mock_gateway())
assert engine._verification_enabled is False
def test_react_engine_inherits_verification_commands_from_phase_policy() -> None:
policy = PhasePolicy(
whitelist={PhaseState.PLANNING: frozenset({WILDCARD})},
verification_commands=["pytest -x -q", "ruff check src/"],
)
engine = ReActEngine(
llm_gateway=make_mock_gateway(),
phase_policy=policy,
)
assert engine._verification_commands == ["pytest -x -q", "ruff check src/"]
def test_react_engine_explicit_commands_override_phase_policy() -> None:
policy = PhasePolicy(
whitelist={PhaseState.PLANNING: frozenset({WILDCARD})},
verification_commands=["pytest -x -q", "ruff check src/"],
)
engine = ReActEngine(
llm_gateway=make_mock_gateway(),
phase_policy=policy,
verification_commands=["echo custom"],
)
assert engine._verification_commands == ["echo custom"]
def test_react_engine_no_policy_no_commands() -> None:
engine = ReActEngine(llm_gateway=make_mock_gateway())
assert engine._verification_commands is None
# ── ReActEngine._execute_tool sandbox integration (RV3) ───────────────
async def test_execute_tool_blocks_network_in_verification_phase() -> None:
"""Sandbox blocks a tool's network call during VERIFICATION phase and
returns a structured error instead of raising."""
policy = PhasePolicy(
whitelist={
PhaseState.VERIFICATION: frozenset({"net_tool"}),
PhaseState.PLANNING: frozenset({WILDCARD}),
},
start_phase=PhaseState.VERIFICATION,
)
sandbox = WorkspaceSandbox(workspace_root=Path("/tmp"))
engine = ReActEngine(
llm_gateway=make_mock_gateway(),
phase_policy=policy,
sandbox=sandbox,
)
tool = _NetworkTool()
result = await engine._execute_tool("net_tool", {}, [tool])
assert result["error_code"] == "sandbox_network_blocked"
assert result["current_phase"] == "verification"
assert result["tool"] == "net_tool"
async def test_execute_tool_no_block_outside_verification() -> None:
"""Sandbox does not block tool calls in non-VERIFICATION phases."""
policy = PhasePolicy(
whitelist={
PhaseState.PLANNING: frozenset({"net_tool"}),
PhaseState.VERIFICATION: frozenset({WILDCARD}),
},
start_phase=PhaseState.PLANNING,
)
sandbox = WorkspaceSandbox(workspace_root=Path("/tmp"))
engine = ReActEngine(
llm_gateway=make_mock_gateway(),
phase_policy=policy,
sandbox=sandbox,
)
tool = _NetworkTool()
# In PLANNING phase, the tool should attempt the connect and fail with
# a connection error (not sandbox block). The connect to port 1 on
# localhost will fail with ECONNREFUSED — we just assert it's NOT the
# sandbox error code.
result = await engine._execute_tool("net_tool", {}, [tool])
assert result.get("error_code") != "sandbox_network_blocked"
async def test_execute_tool_no_sandbox_no_block() -> None:
"""No sandbox configured → no network blocking even in VERIFICATION."""
policy = PhasePolicy(
whitelist={
PhaseState.VERIFICATION: frozenset({"net_tool"}),
PhaseState.PLANNING: frozenset({WILDCARD}),
},
start_phase=PhaseState.VERIFICATION,
)
engine = ReActEngine(
llm_gateway=make_mock_gateway(),
phase_policy=policy,
sandbox=None,
)
tool = _NetworkTool()
result = await engine._execute_tool("net_tool", {}, [tool])
assert result.get("error_code") != "sandbox_network_blocked"