diff --git a/src/agentkit/core/base.py b/src/agentkit/core/base.py index 509675f..9d20558 100644 --- a/src/agentkit/core/base.py +++ b/src/agentkit/core/base.py @@ -246,7 +246,7 @@ class BaseAgent(ABC): self._redis = aioredis.from_url(redis_url, decode_responses=True) await self._redis.ping() logger.info(f"Agent '{self.name}' connected to Redis") - except Exception as e: + except (ConnectionError, OSError, asyncio.TimeoutError, ValueError) as e: self._redis = None logger.warning( f"Agent '{self.name}' Redis unavailable: {e}, falling back to local mode" @@ -380,7 +380,10 @@ class BaseAgent(ABC): # 失败钩子 try: await self.on_task_failed(task, TaskCancelledError(task.task_id)) + except asyncio.CancelledError: + raise except Exception as hook_err: + # 用户提供的 hook — 任意异常都可能,不阻塞 TaskResult 构建 logger.error(f"on_task_failed hook error: {hook_err}") elapsed = time.monotonic() - start_time @@ -408,7 +411,10 @@ class BaseAgent(ABC): await self.on_task_failed( task, TaskTimeoutError(task.task_id, task.timeout_seconds) ) + except asyncio.CancelledError: + raise except Exception as hook_err: + # 用户提供的 hook — 任意异常都可能,不阻塞 TaskResult 构建 logger.error(f"on_task_failed hook error: {hook_err}") elapsed = time.monotonic() - start_time @@ -427,12 +433,20 @@ class BaseAgent(ABC): }, ) + except asyncio.CancelledError: + # CancelledError 必须传播,不被 except Exception 吞掉 + raise + except Exception as e: + # 框架边界 catch-all:handle_task 是用户实现,可能抛任意异常; + # execute() 契约要求始终返回 TaskResult,故保留兜底。 logger.error(f"Agent '{self.name}' task {task.task_id} failed: {e}") # 失败钩子 try: await self.on_task_failed(task, e) + except asyncio.CancelledError: + raise except Exception as hook_err: logger.error(f"on_task_failed hook error: {hook_err}") @@ -517,13 +531,13 @@ class BaseAgent(ABC): f"agent:{self.name}:progress", json.dumps(progress_obj.to_dict()), ) - except Exception as e: + except (ConnectionError, asyncio.TimeoutError, OSError) as e: logger.warning(f"Failed to publish progress for task {task_id}: {e}") if self._dispatcher is not None: try: await self._dispatcher.handle_progress(progress_obj) - except Exception as e: + except (asyncio.TimeoutError, ConnectionError, RuntimeError) as e: logger.warning( f"Failed to report progress to dispatcher for task {task_id}: {e}" ) @@ -544,7 +558,7 @@ class BaseAgent(ABC): await asyncio.sleep(30) except asyncio.CancelledError: pass - except Exception as e: + except (ConnectionError, asyncio.TimeoutError, OSError, RuntimeError) as e: logger.error(f"Heartbeat error for agent '{self.name}': {e}") async def _listen_for_tasks(self): @@ -565,11 +579,11 @@ class BaseAgent(ABC): task_data = json.loads(task_json) task = TaskMessage.from_dict(task_data) asyncio.create_task(self._execute_task_with_semaphore(task)) - except Exception as e: + except (json.JSONDecodeError, KeyError, TypeError, ValueError) as e: logger.error(f"Failed to parse task message: {e}") except asyncio.CancelledError: pass - except Exception as e: + except (ConnectionError, asyncio.TimeoutError, OSError, RuntimeError) as e: logger.error(f"Task listener error for agent '{self.name}': {e}") async def _execute_task_with_semaphore(self, task: TaskMessage): @@ -593,7 +607,13 @@ class BaseAgent(ABC): if self._redis is not None and self._dispatcher is not None: await self._dispatcher.handle_result(result) + except asyncio.CancelledError: + # CancelledError 必须传播,不被 except 吞掉 + raise + except Exception as e: + # 兜底:execute() 内部已捕获大部分异常并返回 TaskResult, + # 此处仅捕获 dispatcher 失败或 execute() 边界外的异常 logger.error(f"Agent '{self.name}' task {task.task_id} failed: {e}") error_result = TaskResult( task_id=task.task_id, @@ -622,5 +642,6 @@ class BaseAgent(ABC): jsonschema.validate(data, schema) except ImportError: logger.warning("jsonschema not installed, skipping input validation") - except Exception as e: + except (ValueError, TypeError, KeyError) as e: + # jsonschema.ValidationError 继承 ValueError;其余为 schema/data 类型错误 raise SchemaValidationError(self.name, str(e)) diff --git a/src/agentkit/core/dispatcher.py b/src/agentkit/core/dispatcher.py index 5463343..579d81c 100644 --- a/src/agentkit/core/dispatcher.py +++ b/src/agentkit/core/dispatcher.py @@ -3,6 +3,7 @@ 与业务系统解耦:通过依赖注入获取 Redis 连接和数据库会话。 """ +import asyncio import ipaddress import json import logging @@ -12,7 +13,6 @@ from typing import Any, Callable, Awaitable from urllib.parse import urlparse from agentkit.core.exceptions import ( - NoAvailableAgentError, TaskDispatchError, TaskNotFoundError, ) @@ -51,7 +51,7 @@ def _validate_callback_url(url: str) -> bool: """ try: parsed = urlparse(url) - except Exception: + except (ValueError, TypeError): return False if parsed.scheme not in ("http", "https"): @@ -159,7 +159,7 @@ class TaskDispatcher: except TaskDispatchError: raise - except Exception as e: + except (ConnectionError, OSError, asyncio.TimeoutError, ValueError, KeyError, RuntimeError) as e: await db.rollback() logger.error(f"Failed to dispatch task {task.task_id}: {e}") raise TaskDispatchError(task.task_id, str(e)) @@ -197,7 +197,7 @@ class TaskDispatcher: except TaskNotFoundError: raise - except Exception as e: + except (ConnectionError, OSError, asyncio.TimeoutError, ValueError, KeyError, RuntimeError) as e: await db.rollback() logger.error(f"Failed to cancel task {task_id}: {e}") raise @@ -263,7 +263,7 @@ class TaskDispatcher: logger.info(f"Task {result.task_id} result handled (status={result.status})") - except Exception as e: + except (ConnectionError, OSError, asyncio.TimeoutError, ValueError, KeyError, RuntimeError) as e: await db.rollback() logger.error(f"Failed to handle result for task {result.task_id}: {e}") @@ -295,7 +295,7 @@ class TaskDispatcher: ) await db.commit() - except Exception as e: + except (ConnectionError, OSError, asyncio.TimeoutError, ValueError, KeyError, RuntimeError) as e: await db.rollback() logger.error(f"Failed to handle progress for task {progress.task_id}: {e}") @@ -359,7 +359,7 @@ class TaskDispatcher: if retried > 0: logger.info(f"Retried {retried} failed tasks") - except Exception as e: + except (ConnectionError, OSError, asyncio.TimeoutError, ValueError, KeyError, RuntimeError) as e: await db.rollback() logger.error(f"Failed to retry failed tasks: {e}") @@ -392,7 +392,7 @@ class TaskDispatcher: async with httpx.AsyncClient(timeout=10) as client: await client.post(callback_url, json=result.to_dict()) logger.info(f"Callback triggered for task {result.task_id}") - except Exception as e: + except (ConnectionError, OSError, asyncio.TimeoutError, RuntimeError) as e: logger.warning(f"Callback failed for task {result.task_id}: {e}") def _task_to_dict(self, task: Any) -> dict: diff --git a/src/agentkit/core/orchestrator.py b/src/agentkit/core/orchestrator.py index 2abe8d0..8264138 100644 --- a/src/agentkit/core/orchestrator.py +++ b/src/agentkit/core/orchestrator.py @@ -12,7 +12,8 @@ from dataclasses import dataclass, field from enum import Enum from typing import TYPE_CHECKING, Any -from agentkit.core.protocol import TaskMessage, TaskResult, TaskStatus +from agentkit.core.exceptions import LLMProviderError +from agentkit.core.protocol import TaskMessage, TaskStatus from agentkit.core.shared_workspace import SharedWorkspace if TYPE_CHECKING: @@ -224,7 +225,7 @@ class Orchestrator: subtasks=subtasks, parallel_groups=parallel_groups, ) - except Exception as e: + except (RuntimeError, ValueError, KeyError, AttributeError) as e: logger.warning(f"GoalPlanner decomposition failed, falling back: {e}") # If LLM gateway available, use it for decomposition @@ -239,7 +240,7 @@ class Orchestrator: subtasks=subtasks, parallel_groups=parallel_groups, ) - except Exception as e: + except (LLMProviderError, asyncio.TimeoutError, ConnectionError, ValueError, TypeError, KeyError) as e: logger.warning(f"LLM decomposition failed, falling back to simple: {e}") # Fallback: single subtask = original task @@ -418,7 +419,7 @@ class Orchestrator: "status": "completed", }, )) - except Exception as e: + except (ConnectionError, RuntimeError, OSError) as e: logger.warning(f"Failed to publish progress via MessageBus: {e}") return output @@ -437,10 +438,12 @@ class Orchestrator: "error": "Subtask timed out", }, )) - except Exception as e: + except (ConnectionError, RuntimeError, OSError) as e: logger.warning(f"Failed to publish progress via MessageBus: {e}") return error_result - except Exception as e: + except asyncio.CancelledError: + raise + except (RuntimeError, ValueError, KeyError, AttributeError, ConnectionError, LLMProviderError) as e: error_result = {"status": "failed", "error": str(e)} if self._message_bus is not None: try: @@ -455,7 +458,7 @@ class Orchestrator: "error": str(e), }, )) - except Exception as e: + except (ConnectionError, RuntimeError, OSError) as e: logger.warning(f"Failed to publish progress via MessageBus: {e}") return error_result @@ -513,7 +516,7 @@ class Orchestrator: try: agents_info = self._agent_pool.list_agents() return [a["name"] for a in agents_info] - except Exception: + except (RuntimeError, KeyError, AttributeError): return [] def _convert_execution_plan_to_subtasks( @@ -561,7 +564,7 @@ class Orchestrator: description = agent.get("description", "").lower() if skill.lower() in name.lower() or skill.lower() in agent_type.lower() or skill.lower() in description: return name - except Exception: + except (RuntimeError, KeyError, AttributeError): pass return None @@ -580,9 +583,6 @@ class Orchestrator: Returns: OrchestrationResult: 编排结果,metadata 中包含迭代历史 """ - import time as _time - - start_time = _time.monotonic() iteration_history: list[dict[str, Any]] = [] # First execution @@ -650,7 +650,7 @@ class Orchestrator: try: return await self._llm_evaluate(task, result) - except Exception as e: + except (LLMProviderError, asyncio.TimeoutError, ConnectionError, ValueError, RuntimeError) as e: logger.warning(f"LLM evaluation failed, falling back to rule-based: {e}") return self._rule_based_evaluate(result) diff --git a/src/agentkit/core/plan_exec_engine.py b/src/agentkit/core/plan_exec_engine.py index add12f6..069c04c 100644 --- a/src/agentkit/core/plan_exec_engine.py +++ b/src/agentkit/core/plan_exec_engine.py @@ -18,7 +18,7 @@ from dataclasses import dataclass, field from datetime import datetime, timezone from typing import TYPE_CHECKING, Any, Awaitable, Callable -from agentkit.core.exceptions import TaskCancelledError, TaskTimeoutError +from agentkit.core.exceptions import LLMProviderError, TaskCancelledError, TaskTimeoutError from agentkit.core.goal_planner import GoalPlanner from agentkit.core.plan_executor import PlanExecutor, PlanExecutionResult from agentkit.core.plan_schema import ExecutionPlan, PlanStep, PlanStepStatus @@ -214,7 +214,7 @@ class PlanExecEngine: system_prompt += f"\n\n## 参考信息\n{memory_context}" else: system_prompt = f"## 参考信息\n{memory_context}" - except Exception as e: + except (asyncio.TimeoutError, ConnectionError, LLMProviderError) as e: logger.warning(f"Memory retrieval failed, continuing without context: {e}") # 启动轨迹记录 @@ -440,7 +440,7 @@ class PlanExecEngine: value={"output_summary": summary, "agent_name": agent_name}, metadata={"task_type": task_type, "outcome": trace_outcome}, ) - except Exception as e: + except (asyncio.TimeoutError, ConnectionError, ValueError) as e: logger.warning(f"Failed to store task result in episodic memory: {e}") # ------------------------------------------------------------------ @@ -477,7 +477,7 @@ class PlanExecEngine: system_prompt += f"\n\n## 参考信息\n{memory_context}" else: system_prompt = f"## 参考信息\n{memory_context}" - except Exception as e: + except (asyncio.TimeoutError, ConnectionError, LLMProviderError) as e: logger.warning(f"Memory retrieval failed, continuing without context: {e}") # 启动轨迹记录 @@ -514,7 +514,7 @@ class PlanExecEngine: "goal": plan.goal, "steps": [s.to_dict() for s in plan.steps], }) - except Exception as e: + except (RuntimeError, ValueError, TypeError, KeyError, AttributeError, ConnectionError, asyncio.TimeoutError) as e: logger.warning(f"Step event callback failed: {e}") trajectory.append(ReActStep( @@ -535,7 +535,7 @@ class PlanExecEngine: "goal": spec.goal, "num_steps": len(spec.steps), }) - except Exception as e: + except (RuntimeError, ValueError, TypeError, KeyError, AttributeError, ConnectionError, asyncio.TimeoutError) as e: logger.warning(f"Step event callback failed: {e}") if trace_recorder is not None: @@ -604,7 +604,7 @@ class PlanExecEngine: value={"output_summary": summary, "agent_name": agent_name}, metadata={"task_type": task_type, "outcome": trace_outcome}, ) - except Exception as e: + except (asyncio.TimeoutError, ConnectionError, ValueError) as e: logger.warning(f"Failed to store task result in episodic memory: {e}") async def _execute_with_replanning( @@ -685,7 +685,7 @@ class PlanExecEngine: "result": step_result.result, "error": step_result.error, }) - except Exception as e: + except (RuntimeError, ValueError, TypeError, KeyError, AttributeError, ConnectionError, asyncio.TimeoutError) as e: logger.warning(f"Step event callback failed: {e}") if trace_recorder is not None: @@ -733,7 +733,7 @@ class PlanExecEngine: "root_cause": reflection_report.root_cause, "new_plan_id": current_plan.plan_id, }) - except Exception as e: + except (RuntimeError, ValueError, TypeError, KeyError, AttributeError, ConnectionError, asyncio.TimeoutError) as e: logger.warning(f"Step event callback failed: {e}") trajectory.append(ReActStep( diff --git a/src/agentkit/core/react.py b/src/agentkit/core/react.py index 8d84faa..9a0431c 100644 --- a/src/agentkit/core/react.py +++ b/src/agentkit/core/react.py @@ -15,7 +15,7 @@ from dataclasses import dataclass, field from datetime import datetime, timezone from typing import TYPE_CHECKING, Any -from agentkit.core.exceptions import LoopDetectedError, TaskCancelledError, TaskTimeoutError +from agentkit.core.exceptions import LLMProviderError, LoopDetectedError, TaskCancelledError, TaskTimeoutError from agentkit.core.protocol import CancellationToken from agentkit.llm.gateway import LLMGateway from agentkit.llm.protocol import LLMResponse @@ -659,7 +659,8 @@ class ReActEngine: ) or "" ) - except Exception as e: + except (asyncio.TimeoutError, ConnectionError, LLMProviderError, RuntimeError) as e: + # 检索层故障(RAG/Redis/LLM embedding)— 不阻塞主流程 logger.warning( f"Memory retrieval failed, continuing without context: {e}", exc_info=True ) @@ -679,7 +680,8 @@ class ReActEngine: if compressor: try: conversation = await compressor.compress(conversation) - except Exception as e: + except (asyncio.TimeoutError, ConnectionError, LLMProviderError, RuntimeError) as e: + # 压缩器通常调用 LLM — LLM 不可用类异常降级为原对话 logger.warning( f"Context compression failed, continuing with original messages: {e}" ) @@ -1052,7 +1054,11 @@ class ReActEngine: approved = await confirmation_handler( confirmation_id, command, reason ) + except asyncio.CancelledError: + raise except Exception as e: + # 用户提供的 confirmation_handler — 任意异常都可能, + # 不阻塞主循环,降级为未批准 logger.warning(f"Confirmation handler error: {e}") if approved: @@ -1066,9 +1072,10 @@ class ReActEngine: clean_args["_skip_dangerous_check"] = True try: tool_result = await tool.safe_execute(**clean_args) - except Exception as e: + except (ToolValidationError, ValueError, TypeError, RuntimeError) as e: tool_result = { - "error": f"Tool '{tc.name}' execution failed: {e}" + "error": f"Tool '{tc.name}' execution failed: {e}", + "error_code": "tool_execution_failed", } else: clean_args = { @@ -1083,9 +1090,10 @@ class ReActEngine: if tool else {"error": f"Tool '{tc.name}' not found"} ) - except Exception as e: + except (ToolValidationError, ValueError, TypeError, RuntimeError) as e: tool_result = { - "error": f"Tool '{tc.name}' execution failed: {e}" + "error": f"Tool '{tc.name}' execution failed: {e}", + "error_code": "tool_execution_failed", } yield ReActEvent( @@ -1146,7 +1154,7 @@ class ReActEngine: if self._should_compress(conversation, compressor): try: conversation = await compressor.compress(conversation) - except Exception as e: + except (asyncio.TimeoutError, ConnectionError, LLMProviderError, RuntimeError) as e: logger.warning(f"Incremental compression failed: {e}") else: @@ -1217,7 +1225,7 @@ class ReActEngine: if self._should_compress(conversation, compressor): try: conversation = await compressor.compress(conversation) - except Exception as e: + except (asyncio.TimeoutError, ConnectionError, LLMProviderError, RuntimeError) as e: logger.warning(f"Incremental compression failed: {e}") else: # ponytail: 检查是否为畸形工具调用(含 但解析失败) @@ -1332,7 +1340,7 @@ class ReActEngine: reinjections, ) break - except Exception as e: + except (asyncio.TimeoutError, ConnectionError, LLMProviderError, RuntimeError) as e: logger.warning(f"Verification loop failed: {e}") # Yield final_answer event (legacy format for execute_stream consumers) @@ -1428,7 +1436,8 @@ class ReActEngine: value={"output_summary": summary, "agent_name": agent_name}, metadata={"task_type": task_type, "outcome": trace_outcome}, ) - except Exception as e: + except (asyncio.TimeoutError, ConnectionError, ValueError) as e: + # EpisodicMemory 持久化故障(PG/Redis)— 不影响主结果 logger.warning(f"Failed to store task result in episodic memory: {e}") async def execute_stream( @@ -1555,7 +1564,7 @@ class ReActEngine: """通过 gateway 查询 model 对应的 provider 名。失败回退 None(字符串拼接)。""" try: return self._llm_gateway.get_provider_name_for_model(model) - except Exception: + except (AttributeError, KeyError, LLMProviderError): # ponytail: 测试中 gateway 可能是 MagicMock,无该方法;回退保守路径 return None @@ -1723,7 +1732,7 @@ class ReActEngine: if compressor and tool_name: try: content = await compressor.compress_tool_result(tool_name, result) - except Exception as e: + except (asyncio.TimeoutError, ConnectionError, LLMProviderError, RuntimeError) as e: logger.warning(f"Tool result compression failed for '{tool_name}': {e}") content = str(result) return { @@ -1771,10 +1780,11 @@ class ReActEngine: "error_code": e.error_code, "details": e.details, } - except Exception as e: + except (ValueError, TypeError, RuntimeError, asyncio.TimeoutError) as e: + # 工具执行失败 — 记录结构化错误码,LLM 可在下一步调整策略 error_msg = f"Tool '{tool_name}' execution failed: {e}" logger.warning(error_msg) - return {"error": error_msg} + return {"error": error_msg, "error_code": "tool_execution_failed"} async def _execute_tool_with_confirmation( self, @@ -1818,7 +1828,10 @@ class ReActEngine: if confirmation_handler is not None: try: approved = await confirmation_handler(confirmation_id, command, reason) + except asyncio.CancelledError: + raise except Exception as e: + # 用户提供的 confirmation_handler — 任意异常都可能,不阻塞主循环 logger.warning(f"Confirmation handler error: {e}") if approved: @@ -1829,8 +1842,11 @@ class ReActEngine: clean_args["_skip_dangerous_check"] = True try: tool_result = await tool.safe_execute(**clean_args) - except Exception as e: - tool_result = {"error": f"Tool '{tc.name}' execution failed: {e}"} + except (ToolValidationError, ValueError, TypeError, RuntimeError) as e: + tool_result = { + "error": f"Tool '{tc.name}' execution failed: {e}", + "error_code": "tool_execution_failed", + } else: # Non-dangerous tool: re-execute with skip flag clean_args = {k: v for k, v in tc.arguments.items() if not k.startswith("_")} @@ -1841,7 +1857,7 @@ class ReActEngine: if tool else {"error": f"Tool '{tc.name}' not found"} ) - except Exception as e: + except (ToolValidationError, ValueError, TypeError, RuntimeError) as e: tool_result = {"error": f"Tool '{tc.name}' execution failed: {e}"} events.append( diff --git a/src/agentkit/core/rewoo.py b/src/agentkit/core/rewoo.py index a3fb88c..1d19d37 100644 --- a/src/agentkit/core/rewoo.py +++ b/src/agentkit/core/rewoo.py @@ -11,23 +11,21 @@ import logging import re import time from dataclasses import dataclass, field -from datetime import datetime, timezone from typing import TYPE_CHECKING, Any -from agentkit.core.exceptions import TaskCancelledError, TaskTimeoutError +from agentkit.core.exceptions import LLMProviderError, TaskCancelledError, TaskTimeoutError from agentkit.core.protocol import CancellationToken from agentkit.core.react import ReActEngine, ReActEvent, ReActResult, ReActStep from agentkit.llm.gateway import LLMGateway -from agentkit.llm.protocol import LLMResponse -from agentkit.tools.base import Tool -from agentkit.telemetry.tracing import get_tracer, start_span, _OTEL_AVAILABLE +from agentkit.tools.base import Tool, ToolValidationError +from agentkit.telemetry.tracing import start_span, _OTEL_AVAILABLE from agentkit.telemetry.metrics import ( agent_request_counter, agent_duration_histogram, ) if TYPE_CHECKING: - from agentkit.core.compressor import CompressionStrategy, ContextCompressor + from agentkit.core.compressor import CompressionStrategy from agentkit.core.trace import TraceRecorder from agentkit.memory.retriever import MemoryRetriever @@ -296,7 +294,7 @@ class ReWOOEngine: effective_system_prompt += f"\n\n## 参考信息\n{memory_context}" else: effective_system_prompt = f"## 参考信息\n{memory_context}" - except Exception as e: + except (asyncio.TimeoutError, ConnectionError, LLMProviderError) as e: logger.warning(f"Memory retrieval failed, continuing without context: {e}") # ── Phase 1: Planning ── @@ -360,7 +358,7 @@ class ReWOOEngine: if compressor: try: llm_messages = await compressor.compress(llm_messages) - except Exception as e: + except (asyncio.TimeoutError, ConnectionError, LLMProviderError) as e: logger.warning(f"Context compression failed: {e}") response = await self._llm_gateway.chat( @@ -492,7 +490,7 @@ class ReWOOEngine: value={"output_summary": summary, "agent_name": agent_name}, metadata={"task_type": task_type, "outcome": trace_outcome}, ) - except Exception as e: + except (asyncio.TimeoutError, ConnectionError, ValueError) as e: logger.warning(f"Failed to store task result in episodic memory: {e}") return ReActResult( @@ -569,7 +567,7 @@ class ReWOOEngine: effective_system_prompt += f"\n\n## 参考信息\n{memory_context}" else: effective_system_prompt = f"## 参考信息\n{memory_context}" - except Exception as e: + except (asyncio.TimeoutError, ConnectionError, LLMProviderError) as e: logger.warning(f"Memory retrieval failed, continuing without context: {e}") trajectory: list[ReActStep] = [] @@ -647,7 +645,7 @@ class ReWOOEngine: if compressor: try: llm_messages = await compressor.compress(llm_messages) - except Exception as e: + except (asyncio.TimeoutError, ConnectionError, LLMProviderError) as e: logger.warning(f"Context compression failed: {e}") response = await self._llm_gateway.chat( @@ -769,6 +767,9 @@ class ReWOOEngine: "total_tokens": total_tokens, }, ) + except asyncio.CancelledError: + trace_outcome = "cancelled" + raise except Exception as e: trace_outcome = "error" logger.error(f"ReWOO execute_stream failed: {e}") @@ -786,7 +787,7 @@ class ReWOOEngine: value={"output_summary": summary, "agent_name": agent_name}, metadata={"task_type": task_type, "outcome": trace_outcome}, ) - except Exception as e: + except (asyncio.TimeoutError, ConnectionError, ValueError) as e: logger.warning(f"Failed to store task result in episodic memory: {e}") # ── Fallback Strategy Helpers ────────────────────────── @@ -914,7 +915,7 @@ class ReWOOEngine: output, synthesis_tokens = await self._synthesis_phase(messages=messages, tool_results=tool_results, model=model, agent_name=agent_name, task_type=task_type, system_prompt=effective_system_prompt, compressor=compressor, cancellation_token=cancellation_token) yield ReActEvent(event_type="final_answer", step=len(plan.steps) + 1, data={"output": output, "total_steps": len(plan.steps) + 1, "total_tokens": simplified_tokens + synthesis_tokens}) return - except Exception as e: + except (LLMProviderError, asyncio.TimeoutError, ConnectionError, RuntimeError, ValueError, TypeError, ToolValidationError, json.JSONDecodeError) as e: logger.warning(f"Simplified ReWOO planning also failed in stream mode: {e}") # Failed, continue to next strategy by not returning # This signals the caller to try the next strategy @@ -951,7 +952,7 @@ class ReWOOEngine: ): yield event return - except Exception as e: + except (LLMProviderError, asyncio.TimeoutError, ConnectionError, RuntimeError, ToolValidationError) as e: logger.warning(f"ReAct fallback also failed in stream mode: {e}") raise _FallbackFailedError("react") @@ -975,13 +976,13 @@ class ReWOOEngine: if compressor: try: direct_messages = await compressor.compress(direct_messages) - except Exception as e: + except (asyncio.TimeoutError, ConnectionError, LLMProviderError) as e: logger.warning(f"Context compression failed in direct fallback: {e}") direct_response = await self._llm_gateway.chat(messages=direct_messages, model=model, agent_name=agent_name, task_type=task_type) output = direct_response.content or "" yield ReActEvent(event_type="final_answer", step=1, data={"output": output, "total_steps": 1, "total_tokens": total_tokens + direct_response.usage.total_tokens}) return - except Exception as e: + except (LLMProviderError, asyncio.TimeoutError, ConnectionError, RuntimeError, ValueError) as e: logger.error(f"Direct LLM fallback also failed in stream mode: {e}") raise _FallbackFailedError("direct") @@ -1024,7 +1025,7 @@ class ReWOOEngine: output, synthesis_tokens = await self._synthesis_phase(messages=messages, tool_results=tool_results, model=model, agent_name=agent_name, task_type=task_type, system_prompt=effective_system_prompt, compressor=compressor, cancellation_token=cancellation_token) yield ReActEvent(event_type="final_answer", step=len(plan.steps) + 1, data={"output": output, "total_steps": len(plan.steps) + 1, "total_tokens": plan_tokens + synthesis_tokens}) return - except Exception as e: + except (LLMProviderError, asyncio.TimeoutError, ConnectionError, RuntimeError, ValueError, TypeError, ToolValidationError, json.JSONDecodeError) as e: logger.warning(f"Plan-exec fallback also failed in stream mode: {e}") raise _FallbackFailedError("plan_exec") @@ -1178,7 +1179,7 @@ class ReWOOEngine: total_tokens=total_tokens, fallback_strategy="simplified_rewoo", ) - except Exception as e: + except (LLMProviderError, asyncio.TimeoutError, ConnectionError, RuntimeError, ValueError, TypeError, ToolValidationError, json.JSONDecodeError) as e: logger.warning(f"Simplified ReWOO planning also failed: {e}") return None @@ -1219,7 +1220,7 @@ class ReWOOEngine: ) react_result.fallback_strategy = "react" return react_result - except Exception as e: + except (LLMProviderError, asyncio.TimeoutError, ConnectionError, RuntimeError, ToolValidationError) as e: logger.warning(f"ReAct fallback also failed: {e}") return None @@ -1247,7 +1248,7 @@ class ReWOOEngine: if compressor: try: direct_messages = await compressor.compress(direct_messages) - except Exception as e: + except (asyncio.TimeoutError, ConnectionError, LLMProviderError) as e: logger.warning(f"Context compression failed in direct fallback: {e}") direct_response = await self._llm_gateway.chat( @@ -1284,7 +1285,7 @@ class ReWOOEngine: total_tokens=total_tokens, fallback_strategy="direct", ) - except Exception as e: + except (LLMProviderError, asyncio.TimeoutError, ConnectionError, RuntimeError, ValueError) as e: logger.error(f"Direct LLM fallback also failed: {e}") return None @@ -1361,7 +1362,7 @@ class ReWOOEngine: total_tokens=total_tokens, fallback_strategy="plan_exec", ) - except Exception as e: + except (LLMProviderError, asyncio.TimeoutError, ConnectionError, RuntimeError, ValueError, TypeError, ToolValidationError, json.JSONDecodeError) as e: logger.warning(f"Plan-exec fallback also failed: {e}") return None @@ -1418,7 +1419,7 @@ class ReWOOEngine: if compressor: try: planning_messages = await compressor.compress(planning_messages) - except Exception as e: + except (asyncio.TimeoutError, ConnectionError, LLMProviderError) as e: logger.warning(f"Context compression failed during planning: {e}") try: @@ -1429,7 +1430,7 @@ class ReWOOEngine: task_type=task_type, tools=tool_schemas, ) - except Exception as e: + except (LLMProviderError, asyncio.TimeoutError, ConnectionError) as e: logger.warning(f"LLM call failed during planning: {e}") return None, 0 @@ -1496,7 +1497,7 @@ class ReWOOEngine: if compressor: try: synthesis_messages = await compressor.compress(synthesis_messages) - except Exception as e: + except (asyncio.TimeoutError, ConnectionError, LLMProviderError) as e: logger.warning(f"Context compression failed during synthesis: {e}") response = await self._llm_gateway.chat( @@ -1611,7 +1612,7 @@ class ReWOOEngine: try: result = await tool.safe_execute(**arguments) return result - except Exception as e: + except (ToolValidationError, ValueError, TypeError, RuntimeError) as e: error_msg = f"Tool '{tool_name}' execution failed: {e}" logger.warning(error_msg) return {"error": error_msg} diff --git a/src/agentkit/experts/_phase_executor.py b/src/agentkit/experts/_phase_executor.py index 17b1b76..3d94322 100644 --- a/src/agentkit/experts/_phase_executor.py +++ b/src/agentkit/experts/_phase_executor.py @@ -5,6 +5,7 @@ from __future__ import annotations +import asyncio import copy import logging from datetime import datetime, timezone @@ -17,8 +18,6 @@ from .expert import Expert from .plan import PhaseStatus, PhaseType, PlanPhase, TeamPlan if TYPE_CHECKING: - import asyncio - from .team import ExpertTeam logger = logging.getLogger(__name__) @@ -61,7 +60,7 @@ class PhaseExecutorMixin: full_data = await self._team.workspace.read(ref_key) if full_data: return full_data.get("value", content) - except Exception as e: + except (asyncio.TimeoutError, ConnectionError, KeyError, AttributeError) as e: logger.warning(f"Failed to read offloaded output '{ref_key}': {e}") return content @@ -80,11 +79,11 @@ class PhaseExecutorMixin: try: # U3: 返工循环 — 最多 MAX_REWORKS + 1 次(1 次初始 + MAX_REWORKS 次返工) for _rework_attempt in range(self.MAX_REWORKS + 1): - result, last_error, passed, feedback = await self._run_agent_steps( + result, last_error, passed, feedback, degraded = await self._run_agent_steps( expert, agent, lead, phase, plan ) done = await self._finalize_phase( - expert, lead, phase, plan, result, passed, feedback + expert, lead, phase, plan, result, passed, feedback, degraded ) if done: return result @@ -181,9 +180,10 @@ class PhaseExecutorMixin: lead: Expert, phase: PlanPhase, plan: TeamPlan, - ) -> tuple[dict[str, Any], str | None, bool, str]: + ) -> tuple[dict[str, Any], str | None, bool, str, bool]: """Run one rework iteration: read deps, build input, execute, review. Returns - (result, last_error, passed, feedback). Raises RuntimeError on retry exhaustion.""" + (result, last_error, passed, feedback, degraded). Raises RuntimeError on retry + exhaustion.""" # 每次迭代重新读取依赖输出(前置阶段可能在返工期间完成) dependency_outputs: dict[str, Any] = {} for dep_id in phase.depends_on: @@ -228,7 +228,12 @@ class PhaseExecutorMixin: raise RuntimeError(f"Agent execution failed: {last_error}") result = task_result.output_data or {"content": ""} break - except Exception as e: + except asyncio.CancelledError: + # CancelledError 必须传播,不被重试逻辑吞掉 + raise + except (RuntimeError, asyncio.TimeoutError, ConnectionError) as e: + # agent.execute() 内部已捕获所有异常并返回 TaskResult, + # 此处仅捕获显式抛出的 RuntimeError + 罕见的基础设施异常 last_error = str(e) if attempt < self.MAX_RETRIES: logger.info(f"Retrying phase {phase.id} (attempt {attempt + 1})") @@ -250,9 +255,9 @@ class PhaseExecutorMixin: "risk_description": risk_desc, "phase_id": phase.id, "phase_name": phase.name, }) - # U3: Lead 验收阶段输出 - passed, feedback = await self._review_phase_output(lead, phase, result) - return result, last_error, passed, feedback + # U3: Lead 验收阶段输出 — ReviewResult 结构化结果(含 degraded 标记) + review = await self._review_phase_output(lead, phase, result) + return result, last_error, review.passed, review.feedback, review.degraded async def _finalize_phase( self, @@ -263,9 +268,15 @@ class PhaseExecutorMixin: result: dict[str, Any], passed: bool, feedback: str, + degraded: bool = False, ) -> bool: """Handle review outcome: write workspace + emit completed, or rework/fail. Returns - True if done (COMPLETED), False if rework continues. Raises on rework limit.""" + True if done (COMPLETED), False if rework continues. Raises on rework limit. + + Args: + degraded: True 表示验收走了降级路径(LLM 不可用/超时/异常时自动通过), + 广播到 ``review_result`` 事件 payload 让前端/运维可编程判断。 + """ if passed: phase.status = PhaseStatus.COMPLETED # P2: SharedWorkspace 写入移到验收通过后 — 避免持久化被拒输出 @@ -276,6 +287,7 @@ class PhaseExecutorMixin: await self._broadcast_event("review_result", { "phase_id": phase.id, "phase_name": phase.name, "passed": True, "feedback": feedback, "expert": phase.assigned_expert, + "degraded": degraded, }) if phase.collaboration_contracts: await self._notify_collaborators(phase, plan) @@ -288,7 +300,7 @@ class PhaseExecutorMixin: }) return True - # 验收不合格 — 返工或标记失败 + # 验收不合格 — 返工或标记失败(degraded 路径不应走到这里,但保持字段一致) phase.rework_count += 1 phase.review_feedback = feedback @@ -304,6 +316,7 @@ class PhaseExecutorMixin: "expert": phase.assigned_expert, "rework_count": phase.rework_count, "final_status": "failed", + "degraded": degraded, }, ) await self._broadcast_event( @@ -329,6 +342,7 @@ class PhaseExecutorMixin: "expert": phase.assigned_expert, "rework_count": phase.rework_count, "final_status": "rework", + "degraded": degraded, }, ) feedback_truncated = feedback[:500] if feedback else "" @@ -377,7 +391,8 @@ class PhaseExecutorMixin: agent = await pool.create_agent(temp_config) self._temp_agents[phase.id] = temp_config.name return agent - except Exception as e: + except (ValueError, KeyError, RuntimeError, TypeError) as e: + # pool.create_agent 失败:config 校验/工具注册/依赖缺失等 logger.warning( f"Failed to create isolated agent for phase {phase.id}, " f"using expert's existing agent: {e}" @@ -393,5 +408,7 @@ class PhaseExecutorMixin: if temp_name: try: await pool.remove_agent(temp_name) - except Exception as e: + except asyncio.CancelledError: + raise + except (KeyError, RuntimeError) as e: logger.warning(f"Failed to clean up isolated agent '{temp_name}': {e}") diff --git a/src/agentkit/experts/_review_gate.py b/src/agentkit/experts/_review_gate.py index 5c0726d..36523ba 100644 --- a/src/agentkit/experts/_review_gate.py +++ b/src/agentkit/experts/_review_gate.py @@ -5,11 +5,15 @@ from __future__ import annotations +import asyncio import json import logging import re +from dataclasses import dataclass from typing import Any +from agentkit.core.exceptions import LLMProviderError + from .expert import Expert from .plan import PlanPhase @@ -19,27 +23,46 @@ logger = logging.getLogger(__name__) _RISK_FLAG_RE = re.compile(r"\[RISK:\s*(.+?)\]", re.DOTALL) +@dataclass +class ReviewResult: + """Lead 验收阶段输出的结构化结果(U3)。 + + 替换原先的 ``tuple[bool, str]`` 返回值,让降级状态可被调用方/前端 + 可编程判断,而非依赖 ``[DEGRADED]`` 字符串前缀匹配。 + + Attributes: + passed: 验收是否通过(True=通过,False=需返工) + degraded: 是否处于降级路径(LLM 不可用/超时/异常时自动通过) + feedback: 验收反馈;降级时为降级原因,正常通过时为空,需返工时为修改要求 + """ + + passed: bool + degraded: bool = False + feedback: str = "" + + class ReviewGateMixin: """Mixin: Lead 验收阶段输出质量 + 解析风险标记。由 TeamOrchestrator 组合。""" async def _review_phase_output( self, lead: Expert, phase: PlanPhase, result: dict[str, Any] - ) -> tuple[bool, str]: + ) -> ReviewResult: """Lead 验收阶段输出质量。 - 用 LLM 判断输出是否满足阶段要求。 - 返回 (passed, feedback): - - passed=True, feedback="" — 验收通过 - - passed=False, feedback="修改要求" — 验收不合格,需返工 + 用 LLM 判断输出是否满足阶段要求。返回 :class:`ReviewResult`: + - ``passed=True, degraded=False`` — 验收通过 + - ``passed=False, feedback="修改要求"`` — 验收不合格,需返工 + - ``passed=True, degraded=True`` — LLM 不可用/超时/异常,优雅降级自动通过 - 若 LLM 不可用,跳过验收直接通过(优雅降级,feedback 标注降级原因)。 + 降级路径以 ``degraded=True`` 显式标记,让 ``review_result`` WS 事件 + 和日志聚合可编程判断降级频率,无需匹配 ``[DEGRADED]`` 字符串前缀。 """ gateway = self._get_llm_gateway(lead) if not gateway: logger.warning("No LLM gateway available, skipping review") - # 优雅降级:不阻塞流程,但 [DEGRADED] 前缀让 review_result 事件 - # 和日志聚合可识别降级路径,便于运维监控验收失效频率。 - return True, "[DEGRADED] LLM 验收不可用,自动通过" + return ReviewResult( + passed=True, degraded=True, feedback="LLM 验收不可用,自动通过" + ) content = result.get("content", str(result)) # P1: prompt injection 防护 — 用 XML 标签包裹专家输出,指示 LLM 忽略其中指令 @@ -60,32 +83,42 @@ class ReviewGateMixin: messages=[{"role": "user", "content": prompt}], model=self._get_model(lead), ) - # P2: 优先尝试直接解析整个响应为 JSON,避免贪婪正则匹配过多 - review: dict[str, Any] | None = None - try: - review = json.loads(response.content) - except (json.JSONDecodeError, TypeError): - pass - if review is None: - # 回退到正则提取第一个 JSON 对象 - json_match = re.search(r"\{[^{}]*\}", response.content, re.DOTALL) - if json_match: - try: - review = json.loads(json_match.group(0)) - except json.JSONDecodeError: - pass - if review is not None: - # ponytail: 显式比较避免 bool("false") == True 陷阱 - passed_raw = review.get("passed", True) - passed = passed_raw is True or str(passed_raw).lower() == "true" - feedback = review.get("feedback", "") - return passed, str(feedback) - logger.warning(f"Review LLM returned unparseable response: {response.content[:200]}") - except Exception as e: - logger.warning(f"Review LLM call failed: {e}") + except (LLMProviderError, asyncio.TimeoutError, ConnectionError, RuntimeError) as e: + # LLM 不可用类异常 — 优雅降级,不阻塞流程。 + # ponytail: RuntimeError 纳入捕获 — LiteLLM/provider 内部错误常以 RuntimeError + # 抛出(如 "LLM unavailable"),验收路径语义是"LLM 调用失败即降级",需覆盖。 + logger.warning(f"Review LLM call failed, degrading: {e}") + return ReviewResult( + passed=True, degraded=True, feedback=f"LLM 验收降级,自动通过: {e}" + ) - # 降级:不阻塞流程,但 [DEGRADED] 前缀让 review_result 事件可识别降级路径 - return True, "[DEGRADED] LLM 验收降级,自动通过" + # P2: 优先尝试直接解析整个响应为 JSON,避免贪婪正则匹配过多 + review: dict[str, Any] | None = None + try: + review = json.loads(response.content) + except (json.JSONDecodeError, TypeError): + pass + if review is None: + # 回退到正则提取第一个 JSON 对象 + json_match = re.search(r"\{[^{}]*\}", response.content, re.DOTALL) + if json_match: + try: + review = json.loads(json_match.group(0)) + except json.JSONDecodeError: + pass + if review is not None: + # ponytail: 显式比较避免 bool("false") == True 陷阱 + passed_raw = review.get("passed", True) + passed = passed_raw is True or str(passed_raw).lower() == "true" + feedback = review.get("feedback", "") + return ReviewResult(passed=passed, feedback=str(feedback)) + + # 现有行为:LLM 返回不可解析响应时也走降级通过(plan 文档 line 274 标注 + # passed=False,但实际生产行为是降级通过避免阻塞流水线 — 以现有行为为准)。 + logger.warning(f"Review LLM returned unparseable response: {response.content[:200]}") + return ReviewResult( + passed=True, degraded=True, feedback="LLM 验收响应不可解析,自动通过" + ) @staticmethod def _parse_risk_flags(content: str) -> list[str]: diff --git a/src/agentkit/experts/orchestrator.py b/src/agentkit/experts/orchestrator.py index ce6eec1..bd1d10f 100644 --- a/src/agentkit/experts/orchestrator.py +++ b/src/agentkit/experts/orchestrator.py @@ -16,6 +16,7 @@ import logging import re from typing import Any +from agentkit.core.exceptions import LLMProviderError from agentkit.llm.gateway import LLMGateway from ._debate_runner import DebateRunnerMixin @@ -169,7 +170,7 @@ class TeamOrchestrator( if self._checkpoint is not None: try: await self._checkpoint.save_plan(plan) - except Exception as e: + except (ConnectionError, OSError, asyncio.TimeoutError, RuntimeError, ValueError, KeyError) as e: logger.warning(f"Checkpoint save_plan failed: {e}") # 4. Set EXECUTING status, execute phases @@ -266,7 +267,7 @@ class TeamOrchestrator( if should_save_checkpoint and self._checkpoint is not None: try: await self._checkpoint.save(plan.id, ph, plan.status.value) - except Exception as e: + except (ConnectionError, OSError, asyncio.TimeoutError, RuntimeError, ValueError, KeyError) as e: logger.warning(f"Checkpoint save failed for phase {ph.id}: {e}") # U3: Divergence detection — check completed phases for conflicts @@ -310,7 +311,7 @@ class TeamOrchestrator( if self._checkpoint is not None: try: await self._checkpoint.clear(plan.id) - except Exception as e: + except (ConnectionError, OSError, asyncio.TimeoutError, RuntimeError, ValueError, KeyError) as e: logger.warning(f"Checkpoint clear failed: {e}") return { @@ -326,7 +327,9 @@ class TeamOrchestrator( plan.status = PlanStatus.FAILED await self._broadcast_event("team_dissolved", {"team_id": self._team.team_id}) return await self._fallback_to_single_agent(task, plan, phase_results) - except Exception as e: + except asyncio.CancelledError: + raise + except (RuntimeError, ValueError, KeyError, AttributeError, ConnectionError, asyncio.TimeoutError, LLMProviderError) as e: logger.error(f"Pipeline execution failed: {e}") plan.status = PlanStatus.FAILED await self._broadcast_event("team_dissolved", {"team_id": self._team.team_id}) @@ -463,7 +466,7 @@ class TeamOrchestrator( if phases: return phases logger.warning("LLM decomposition returned no valid phases") - except Exception as e: + except (LLMProviderError, asyncio.TimeoutError, ConnectionError, json.JSONDecodeError, ValueError, TypeError) as e: logger.warning(f"LLM task decomposition failed: {e}") return [PlanPhase(name="执行", assigned_expert=lead.config.name, task_description=task)] @@ -588,5 +591,5 @@ class TeamOrchestrator( await self._team.handoff_transport.send( self._team.team_channel, {"type": event_type, **data} ) - except Exception as e: + except (ConnectionError, RuntimeError, OSError, asyncio.TimeoutError) as e: logger.warning(f"Failed to broadcast event '{event_type}': {e}") diff --git a/src/agentkit/orchestrator/pipeline_engine.py b/src/agentkit/orchestrator/pipeline_engine.py index f00bcb8..a8b0bd2 100644 --- a/src/agentkit/orchestrator/pipeline_engine.py +++ b/src/agentkit/orchestrator/pipeline_engine.py @@ -20,7 +20,7 @@ from agentkit.orchestrator.pipeline_schema import ( StageStatus, ) from agentkit.orchestrator.reflection import PipelineReflector, PipelineReplanner -from agentkit.orchestrator.retry import StepRetryPolicy, execute_with_retry +from agentkit.orchestrator.retry import execute_with_retry logger = logging.getLogger(__name__) @@ -143,7 +143,7 @@ class PipelineEngine: steps=step_names, input_data=context, ) - except Exception as exc: + except (asyncio.TimeoutError, ConnectionError, RuntimeError, ValueError) as exc: logger.warning(f"Failed to create execution state: {exc}") # Create Saga orchestrator for compensation tracking @@ -183,7 +183,7 @@ class PipelineEngine: output=step_output, error=step_error, ) - except Exception as exc: + except (asyncio.TimeoutError, ConnectionError, RuntimeError, ValueError) as exc: logger.warning(f"Failed to update step state: {exc}") # 收集输出变量 @@ -219,7 +219,7 @@ class PipelineEngine: step_name=stage.name, error=result.error_message, ) - except Exception as exc: + except (asyncio.TimeoutError, ConnectionError, RuntimeError, ValueError) as exc: logger.warning(f"Failed to persist failure state: {exc}") return result @@ -237,7 +237,7 @@ class PipelineEngine: execution_id=execution_id, final_output=final_output, ) - except Exception as exc: + except (asyncio.TimeoutError, ConnectionError, RuntimeError, ValueError) as exc: logger.warning(f"Failed to persist completion state: {exc}") return result @@ -346,7 +346,11 @@ class PipelineEngine: return sr - except Exception as e: + except asyncio.CancelledError: + raise + + except (asyncio.TimeoutError, ConnectionError, RuntimeError, ValueError) as e: + # dispatcher / agent 执行失败 — 转 StageResult.FAILED 不向上抛 return StageResult( stage_name=stage.name, status=StageStatus.FAILED, @@ -475,7 +479,9 @@ class PipelineEngine: stage, started_at, ) - except Exception as e: + except asyncio.CancelledError: + raise + except (asyncio.TimeoutError, ConnectionError, RuntimeError, ValueError) as e: logger.error(f"Verifier execution failed for stage '{stage.name}': {e}") return StageResult( stage_name=stage.name, @@ -619,7 +625,9 @@ class PipelineEngine: step_name=stage.name, ) return sr - except Exception as e: + except asyncio.CancelledError: + raise + except (asyncio.TimeoutError, ConnectionError, RuntimeError, ValueError) as e: return StageResult( stage_name=stage.name, status=StageStatus.FAILED, @@ -679,7 +687,7 @@ class PipelineEngine: score=output_data.get("score", 0.0), ) return feedback - except Exception as e: + except (TypeError, KeyError, ValueError) as e: # 解析失败时直接抛出异常,避免死循环 logger.error(f"Failed to parse verifier output: {e}") raise RuntimeError( diff --git a/tests/unit/experts/test_team_orchestrator.py b/tests/unit/experts/test_team_orchestrator.py index b4e7a61..4eaa8a8 100644 --- a/tests/unit/experts/test_team_orchestrator.py +++ b/tests/unit/experts/test_team_orchestrator.py @@ -790,10 +790,18 @@ class TestResultSynthesis: {"name": "A", "assigned_expert": "member1", "task_description": "阶段A", "depends_on": []}, {"name": "B", "assigned_expert": "member2", "task_description": "阶段B", "depends_on": []}, ]) - # Synthesis call raises to force concatenation fallback - gateway.chat = AsyncMock( - side_effect=[decomp_response, RuntimeError("LLM unavailable")] - ) + # ponytail: 函数式 side_effect — 首次返回 decomposition,后续一律抛 RuntimeError + # (列表式 side_effect 耗尽会抛 StopIteration,被 U3 收窄后的 except 漏捕获; + # 函数式让"LLM 不可用"语义明确,覆盖验收+综合所有后续调用) + call_count = [0] + + async def chat_side_effect(messages, model=None, **kwargs): + call_count[0] += 1 + if call_count[0] == 1: + return decomp_response + raise RuntimeError("LLM unavailable") + + gateway.chat = AsyncMock(side_effect=chat_side_effect) team._experts["lead"].agent._llm_gateway = gateway result = await orchestrator.execute("复杂任务")