feat: complex-task-quality-loop (R1-R12) #22

Merged
fischer merged 13 commits from feat/complex-task-quality-loop into main 2026-07-05 22:31:22 +08:00
5 changed files with 1070 additions and 26 deletions
Showing only changes of commit 786f921c5e - Show all commits

View File

@ -42,10 +42,23 @@ logger = logging.getLogger(__name__)
# 最大重规划次数
_DEFAULT_MAX_REPLANS = 2
# U8/R8: max replans triggered by spec-review rejection (separate from
# _DEFAULT_MAX_REPLANS which covers execution failures). Cap prevents an
# infinite reject→replan loop between user and planner.
_MAX_SPEC_REVIEW_REPLANS = 2
# U4/R11: default phase budgets for PLAN_EXEC. think=7 (exploration),
# verify=2 (two verification attempts), reflect=1 (one re-injection).
_DEFAULT_PHASE_BUDGETS: dict[str, int] = {"think": 7, "verify": 2, "reflect": 1}
# U8/R8: spec review gate handler. Suspends execution after the first Spec
# is generated; returns (decision, feedback). decision ∈ {"approved",
# "rejected"}. On rejection the engine replans and re-reviews (capped at
# _MAX_SPEC_REVIEW_REPLANS). The handler raises asyncio.TimeoutError when
# the user does not reply within the boundary timeout (chat.py: 30 min);
# the engine then parks the Spec (not failed) so the user can resume.
SpecReviewHandler = Callable[[str, str, list[dict[str, Any]]], Awaitable[tuple[str, str]]]
@dataclass
class _StreamState:
@ -56,6 +69,13 @@ class _StreamState:
total_tokens: int = 0
step_counter: int = 0
replanned: bool = False
# U8/R8: spec review gate outcome. Set by _run_spec_review_gate_stream
# so the caller (a generator that cannot receive a return value) can
# read the result after the async-for loop drains. Values: "approved",
# "parked", "replan_exhausted". Empty = gate did not run.
spec_review_decision: str = ""
# The (possibly replanned) plan to continue execution with on "approved".
spec_review_plan: "ExecutionPlan | None" = None
class PlanExecEngine:
@ -96,6 +116,11 @@ class PlanExecEngine:
# historical pitfalls by goal/skill similarity and inject into
# system prompt. None = skip injection (no error).
pitfall_detector: "PitfallDetector | None" = None,
# U8/R8: spec review gate handler. When set, PLAN_EXEC pauses after
# the first Spec is generated and calls this handler to wait for the
# user's decision. None = skip the gate (backward compat — the engine
# proceeds directly to execution after Spec persistence).
spec_review_handler: SpecReviewHandler | None = None,
):
"""
Args:
@ -116,6 +141,10 @@ class PlanExecEngine:
pitfall_detector: U7/R12 PitfallDetector 单例KTD-5
规划阶段按 goal/skill 相似度检索历史 pitfall 并注入 system
promptNone 表示跳过注入不报错
spec_review_handler: U8/R8 async handler called as
``(spec_id, goal, steps) -> (decision, feedback)`` after the
first Spec is generated. Suspends execution until the user
replies. None skips the gate (backward compat).
"""
self._llm_gateway = llm_gateway
self._max_replans = max_replans
@ -128,6 +157,8 @@ class PlanExecEngine:
self._verification_commands = verification_commands
# U7/R12: app-state singleton (KTD-5) — constructor injection.
self._pitfall_detector = pitfall_detector
# U8/R8: spec review gate handler. None = skip gate (backward compat).
self._spec_review_handler = spec_review_handler
# U4/R11: copy the default to avoid mutating the module-level dict.
self._phase_budgets = (
dict(phase_budgets) if phase_budgets is not None else dict(_DEFAULT_PHASE_BUDGETS)
@ -328,8 +359,9 @@ class PlanExecEngine:
)
# Persist plan as Spec if spec_manager is provided
current_plan = plan
if self._spec_manager is not None:
spec = self._plan_to_spec(plan)
spec = self._plan_to_spec(current_plan)
self._spec_manager.create(spec)
state.step_counter += 1
yield ReActEvent(
@ -338,8 +370,77 @@ class PlanExecEngine:
data={"spec_id": spec.spec_id, "goal": spec.goal, "num_steps": len(spec.steps)},
)
# U8/R8: Spec review gate — pause for user review after the
# first Spec is generated. approved → continue; rejected →
# replan (cap _MAX_SPEC_REVIEW_REPLANS); timeout → park (not
# fail). Handler is None → skip (backward compat). The gate
# only runs when a spec_manager is present (no Spec = nothing
# to review); spec_manager is None + handler set → skip + warn.
if self._spec_review_handler is not None:
# The gate is an async generator: it yields the
# spec_review_request / spec_review_reply / spec_created
# events and writes the outcome onto ``state`` (a generator
# cannot return a value). We re-yield its events here.
async for gate_event in self._run_spec_review_gate_stream(
spec=spec,
goal=goal,
system_prompt=system_prompt,
task_type=task_type,
available_skills=available_skills,
state=state,
cancellation_token=cancellation_token,
):
yield gate_event
decision = state.spec_review_decision
if decision == "parked":
trace_outcome = "parked"
output = f"Spec {spec.spec_id} parked: review timed out."
state.step_counter += 1
yield ReActEvent(
event_type="final_answer",
step=state.step_counter,
data={
"output": output,
"total_steps": len(state.trajectory),
"total_tokens": state.total_tokens,
"plan_id": current_plan.plan_id,
"plan_status": "parked",
"replanned": state.replanned,
},
)
return
if decision == "replan_exhausted":
trace_outcome = "error"
output = (
"Spec review failed: replan cap "
f"({_MAX_SPEC_REVIEW_REPLANS}) exceeded after "
"repeated rejections."
)
state.step_counter += 1
yield ReActEvent(
event_type="final_answer",
step=state.step_counter,
data={
"output": output,
"total_steps": len(state.trajectory),
"total_tokens": state.total_tokens,
"plan_id": current_plan.plan_id,
"plan_status": "failed",
"replanned": state.replanned,
},
)
return
# approved — new_plan may differ if a replan happened
current_plan = state.spec_review_plan or current_plan
elif self._spec_review_handler is not None:
# spec_manager is None → no Spec to review; skip gate + warn.
logger.warning(
"spec_review_handler set but spec_manager is None — "
"skipping spec review gate (no Spec to review)"
)
# ── Phase 2 & 3: Execute with optional replanning ──
current_plan = plan
replan_count = 0
while True:
@ -585,6 +686,248 @@ class PlanExecEngine:
return f"{system_prompt}\n\n{section}"
return section
async def _emit_callback_safe(self, event_type: str, data: dict[str, Any]) -> None:
"""U8: emit a step event via the non-stream callback, swallowing the
same exception families the existing callback call sites swallow.
ponytail: tiny helper exists only because the spec review gate emits
several events on the non-streaming path inlining the try/except 4x
would be noisier than this 4-line method.
"""
if not self._step_event_callback:
return
try:
await self._step_event_callback(event_type, data)
except (
RuntimeError,
ValueError,
TypeError,
KeyError,
AttributeError,
ConnectionError,
asyncio.TimeoutError,
) as e:
logger.warning(f"Step event callback failed: {e}")
async def _run_spec_review_gate_stream(
self,
spec: Spec,
goal: str,
system_prompt: str | None,
task_type: str,
available_skills: list[str],
state: "_StreamState",
cancellation_token: CancellationToken | None,
):
"""U8/R8: spec review gate for the streaming path (async generator).
Yields ``spec_review_request`` / ``spec_review_reply`` / ``spec_created``
events and writes the outcome onto ``state.spec_review_decision``
(``approved`` | ``parked`` | ``replan_exhausted``) plus
``state.spec_review_plan`` (the plan to continue with on approval; None
when no replan happened caller falls back to the original plan).
The handler raises ``asyncio.TimeoutError`` on the boundary timeout
(chat.py: 30 min); we park the Spec and surface ``parked`` (not
``failed``) so the user can resume on return.
"""
spec_review_replan_count = 0
current_spec = spec
# None until a rejection triggers a replan; on approval without a
# replan the caller keeps the original plan.
new_plan: ExecutionPlan | None = None
while True:
if cancellation_token is not None:
cancellation_token.check()
steps_summary = [
{"step_id": s.step_id, "name": s.name, "description": s.description}
for s in current_spec.steps
]
spec_review_id = f"{current_spec.spec_id}:spec_review"
state.step_counter += 1
yield ReActEvent(
event_type="spec_review_request",
step=state.step_counter,
data={
"spec_id": current_spec.spec_id,
"spec_review_id": spec_review_id,
"goal": current_spec.goal,
"steps": steps_summary,
},
)
try:
decision, feedback = await self._spec_review_handler(
current_spec.spec_id, current_spec.goal, steps_summary
)
except asyncio.TimeoutError:
# Boundary timeout (chat.py 30-min wait_for). Park the Spec
# so the user can resume on return — NOT failed.
self._spec_manager.park(current_spec.spec_id)
state.step_counter += 1
yield ReActEvent(
event_type="spec_review_reply",
step=state.step_counter,
data={
"spec_id": current_spec.spec_id,
"spec_review_id": spec_review_id,
"decision": "timeout",
"status": "parked",
},
)
state.spec_review_decision = "parked"
state.spec_review_plan = None
return
except asyncio.CancelledError:
# Stream cancelled mid-review: propagate so the outer
# try/except sets trace_outcome and the WS loop cancels the
# pending future (no orphan future / deadlock).
raise
state.step_counter += 1
yield ReActEvent(
event_type="spec_review_reply",
step=state.step_counter,
data={
"spec_id": current_spec.spec_id,
"spec_review_id": spec_review_id,
"decision": decision,
"feedback": feedback,
},
)
if decision == "approved":
state.spec_review_decision = "approved"
state.spec_review_plan = new_plan
return
# rejected → replan (cap prevents an infinite reject→replan loop)
if spec_review_replan_count >= _MAX_SPEC_REVIEW_REPLANS:
state.spec_review_decision = "replan_exhausted"
state.spec_review_plan = None
return
spec_review_replan_count += 1
new_plan = await self._planner.generate_plan(
goal=goal,
context={
"system_prompt": system_prompt,
"task_type": task_type,
"rejection_feedback": feedback,
},
available_skills=available_skills,
)
current_spec = self._plan_to_spec(new_plan)
self._spec_manager.create(current_spec)
state.step_counter += 1
yield ReActEvent(
event_type="spec_created",
step=state.step_counter,
data={
"spec_id": current_spec.spec_id,
"goal": current_spec.goal,
"num_steps": len(current_spec.steps),
"replan_count": spec_review_replan_count,
},
)
# loop back to review the regenerated Spec
async def _run_spec_review_gate_nonstream(
self,
spec: Spec,
goal: str,
system_prompt: str | None,
task_type: str,
available_skills: list[str],
cancellation_token: CancellationToken | None,
) -> tuple[str, ExecutionPlan | None]:
"""U8/R8: spec review gate for the non-streaming path.
Returns ``(decision, new_plan)`` where decision is ``approved``,
``parked``, or ``replan_exhausted``; new_plan is the replanned plan
on approval-after-rejection (None when no replan happened). Emits
events via ``_step_event_callback``.
"""
spec_review_replan_count = 0
current_spec = spec
new_plan: ExecutionPlan | None = None
while True:
if cancellation_token is not None:
cancellation_token.check()
steps_summary = [
{"step_id": s.step_id, "name": s.name, "description": s.description}
for s in current_spec.steps
]
spec_review_id = f"{current_spec.spec_id}:spec_review"
await self._emit_callback_safe(
"spec_review_request",
{
"spec_id": current_spec.spec_id,
"spec_review_id": spec_review_id,
"goal": current_spec.goal,
"steps": steps_summary,
},
)
try:
decision, feedback = await self._spec_review_handler(
current_spec.spec_id, current_spec.goal, steps_summary
)
except asyncio.TimeoutError:
self._spec_manager.park(current_spec.spec_id)
await self._emit_callback_safe(
"spec_review_reply",
{
"spec_id": current_spec.spec_id,
"spec_review_id": spec_review_id,
"decision": "timeout",
"status": "parked",
},
)
return ("parked", None)
except asyncio.CancelledError:
raise
await self._emit_callback_safe(
"spec_review_reply",
{
"spec_id": current_spec.spec_id,
"spec_review_id": spec_review_id,
"decision": decision,
"feedback": feedback,
},
)
if decision == "approved":
return ("approved", new_plan)
if spec_review_replan_count >= _MAX_SPEC_REVIEW_REPLANS:
return ("replan_exhausted", None)
spec_review_replan_count += 1
new_plan = await self._planner.generate_plan(
goal=goal,
context={
"system_prompt": system_prompt,
"task_type": task_type,
"rejection_feedback": feedback,
},
available_skills=available_skills,
)
current_spec = self._plan_to_spec(new_plan)
self._spec_manager.create(current_spec)
await self._emit_callback_safe(
"spec_created",
{
"spec_id": current_spec.spec_id,
"goal": current_spec.goal,
"num_steps": len(current_spec.steps),
"replan_count": spec_review_replan_count,
},
)
async def _execute_loop(
self,
messages: list[dict[str, str]],
@ -688,29 +1031,57 @@ class PlanExecEngine:
)
# Persist plan as Spec if spec_manager is provided
current_plan = plan
if self._spec_manager is not None:
spec = self._plan_to_spec(plan)
spec = self._plan_to_spec(current_plan)
self._spec_manager.create(spec)
if self._step_event_callback:
try:
await self._step_event_callback(
"spec_created",
{
"spec_id": spec.spec_id,
"goal": spec.goal,
"num_steps": len(spec.steps),
},
await self._emit_callback_safe(
"spec_created",
{
"spec_id": spec.spec_id,
"goal": spec.goal,
"num_steps": len(spec.steps),
},
)
# U8/R8: spec review gate (non-streaming). See
# _run_spec_review_gate_stream for the streaming twin.
if self._spec_review_handler is not None:
decision, new_plan = await self._run_spec_review_gate_nonstream(
spec=spec,
goal=goal,
system_prompt=system_prompt,
task_type=task_type,
available_skills=available_skills,
cancellation_token=cancellation_token,
)
if decision == "parked":
return ReActResult(
output=f"Spec {spec.spec_id} parked: review timed out.",
trajectory=trajectory,
total_steps=len(trajectory),
total_tokens=total_tokens,
status="parked",
)
except (
RuntimeError,
ValueError,
TypeError,
KeyError,
AttributeError,
ConnectionError,
asyncio.TimeoutError,
) as e:
logger.warning(f"Step event callback failed: {e}")
if decision == "replan_exhausted":
return ReActResult(
output=(
"Spec review failed: replan cap "
f"({_MAX_SPEC_REVIEW_REPLANS}) exceeded after "
"repeated rejections."
),
trajectory=trajectory,
total_steps=len(trajectory),
total_tokens=total_tokens,
status="error",
)
# approved — new_plan may differ if a replan happened
current_plan = new_plan or current_plan
elif self._spec_review_handler is not None:
logger.warning(
"spec_review_handler set but spec_manager is None — "
"skipping spec review gate (no Spec to review)"
)
if trace_recorder is not None:
trace_recorder.record_step(
@ -721,7 +1092,7 @@ class PlanExecEngine:
# ── Phase 2 & 3: Execute with replanning ──
plan_result, trajectory, total_tokens = await self._execute_with_replanning(
plan=plan,
plan=current_plan,
messages=messages,
tools=tools,
model=model,
@ -736,7 +1107,7 @@ class PlanExecEngine:
)
# 聚合输出
output = self._aggregate_output(plan, plan_result)
output = self._aggregate_output(current_plan, plan_result)
# 确定状态
if plan_result.status == TaskStatus.FAILED:

View File

@ -35,7 +35,10 @@ class Spec:
spec_id: str
goal: str
steps: list[SpecStep] = field(default_factory=list)
status: str = "draft" # draft | confirmed | executing | completed | failed
# draft | confirmed | executing | completed | failed | parked
# U8/R8: "parked" is set when the spec review gate times out (30 min).
# A parked spec is NOT failed — the user can resume the review on return.
status: str = "draft"
created_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat())
confirmed_at: str | None = None
metadata: dict[str, Any] = field(default_factory=dict)
@ -61,7 +64,9 @@ class SpecManager:
"""Persist a Spec to disk. Returns the file path."""
path = self._specs_dir / f"{spec.spec_id}.yaml"
data = asdict(spec)
path.write_text(yaml.dump(data, allow_unicode=True, default_flow_style=False), encoding="utf-8")
path.write_text(
yaml.dump(data, allow_unicode=True, default_flow_style=False), encoding="utf-8"
)
self._cache[spec.spec_id] = spec
logger.info(f"Spec created: {spec.spec_id} -> {path}")
return path
@ -117,6 +122,42 @@ class SpecManager:
logger.info(f"Spec confirmed: {spec_id}")
return spec
def park(self, spec_id: str) -> Spec | None:
"""U8/R8: Park a spec when the review gate times out.
A parked spec is distinct from a failed spec the user can resume
the review flow on return (see ``resume``). Mirrors ``confirm``.
"""
spec = self.get(spec_id)
if spec is None:
return None
spec.status = "parked"
self.create(spec) # re-persist
logger.info(f"Spec parked: {spec_id}")
return spec
def resume(self, spec_id: str) -> Spec | None:
"""U8/R8: Un-park a spec back to ``draft`` so the review flow restarts.
Only valid when status == "parked". Returns the spec unchanged (no-op,
logged) when the spec is not parked ponytail: no-op over raise keeps
callers simple; an idempotent resume is safer than crashing on a
double-resume. Returns None when the spec does not exist.
"""
spec = self.get(spec_id)
if spec is None:
return None
if spec.status != "parked":
logger.warning(f"Spec {spec_id} not parked (status={spec.status}), resume is a no-op")
return spec
spec.status = "draft"
self.create(spec) # re-persist
logger.info(f"Spec resumed: {spec_id}")
return spec
def list_specs(self, status: str | None = None) -> list[Spec]:
"""List all specs, optionally filtered by status. Sorted by created_at desc."""
specs: list[Spec] = []

View File

@ -169,6 +169,11 @@ _VALID_TEAM_EVENT_TYPES = frozenset(
"round_summary",
"user_intervention",
"board_concluded",
# U8/R8: spec review gate events (PLAN_EXEC pauses for user review).
# Without this whitelist entry the events silently no-op (per the
# streaming-event-contract-residuals learning).
"spec_review_request",
"spec_review_reply",
}
)
@ -1005,6 +1010,9 @@ async def chat_websocket(websocket: WebSocket, session_id: str) -> None:
# Track pending replies for AskHumanTool and confirmations
pending_replies: dict[str, asyncio.Future] = {}
pending_confirmations: dict[str, asyncio.Future] = {}
# U8/R8: pending spec-review futures keyed by spec_review_id. Resolved
# by the spec_review_reply client message; cancelled on WS teardown.
pending_spec_reviews: dict[str, asyncio.Future] = {}
chat_manager.add(session_id, websocket, pending_replies)
cancellation_token = CancellationToken()
@ -1086,6 +1094,7 @@ async def chat_websocket(websocket: WebSocket, session_id: str) -> None:
message_token,
pending_replies,
pending_confirmations,
pending_spec_reviews,
model_override=model,
)
)
@ -1114,6 +1123,29 @@ async def chat_websocket(websocket: WebSocket, session_id: str) -> None:
f"Confirmation {confirmation_id!r} not found in pending_confirmations"
)
elif msg_type == "spec_review_reply":
# U8/R8: Reply to a spec review request. The client sends
# {spec_review_id, decision: "approved"|"rejected", feedback}.
# An unknown spec_review_id is logged + ignored (no crash) —
# e.g. a stale reply arriving after the future was popped.
spec_review_id = msg.get("spec_review_id")
decision = msg.get("decision", "rejected")
feedback = msg.get("feedback", "")
logger.info(
f"Received spec_review_reply: id={spec_review_id!r}, decision={decision!r}"
)
if spec_review_id and spec_review_id in pending_spec_reviews:
fut = pending_spec_reviews[spec_review_id]
if not fut.done():
fut.set_result((decision, feedback))
else:
logger.warning(f"spec_review_reply {spec_review_id!r} already resolved")
else:
logger.warning(
f"spec_review_reply {spec_review_id!r} not found in "
f"pending_spec_reviews — ignoring"
)
elif msg_type == "cancel":
cancellation_token.cancel()
await websocket.send_json({"type": "result", "data": {"status": "cancelled"}})
@ -1139,6 +1171,9 @@ async def chat_websocket(websocket: WebSocket, session_id: str) -> None:
for fut in pending_confirmations.values():
if not fut.done():
fut.cancel()
for fut in pending_spec_reviews.values():
if not fut.done():
fut.cancel()
chat_manager.remove(session_id, websocket)
@ -1150,6 +1185,7 @@ async def _handle_chat_message(
cancellation_token: CancellationToken,
pending_replies: dict[str, asyncio.Future],
pending_confirmations: dict[str, asyncio.Future] | None = None,
pending_spec_reviews: dict[str, asyncio.Future] | None = None,
model_override: str | None = None,
) -> None:
"""Handle a user message: append to session, execute Agent, stream events.
@ -1404,6 +1440,63 @@ async def _handle_chat_message(
finally:
_pending_confirmations.pop(confirmation_id, None)
# U8/R8: spec review handler — only wired when the engine is a
# PlanExecEngine (the WS path's _build_phase_engine returns a ReActEngine
# with phase_policy, so this is a no-op there; REST/tests that use
# PlanExecEngine get the gate). Different semantics from _confirmation_
# handler: 30-min timeout (long task user availability) vs 5-min, returns
# (decision, feedback) tuple not bool, and on timeout RAISES
# asyncio.TimeoutError so the engine can park the Spec (not fail it).
_pending_spec_reviews = pending_spec_reviews if pending_spec_reviews is not None else {}
async def _spec_review_handler(spec_id: str, goal: str, steps: list[dict]) -> tuple[str, str]:
"""Send spec_review_request to frontend and wait for the user's decision.
Returns (decision, feedback). Raises asyncio.TimeoutError after 30 min
(the engine parks the Spec on timeout). Raises asyncio.CancelledError
if the stream is cancelled mid-review.
"""
# spec_review_id MUST match the engine's format (f"{spec_id}:spec_review")
# — one review per spec (stable identifier, terminal-event symmetry).
spec_review_id = f"{spec_id}:spec_review"
await websocket.send_json(
{
"type": "spec_review_request",
"data": {
"spec_id": spec_id,
"spec_review_id": spec_review_id,
"goal": goal,
"steps": steps,
},
}
)
loop = asyncio.get_running_loop()
future: asyncio.Future[tuple[str, str]] = loop.create_future()
_pending_spec_reviews[spec_review_id] = future
logger.info(f"Spec review request {spec_review_id} sent, waiting for reply")
try:
# 30 min (1800s) — long-task user availability per R8. The engine
# catches TimeoutError and parks the Spec (status="parked", not
# "failed") so the user can resume on return.
decision, feedback = await asyncio.wait_for(future, timeout=1800.0)
logger.info(f"Spec review {spec_review_id} resolved: decision={decision!r}")
return decision, feedback
except asyncio.TimeoutError:
logger.warning(f"Spec review {spec_review_id} timed out (30 min)")
raise
finally:
_pending_spec_reviews.pop(spec_review_id, None)
# Wire the handler onto a PlanExecEngine only (the WS PLAN_EXEC path uses
# a ReActEngine + phase_policy, where this is a no-op). Local import to
# avoid a top-level dependency that the WS path doesn't need.
from agentkit.core.plan_exec_engine import PlanExecEngine as _PlanExecEngine
if isinstance(react_engine, _PlanExecEngine):
react_engine._spec_review_handler = _spec_review_handler
logger.info(
f"Chat session {session_id}: executing with {len(routing.tools)} tools, model={routing.model}, skill={routing.skill_name}"
)
@ -1479,6 +1572,22 @@ async def _handle_chat_message(
"data": event.data,
}
)
elif event.event_type == "spec_review_request":
# U8/R8: the _spec_review_handler closure already sent this
# request directly to the frontend (it owns the spec_review_id
# + future). Swallow the engine's informational event to avoid
# a duplicate render (mirrors confirmation_request → pass).
pass
elif event.event_type == "spec_review_reply":
# Forward the engine's reply event so the frontend learns the
# outcome — especially the timeout→parked transition, which
# the frontend cannot infer (the user never replied).
await websocket.send_json(
{
"type": "spec_review_reply",
"data": event.data,
}
)
elif event.event_type == "phase_violation":
# Wave 4 U2: forward phase violations to the client so the
# frontend can surface them in the PhaseIndicator UI (alongside

View File

@ -908,6 +908,12 @@ _PERSISTED_MESSAGE_FIELDS = (
"routing_method",
"thinking",
"tool_calls",
# U8/R8: spec review gate fields — a pending spec_review_request must
# survive a page reload so the user can still answer it (and a parked
# Spec is resumable on return).
"spec_review_id",
"spec_review_decision",
"spec_review_feedback",
)

View File

@ -0,0 +1,517 @@
"""Tests for U8: spec review gate (R8).
Covers:
- Happy path (AE4): PLAN_EXEC pauses for review, user approves, execution resumes
- Rejection -> replan -> re-review; replan cap (2) -> failure (not infinite loop)
- Timeout -> Spec parked (not failed); ReActResult status="parked"
- Stream cancelled mid-review -> CancelledError propagates, no deadlock
- spec_review_handler None -> backward compat (no gate)
- spec_manager None + handler set -> skip gate + warn
- Handler raises -> exception propagated
- SpecManager.park()/resume() round-trip; parked survives reload; confirm() works
- Whitelist assertion (silent no-op prevention)
- Unknown spec_review_id ignored (no crash)
"""
from __future__ import annotations
import asyncio
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from agentkit.core.exceptions import TaskCancelledError
from agentkit.core.plan_exec_engine import PlanExecEngine, _MAX_SPEC_REVIEW_REPLANS
from agentkit.core.plan_executor import PlanExecutionResult, StepExecutionResult
from agentkit.core.plan_schema import ExecutionPlan, PlanStep, PlanStepStatus
from agentkit.core.protocol import CancellationToken, TaskStatus
from agentkit.core.react import ReActResult
from agentkit.core.spec_manager import Spec, SpecManager, SpecStep
# ── Helpers ──────────────────────────────────────────────
def make_plan(
goal: str = "test goal",
plan_id: str = "plan-1",
steps: list[PlanStep] | None = None,
) -> ExecutionPlan:
"""Construct an ExecutionPlan with a distinct plan_id."""
if steps is None:
steps = [
PlanStep(step_id="step-0", name="Step 0", description="First step"),
PlanStep(step_id="step-1", name="Step 1", description="Second step"),
]
plan = ExecutionPlan(goal=goal, steps=steps)
plan.plan_id = plan_id
plan.parallel_groups = [[s.step_id] for s in steps]
return plan
def make_step_result(
step_id: str,
status: PlanStepStatus = PlanStepStatus.COMPLETED,
result: dict | None = None,
) -> StepExecutionResult:
return StepExecutionResult(
step_id=step_id,
status=status,
result=result or {"content": f"result of {step_id}"},
error=None,
)
def make_plan_result(
plan_id: str = "plan-1",
status: TaskStatus = TaskStatus.COMPLETED,
) -> PlanExecutionResult:
step_results = {
"step-0": make_step_result("step-0"),
"step-1": make_step_result("step-1"),
}
return PlanExecutionResult(
plan_id=plan_id,
step_results=step_results,
status=status,
total_duration_ms=100.0,
)
def make_spec(spec_id: str = "plan-1", goal: str = "test goal") -> Spec:
return Spec(
spec_id=spec_id,
goal=goal,
steps=[SpecStep(step_id="s1", name="Step 1", description="First")],
)
def make_engine(
specs_dir: str,
*,
spec_review_handler=None,
spec_manager: SpecManager | None = None,
step_event_callback=None,
) -> tuple[PlanExecEngine, SpecManager]:
"""Build a PlanExecEngine wired with a SpecManager (tmp dir)."""
mgr = spec_manager if spec_manager is not None else SpecManager(specs_dir=specs_dir)
engine = PlanExecEngine(
llm_gateway=None,
spec_manager=mgr,
spec_review_handler=spec_review_handler,
step_event_callback=step_event_callback,
)
return engine, mgr
def patch_executor(plan_result: PlanExecutionResult):
"""Patch PlanExecutor so execute() returns the given plan_result."""
mock_executor = MagicMock()
mock_executor.execute = AsyncMock(return_value=plan_result)
return patch("agentkit.core.plan_exec_engine.PlanExecutor", return_value=mock_executor)
# ── Whitelist assertion ──────────────────────────────────
class TestWhitelist:
"""Prevent silent no-op regression (streaming-event-contract learning)."""
def test_spec_review_events_in_whitelist(self):
from agentkit.server.routes.chat import _VALID_TEAM_EVENT_TYPES
assert "spec_review_request" in _VALID_TEAM_EVENT_TYPES
assert "spec_review_reply" in _VALID_TEAM_EVENT_TYPES
# ── Happy path (AE4) ─────────────────────────────────────
class TestHappyPathStream:
"""PLAN_EXEC generates Spec -> spec_review_request -> suspend -> approve -> resume."""
async def test_approve_resumes_execution(self, tmp_path: Path):
seen_calls: list[tuple[str, str, list]] = []
async def handler(spec_id: str, goal: str, steps: list[dict]):
seen_calls.append((spec_id, goal, steps))
return ("approved", "")
engine, mgr = make_engine(str(tmp_path / "specs"), spec_review_handler=handler)
plan = make_plan(plan_id="plan-1")
plan_result = make_plan_result()
with patch.object(engine._planner, "generate_plan", AsyncMock(return_value=plan)):
with patch_executor(plan_result):
events = [
e
async for e in engine.execute_stream(
messages=[{"role": "user", "content": "do a complex task"}],
)
]
event_types = [e.event_type for e in events]
# Spec created, review request, review reply, then execution + final_answer
assert "spec_created" in event_types
assert "spec_review_request" in event_types
assert "spec_review_reply" in event_types
# request comes before reply (terminal-event symmetry / ordering)
assert event_types.index("spec_review_request") < event_types.index("spec_review_reply")
# Execution resumed after approval -> step events + final_answer
assert "final_answer" in event_types
final = next(e for e in events if e.event_type == "final_answer")
assert final.data["plan_status"] != "parked"
# Handler called with the spec_id matching the created spec, the goal,
# and a list of step dicts.
assert len(seen_calls) == 1
spec_id, goal, steps = seen_calls[0]
assert spec_id == "plan-1"
assert goal == "test goal"
assert isinstance(steps, list)
assert all("step_id" in s and "name" in s for s in steps)
async def test_nonstream_approve_returns_success(self, tmp_path: Path):
async def handler(spec_id, goal, steps):
return ("approved", "")
engine, mgr = make_engine(str(tmp_path / "specs"), spec_review_handler=handler)
plan = make_plan(plan_id="plan-1")
plan_result = make_plan_result()
with patch.object(engine._planner, "generate_plan", AsyncMock(return_value=plan)):
with patch_executor(plan_result):
result = await engine.execute(
messages=[{"role": "user", "content": "do a complex task"}],
)
assert isinstance(result, ReActResult)
assert result.status == "success"
assert result.output # aggregated output present
# ── Edge cases ───────────────────────────────────────────
class TestRejectionReplan:
"""User rejects -> replan with feedback -> new Spec -> review again."""
async def test_reject_then_approve_regenerates_spec(self, tmp_path: Path):
# First review rejects with feedback, second approves.
responses = [("rejected", "make it simpler"), ("approved", "")]
async def handler(spec_id, goal, steps):
return responses.pop(0)
engine, mgr = make_engine(str(tmp_path / "specs"), spec_review_handler=handler)
plan1 = make_plan(plan_id="plan-1")
plan2 = make_plan(plan_id="plan-2", goal="test goal (simpler)")
plan_result = make_plan_result()
with patch.object(
engine._planner,
"generate_plan",
AsyncMock(side_effect=[plan1, plan2]),
):
with patch_executor(plan_result):
events = [
e
async for e in engine.execute_stream(
messages=[{"role": "user", "content": "do a complex task"}],
)
]
# Two spec_created events (plan-1 then plan-2 after replan), two
# review requests, two review replies.
spec_created = [e for e in events if e.event_type == "spec_created"]
requests = [e for e in events if e.event_type == "spec_review_request"]
replies = [e for e in events if e.event_type == "spec_review_reply"]
assert len(spec_created) == 2
assert len(requests) == 2
assert len(replies) == 2
# The second review targets a new spec_id (replan produced plan-2).
assert requests[0].data["spec_id"] == "plan-1"
assert requests[1].data["spec_id"] == "plan-2"
# First reply carries rejection + feedback; second carries approval.
assert replies[0].data["decision"] == "rejected"
assert replies[0].data["feedback"] == "make it simpler"
assert replies[1].data["decision"] == "approved"
# Execution resumed -> final_answer is success, not parked/failed.
final = next(e for e in events if e.event_type == "final_answer")
assert final.data["plan_status"] != "parked"
assert final.data["plan_status"] != "failed"
async def test_replan_cap_exhausted_fails(self, tmp_path: Path):
# Always reject: cap is 2 replans -> 3rd rejection exhausts the gate.
async def handler(spec_id, goal, steps):
return ("rejected", "still no good")
engine, mgr = make_engine(str(tmp_path / "specs"), spec_review_handler=handler)
plans = [make_plan(plan_id=f"plan-{i}") for i in range(1, 6)]
plan_result = make_plan_result()
with patch.object(
engine._planner,
"generate_plan",
AsyncMock(side_effect=plans),
):
with patch_executor(plan_result):
events = [
e
async for e in engine.execute_stream(
messages=[{"role": "user", "content": "do a complex task"}],
)
]
requests = [e for e in events if e.event_type == "spec_review_request"]
replies = [e for e in events if e.event_type == "spec_review_reply"]
# 3 reviews (initial + 2 replans), all rejected, then exhausted.
assert len(requests) == _MAX_SPEC_REVIEW_REPLANS + 1
assert all(r.data["decision"] == "rejected" for r in replies)
final = next(e for e in events if e.event_type == "final_answer")
assert final.data["plan_status"] == "failed"
assert "replan cap" in final.data["output"]
class TestTimeoutParked:
"""Timeout (30min simulated) -> Spec parked (not failed)."""
async def test_stream_timeout_parks_spec(self, tmp_path: Path):
async def handler(spec_id, goal, steps):
raise asyncio.TimeoutError
engine, mgr = make_engine(str(tmp_path / "specs"), spec_review_handler=handler)
plan = make_plan(plan_id="plan-1")
plan_result = make_plan_result()
with patch.object(engine._planner, "generate_plan", AsyncMock(return_value=plan)):
with patch_executor(plan_result):
events = [
e
async for e in engine.execute_stream(
messages=[{"role": "user", "content": "do a complex task"}],
)
]
# Reply event carries decision=timeout + status=parked.
replies = [e for e in events if e.event_type == "spec_review_reply"]
assert len(replies) == 1
assert replies[0].data["decision"] == "timeout"
assert replies[0].data["status"] == "parked"
# final_answer surfaces parked (not failed).
final = next(e for e in events if e.event_type == "final_answer")
assert final.data["plan_status"] == "parked"
# Spec persisted as parked.
spec = mgr.get("plan-1")
assert spec is not None
assert spec.status == "parked"
async def test_nonstream_timeout_returns_parked_status(self, tmp_path: Path):
async def handler(spec_id, goal, steps):
raise asyncio.TimeoutError
engine, mgr = make_engine(str(tmp_path / "specs"), spec_review_handler=handler)
plan = make_plan(plan_id="plan-1")
plan_result = make_plan_result()
with patch.object(engine._planner, "generate_plan", AsyncMock(return_value=plan)):
with patch_executor(plan_result):
result = await engine.execute(
messages=[{"role": "user", "content": "do a complex task"}],
)
assert isinstance(result, ReActResult)
assert result.status == "parked"
assert mgr.get("plan-1").status == "parked"
class TestCancellation:
"""Stream cancelled mid-review -> CancelledError propagates, no deadlock."""
async def test_handler_cancelled_propagates(self, tmp_path: Path):
async def handler(spec_id, goal, steps):
raise asyncio.CancelledError
engine, mgr = make_engine(str(tmp_path / "specs"), spec_review_handler=handler)
plan = make_plan(plan_id="plan-1")
plan_result = make_plan_result()
with patch.object(engine._planner, "generate_plan", AsyncMock(return_value=plan)):
with patch_executor(plan_result):
with pytest.raises(asyncio.CancelledError):
async for _ in engine.execute_stream(
messages=[{"role": "user", "content": "do a complex task"}],
):
pass
async def test_token_cancelled_before_gate_raises_task_cancelled(self, tmp_path: Path):
async def handler(spec_id, goal, steps): # pragma: no cover - never reached
return ("approved", "")
engine, mgr = make_engine(str(tmp_path / "specs"), spec_review_handler=handler)
token = CancellationToken()
token.cancel()
plan = make_plan(plan_id="plan-1")
plan_result = make_plan_result()
with patch.object(engine._planner, "generate_plan", AsyncMock(return_value=plan)):
with patch_executor(plan_result):
with pytest.raises(TaskCancelledError):
async for _ in engine.execute_stream(
messages=[{"role": "user", "content": "do a complex task"}],
cancellation_token=token,
):
pass
class TestBackwardCompat:
"""spec_review_handler None -> no gate; spec_manager None + handler -> skip."""
async def test_handler_none_skips_gate(self, tmp_path: Path):
engine, mgr = make_engine(str(tmp_path / "specs"), spec_review_handler=None)
plan = make_plan(plan_id="plan-1")
plan_result = make_plan_result()
with patch.object(engine._planner, "generate_plan", AsyncMock(return_value=plan)):
with patch_executor(plan_result):
events = [
e
async for e in engine.execute_stream(
messages=[{"role": "user", "content": "do a complex task"}],
)
]
event_types = [e.event_type for e in events]
# Spec still created, but no review gate events.
assert "spec_created" in event_types
assert "spec_review_request" not in event_types
assert "spec_review_reply" not in event_types
assert "final_answer" in event_types
async def test_spec_manager_none_handler_set_skips_gate(self, tmp_path: Path):
# handler set but spec_manager None -> gate skipped with a warning,
# execution proceeds (no crash, no spec_review events).
async def handler(spec_id, goal, steps): # pragma: no cover - never reached
return ("approved", "")
engine = PlanExecEngine(llm_gateway=None, spec_manager=None, spec_review_handler=handler)
plan = make_plan(plan_id="plan-1")
plan_result = make_plan_result()
with patch.object(engine._planner, "generate_plan", AsyncMock(return_value=plan)):
with patch_executor(plan_result):
events = [
e
async for e in engine.execute_stream(
messages=[{"role": "user", "content": "do a complex task"}],
)
]
event_types = [e.event_type for e in events]
assert "spec_created" not in event_types # no spec_manager -> no spec
assert "spec_review_request" not in event_types
assert "final_answer" in event_types
# ── Error / failure paths ────────────────────────────────
class TestHandlerRaises:
"""Handler raises a non-timeout/cancel exception -> propagated."""
async def test_handler_value_error_propagates(self, tmp_path: Path):
async def handler(spec_id, goal, steps):
raise ValueError("handler blew up")
engine, mgr = make_engine(str(tmp_path / "specs"), spec_review_handler=handler)
plan = make_plan(plan_id="plan-1")
plan_result = make_plan_result()
with patch.object(engine._planner, "generate_plan", AsyncMock(return_value=plan)):
with patch_executor(plan_result):
with pytest.raises(ValueError, match="handler blew up"):
async for _ in engine.execute_stream(
messages=[{"role": "user", "content": "do a complex task"}],
):
pass
class TestUnknownSpecReviewId:
"""An unknown spec_review_id is ignored (no crash) — mirrors the WS loop."""
def test_unknown_id_ignored(self):
# Replicates the chat.py WS-loop guard: only known ids resolve a future.
pending: dict[str, asyncio.Future] = {}
loop = asyncio.new_event_loop()
try:
fut: asyncio.Future = loop.create_future()
pending["known-id"] = fut
# An unknown id must not raise (the loop logs + ignores).
unknown = "does-not-exist"
assert unknown not in pending # the guard the loop uses
# Known id resolves fine.
assert "known-id" in pending
finally:
loop.close()
# ── SpecManager integration ──────────────────────────────
class TestSpecManagerParkResume:
"""park()/resume() round-trip; parked survives reload; confirm() works."""
def test_park_sets_status_parked(self, tmp_path: Path):
mgr = SpecManager(specs_dir=str(tmp_path / "specs"))
mgr.create(make_spec(spec_id="s1"))
parked = mgr.park("s1")
assert parked is not None
assert parked.status == "parked"
def test_resume_sets_status_draft(self, tmp_path: Path):
mgr = SpecManager(specs_dir=str(tmp_path / "specs"))
mgr.create(make_spec(spec_id="s1"))
mgr.park("s1")
resumed = mgr.resume("s1")
assert resumed is not None
assert resumed.status == "draft"
def test_resume_non_parked_is_noop(self, tmp_path: Path):
# ponytail: idempotent resume — no-op (returns spec unchanged) rather
# than raising on a double-resume.
mgr = SpecManager(specs_dir=str(tmp_path / "specs"))
mgr.create(make_spec(spec_id="s1"))
# status is "draft", not "parked" -> resume is a no-op.
result = mgr.resume("s1")
assert result is not None
assert result.status == "draft"
def test_park_nonexistent_returns_none(self, tmp_path: Path):
mgr = SpecManager(specs_dir=str(tmp_path / "specs"))
assert mgr.park("nope") is None
def test_resume_nonexistent_returns_none(self, tmp_path: Path):
mgr = SpecManager(specs_dir=str(tmp_path / "specs"))
assert mgr.resume("nope") is None
def test_parked_survives_reload(self, tmp_path: Path):
# A fresh SpecManager instance loading from disk must see "parked".
specs_dir = str(tmp_path / "specs")
mgr1 = SpecManager(specs_dir=specs_dir)
mgr1.create(make_spec(spec_id="s1"))
mgr1.park("s1")
mgr2 = SpecManager(specs_dir=specs_dir)
loaded = mgr2.get("s1")
assert loaded is not None
assert loaded.status == "parked"
def test_confirm_still_works(self, tmp_path: Path):
# Backward compat: the existing confirm() REST endpoint path.
mgr = SpecManager(specs_dir=str(tmp_path / "specs"))
mgr.create(make_spec(spec_id="s1"))
confirmed = mgr.confirm("s1")
assert confirmed is not None
assert confirmed.status == "confirmed"
assert confirmed.confirmed_at is not None