From 771756814f7c9133eca10b910e038d301f329325 Mon Sep 17 00:00:00 2001 From: chiguyong Date: Thu, 18 Jun 2026 13:00:59 +0800 Subject: [PATCH] =?UTF-8?q?fix(review):=20=E4=BF=AE=E5=A4=8D=E4=BB=A3?= =?UTF-8?q?=E7=A0=81=E5=AE=A1=E6=9F=A5=E5=8F=91=E7=8E=B0=E7=9A=84=20P0/P1/?= =?UTF-8?q?P2=20=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit P0 (Critical): - orchestrator: plan_update 事件 key 从 phases 改为 plan_phases 匹配前端契约 - orchestrator: team_formed 事件 payload 从 string[] 改为 IExpertInfo[] + plan_phases:[] P1 (High): - orchestrator: 新增 phase_failed 事件广播 (3处: gather 失败/_execute_phase 异常/_mark_dependents_failed 级联) - orchestrator: 新增 team_dissolved 事件广播 (3处: 正常完成/ValueError/Exception) - orchestrator: _mark_dependents_failed 改为 async 以支持事件广播 - orchestrator: gather 结果检查增加 asyncio.CancelledError (Python 3.11+ BaseException) - plan: PhaseStatus.RUNNING 值从 running 改为 in_progress 匹配前端联合类型 - team.ts: updatePhaseStatus 增加 plan_phases undefined 防御守卫 - chat.py: 增加 asyncio.CancelledError 处理 + team.dissolve() 移入 finally 块 P2 (Medium): - orchestrator: _get_isolated_agent 返回类型 Any 改为 ConfigDrivenAgent - orchestrator: _get_llm_gateway 返回类型 Any 改为 LLMGateway | None - orchestrator: 依赖输出从 SharedWorkspace 读取改为内存 dep_phase.result (减少冗余 I/O) - plan: PlanPhase.to_dict() result 序列化为 string 匹配前端 ITeamPlanPhase.result 类型 - types.ts: expert_step.step 类型从 number 改为 string (后端发送 phase ID) Tests: 377 passed (experts + chat_team + expert_team) --- src/agentkit/experts/orchestrator.py | 82 +++++++++++++++---- src/agentkit/experts/plan.py | 11 ++- src/agentkit/server/frontend/src/api/types.ts | 2 +- .../server/frontend/src/stores/team.ts | 2 +- src/agentkit/server/routes/chat.py | 22 ++--- tests/unit/experts/test_plan.py | 20 +++-- tests/unit/experts/test_team_orchestrator.py | 2 +- 7 files changed, 104 insertions(+), 37 deletions(-) diff --git a/src/agentkit/experts/orchestrator.py b/src/agentkit/experts/orchestrator.py index 34bc3c4..a9129f9 100644 --- a/src/agentkit/experts/orchestrator.py +++ b/src/agentkit/experts/orchestrator.py @@ -27,7 +27,9 @@ import re from datetime import datetime, timezone from typing import Any +from agentkit.core.config_driven import ConfigDrivenAgent from agentkit.core.protocol import TaskMessage, TaskResult, TaskStatus +from agentkit.llm.gateway import LLMGateway from .expert import Expert from .plan import PhaseStatus, PlanPhase, PlanStatus, TeamPlan @@ -93,13 +95,27 @@ class TeamOrchestrator: ) # 1. Emit team_formed event + # Send experts as IExpertInfo-compatible dicts + plan_phases: [] to match frontend contract await self._broadcast_event( "team_formed", { "team_id": self._team.team_id, "status": self._team.status.value, "lead_expert": lead.config.name, - "experts": [e.config.name for e in self._team.active_experts], + "experts": [ + { + "id": e.config.name, + "name": e.config.name, + "persona": e.config.persona, + "avatar": e.config.avatar, + "color": e.config.color, + "is_lead": e.config.name == lead.config.name, + "bound_skills": list(e.config.bound_skills), + "status": "active", + } + for e in self._team.active_experts + ], + "plan_phases": [], }, ) @@ -117,7 +133,7 @@ class TeamOrchestrator: "plan_update", { "plan_id": plan.id, - "phases": [ph.to_dict() for ph in plan.phases], + "plan_phases": [ph.to_dict() for ph in plan.phases], }, ) @@ -143,14 +159,23 @@ class TeamOrchestrator: ) for ph, result in zip(ready, results): - if isinstance(result, Exception): + if isinstance(result, (Exception, asyncio.CancelledError)): logger.error(f"Phase {ph.id} ({ph.name}) failed: {result}") plan.update_phase_status( ph.id, PhaseStatus.FAILED, {"error": str(result)} ) phase_results[ph.id] = {"error": str(result)} + # Emit phase_failed event + await self._broadcast_event( + "phase_failed", + { + "phase_id": ph.id, + "phase_name": ph.name, + "error": str(result), + }, + ) # Mark dependent phases as failed - self._mark_dependents_failed(ph.id, plan, phase_results) + await self._mark_dependents_failed(ph.id, plan, phase_results) else: phase_results[ph.id] = result @@ -178,6 +203,12 @@ class TeamOrchestrator: }, ) + # 8. Emit team_dissolved event + await self._broadcast_event( + "team_dissolved", + {"team_id": self._team.team_id}, + ) + return { "status": "completed", "result": final_result, @@ -189,10 +220,16 @@ class TeamOrchestrator: # Circular dependency or invalid reference from topological_sort logger.error(f"Pipeline execution failed (invalid plan): {e}") plan.status = PlanStatus.FAILED + await self._broadcast_event( + "team_dissolved", {"team_id": self._team.team_id} + ) return await self._fallback_to_single_agent(task, plan, phase_results) except Exception as e: logger.error(f"Pipeline execution failed: {e}") plan.status = PlanStatus.FAILED + await self._broadcast_event( + "team_dissolved", {"team_id": self._team.team_id} + ) return await self._fallback_to_single_agent(task, plan, phase_results) async def _decompose_task(self, lead: Expert, task: str) -> list[PlanPhase]: @@ -349,15 +386,14 @@ class TeamOrchestrator: }, ) - # Read dependency outputs from SharedWorkspace + # Read dependency outputs from in-memory phase results (faster than workspace) dependency_outputs: dict[str, Any] = {} for dep_id in phase.depends_on: dep_phase = plan.get_phase(dep_id) - if dep_phase and dep_phase.status == PhaseStatus.COMPLETED: - key = f"{plan.id}/phase/{dep_id}/output" - data = await self._team.workspace.read(key) - if data: - dependency_outputs[dep_phase.name] = data.get("value", "") + if dep_phase and dep_phase.status == PhaseStatus.COMPLETED and dep_phase.result: + dependency_outputs[dep_phase.name] = dep_phase.result.get( + "content", str(dep_phase.result) + ) # Emit expert_step event await self._broadcast_event( @@ -473,9 +509,18 @@ class TeamOrchestrator: # Should not reach here phase.status = PhaseStatus.FAILED + # Emit phase_failed event + await self._broadcast_event( + "phase_failed", + { + "phase_id": phase.id, + "phase_name": phase.name, + "error": last_error or "unknown error", + }, + ) raise RuntimeError(f"Phase {phase.id} ({phase.name}) failed: {last_error}") - async def _get_isolated_agent(self, expert: Expert, phase: PlanPhase) -> Any: + async def _get_isolated_agent(self, expert: Expert, phase: PlanPhase) -> ConfigDrivenAgent: """Get an isolated ConfigDrivenAgent instance for the phase. If AgentPool is available, creates a temporary agent with a unique name @@ -516,7 +561,7 @@ class TeamOrchestrator: except Exception as e: logger.warning(f"Failed to clean up isolated agent '{temp_name}': {e}") - def _mark_dependents_failed( + async def _mark_dependents_failed( self, failed_phase_id: str, plan: TeamPlan, phase_results: dict[str, dict[str, Any]] ) -> None: """Mark all phases that depend on the failed phase as FAILED.""" @@ -527,8 +572,17 @@ class TeamOrchestrator: ph.status = PhaseStatus.FAILED ph.result = {"error": f"Dependency phase '{failed_phase_id}' failed"} phase_results[ph.id] = {"error": f"Dependency '{failed_phase_id}' failed"} + # Emit phase_failed event for cascaded failure + await self._broadcast_event( + "phase_failed", + { + "phase_id": ph.id, + "phase_name": ph.name, + "error": f"Dependency phase '{failed_phase_id}' failed", + }, + ) # Recursively mark their dependents - self._mark_dependents_failed(ph.id, plan, phase_results) + await self._mark_dependents_failed(ph.id, plan, phase_results) async def _synthesize_results( self, lead: Expert, task: str, completed_phases: list[PlanPhase] @@ -668,7 +722,7 @@ class TeamOrchestrator: return target.config.llm.get("model", "default") return "default" - def _get_llm_gateway(self, expert: Expert | None = None) -> Any: + def _get_llm_gateway(self, expert: Expert | None = None) -> LLMGateway | None: """Get LLM gateway from the given expert or the lead expert's agent. Falls back to other active experts if the primary target has no gateway. diff --git a/src/agentkit/experts/plan.py b/src/agentkit/experts/plan.py index 2bea4a2..36d303b 100644 --- a/src/agentkit/experts/plan.py +++ b/src/agentkit/experts/plan.py @@ -50,7 +50,7 @@ class PhaseStatus(str, enum.Enum): """阶段状态(流水线模式)""" PENDING = "pending" - RUNNING = "running" + RUNNING = "in_progress" COMPLETED = "completed" FAILED = "failed" @@ -122,6 +122,13 @@ class PlanPhase: def to_dict(self) -> dict[str, Any]: """序列化为字典""" + # Serialize result to string to match frontend ITeamPlanPhase.result type + result_str: str | None = None + if self.result is not None: + if isinstance(self.result, dict): + result_str = self.result.get("content", str(self.result)) + else: + result_str = str(self.result) return { "id": self.id, "name": self.name, @@ -129,7 +136,7 @@ class PlanPhase: "task_description": self.task_description, "depends_on": list(self.depends_on), "status": self.status.value, - "result": self.result, + "result": result_str, } @classmethod diff --git a/src/agentkit/server/frontend/src/api/types.ts b/src/agentkit/server/frontend/src/api/types.ts index 6a9323b..f9c72e8 100644 --- a/src/agentkit/server/frontend/src/api/types.ts +++ b/src/agentkit/server/frontend/src/api/types.ts @@ -101,7 +101,7 @@ export type WsServerMessage = | { type: 'error'; data: { message: string; code?: string } } | { type: 'pong' } | { type: 'team_formed'; data: IExpertTeamState } - | { type: 'expert_step'; data: { expert_id: string; expert_name: string; expert_color: string; content: string; step: number } } + | { type: 'expert_step'; data: { expert_id: string; expert_name: string; expert_color: string; content: string; step: string } } | { type: 'expert_result'; data: { expert_id: string; expert_name: string; expert_color: string; content: string } } | { type: 'plan_update'; data: { plan_phases: ITeamPlanPhase[] } } | { type: 'phase_started'; data: { phase_id: string; phase_name: string; assigned_expert: string; depends_on: string[] } } diff --git a/src/agentkit/server/frontend/src/stores/team.ts b/src/agentkit/server/frontend/src/stores/team.ts index 94f5faa..a51e88a 100644 --- a/src/agentkit/server/frontend/src/stores/team.ts +++ b/src/agentkit/server/frontend/src/stores/team.ts @@ -43,7 +43,7 @@ export const useTeamStore = defineStore('team', () => { status: ITeamPlanPhase['status'], result?: string, ) { - if (!teamState.value) return + if (!teamState.value?.plan_phases) return const phases = teamState.value.plan_phases.map((p) => { if (p.id !== phaseId) return p return result !== undefined diff --git a/src/agentkit/server/routes/chat.py b/src/agentkit/server/routes/chat.py index 6d326d8..29f1d11 100644 --- a/src/agentkit/server/routes/chat.py +++ b/src/agentkit/server/routes/chat.py @@ -391,18 +391,24 @@ async def _execute_team_collab( await team.create_team(lead_config=lead_config, member_configs=member_configs) orchestrator = TeamOrchestrator(team=team) result = await orchestrator.execute(routing_result.task_content) + except asyncio.CancelledError: + logger.info(f"Team collaboration cancelled for session {session_id}") + await websocket.send_json( + {"type": "error", "data": {"message": "团队协作已取消"}} + ) + return True except Exception as e: logger.error(f"Team collaboration failed for session {session_id}: {e}", exc_info=True) await websocket.send_json( {"type": "error", "data": {"message": f"团队协作执行失败: {str(e)[:200]}"}} ) - try: - await team.dissolve() - except Exception: - pass return True finally: - # Always remove handler to avoid leaks + # Always dissolve the team and remove handler to avoid leaks + try: + await team.dissolve() + except Exception as e: + logger.warning(f"Team dissolve failed: {e}") try: team.handoff_transport._handlers.pop(team.team_channel, None) except Exception: @@ -440,12 +446,6 @@ async def _execute_team_collab( agent_name="team_collab", ) - # Dissolve the team to release expert agents - try: - await team.dissolve() - except Exception as e: - logger.warning(f"Team dissolve failed: {e}") - return True diff --git a/tests/unit/experts/test_plan.py b/tests/unit/experts/test_plan.py index 069f195..742f3ad 100644 --- a/tests/unit/experts/test_plan.py +++ b/tests/unit/experts/test_plan.py @@ -351,7 +351,7 @@ class TestPhaseStatus: def test_statuses_exist(self): """阶段状态都存在""" assert PhaseStatus.PENDING == "pending" - assert PhaseStatus.RUNNING == "running" + assert PhaseStatus.RUNNING == "in_progress" assert PhaseStatus.COMPLETED == "completed" assert PhaseStatus.FAILED == "failed" @@ -391,7 +391,11 @@ class TestPlanPhase: assert phase.result is None def test_to_dict_from_dict_roundtrip(self): - """to_dict / from_dict 往返序列化""" + """to_dict / from_dict 往返序列化 + + Note: to_dict() serializes result to string for frontend compatibility, + so the roundtrip is lossy for the result field (dict → string). + """ phase = _make_phase( id="roundtrip_phase", name="往返测试", @@ -399,7 +403,7 @@ class TestPlanPhase: task_description="测试序列化", depends_on=["dep1", "dep2"], status=PhaseStatus.COMPLETED, - result={"key": "value"}, + result={"content": "value"}, ) d = phase.to_dict() restored = PlanPhase.from_dict(d) @@ -410,7 +414,8 @@ class TestPlanPhase: assert restored.task_description == phase.task_description assert restored.depends_on == phase.depends_on assert restored.status == phase.status - assert restored.result == phase.result + # result is serialized to string in to_dict(); from_dict reads it back as string + assert restored.result == "value" def test_to_dict_structure(self): """to_dict 返回正确的字典结构""" @@ -421,7 +426,7 @@ class TestPlanPhase: task_description="测试结构", depends_on=["d1"], status=PhaseStatus.RUNNING, - result={"output": "data"}, + result={"content": "phase output data"}, ) d = phase.to_dict() assert d["id"] == "struct_test" @@ -429,8 +434,9 @@ class TestPlanPhase: assert d["assigned_expert"] == "dev" assert d["task_description"] == "测试结构" assert d["depends_on"] == ["d1"] - assert d["status"] == "running" - assert d["result"] == {"output": "data"} + assert d["status"] == "in_progress" + # result is serialized to string to match frontend ITeamPlanPhase.result type + assert d["result"] == "phase output data" class TestTeamPlanPhases: diff --git a/tests/unit/experts/test_team_orchestrator.py b/tests/unit/experts/test_team_orchestrator.py index feecf70..6528955 100644 --- a/tests/unit/experts/test_team_orchestrator.py +++ b/tests/unit/experts/test_team_orchestrator.py @@ -259,7 +259,7 @@ class TestPipelineExecution: calls = team._handoff_transport.send.call_args_list plan_updates = [c for c in calls if c[0][1].get("type") == "plan_update"] assert len(plan_updates) >= 1 - assert "phases" in plan_updates[0][0][1] + assert "plan_phases" in plan_updates[0][0][1] @pytest.mark.asyncio async def test_pipeline_emits_team_synthesis_event(self):