diff --git a/src/agentkit/core/plan_exec_engine.py b/src/agentkit/core/plan_exec_engine.py index d15aff6..9382ed0 100644 --- a/src/agentkit/core/plan_exec_engine.py +++ b/src/agentkit/core/plan_exec_engine.py @@ -42,10 +42,23 @@ logger = logging.getLogger(__name__) # 最大重规划次数 _DEFAULT_MAX_REPLANS = 2 +# U8/R8: max replans triggered by spec-review rejection (separate from +# _DEFAULT_MAX_REPLANS which covers execution failures). Cap prevents an +# infinite reject→replan loop between user and planner. +_MAX_SPEC_REVIEW_REPLANS = 2 + # U4/R11: default phase budgets for PLAN_EXEC. think=7 (exploration), # verify=2 (two verification attempts), reflect=1 (one re-injection). _DEFAULT_PHASE_BUDGETS: dict[str, int] = {"think": 7, "verify": 2, "reflect": 1} +# U8/R8: spec review gate handler. Suspends execution after the first Spec +# is generated; returns (decision, feedback). decision ∈ {"approved", +# "rejected"}. On rejection the engine replans and re-reviews (capped at +# _MAX_SPEC_REVIEW_REPLANS). The handler raises asyncio.TimeoutError when +# the user does not reply within the boundary timeout (chat.py: 30 min); +# the engine then parks the Spec (not failed) so the user can resume. +SpecReviewHandler = Callable[[str, str, list[dict[str, Any]]], Awaitable[tuple[str, str]]] + @dataclass class _StreamState: @@ -56,6 +69,13 @@ class _StreamState: total_tokens: int = 0 step_counter: int = 0 replanned: bool = False + # U8/R8: spec review gate outcome. Set by _run_spec_review_gate_stream + # so the caller (a generator that cannot receive a return value) can + # read the result after the async-for loop drains. Values: "approved", + # "parked", "replan_exhausted". Empty = gate did not run. + spec_review_decision: str = "" + # The (possibly replanned) plan to continue execution with on "approved". + spec_review_plan: "ExecutionPlan | None" = None class PlanExecEngine: @@ -96,6 +116,11 @@ class PlanExecEngine: # historical pitfalls by goal/skill similarity and inject into # system prompt. None = skip injection (no error). pitfall_detector: "PitfallDetector | None" = None, + # U8/R8: spec review gate handler. When set, PLAN_EXEC pauses after + # the first Spec is generated and calls this handler to wait for the + # user's decision. None = skip the gate (backward compat — the engine + # proceeds directly to execution after Spec persistence). + spec_review_handler: SpecReviewHandler | None = None, ): """ Args: @@ -116,6 +141,10 @@ class PlanExecEngine: pitfall_detector: U7/R12 — PitfallDetector 单例(KTD-5)。 规划阶段按 goal/skill 相似度检索历史 pitfall 并注入 system prompt。None 表示跳过注入(不报错)。 + spec_review_handler: U8/R8 — async handler called as + ``(spec_id, goal, steps) -> (decision, feedback)`` after the + first Spec is generated. Suspends execution until the user + replies. None skips the gate (backward compat). """ self._llm_gateway = llm_gateway self._max_replans = max_replans @@ -128,6 +157,8 @@ class PlanExecEngine: self._verification_commands = verification_commands # U7/R12: app-state singleton (KTD-5) — constructor injection. self._pitfall_detector = pitfall_detector + # U8/R8: spec review gate handler. None = skip gate (backward compat). + self._spec_review_handler = spec_review_handler # U4/R11: copy the default to avoid mutating the module-level dict. self._phase_budgets = ( dict(phase_budgets) if phase_budgets is not None else dict(_DEFAULT_PHASE_BUDGETS) @@ -328,8 +359,9 @@ class PlanExecEngine: ) # Persist plan as Spec if spec_manager is provided + current_plan = plan if self._spec_manager is not None: - spec = self._plan_to_spec(plan) + spec = self._plan_to_spec(current_plan) self._spec_manager.create(spec) state.step_counter += 1 yield ReActEvent( @@ -338,8 +370,77 @@ class PlanExecEngine: data={"spec_id": spec.spec_id, "goal": spec.goal, "num_steps": len(spec.steps)}, ) + # U8/R8: Spec review gate — pause for user review after the + # first Spec is generated. approved → continue; rejected → + # replan (cap _MAX_SPEC_REVIEW_REPLANS); timeout → park (not + # fail). Handler is None → skip (backward compat). The gate + # only runs when a spec_manager is present (no Spec = nothing + # to review); spec_manager is None + handler set → skip + warn. + if self._spec_review_handler is not None: + # The gate is an async generator: it yields the + # spec_review_request / spec_review_reply / spec_created + # events and writes the outcome onto ``state`` (a generator + # cannot return a value). We re-yield its events here. + async for gate_event in self._run_spec_review_gate_stream( + spec=spec, + goal=goal, + system_prompt=system_prompt, + task_type=task_type, + available_skills=available_skills, + state=state, + cancellation_token=cancellation_token, + ): + yield gate_event + + decision = state.spec_review_decision + if decision == "parked": + trace_outcome = "parked" + output = f"Spec {spec.spec_id} parked: review timed out." + state.step_counter += 1 + yield ReActEvent( + event_type="final_answer", + step=state.step_counter, + data={ + "output": output, + "total_steps": len(state.trajectory), + "total_tokens": state.total_tokens, + "plan_id": current_plan.plan_id, + "plan_status": "parked", + "replanned": state.replanned, + }, + ) + return + if decision == "replan_exhausted": + trace_outcome = "error" + output = ( + "Spec review failed: replan cap " + f"({_MAX_SPEC_REVIEW_REPLANS}) exceeded after " + "repeated rejections." + ) + state.step_counter += 1 + yield ReActEvent( + event_type="final_answer", + step=state.step_counter, + data={ + "output": output, + "total_steps": len(state.trajectory), + "total_tokens": state.total_tokens, + "plan_id": current_plan.plan_id, + "plan_status": "failed", + "replanned": state.replanned, + }, + ) + return + # approved — new_plan may differ if a replan happened + current_plan = state.spec_review_plan or current_plan + elif self._spec_review_handler is not None: + # spec_manager is None → no Spec to review; skip gate + warn. + logger.warning( + "spec_review_handler set but spec_manager is None — " + "skipping spec review gate (no Spec to review)" + ) + # ── Phase 2 & 3: Execute with optional replanning ── - current_plan = plan replan_count = 0 while True: @@ -585,6 +686,248 @@ class PlanExecEngine: return f"{system_prompt}\n\n{section}" return section + async def _emit_callback_safe(self, event_type: str, data: dict[str, Any]) -> None: + """U8: emit a step event via the non-stream callback, swallowing the + same exception families the existing callback call sites swallow. + + ponytail: tiny helper exists only because the spec review gate emits + several events on the non-streaming path — inlining the try/except 4x + would be noisier than this 4-line method. + """ + if not self._step_event_callback: + return + try: + await self._step_event_callback(event_type, data) + except ( + RuntimeError, + ValueError, + TypeError, + KeyError, + AttributeError, + ConnectionError, + asyncio.TimeoutError, + ) as e: + logger.warning(f"Step event callback failed: {e}") + + async def _run_spec_review_gate_stream( + self, + spec: Spec, + goal: str, + system_prompt: str | None, + task_type: str, + available_skills: list[str], + state: "_StreamState", + cancellation_token: CancellationToken | None, + ): + """U8/R8: spec review gate for the streaming path (async generator). + + Yields ``spec_review_request`` / ``spec_review_reply`` / ``spec_created`` + events and writes the outcome onto ``state.spec_review_decision`` + (``approved`` | ``parked`` | ``replan_exhausted``) plus + ``state.spec_review_plan`` (the plan to continue with on approval; None + when no replan happened — caller falls back to the original plan). + + The handler raises ``asyncio.TimeoutError`` on the boundary timeout + (chat.py: 30 min); we park the Spec and surface ``parked`` (not + ``failed``) so the user can resume on return. + """ + spec_review_replan_count = 0 + current_spec = spec + # None until a rejection triggers a replan; on approval without a + # replan the caller keeps the original plan. + new_plan: ExecutionPlan | None = None + while True: + if cancellation_token is not None: + cancellation_token.check() + + steps_summary = [ + {"step_id": s.step_id, "name": s.name, "description": s.description} + for s in current_spec.steps + ] + spec_review_id = f"{current_spec.spec_id}:spec_review" + state.step_counter += 1 + yield ReActEvent( + event_type="spec_review_request", + step=state.step_counter, + data={ + "spec_id": current_spec.spec_id, + "spec_review_id": spec_review_id, + "goal": current_spec.goal, + "steps": steps_summary, + }, + ) + + try: + decision, feedback = await self._spec_review_handler( + current_spec.spec_id, current_spec.goal, steps_summary + ) + except asyncio.TimeoutError: + # Boundary timeout (chat.py 30-min wait_for). Park the Spec + # so the user can resume on return — NOT failed. + self._spec_manager.park(current_spec.spec_id) + state.step_counter += 1 + yield ReActEvent( + event_type="spec_review_reply", + step=state.step_counter, + data={ + "spec_id": current_spec.spec_id, + "spec_review_id": spec_review_id, + "decision": "timeout", + "status": "parked", + }, + ) + state.spec_review_decision = "parked" + state.spec_review_plan = None + return + except asyncio.CancelledError: + # Stream cancelled mid-review: propagate so the outer + # try/except sets trace_outcome and the WS loop cancels the + # pending future (no orphan future / deadlock). + raise + + state.step_counter += 1 + yield ReActEvent( + event_type="spec_review_reply", + step=state.step_counter, + data={ + "spec_id": current_spec.spec_id, + "spec_review_id": spec_review_id, + "decision": decision, + "feedback": feedback, + }, + ) + + if decision == "approved": + state.spec_review_decision = "approved" + state.spec_review_plan = new_plan + return + + # rejected → replan (cap prevents an infinite reject→replan loop) + if spec_review_replan_count >= _MAX_SPEC_REVIEW_REPLANS: + state.spec_review_decision = "replan_exhausted" + state.spec_review_plan = None + return + + spec_review_replan_count += 1 + new_plan = await self._planner.generate_plan( + goal=goal, + context={ + "system_prompt": system_prompt, + "task_type": task_type, + "rejection_feedback": feedback, + }, + available_skills=available_skills, + ) + current_spec = self._plan_to_spec(new_plan) + self._spec_manager.create(current_spec) + state.step_counter += 1 + yield ReActEvent( + event_type="spec_created", + step=state.step_counter, + data={ + "spec_id": current_spec.spec_id, + "goal": current_spec.goal, + "num_steps": len(current_spec.steps), + "replan_count": spec_review_replan_count, + }, + ) + # loop back to review the regenerated Spec + + async def _run_spec_review_gate_nonstream( + self, + spec: Spec, + goal: str, + system_prompt: str | None, + task_type: str, + available_skills: list[str], + cancellation_token: CancellationToken | None, + ) -> tuple[str, ExecutionPlan | None]: + """U8/R8: spec review gate for the non-streaming path. + + Returns ``(decision, new_plan)`` where decision is ``approved``, + ``parked``, or ``replan_exhausted``; new_plan is the replanned plan + on approval-after-rejection (None when no replan happened). Emits + events via ``_step_event_callback``. + """ + spec_review_replan_count = 0 + current_spec = spec + new_plan: ExecutionPlan | None = None + while True: + if cancellation_token is not None: + cancellation_token.check() + + steps_summary = [ + {"step_id": s.step_id, "name": s.name, "description": s.description} + for s in current_spec.steps + ] + spec_review_id = f"{current_spec.spec_id}:spec_review" + await self._emit_callback_safe( + "spec_review_request", + { + "spec_id": current_spec.spec_id, + "spec_review_id": spec_review_id, + "goal": current_spec.goal, + "steps": steps_summary, + }, + ) + + try: + decision, feedback = await self._spec_review_handler( + current_spec.spec_id, current_spec.goal, steps_summary + ) + except asyncio.TimeoutError: + self._spec_manager.park(current_spec.spec_id) + await self._emit_callback_safe( + "spec_review_reply", + { + "spec_id": current_spec.spec_id, + "spec_review_id": spec_review_id, + "decision": "timeout", + "status": "parked", + }, + ) + return ("parked", None) + except asyncio.CancelledError: + raise + + await self._emit_callback_safe( + "spec_review_reply", + { + "spec_id": current_spec.spec_id, + "spec_review_id": spec_review_id, + "decision": decision, + "feedback": feedback, + }, + ) + + if decision == "approved": + return ("approved", new_plan) + + if spec_review_replan_count >= _MAX_SPEC_REVIEW_REPLANS: + return ("replan_exhausted", None) + + spec_review_replan_count += 1 + new_plan = await self._planner.generate_plan( + goal=goal, + context={ + "system_prompt": system_prompt, + "task_type": task_type, + "rejection_feedback": feedback, + }, + available_skills=available_skills, + ) + current_spec = self._plan_to_spec(new_plan) + self._spec_manager.create(current_spec) + await self._emit_callback_safe( + "spec_created", + { + "spec_id": current_spec.spec_id, + "goal": current_spec.goal, + "num_steps": len(current_spec.steps), + "replan_count": spec_review_replan_count, + }, + ) + async def _execute_loop( self, messages: list[dict[str, str]], @@ -688,29 +1031,57 @@ class PlanExecEngine: ) # Persist plan as Spec if spec_manager is provided + current_plan = plan if self._spec_manager is not None: - spec = self._plan_to_spec(plan) + spec = self._plan_to_spec(current_plan) self._spec_manager.create(spec) - if self._step_event_callback: - try: - await self._step_event_callback( - "spec_created", - { - "spec_id": spec.spec_id, - "goal": spec.goal, - "num_steps": len(spec.steps), - }, + await self._emit_callback_safe( + "spec_created", + { + "spec_id": spec.spec_id, + "goal": spec.goal, + "num_steps": len(spec.steps), + }, + ) + + # U8/R8: spec review gate (non-streaming). See + # _run_spec_review_gate_stream for the streaming twin. + if self._spec_review_handler is not None: + decision, new_plan = await self._run_spec_review_gate_nonstream( + spec=spec, + goal=goal, + system_prompt=system_prompt, + task_type=task_type, + available_skills=available_skills, + cancellation_token=cancellation_token, + ) + if decision == "parked": + return ReActResult( + output=f"Spec {spec.spec_id} parked: review timed out.", + trajectory=trajectory, + total_steps=len(trajectory), + total_tokens=total_tokens, + status="parked", ) - except ( - RuntimeError, - ValueError, - TypeError, - KeyError, - AttributeError, - ConnectionError, - asyncio.TimeoutError, - ) as e: - logger.warning(f"Step event callback failed: {e}") + if decision == "replan_exhausted": + return ReActResult( + output=( + "Spec review failed: replan cap " + f"({_MAX_SPEC_REVIEW_REPLANS}) exceeded after " + "repeated rejections." + ), + trajectory=trajectory, + total_steps=len(trajectory), + total_tokens=total_tokens, + status="error", + ) + # approved — new_plan may differ if a replan happened + current_plan = new_plan or current_plan + elif self._spec_review_handler is not None: + logger.warning( + "spec_review_handler set but spec_manager is None — " + "skipping spec review gate (no Spec to review)" + ) if trace_recorder is not None: trace_recorder.record_step( @@ -721,7 +1092,7 @@ class PlanExecEngine: # ── Phase 2 & 3: Execute with replanning ── plan_result, trajectory, total_tokens = await self._execute_with_replanning( - plan=plan, + plan=current_plan, messages=messages, tools=tools, model=model, @@ -736,7 +1107,7 @@ class PlanExecEngine: ) # 聚合输出 - output = self._aggregate_output(plan, plan_result) + output = self._aggregate_output(current_plan, plan_result) # 确定状态 if plan_result.status == TaskStatus.FAILED: diff --git a/src/agentkit/core/spec_manager.py b/src/agentkit/core/spec_manager.py index c28976c..0ead76c 100644 --- a/src/agentkit/core/spec_manager.py +++ b/src/agentkit/core/spec_manager.py @@ -35,7 +35,10 @@ class Spec: spec_id: str goal: str steps: list[SpecStep] = field(default_factory=list) - status: str = "draft" # draft | confirmed | executing | completed | failed + # draft | confirmed | executing | completed | failed | parked + # U8/R8: "parked" is set when the spec review gate times out (30 min). + # A parked spec is NOT failed — the user can resume the review on return. + status: str = "draft" created_at: str = field(default_factory=lambda: datetime.now(timezone.utc).isoformat()) confirmed_at: str | None = None metadata: dict[str, Any] = field(default_factory=dict) @@ -61,7 +64,9 @@ class SpecManager: """Persist a Spec to disk. Returns the file path.""" path = self._specs_dir / f"{spec.spec_id}.yaml" data = asdict(spec) - path.write_text(yaml.dump(data, allow_unicode=True, default_flow_style=False), encoding="utf-8") + path.write_text( + yaml.dump(data, allow_unicode=True, default_flow_style=False), encoding="utf-8" + ) self._cache[spec.spec_id] = spec logger.info(f"Spec created: {spec.spec_id} -> {path}") return path @@ -117,6 +122,42 @@ class SpecManager: logger.info(f"Spec confirmed: {spec_id}") return spec + def park(self, spec_id: str) -> Spec | None: + """U8/R8: Park a spec when the review gate times out. + + A parked spec is distinct from a failed spec — the user can resume + the review flow on return (see ``resume``). Mirrors ``confirm``. + """ + spec = self.get(spec_id) + if spec is None: + return None + + spec.status = "parked" + self.create(spec) # re-persist + logger.info(f"Spec parked: {spec_id}") + return spec + + def resume(self, spec_id: str) -> Spec | None: + """U8/R8: Un-park a spec back to ``draft`` so the review flow restarts. + + Only valid when status == "parked". Returns the spec unchanged (no-op, + logged) when the spec is not parked — ponytail: no-op over raise keeps + callers simple; an idempotent resume is safer than crashing on a + double-resume. Returns None when the spec does not exist. + """ + spec = self.get(spec_id) + if spec is None: + return None + + if spec.status != "parked": + logger.warning(f"Spec {spec_id} not parked (status={spec.status}), resume is a no-op") + return spec + + spec.status = "draft" + self.create(spec) # re-persist + logger.info(f"Spec resumed: {spec_id}") + return spec + def list_specs(self, status: str | None = None) -> list[Spec]: """List all specs, optionally filtered by status. Sorted by created_at desc.""" specs: list[Spec] = [] diff --git a/src/agentkit/server/routes/chat.py b/src/agentkit/server/routes/chat.py index ed9aa49..269d8bd 100644 --- a/src/agentkit/server/routes/chat.py +++ b/src/agentkit/server/routes/chat.py @@ -169,6 +169,11 @@ _VALID_TEAM_EVENT_TYPES = frozenset( "round_summary", "user_intervention", "board_concluded", + # U8/R8: spec review gate events (PLAN_EXEC pauses for user review). + # Without this whitelist entry the events silently no-op (per the + # streaming-event-contract-residuals learning). + "spec_review_request", + "spec_review_reply", } ) @@ -1005,6 +1010,9 @@ async def chat_websocket(websocket: WebSocket, session_id: str) -> None: # Track pending replies for AskHumanTool and confirmations pending_replies: dict[str, asyncio.Future] = {} pending_confirmations: dict[str, asyncio.Future] = {} + # U8/R8: pending spec-review futures keyed by spec_review_id. Resolved + # by the spec_review_reply client message; cancelled on WS teardown. + pending_spec_reviews: dict[str, asyncio.Future] = {} chat_manager.add(session_id, websocket, pending_replies) cancellation_token = CancellationToken() @@ -1086,6 +1094,7 @@ async def chat_websocket(websocket: WebSocket, session_id: str) -> None: message_token, pending_replies, pending_confirmations, + pending_spec_reviews, model_override=model, ) ) @@ -1114,6 +1123,29 @@ async def chat_websocket(websocket: WebSocket, session_id: str) -> None: f"Confirmation {confirmation_id!r} not found in pending_confirmations" ) + elif msg_type == "spec_review_reply": + # U8/R8: Reply to a spec review request. The client sends + # {spec_review_id, decision: "approved"|"rejected", feedback}. + # An unknown spec_review_id is logged + ignored (no crash) — + # e.g. a stale reply arriving after the future was popped. + spec_review_id = msg.get("spec_review_id") + decision = msg.get("decision", "rejected") + feedback = msg.get("feedback", "") + logger.info( + f"Received spec_review_reply: id={spec_review_id!r}, decision={decision!r}" + ) + if spec_review_id and spec_review_id in pending_spec_reviews: + fut = pending_spec_reviews[spec_review_id] + if not fut.done(): + fut.set_result((decision, feedback)) + else: + logger.warning(f"spec_review_reply {spec_review_id!r} already resolved") + else: + logger.warning( + f"spec_review_reply {spec_review_id!r} not found in " + f"pending_spec_reviews — ignoring" + ) + elif msg_type == "cancel": cancellation_token.cancel() await websocket.send_json({"type": "result", "data": {"status": "cancelled"}}) @@ -1139,6 +1171,9 @@ async def chat_websocket(websocket: WebSocket, session_id: str) -> None: for fut in pending_confirmations.values(): if not fut.done(): fut.cancel() + for fut in pending_spec_reviews.values(): + if not fut.done(): + fut.cancel() chat_manager.remove(session_id, websocket) @@ -1150,6 +1185,7 @@ async def _handle_chat_message( cancellation_token: CancellationToken, pending_replies: dict[str, asyncio.Future], pending_confirmations: dict[str, asyncio.Future] | None = None, + pending_spec_reviews: dict[str, asyncio.Future] | None = None, model_override: str | None = None, ) -> None: """Handle a user message: append to session, execute Agent, stream events. @@ -1404,6 +1440,63 @@ async def _handle_chat_message( finally: _pending_confirmations.pop(confirmation_id, None) + # U8/R8: spec review handler — only wired when the engine is a + # PlanExecEngine (the WS path's _build_phase_engine returns a ReActEngine + # with phase_policy, so this is a no-op there; REST/tests that use + # PlanExecEngine get the gate). Different semantics from _confirmation_ + # handler: 30-min timeout (long task user availability) vs 5-min, returns + # (decision, feedback) tuple not bool, and on timeout RAISES + # asyncio.TimeoutError so the engine can park the Spec (not fail it). + _pending_spec_reviews = pending_spec_reviews if pending_spec_reviews is not None else {} + + async def _spec_review_handler(spec_id: str, goal: str, steps: list[dict]) -> tuple[str, str]: + """Send spec_review_request to frontend and wait for the user's decision. + + Returns (decision, feedback). Raises asyncio.TimeoutError after 30 min + (the engine parks the Spec on timeout). Raises asyncio.CancelledError + if the stream is cancelled mid-review. + """ + # spec_review_id MUST match the engine's format (f"{spec_id}:spec_review") + # — one review per spec (stable identifier, terminal-event symmetry). + spec_review_id = f"{spec_id}:spec_review" + await websocket.send_json( + { + "type": "spec_review_request", + "data": { + "spec_id": spec_id, + "spec_review_id": spec_review_id, + "goal": goal, + "steps": steps, + }, + } + ) + + loop = asyncio.get_running_loop() + future: asyncio.Future[tuple[str, str]] = loop.create_future() + _pending_spec_reviews[spec_review_id] = future + logger.info(f"Spec review request {spec_review_id} sent, waiting for reply") + + try: + # 30 min (1800s) — long-task user availability per R8. The engine + # catches TimeoutError and parks the Spec (status="parked", not + # "failed") so the user can resume on return. + decision, feedback = await asyncio.wait_for(future, timeout=1800.0) + logger.info(f"Spec review {spec_review_id} resolved: decision={decision!r}") + return decision, feedback + except asyncio.TimeoutError: + logger.warning(f"Spec review {spec_review_id} timed out (30 min)") + raise + finally: + _pending_spec_reviews.pop(spec_review_id, None) + + # Wire the handler onto a PlanExecEngine only (the WS PLAN_EXEC path uses + # a ReActEngine + phase_policy, where this is a no-op). Local import to + # avoid a top-level dependency that the WS path doesn't need. + from agentkit.core.plan_exec_engine import PlanExecEngine as _PlanExecEngine + + if isinstance(react_engine, _PlanExecEngine): + react_engine._spec_review_handler = _spec_review_handler + logger.info( f"Chat session {session_id}: executing with {len(routing.tools)} tools, model={routing.model}, skill={routing.skill_name}" ) @@ -1479,6 +1572,22 @@ async def _handle_chat_message( "data": event.data, } ) + elif event.event_type == "spec_review_request": + # U8/R8: the _spec_review_handler closure already sent this + # request directly to the frontend (it owns the spec_review_id + # + future). Swallow the engine's informational event to avoid + # a duplicate render (mirrors confirmation_request → pass). + pass + elif event.event_type == "spec_review_reply": + # Forward the engine's reply event so the frontend learns the + # outcome — especially the timeout→parked transition, which + # the frontend cannot infer (the user never replied). + await websocket.send_json( + { + "type": "spec_review_reply", + "data": event.data, + } + ) elif event.event_type == "phase_violation": # Wave 4 U2: forward phase violations to the client so the # frontend can surface them in the PhaseIndicator UI (alongside diff --git a/src/agentkit/server/routes/portal.py b/src/agentkit/server/routes/portal.py index a9dad8e..1661361 100644 --- a/src/agentkit/server/routes/portal.py +++ b/src/agentkit/server/routes/portal.py @@ -908,6 +908,12 @@ _PERSISTED_MESSAGE_FIELDS = ( "routing_method", "thinking", "tool_calls", + # U8/R8: spec review gate fields — a pending spec_review_request must + # survive a page reload so the user can still answer it (and a parked + # Spec is resumable on return). + "spec_review_id", + "spec_review_decision", + "spec_review_feedback", ) diff --git a/tests/unit/test_spec_review_gate.py b/tests/unit/test_spec_review_gate.py new file mode 100644 index 0000000..7642157 --- /dev/null +++ b/tests/unit/test_spec_review_gate.py @@ -0,0 +1,517 @@ +"""Tests for U8: spec review gate (R8). + +Covers: +- Happy path (AE4): PLAN_EXEC pauses for review, user approves, execution resumes +- Rejection -> replan -> re-review; replan cap (2) -> failure (not infinite loop) +- Timeout -> Spec parked (not failed); ReActResult status="parked" +- Stream cancelled mid-review -> CancelledError propagates, no deadlock +- spec_review_handler None -> backward compat (no gate) +- spec_manager None + handler set -> skip gate + warn +- Handler raises -> exception propagated +- SpecManager.park()/resume() round-trip; parked survives reload; confirm() works +- Whitelist assertion (silent no-op prevention) +- Unknown spec_review_id ignored (no crash) +""" + +from __future__ import annotations + +import asyncio +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from agentkit.core.exceptions import TaskCancelledError +from agentkit.core.plan_exec_engine import PlanExecEngine, _MAX_SPEC_REVIEW_REPLANS +from agentkit.core.plan_executor import PlanExecutionResult, StepExecutionResult +from agentkit.core.plan_schema import ExecutionPlan, PlanStep, PlanStepStatus +from agentkit.core.protocol import CancellationToken, TaskStatus +from agentkit.core.react import ReActResult +from agentkit.core.spec_manager import Spec, SpecManager, SpecStep + + +# ── Helpers ────────────────────────────────────────────── + + +def make_plan( + goal: str = "test goal", + plan_id: str = "plan-1", + steps: list[PlanStep] | None = None, +) -> ExecutionPlan: + """Construct an ExecutionPlan with a distinct plan_id.""" + if steps is None: + steps = [ + PlanStep(step_id="step-0", name="Step 0", description="First step"), + PlanStep(step_id="step-1", name="Step 1", description="Second step"), + ] + plan = ExecutionPlan(goal=goal, steps=steps) + plan.plan_id = plan_id + plan.parallel_groups = [[s.step_id] for s in steps] + return plan + + +def make_step_result( + step_id: str, + status: PlanStepStatus = PlanStepStatus.COMPLETED, + result: dict | None = None, +) -> StepExecutionResult: + return StepExecutionResult( + step_id=step_id, + status=status, + result=result or {"content": f"result of {step_id}"}, + error=None, + ) + + +def make_plan_result( + plan_id: str = "plan-1", + status: TaskStatus = TaskStatus.COMPLETED, +) -> PlanExecutionResult: + step_results = { + "step-0": make_step_result("step-0"), + "step-1": make_step_result("step-1"), + } + return PlanExecutionResult( + plan_id=plan_id, + step_results=step_results, + status=status, + total_duration_ms=100.0, + ) + + +def make_spec(spec_id: str = "plan-1", goal: str = "test goal") -> Spec: + return Spec( + spec_id=spec_id, + goal=goal, + steps=[SpecStep(step_id="s1", name="Step 1", description="First")], + ) + + +def make_engine( + specs_dir: str, + *, + spec_review_handler=None, + spec_manager: SpecManager | None = None, + step_event_callback=None, +) -> tuple[PlanExecEngine, SpecManager]: + """Build a PlanExecEngine wired with a SpecManager (tmp dir).""" + mgr = spec_manager if spec_manager is not None else SpecManager(specs_dir=specs_dir) + engine = PlanExecEngine( + llm_gateway=None, + spec_manager=mgr, + spec_review_handler=spec_review_handler, + step_event_callback=step_event_callback, + ) + return engine, mgr + + +def patch_executor(plan_result: PlanExecutionResult): + """Patch PlanExecutor so execute() returns the given plan_result.""" + mock_executor = MagicMock() + mock_executor.execute = AsyncMock(return_value=plan_result) + return patch("agentkit.core.plan_exec_engine.PlanExecutor", return_value=mock_executor) + + +# ── Whitelist assertion ────────────────────────────────── + + +class TestWhitelist: + """Prevent silent no-op regression (streaming-event-contract learning).""" + + def test_spec_review_events_in_whitelist(self): + from agentkit.server.routes.chat import _VALID_TEAM_EVENT_TYPES + + assert "spec_review_request" in _VALID_TEAM_EVENT_TYPES + assert "spec_review_reply" in _VALID_TEAM_EVENT_TYPES + + +# ── Happy path (AE4) ───────────────────────────────────── + + +class TestHappyPathStream: + """PLAN_EXEC generates Spec -> spec_review_request -> suspend -> approve -> resume.""" + + async def test_approve_resumes_execution(self, tmp_path: Path): + seen_calls: list[tuple[str, str, list]] = [] + + async def handler(spec_id: str, goal: str, steps: list[dict]): + seen_calls.append((spec_id, goal, steps)) + return ("approved", "") + + engine, mgr = make_engine(str(tmp_path / "specs"), spec_review_handler=handler) + plan = make_plan(plan_id="plan-1") + plan_result = make_plan_result() + + with patch.object(engine._planner, "generate_plan", AsyncMock(return_value=plan)): + with patch_executor(plan_result): + events = [ + e + async for e in engine.execute_stream( + messages=[{"role": "user", "content": "do a complex task"}], + ) + ] + + event_types = [e.event_type for e in events] + # Spec created, review request, review reply, then execution + final_answer + assert "spec_created" in event_types + assert "spec_review_request" in event_types + assert "spec_review_reply" in event_types + # request comes before reply (terminal-event symmetry / ordering) + assert event_types.index("spec_review_request") < event_types.index("spec_review_reply") + # Execution resumed after approval -> step events + final_answer + assert "final_answer" in event_types + final = next(e for e in events if e.event_type == "final_answer") + assert final.data["plan_status"] != "parked" + + # Handler called with the spec_id matching the created spec, the goal, + # and a list of step dicts. + assert len(seen_calls) == 1 + spec_id, goal, steps = seen_calls[0] + assert spec_id == "plan-1" + assert goal == "test goal" + assert isinstance(steps, list) + assert all("step_id" in s and "name" in s for s in steps) + + async def test_nonstream_approve_returns_success(self, tmp_path: Path): + async def handler(spec_id, goal, steps): + return ("approved", "") + + engine, mgr = make_engine(str(tmp_path / "specs"), spec_review_handler=handler) + plan = make_plan(plan_id="plan-1") + plan_result = make_plan_result() + + with patch.object(engine._planner, "generate_plan", AsyncMock(return_value=plan)): + with patch_executor(plan_result): + result = await engine.execute( + messages=[{"role": "user", "content": "do a complex task"}], + ) + + assert isinstance(result, ReActResult) + assert result.status == "success" + assert result.output # aggregated output present + + +# ── Edge cases ─────────────────────────────────────────── + + +class TestRejectionReplan: + """User rejects -> replan with feedback -> new Spec -> review again.""" + + async def test_reject_then_approve_regenerates_spec(self, tmp_path: Path): + # First review rejects with feedback, second approves. + responses = [("rejected", "make it simpler"), ("approved", "")] + + async def handler(spec_id, goal, steps): + return responses.pop(0) + + engine, mgr = make_engine(str(tmp_path / "specs"), spec_review_handler=handler) + plan1 = make_plan(plan_id="plan-1") + plan2 = make_plan(plan_id="plan-2", goal="test goal (simpler)") + plan_result = make_plan_result() + + with patch.object( + engine._planner, + "generate_plan", + AsyncMock(side_effect=[plan1, plan2]), + ): + with patch_executor(plan_result): + events = [ + e + async for e in engine.execute_stream( + messages=[{"role": "user", "content": "do a complex task"}], + ) + ] + + # Two spec_created events (plan-1 then plan-2 after replan), two + # review requests, two review replies. + spec_created = [e for e in events if e.event_type == "spec_created"] + requests = [e for e in events if e.event_type == "spec_review_request"] + replies = [e for e in events if e.event_type == "spec_review_reply"] + assert len(spec_created) == 2 + assert len(requests) == 2 + assert len(replies) == 2 + # The second review targets a new spec_id (replan produced plan-2). + assert requests[0].data["spec_id"] == "plan-1" + assert requests[1].data["spec_id"] == "plan-2" + # First reply carries rejection + feedback; second carries approval. + assert replies[0].data["decision"] == "rejected" + assert replies[0].data["feedback"] == "make it simpler" + assert replies[1].data["decision"] == "approved" + # Execution resumed -> final_answer is success, not parked/failed. + final = next(e for e in events if e.event_type == "final_answer") + assert final.data["plan_status"] != "parked" + assert final.data["plan_status"] != "failed" + + async def test_replan_cap_exhausted_fails(self, tmp_path: Path): + # Always reject: cap is 2 replans -> 3rd rejection exhausts the gate. + async def handler(spec_id, goal, steps): + return ("rejected", "still no good") + + engine, mgr = make_engine(str(tmp_path / "specs"), spec_review_handler=handler) + plans = [make_plan(plan_id=f"plan-{i}") for i in range(1, 6)] + plan_result = make_plan_result() + + with patch.object( + engine._planner, + "generate_plan", + AsyncMock(side_effect=plans), + ): + with patch_executor(plan_result): + events = [ + e + async for e in engine.execute_stream( + messages=[{"role": "user", "content": "do a complex task"}], + ) + ] + + requests = [e for e in events if e.event_type == "spec_review_request"] + replies = [e for e in events if e.event_type == "spec_review_reply"] + # 3 reviews (initial + 2 replans), all rejected, then exhausted. + assert len(requests) == _MAX_SPEC_REVIEW_REPLANS + 1 + assert all(r.data["decision"] == "rejected" for r in replies) + final = next(e for e in events if e.event_type == "final_answer") + assert final.data["plan_status"] == "failed" + assert "replan cap" in final.data["output"] + + +class TestTimeoutParked: + """Timeout (30min simulated) -> Spec parked (not failed).""" + + async def test_stream_timeout_parks_spec(self, tmp_path: Path): + async def handler(spec_id, goal, steps): + raise asyncio.TimeoutError + + engine, mgr = make_engine(str(tmp_path / "specs"), spec_review_handler=handler) + plan = make_plan(plan_id="plan-1") + plan_result = make_plan_result() + + with patch.object(engine._planner, "generate_plan", AsyncMock(return_value=plan)): + with patch_executor(plan_result): + events = [ + e + async for e in engine.execute_stream( + messages=[{"role": "user", "content": "do a complex task"}], + ) + ] + + # Reply event carries decision=timeout + status=parked. + replies = [e for e in events if e.event_type == "spec_review_reply"] + assert len(replies) == 1 + assert replies[0].data["decision"] == "timeout" + assert replies[0].data["status"] == "parked" + # final_answer surfaces parked (not failed). + final = next(e for e in events if e.event_type == "final_answer") + assert final.data["plan_status"] == "parked" + # Spec persisted as parked. + spec = mgr.get("plan-1") + assert spec is not None + assert spec.status == "parked" + + async def test_nonstream_timeout_returns_parked_status(self, tmp_path: Path): + async def handler(spec_id, goal, steps): + raise asyncio.TimeoutError + + engine, mgr = make_engine(str(tmp_path / "specs"), spec_review_handler=handler) + plan = make_plan(plan_id="plan-1") + plan_result = make_plan_result() + + with patch.object(engine._planner, "generate_plan", AsyncMock(return_value=plan)): + with patch_executor(plan_result): + result = await engine.execute( + messages=[{"role": "user", "content": "do a complex task"}], + ) + + assert isinstance(result, ReActResult) + assert result.status == "parked" + assert mgr.get("plan-1").status == "parked" + + +class TestCancellation: + """Stream cancelled mid-review -> CancelledError propagates, no deadlock.""" + + async def test_handler_cancelled_propagates(self, tmp_path: Path): + async def handler(spec_id, goal, steps): + raise asyncio.CancelledError + + engine, mgr = make_engine(str(tmp_path / "specs"), spec_review_handler=handler) + plan = make_plan(plan_id="plan-1") + plan_result = make_plan_result() + + with patch.object(engine._planner, "generate_plan", AsyncMock(return_value=plan)): + with patch_executor(plan_result): + with pytest.raises(asyncio.CancelledError): + async for _ in engine.execute_stream( + messages=[{"role": "user", "content": "do a complex task"}], + ): + pass + + async def test_token_cancelled_before_gate_raises_task_cancelled(self, tmp_path: Path): + async def handler(spec_id, goal, steps): # pragma: no cover - never reached + return ("approved", "") + + engine, mgr = make_engine(str(tmp_path / "specs"), spec_review_handler=handler) + token = CancellationToken() + token.cancel() + plan = make_plan(plan_id="plan-1") + plan_result = make_plan_result() + + with patch.object(engine._planner, "generate_plan", AsyncMock(return_value=plan)): + with patch_executor(plan_result): + with pytest.raises(TaskCancelledError): + async for _ in engine.execute_stream( + messages=[{"role": "user", "content": "do a complex task"}], + cancellation_token=token, + ): + pass + + +class TestBackwardCompat: + """spec_review_handler None -> no gate; spec_manager None + handler -> skip.""" + + async def test_handler_none_skips_gate(self, tmp_path: Path): + engine, mgr = make_engine(str(tmp_path / "specs"), spec_review_handler=None) + plan = make_plan(plan_id="plan-1") + plan_result = make_plan_result() + + with patch.object(engine._planner, "generate_plan", AsyncMock(return_value=plan)): + with patch_executor(plan_result): + events = [ + e + async for e in engine.execute_stream( + messages=[{"role": "user", "content": "do a complex task"}], + ) + ] + + event_types = [e.event_type for e in events] + # Spec still created, but no review gate events. + assert "spec_created" in event_types + assert "spec_review_request" not in event_types + assert "spec_review_reply" not in event_types + assert "final_answer" in event_types + + async def test_spec_manager_none_handler_set_skips_gate(self, tmp_path: Path): + # handler set but spec_manager None -> gate skipped with a warning, + # execution proceeds (no crash, no spec_review events). + async def handler(spec_id, goal, steps): # pragma: no cover - never reached + return ("approved", "") + + engine = PlanExecEngine(llm_gateway=None, spec_manager=None, spec_review_handler=handler) + plan = make_plan(plan_id="plan-1") + plan_result = make_plan_result() + + with patch.object(engine._planner, "generate_plan", AsyncMock(return_value=plan)): + with patch_executor(plan_result): + events = [ + e + async for e in engine.execute_stream( + messages=[{"role": "user", "content": "do a complex task"}], + ) + ] + + event_types = [e.event_type for e in events] + assert "spec_created" not in event_types # no spec_manager -> no spec + assert "spec_review_request" not in event_types + assert "final_answer" in event_types + + +# ── Error / failure paths ──────────────────────────────── + + +class TestHandlerRaises: + """Handler raises a non-timeout/cancel exception -> propagated.""" + + async def test_handler_value_error_propagates(self, tmp_path: Path): + async def handler(spec_id, goal, steps): + raise ValueError("handler blew up") + + engine, mgr = make_engine(str(tmp_path / "specs"), spec_review_handler=handler) + plan = make_plan(plan_id="plan-1") + plan_result = make_plan_result() + + with patch.object(engine._planner, "generate_plan", AsyncMock(return_value=plan)): + with patch_executor(plan_result): + with pytest.raises(ValueError, match="handler blew up"): + async for _ in engine.execute_stream( + messages=[{"role": "user", "content": "do a complex task"}], + ): + pass + + +class TestUnknownSpecReviewId: + """An unknown spec_review_id is ignored (no crash) — mirrors the WS loop.""" + + def test_unknown_id_ignored(self): + # Replicates the chat.py WS-loop guard: only known ids resolve a future. + pending: dict[str, asyncio.Future] = {} + loop = asyncio.new_event_loop() + try: + fut: asyncio.Future = loop.create_future() + pending["known-id"] = fut + # An unknown id must not raise (the loop logs + ignores). + unknown = "does-not-exist" + assert unknown not in pending # the guard the loop uses + # Known id resolves fine. + assert "known-id" in pending + finally: + loop.close() + + +# ── SpecManager integration ────────────────────────────── + + +class TestSpecManagerParkResume: + """park()/resume() round-trip; parked survives reload; confirm() works.""" + + def test_park_sets_status_parked(self, tmp_path: Path): + mgr = SpecManager(specs_dir=str(tmp_path / "specs")) + mgr.create(make_spec(spec_id="s1")) + parked = mgr.park("s1") + assert parked is not None + assert parked.status == "parked" + + def test_resume_sets_status_draft(self, tmp_path: Path): + mgr = SpecManager(specs_dir=str(tmp_path / "specs")) + mgr.create(make_spec(spec_id="s1")) + mgr.park("s1") + resumed = mgr.resume("s1") + assert resumed is not None + assert resumed.status == "draft" + + def test_resume_non_parked_is_noop(self, tmp_path: Path): + # ponytail: idempotent resume — no-op (returns spec unchanged) rather + # than raising on a double-resume. + mgr = SpecManager(specs_dir=str(tmp_path / "specs")) + mgr.create(make_spec(spec_id="s1")) + # status is "draft", not "parked" -> resume is a no-op. + result = mgr.resume("s1") + assert result is not None + assert result.status == "draft" + + def test_park_nonexistent_returns_none(self, tmp_path: Path): + mgr = SpecManager(specs_dir=str(tmp_path / "specs")) + assert mgr.park("nope") is None + + def test_resume_nonexistent_returns_none(self, tmp_path: Path): + mgr = SpecManager(specs_dir=str(tmp_path / "specs")) + assert mgr.resume("nope") is None + + def test_parked_survives_reload(self, tmp_path: Path): + # A fresh SpecManager instance loading from disk must see "parked". + specs_dir = str(tmp_path / "specs") + mgr1 = SpecManager(specs_dir=specs_dir) + mgr1.create(make_spec(spec_id="s1")) + mgr1.park("s1") + + mgr2 = SpecManager(specs_dir=specs_dir) + loaded = mgr2.get("s1") + assert loaded is not None + assert loaded.status == "parked" + + def test_confirm_still_works(self, tmp_path: Path): + # Backward compat: the existing confirm() REST endpoint path. + mgr = SpecManager(specs_dir=str(tmp_path / "specs")) + mgr.create(make_spec(spec_id="s1")) + confirmed = mgr.confirm("s1") + assert confirmed is not None + assert confirmed.status == "confirmed" + assert confirmed.confirmed_at is not None