diff --git a/src/agentkit/core/config_driven.py b/src/agentkit/core/config_driven.py index 822015b..33a762d 100644 --- a/src/agentkit/core/config_driven.py +++ b/src/agentkit/core/config_driven.py @@ -72,6 +72,11 @@ async def drain_pending_evolution_tasks() -> None: await asyncio.gather(*_pending_evolution_tasks, return_exceptions=True) +def get_evolution_dropped_count() -> int: + """Return the number of evolution tasks dropped due to backpressure.""" + return _evolution_dropped_count + + class AgentConfig: """Agent 配置模型,从 YAML 或 Dict 构建""" @@ -739,7 +744,20 @@ class ConfigDrivenAgent(BaseAgent, EvolutionMixin): Shared by all _handle_*_stream methods to avoid duplicating the message-rendering logic that mirrors the sync _handle_* methods. + + Portal path: if ``task.input_data["messages"]`` is present (a list of + ``{role, content}`` dicts), use those pre-built messages directly + instead of rendering the prompt template. This lets the portal route + through ``execute_stream`` (inheriting evolution hooks + trace_outcome + propagation) while keeping its external message-building logic. """ + prebuilt = task.input_data.get("messages") + if prebuilt is not None: + system_prompt = task.input_data.get("system_prompt") + user_messages = [m for m in prebuilt if m.get("role") != "system"] + if not user_messages: + user_messages = [{"role": "user", "content": str(task.input_data)}] + return system_prompt, user_messages variables = task.input_data.copy() variables["task_type"] = task.task_type if self._prompt_template: @@ -774,22 +792,35 @@ class ConfigDrivenAgent(BaseAgent, EvolutionMixin): token = CancellationToken() self._active_tokens[task.task_id] = token _stream_output: dict = {} + _stream_trace_outcome: str = "success" _stream_error: BaseException | None = None _stream_completed = False + _stream_started_at = datetime.now(timezone.utc) try: await self._register_mcp_tools() async for event in self.handle_task_stream(task): if event.event_type == "final_answer": _raw = event.data.get("output", "") _stream_output = {"content": _raw} if isinstance(_raw, str) else _raw + # PLAN_EXEC path may embed trace_outcome in final_answer. + _to = event.data.get("trace_outcome") + if _to: + _stream_trace_outcome = _to + elif event.event_type == "final_result": + # REACT path: final_result carries ReActResult.status. + _result = event.data.get("result") + if _result is not None: + _stream_trace_outcome = getattr(_result, "status", "success") yield event _stream_completed = True except asyncio.CancelledError as ce: # Cancellation must propagate, but hooks still fire (U2 edge case). _stream_error = ce + _stream_trace_outcome = "cancelled" raise except Exception as e: _stream_error = e + _stream_trace_outcome = "error" raise finally: # async generator 的 finally 在 generator 关闭时执行(GC/aclose/正常结束) @@ -797,6 +828,12 @@ class ConfigDrivenAgent(BaseAgent, EvolutionMixin): # KTD-4: lifecycle parity — fire evolution hooks fire-and-forget. try: now = datetime.now(timezone.utc) + # KTD-8: propagate trace_outcome into output_data so + # lifecycle._is_failure_path() can detect non-success outcomes. + if _stream_output: + _stream_output["trace_outcome"] = _stream_trace_outcome + else: + _stream_output = {"trace_outcome": _stream_trace_outcome} if _stream_error is not None: if isinstance(_stream_error, (asyncio.CancelledError, TaskCancelledError)): status = TaskStatus.CANCELLED @@ -810,17 +847,29 @@ class ConfigDrivenAgent(BaseAgent, EvolutionMixin): status=status, output_data=None, error_message=err_msg, - started_at=now, + started_at=_stream_started_at, completed_at=now, ) elif _stream_completed: + # KTD-8: map non-success trace_outcomes to FAILED. + if _stream_trace_outcome in ( + "gave_up_after_reflections", + "verify_failed", + "verify_quota_exhausted", + "failed", + ): + status = TaskStatus.FAILED + err_msg = _stream_trace_outcome + else: + status = TaskStatus.COMPLETED + err_msg = None result = TaskResult( task_id=task.task_id, agent_name=self.name, - status=TaskStatus.COMPLETED, + status=status, output_data=_stream_output, - error_message=None, - started_at=now, + error_message=err_msg, + started_at=_stream_started_at, completed_at=now, ) else: @@ -831,7 +880,7 @@ class ConfigDrivenAgent(BaseAgent, EvolutionMixin): status=TaskStatus.CANCELLED, output_data=None, error_message="stream closed before completion", - started_at=now, + started_at=_stream_started_at, completed_at=now, ) self._trigger_evolution_hooks(task, result) diff --git a/src/agentkit/core/plan_exec_engine.py b/src/agentkit/core/plan_exec_engine.py index cd64d47..0146f7b 100644 --- a/src/agentkit/core/plan_exec_engine.py +++ b/src/agentkit/core/plan_exec_engine.py @@ -121,6 +121,10 @@ class PlanExecEngine: # user's decision. None = skip the gate (backward compat — the engine # proceeds directly to execution after Spec persistence). spec_review_handler: SpecReviewHandler | None = None, + # KTD-2/R4: max reflections for ReActEngine reinjection→reflection + # escalation. Threaded through to each step's ReActEngine so the + # verify-failed path can escalate from reinjection to full reflection. + max_reflections: int = 2, ): """ Args: @@ -159,6 +163,8 @@ class PlanExecEngine: self._pitfall_detector = pitfall_detector # U8/R8: spec review gate handler. None = skip gate (backward compat). self._spec_review_handler = spec_review_handler + # KTD-2/R4: max reflections threaded to each step's ReActEngine. + self._max_reflections = max_reflections # U4/R11: copy the default to avoid mutating the module-level dict. self._phase_budgets = ( dict(phase_budgets) if phase_budgets is not None else dict(_DEFAULT_PHASE_BUDGETS) @@ -605,9 +611,10 @@ class PlanExecEngine: "output": output, "total_steps": len(state.trajectory), "total_tokens": state.total_tokens, - "plan_id": plan.plan_id, + "plan_id": current_plan.plan_id, "plan_status": plan_result.status.value, "replanned": state.replanned, + "trace_outcome": trace_outcome, }, ) @@ -637,7 +644,7 @@ class PlanExecEngine: async def _inject_pitfall_warnings( self, goal: str, - plan_steps: list[Any], + plan_steps: list[PlanStep], task_type: str, actor: str, system_prompt: str | None, @@ -1432,6 +1439,7 @@ class PlanExecEngine: verification_enabled=self._verification_enabled, verification_commands=self._verification_commands, phase_budgets=self._phase_budgets, + max_reflections=self._max_reflections, ) return PlanExecutor( agent_pool=step_executor, @@ -1590,11 +1598,13 @@ class ReActStepExecutor: model: str = "default", system_prompt: str | None = None, tools: list["Tool"] | None = None, - max_steps: int = 5, + max_steps: int = 10, confirmation_handler: Any | None = None, verification_enabled: bool = False, verification_commands: list[str] | None = None, phase_budgets: dict[str, int] | None = None, + # KTD-2/R4: threaded through to each step's ReActEngine. + max_reflections: int = 2, ): self._llm_gateway = llm_gateway self._messages = messages or [] @@ -1607,6 +1617,8 @@ class ReActStepExecutor: self._verification_commands = verification_commands # U4/R11: thread through to each step's ReActEngine. self._phase_budgets = phase_budgets + # KTD-2/R4: thread through to each step's ReActEngine. + self._max_reflections = max_reflections self._agents: dict[str, _ReActStepAgent] = {} async def create_agent_from_skill(self, skill_name: str): @@ -1623,6 +1635,7 @@ class ReActStepExecutor: verification_enabled=self._verification_enabled, verification_commands=self._verification_commands, phase_budgets=self._phase_budgets, + max_reflections=self._max_reflections, ) self._agents[skill_name] = agent return agent @@ -1642,6 +1655,7 @@ class ReActStepExecutor: verification_enabled=self._verification_enabled, verification_commands=self._verification_commands, phase_budgets=self._phase_budgets, + max_reflections=self._max_reflections, ) self._agents[key] = agent return agent @@ -1662,11 +1676,12 @@ class _ReActStepAgent: model: str = "default", system_prompt: str | None = None, tools: list["Tool"] | None = None, - max_steps: int = 5, + max_steps: int = 10, confirmation_handler: Any | None = None, verification_enabled: bool = False, verification_commands: list[str] | None = None, phase_budgets: dict[str, int] | None = None, + max_reflections: int = 2, ): self.name = name self._llm_gateway = llm_gateway @@ -1680,6 +1695,7 @@ class _ReActStepAgent: self._verification_commands = verification_commands # U4/R11: per-phase step quotas, passed to ReActEngine. self._phase_budgets = phase_budgets + self._max_reflections = max_reflections async def execute(self, task_msg: TaskMessage) -> "TaskResult": """执行步骤:通过 ReActEngine 循环调用""" @@ -1710,6 +1726,7 @@ class _ReActStepAgent: verification_enabled=self._verification_enabled, verification_commands=self._verification_commands, phase_budgets=self._phase_budgets, + max_reflections=self._max_reflections, ) # 构建 messages @@ -1728,7 +1745,13 @@ class _ReActStepAgent: now = datetime.now(timezone.utc) status = TaskStatus.COMPLETED.value - if react_result.status in ("timeout", "cancelled"): + if react_result.status in ( + "timeout", + "cancelled", + "verify_failed", + "gave_up_after_reflections", + "failed", + ): status = TaskStatus.FAILED.value return TaskResult( diff --git a/src/agentkit/core/react.py b/src/agentkit/core/react.py index 15bb080..f39991b 100644 --- a/src/agentkit/core/react.py +++ b/src/agentkit/core/react.py @@ -33,10 +33,12 @@ from agentkit.telemetry.metrics import ( agent_duration_histogram, ) +from agentkit.core.phase import PhaseState + if TYPE_CHECKING: from agentkit.core.compressor import CompressionStrategy from agentkit.core.middleware import MiddlewareChain - from agentkit.core.phase import PhasePolicy, PhaseState + from agentkit.core.phase import PhasePolicy from agentkit.core.sandbox import WorkspaceSandbox from agentkit.core.trace import TraceRecorder from agentkit.evolution.pitfall_detector import PitfallWarning @@ -420,8 +422,6 @@ class ReActEngine: """ if self._phase_policy is None or self._current_phase is None: return - from agentkit.core.phase import PhaseState - while self._current_phase not in (PhaseState.VERIFICATION, PhaseState.DELIVERY): nxt = self.advance_phase() if nxt is None: @@ -446,8 +446,6 @@ class ReActEngine: """ if self._phase_policy is None or self._current_phase is None: return None - from agentkit.core.phase import PhaseState - nxt = PhaseState.next_of(self._current_phase) if nxt is None: # Already at DELIVERY — return None to signal no transition. @@ -890,8 +888,8 @@ class ReActEngine: trace_outcome = "success" # U4/G1: verify 失败回灌计数器。受 max_steps 上限约束(不无限循环)。 - # U4/KTD-7: initialize from restored budget state (checkpoint resume). - reinjections = self._reflect_count + # U4/KTD-7: _reflect_count is initialized from restored budget state + # (checkpoint resume) and used directly — no redundant local copy. _loop_start = time.monotonic() while step < self._max_steps: @@ -913,9 +911,7 @@ class ReActEngine: and self._phase_policy is not None and self._current_phase is not None ): - from agentkit.core.phase import PhaseState as _PS - - if self._current_phase in (_PS.PLANNING, _PS.BUILDING): + if self._current_phase in (PhaseState.PLANNING, PhaseState.BUILDING): self._think_count += 1 think_quota = self._phase_budgets.get("think") if think_quota is not None and self._think_count >= think_quota: @@ -1547,7 +1543,7 @@ class ReActEngine: vresult = await vloop.verify() if not vresult.passed: if ( - reinjections < self._max_reinjections + self._reflect_count < self._max_reinjections and step < self._max_steps ): errors_text = "\n".join(vresult.errors) @@ -1557,7 +1553,6 @@ class ReActEngine: "content": (f"验证失败,错误如下:\n{errors_text}"), } ) - reinjections += 1 # U4/R10: track reflect count for # checkpoint reconstruction (KTD-7). self._reflect_count += 1 @@ -1574,7 +1569,7 @@ class ReActEngine: data={ "message": ( f"验证失败,已注入错误信息让 LLM 自纠正 " - f"(reinjection {reinjections}/{self._max_reinjections})" + f"(reinjection {self._reflect_count}/{self._max_reinjections})" ), "verify_errors": vresult.errors, }, @@ -1681,7 +1676,7 @@ class ReActEngine: logger.info( "Verification failed after %d reinjections, " "%d reflections, interrupting with verify log", - reinjections, + self._reflect_count, self._reflection_count, ) break @@ -2136,7 +2131,7 @@ class ReActEngine: in_verification = ( self._sandbox is not None and self._current_phase is not None - and self._current_phase.value == "verification" + and self._current_phase == PhaseState.VERIFICATION ) try: diff --git a/src/agentkit/core/sandbox.py b/src/agentkit/core/sandbox.py index 759aa0a..cbb943d 100644 --- a/src/agentkit/core/sandbox.py +++ b/src/agentkit/core/sandbox.py @@ -28,10 +28,24 @@ import contextlib import errno import logging import socket +import threading from pathlib import Path logger = logging.getLogger(__name__) +# Reentrancy counter for ``network_block``. Concurrent VERIFICATION phases +# (parallel PLAN_EXEC steps) each enter the context manager; only the first +# entry (0 -> 1) patches ``socket.socket.connect``, and only the last exit +# (1 -> 0) restores it. Naive save/restore would unpatch on the first exit +# while other phases are still expecting the block to be in effect, breaking +# sandboxing for any phase that started later. +# ponytail: process-wide counter — not subprocess-safe (inherited fork state +# is irrelevant because the monkey-patch lives in the parent's socket module). +_network_block_count: int = 0 +_network_block_lock = threading.Lock() +_original_socket_connect = socket.socket.connect +_original_socket_connect_ex = socket.socket.connect_ex + class SandboxNetworkBlockedError(RuntimeError): """Raised when a tool attempts an outbound network call under sandbox.""" @@ -115,17 +129,23 @@ class WorkspaceSandbox: """Block outbound network connections within the async context. Patches ``socket.socket.connect`` and ``connect_ex`` to raise / - return ``ECONNREFUSED`` respectively. Restores the originals on exit, - even if the wrapped code raises. + return ``ECONNREFUSED`` respectively. Restores the originals on the + last concurrent exit, even if the wrapped code raises. Already-connected sockets (e.g. an LLM gateway keep-alive pool) are unaffected — only *new* ``connect()`` calls are blocked. This is the correct granularity: the LLM gateway talks over its existing connection, while a tool trying to ``requests.get(...)`` makes a new connect and is rejected. + + Reentrancy: a module-level counter guards the patch. Concurrent + VERIFICATION phases (parallel PLAN_EXEC steps) each enter/exit; the + patch is engaged on count 0->1 and released on count 1->0. Without + this, the first exit would restore the original connect while later + phases are still expecting the block, terminating new LLM gateway / + Redis / PostgreSQL connections in those phases. """ - original_connect = socket.socket.connect - original_connect_ex = socket.socket.connect_ex + global _network_block_count # noqa: PLW0603 def _blocked_connect(self_sock, *args, **kwargs): # noqa: ANN001 raise SandboxNetworkBlockedError( @@ -136,15 +156,26 @@ class WorkspaceSandbox: # connect_ex returns an errno instead of raising (POSIX contract). return errno.ECONNREFUSED - socket.socket.connect = _blocked_connect # type: ignore[method-assign] - socket.socket.connect_ex = _blocked_connect_ex # type: ignore[method-assign] - logger.debug("sandbox: network block engaged") + with _network_block_lock: + _network_block_count += 1 + if _network_block_count == 1: + socket.socket.connect = _blocked_connect # type: ignore[method-assign] + socket.socket.connect_ex = _blocked_connect_ex # type: ignore[method-assign] + logger.debug("sandbox: network block engaged (count=1)") try: yield finally: - socket.socket.connect = original_connect # type: ignore[method-assign] - socket.socket.connect_ex = original_connect_ex # type: ignore[method-assign] - logger.debug("sandbox: network block released") + with _network_block_lock: + _network_block_count -= 1 + if _network_block_count == 0: + socket.socket.connect = _original_socket_connect # type: ignore[method-assign] + socket.socket.connect_ex = _original_socket_connect_ex # type: ignore[method-assign] + logger.debug("sandbox: network block released (count=0)") + else: + logger.debug( + "sandbox: network block still held (count=%d)", + _network_block_count, + ) def detect_verification_commands(workspace_root: str | Path | None) -> list[str]: diff --git a/src/agentkit/server/app.py b/src/agentkit/server/app.py index ed75eaa..988b31b 100644 --- a/src/agentkit/server/app.py +++ b/src/agentkit/server/app.py @@ -805,7 +805,14 @@ async def lifespan(app: FastAPI): try: from agentkit.core.config_driven import drain_pending_evolution_tasks - await drain_pending_evolution_tasks() + await asyncio.wait_for(drain_pending_evolution_tasks(), timeout=10.0) + except asyncio.TimeoutError: + from agentkit.core.config_driven import _pending_evolution_tasks + + logger.warning( + "drain_pending_evolution_tasks 超时 10s, %d 个任务被放弃", + len(_pending_evolution_tasks), + ) except Exception: logger.debug("drain_pending_evolution_tasks 异常已忽略", exc_info=True) diff --git a/src/agentkit/server/routes/chat.py b/src/agentkit/server/routes/chat.py index 354236b..3df74d0 100644 --- a/src/agentkit/server/routes/chat.py +++ b/src/agentkit/server/routes/chat.py @@ -1494,6 +1494,23 @@ async def _handle_chat_message( }, } ) + # U8/R8: persist the spec_review_request so it survives a page reload. + # The frontend reconstructs the pending review card from the restored + # message metadata (spec_review_id + goal + steps). + try: + await sm.append_message( + session_id=session_id, + role=MessageRole.ASSISTANT, + content=f"[Spec Review] {goal}", + metadata={ + "message_type": "spec_review_request", + "spec_review_id": spec_review_id, + "spec_review_goal": goal, + "spec_review_steps": steps, + }, + ) + except Exception: + logger.debug("Failed to persist spec_review_request", exc_info=True) loop = asyncio.get_running_loop() future: asyncio.Future[tuple[str, str]] = loop.create_future() @@ -1506,19 +1523,58 @@ async def _handle_chat_message( # "failed") so the user can resume on return. decision, feedback = await asyncio.wait_for(future, timeout=1800.0) logger.info(f"Spec review {spec_review_id} resolved: decision={decision!r}") + # Persist the decision so the frontend can show the outcome after + # a reload (e.g. timeout→parked transition the user never saw). + try: + await sm.append_message( + session_id=session_id, + role=MessageRole.ASSISTANT, + content=f"[Spec Review Decision] {decision}: {feedback}", + metadata={ + "message_type": "spec_review_reply", + "spec_review_id": spec_review_id, + "spec_review_decision": decision, + "spec_review_feedback": feedback, + }, + ) + except Exception: + logger.debug("Failed to persist spec_review_reply", exc_info=True) return decision, feedback except asyncio.TimeoutError: logger.warning(f"Spec review {spec_review_id} timed out (30 min)") + # Persist the timeout→parked transition so the frontend can show + # the parked state after a reload. + try: + await sm.append_message( + session_id=session_id, + role=MessageRole.ASSISTANT, + content=f"[Spec Review Timed Out] {spec_review_id}", + metadata={ + "message_type": "spec_review_reply", + "spec_review_id": spec_review_id, + "spec_review_decision": "parked", + "spec_review_feedback": "timed out (30 min)", + }, + ) + except Exception: + logger.debug("Failed to persist spec_review timeout", exc_info=True) raise finally: _pending_spec_reviews.pop(spec_review_id, None) - # Wire the handler onto a PlanExecEngine only (the WS PLAN_EXEC path uses - # a ReActEngine + phase_policy, where this is a no-op). Local import to - # avoid a top-level dependency that the WS path doesn't need. - from agentkit.core.plan_exec_engine import PlanExecEngine as _PlanExecEngine - - if isinstance(react_engine, _PlanExecEngine): + # U8/R8: spec review gate wiring. The WS PLAN_EXEC path uses + # ``_build_phase_engine`` which returns a ``ReActEngine`` with + # ``phase_policy`` (NOT a ``PlanExecEngine``), so the gate cannot be + # wired here — ``ReActEngine`` does not read ``_spec_review_handler``. + # The gate only fires when ``ConfigDrivenAgent.execute_stream`` → + # ``_handle_plan_exec_stream`` → ``PlanExecEngine.execute_stream`` runs, + # which is the portal/task path (not the WS chat path). + # ponytail: known ceiling — WS chat PLAN_EXEC (phase_policy mechanism) + # does not support spec review. Upgrade path: route WS PLAN_EXEC through + # ``ConfigDrivenAgent.execute_stream`` to unify with the portal path and + # inherit the gate. The ``_spec_review_handler`` closure + event handlers + # below are kept so the upgrade is a routing change, not a rewrite. + if hasattr(react_engine, "_spec_review_handler"): react_engine._spec_review_handler = _spec_review_handler logger.info( diff --git a/src/agentkit/server/routes/portal.py b/src/agentkit/server/routes/portal.py index 1661361..8001630 100644 --- a/src/agentkit/server/routes/portal.py +++ b/src/agentkit/server/routes/portal.py @@ -23,7 +23,7 @@ from pydantic import BaseModel from agentkit.core.config_driven import ConfigDrivenAgent from agentkit.core.event_queue import EventQueue -from agentkit.core.protocol import Event, TaskEventType, TaskStatus, TurnEventType +from agentkit.core.protocol import Event, TaskEventType, TaskMessage, TaskStatus, TurnEventType from agentkit.core.react import ReActEngine from agentkit.chat.skill_routing import ExecutionMode, SkillRoutingResult from agentkit.chat.request_preprocessor import RequestPreprocessor @@ -73,6 +73,42 @@ def _ensure_non_empty(text: str | None) -> str: return EMPTY_LLM_RESPONSE +def _build_portal_task( + *, + agent_name: str, + messages: list[dict[str, str]], + system_prompt: str | None, + timeout_seconds: float | None, + conversation_id: str | None = None, + task_id: str | None = None, +) -> TaskMessage: + """Construct a TaskMessage for routing through ConfigDrivenAgent.execute_stream. + + The portal builds messages externally (history + user message). The + ``messages`` key in input_data tells _build_llm_messages to use them + directly instead of rendering the prompt template. This lets the portal + inherit evolution hooks + trace_outcome propagation from execute_stream's + finally block (KTD-4/KTD-8). + """ + from datetime import datetime, timezone + + return TaskMessage( + task_id=task_id or str(uuid.uuid4()), + agent_name=agent_name, + task_type="chat", + priority=0, + input_data={ + "messages": messages, + "system_prompt": system_prompt, + "content": messages[-1].get("content", "") if messages else "", + }, + callback_url=None, + created_at=datetime.now(timezone.utc), + timeout_seconds=int(timeout_seconds) if timeout_seconds else 300, + conversation_id=conversation_id, + ) + + async def _emit_event_safe( event_queue: EventQueue | None, event_type: str, @@ -556,38 +592,39 @@ async def chat(request: ChatRequest, req: Request, _auth: None = Depends(_verify ) react_config = agent.get_react_config() - react_engine = getattr(agent, "_react_engine", None) - if react_engine is None: - react_engine = ReActEngine( + # KTD-4/KTD-8: route through ConfigDrivenAgent.execute_stream so the + # finally block fires evolution hooks + propagates trace_outcome. The + # portal builds messages externally; _build_portal_task packages them + # into a TaskMessage whose input_data["messages"] is used directly by + # _build_llm_messages (bypassing the prompt template). + _react_engine = getattr(agent, "_react_engine", None) + if _react_engine is None: + _react_engine = ReActEngine( llm_gateway=llm_gateway, max_steps=react_config["max_steps"], ) + agent._react_engine = _react_engine else: - react_engine.reset() + _react_engine.reset() messages = [{"role": "user", "content": request.message}] # Inject conversation history history_msgs = await _build_history_messages(conv.id) for hm in reversed(history_msgs): messages.insert(0, hm) - tools = agent.get_tools() - model = agent.get_model() system_prompt = getattr(agent, "_system_prompt", None) or agent.get_system_prompt() timeout_seconds = react_config["timeout_seconds"] + portal_task = _build_portal_task( + agent_name=agent.name, + messages=messages, + system_prompt=system_prompt, + timeout_seconds=timeout_seconds, + conversation_id=conv.id, + ) collected_output: list[str] = [] try: - # U2 verify: calls react_engine.execute_stream directly, bypassing - # ConfigDrivenAgent.execute_stream — evolution hooks NOT propagated - # here. Routing through agent.execute_stream is tracked separately. - async for event in react_engine.execute_stream( - messages=messages, - tools=tools, - model=model, - agent_name=agent.name, - system_prompt=system_prompt, - timeout_seconds=timeout_seconds, - ): + async for event in agent.execute_stream(portal_task): if event.event_type == "final_answer": collected_output.append(event.data.get("output", "")) except asyncio.CancelledError: @@ -684,34 +721,32 @@ async def chat_stream(request: ChatRequest, req: Request, _auth: None = Depends( ) react_config = agent.get_react_config() - react_engine = getattr(agent, "_react_engine", None) - if react_engine is None: - react_engine = ReActEngine( + # KTD-4/KTD-8: route through ConfigDrivenAgent.execute_stream + # (evolution hooks + trace_outcome propagation in finally block). + _react_engine = getattr(agent, "_react_engine", None) + if _react_engine is None: + _react_engine = ReActEngine( llm_gateway=llm_gateway, max_steps=react_config["max_steps"], ) + agent._react_engine = _react_engine else: - react_engine.reset() + _react_engine.reset() messages = [{"role": "user", "content": request.message}] - tools = agent.get_tools() - model = agent.get_model() system_prompt = getattr(agent, "_system_prompt", None) or agent.get_system_prompt() timeout_seconds = react_config["timeout_seconds"] + portal_task = _build_portal_task( + agent_name=agent.name, + messages=messages, + system_prompt=system_prompt, + timeout_seconds=timeout_seconds, + conversation_id=conv.id, + ) collected_output: list[str] = [] try: - # U2 verify: calls react_engine.execute_stream directly, bypassing - # ConfigDrivenAgent.execute_stream — evolution hooks NOT propagated - # here. Routing through agent.execute_stream is tracked separately. - async for event in react_engine.execute_stream( - messages=messages, - tools=tools, - model=model, - agent_name=agent.name, - system_prompt=system_prompt, - timeout_seconds=timeout_seconds, - ): + async for event in agent.execute_stream(portal_task): if event.event_type == "final_answer": collected_output.append(event.data.get("output", "")) yield { @@ -967,11 +1002,8 @@ def _derive_title_from_messages(messages: list) -> str: async def _execute_react_background( - react_engine: ReActEngine, + agent: ConfigDrivenAgent, messages: list[dict], - tools: list, - model: str, - agent_name: str, system_prompt: str | None, timeout_seconds: float | None, conv_id: str, @@ -987,6 +1019,10 @@ async def _execute_react_background( Results are always persisted to the conversation store, regardless of whether a WebSocket subscriber is active. Task status is tracked in TaskStore when provided. + + KTD-4/KTD-8: routes through ``agent.execute_stream`` (not + ``react_engine.execute_stream`` directly) so the finally block fires + evolution hooks and propagates trace_outcome. """ collected_output: list[str] = [] try: @@ -1005,17 +1041,15 @@ async def _execute_react_background( ): logger.warning("Failed to update TaskStore RUNNING", exc_info=True) - # U2 verify: calls react_engine.execute_stream directly, bypassing - # ConfigDrivenAgent.execute_stream — evolution hooks NOT propagated - # here. Routing through agent.execute_stream is tracked separately. - async for event in react_engine.execute_stream( + portal_task = _build_portal_task( + agent_name=agent.name, messages=messages, - tools=tools, - model=model, - agent_name=agent_name, system_prompt=system_prompt, timeout_seconds=timeout_seconds, - ): + conversation_id=conv_id, + task_id=task_id, + ) + async for event in agent.execute_stream(portal_task): if event.event_type == "final_answer": collected_output.append(event.data.get("output", "")) @@ -1219,6 +1253,14 @@ async def portal_websocket(websocket: WebSocket): task_id: str | None = None # Track the active background task so cancel can propagate to it. active_bg_task: asyncio.Task | None = None + # U8/R8: pending spec review futures. The portal WS path doesn't wire + # _spec_review_handler on the agent (the background task architecture + # makes EventQueue-based request/reply non-trivial), so this dict is + # typically empty. It exists so stale spec_review_reply messages from + # the frontend are handled gracefully instead of silently ignored. + # ponytail: upgrade path — wire _spec_review_handler via EventQueue + + # future, mirroring chat.py's _spec_review_handler closure. + pending_spec_reviews: dict[str, asyncio.Future[tuple[str, str]]] = {} try: while True: @@ -1256,6 +1298,32 @@ async def portal_websocket(websocket: WebSocket): await websocket.send_json({"type": "pong"}) continue + if msg_type == "spec_review_reply": + # U8/R8: mirror chat.py:1126 — resolve a pending spec review + # future. Typically a no-op in the portal WS path (the + # _spec_review_handler isn't wired), but handles stale replies + # gracefully. + spec_review_id = msg.get("spec_review_id") + decision = msg.get("decision", "rejected") + feedback = msg.get("feedback", "") + logger.info( + f"Received spec_review_reply: id={spec_review_id!r}, decision={decision!r}" + ) + if spec_review_id and spec_review_id in pending_spec_reviews: + fut = pending_spec_reviews[spec_review_id] + if not fut.done(): + fut.set_result((decision, feedback)) + else: + logger.warning( + f"spec_review_reply {spec_review_id!r} already resolved" + ) + else: + logger.warning( + f"spec_review_reply {spec_review_id!r} not found in " + f"pending_spec_reviews — ignoring" + ) + continue + if msg_type == "resume": # Frontend reconnected and wants to resume a running task resume_task_id = msg.get("task_id", "") @@ -1800,15 +1868,17 @@ async def portal_websocket(websocket: WebSocket): # Execute via ReAct stream react_config = agent.get_react_config() - # Reuse agent's ReActEngine if available (aligned with chat.py pattern) - react_engine = getattr(agent, "_react_engine", None) - if react_engine is None: - react_engine = ReActEngine( + # KTD-4/KTD-8: route through ConfigDrivenAgent.execute_stream + # (evolution hooks + trace_outcome propagation in finally block). + _react_engine = getattr(agent, "_react_engine", None) + if _react_engine is None: + _react_engine = ReActEngine( llm_gateway=llm_gateway, max_steps=react_config["max_steps"], ) + agent._react_engine = _react_engine else: - react_engine.reset() + _react_engine.reset() messages = [{"role": "user", "content": message_text}] # Inject conversation history for context continuity @@ -1829,11 +1899,8 @@ async def portal_websocket(websocket: WebSocket): # background task continues running and persists the result. bg_task = asyncio.create_task( _execute_react_background( - react_engine=react_engine, + agent=agent, messages=messages, - tools=tools, - model=model, - agent_name=agent.name, system_prompt=system_prompt, timeout_seconds=timeout_seconds, conv_id=conv.id, diff --git a/tests/unit/server/test_portal_ws_background.py b/tests/unit/server/test_portal_ws_background.py index 4da3089..85bb8f4 100644 --- a/tests/unit/server/test_portal_ws_background.py +++ b/tests/unit/server/test_portal_ws_background.py @@ -38,10 +38,12 @@ class FakeConversationStore: class FakeReactEngine: """Fake ReAct engine that yields events from a predefined list.""" + name = "test-agent" + def __init__(self, events: list[Event]) -> None: self._events = events - async def execute_stream(self, **kwargs): + async def execute_stream(self, task): for event in self._events: yield event @@ -49,11 +51,13 @@ class FakeReactEngine: class FailingReactEngine: """Fake ReAct engine that raises an exception after yielding some events.""" + name = "test-agent" + def __init__(self, events: list[Event], error: Exception) -> None: self._events = events self._error = error - async def execute_stream(self, **kwargs): + async def execute_stream(self, task): for event in self._events: yield event raise self._error @@ -76,11 +80,13 @@ def _make_event( class SlowFakeReactEngine: """Fake ReAct engine with a delay to allow status checks during execution.""" + name = "test-agent" + def __init__(self, events: list[Event], delay: float = 0.1) -> None: self._events = events self._delay = delay - async def execute_stream(self, **kwargs): + async def execute_stream(self, task): for event in self._events: await asyncio.sleep(self._delay) yield event @@ -93,11 +99,13 @@ class CancellableReactEngine: Event so the test can cancel the task and verify CancelledError cleanup. """ + name = "test-agent" + def __init__(self, first_event: Event) -> None: self._first_event = first_event self.started = asyncio.Event() - async def execute_stream(self, **kwargs): + async def execute_stream(self, task): yield self._first_event self.started.set() # Block forever until cancelled @@ -130,11 +138,8 @@ class TestExecuteReactBackground: eq = EventQueue() await _execute_react_background( - react_engine=engine, + agent=engine, messages=[], - tools=[], - model="test-model", - agent_name="test-agent", system_prompt=None, timeout_seconds=None, conv_id="test-conv", @@ -162,11 +167,8 @@ class TestExecuteReactBackground: eq = EventQueue() await _execute_react_background( - react_engine=engine, + agent=engine, messages=[], - tools=[], - model="test-model", - agent_name="test-agent", system_prompt=None, timeout_seconds=None, conv_id="test-conv", @@ -190,11 +192,8 @@ class TestExecuteReactBackground: eq = EventQueue() await _execute_react_background( - react_engine=engine, + agent=engine, messages=[], - tools=[], - model="test-model", - agent_name="test-agent", system_prompt=None, timeout_seconds=None, conv_id="test-conv", @@ -228,11 +227,8 @@ class TestExecuteReactBackground: await asyncio.sleep(0.05) await _execute_react_background( - react_engine=engine, + agent=engine, messages=[], - tools=[], - model="test-model", - agent_name="test-agent", system_prompt=None, timeout_seconds=None, conv_id="test-conv", @@ -270,11 +266,8 @@ class TestExecuteReactBackground: await asyncio.sleep(0.05) await _execute_react_background( - react_engine=engine, + agent=engine, messages=[], - tools=[], - model="test-model", - agent_name="test-agent", system_prompt=None, timeout_seconds=None, conv_id="test-conv", @@ -318,11 +311,8 @@ class TestTaskStoreIntegration: # Start background task bg_task = asyncio.create_task( _execute_react_background( - react_engine=engine, + agent=engine, messages=[], - tools=[], - model="test-model", - agent_name="test-agent", system_prompt=None, timeout_seconds=None, conv_id="test-conv", @@ -365,11 +355,8 @@ class TestTaskStoreIntegration: ) await _execute_react_background( - react_engine=engine, + agent=engine, messages=[], - tools=[], - model="test-model", - agent_name="test-agent", system_prompt=None, timeout_seconds=None, conv_id="test-conv", @@ -394,11 +381,8 @@ class TestTaskStoreIntegration: # Should not raise await _execute_react_background( - react_engine=engine, + agent=engine, messages=[], - tools=[], - model="test-model", - agent_name="test-agent", system_prompt=None, timeout_seconds=None, conv_id="test-conv", @@ -552,11 +536,8 @@ class TestCancelledErrorPath: bg_task = asyncio.create_task( _execute_react_background( - react_engine=engine, + agent=engine, messages=[], - tools=[], - model="test-model", - agent_name="test-agent", system_prompt=None, timeout_seconds=None, conv_id="test-conv", @@ -590,11 +571,8 @@ class TestCancelledErrorPath: bg_task = asyncio.create_task( _execute_react_background( - react_engine=engine, + agent=engine, messages=[], - tools=[], - model="test-model", - agent_name="test-agent", system_prompt=None, timeout_seconds=None, conv_id="test-conv", @@ -636,11 +614,8 @@ class TestCancelledErrorPath: bg_task = asyncio.create_task( _execute_react_background( - react_engine=engine, + agent=engine, messages=[], - tools=[], - model="test-model", - agent_name="test-agent", system_prompt=None, timeout_seconds=None, conv_id="test-conv", @@ -769,11 +744,8 @@ class TestCancelPropagation: # Simulate the background task as portal.py would create it active_bg_task: asyncio.Task | None = asyncio.create_task( _execute_react_background( - react_engine=engine, + agent=engine, messages=[], - tools=[], - model="test-model", - agent_name="test-agent", system_prompt=None, timeout_seconds=None, conv_id="cancel-conv", @@ -814,11 +786,8 @@ class TestCancelPropagation: bg_task = asyncio.create_task( _execute_react_background( - react_engine=engine, + agent=engine, messages=[], - tools=[], - model="test-model", - agent_name="test-agent", system_prompt=None, timeout_seconds=None, conv_id="test-conv", @@ -865,11 +834,8 @@ class TestWebSocketDisconnectNoCancel: # Start the background task (as portal.py would) bg_task = asyncio.create_task( _execute_react_background( - react_engine=engine, + agent=engine, messages=[], - tools=[], - model="test-model", - agent_name="test-agent", system_prompt=None, timeout_seconds=None, conv_id="test-conv", @@ -912,11 +878,8 @@ class TestWebSocketDisconnectNoCancel: bg_task = asyncio.create_task( _execute_react_background( - react_engine=engine, + agent=engine, messages=[], - tools=[], - model="test-model", - agent_name="test-agent", system_prompt=None, timeout_seconds=None, conv_id="resume-conv", diff --git a/tests/unit/test_execute_stream_hooks.py b/tests/unit/test_execute_stream_hooks.py index 0098e78..83a6e94 100644 --- a/tests/unit/test_execute_stream_hooks.py +++ b/tests/unit/test_execute_stream_hooks.py @@ -112,7 +112,8 @@ class TestExecuteStreamHooks: assert events[0].event_type == "final_answer" assert len(fired) == 1 assert fired[0].status == TaskStatus.COMPLETED - assert fired[0].output_data == {"content": "hello world"} + # KTD-8: output_data includes trace_outcome for lifecycle._is_failure_path() + assert fired[0].output_data == {"content": "hello world", "trace_outcome": "success"} async def test_failure_fires_on_task_failed(self): """Stream exception fires evolve_after_task with FAILED status."""