From 0f8ea6e21e7e5787d1c149d68638b06d9694cf36 Mon Sep 17 00:00:00 2001 From: chiguyong Date: Thu, 18 Jun 2026 01:39:22 +0800 Subject: [PATCH] =?UTF-8?q?feat(experts):=E9=87=8D=E5=86=99=20TeamOrchestr?= =?UTF-8?q?ator=20=E4=B8=BA=E6=B5=81=E6=B0=B4=E7=BA=BF=E6=A8=A1=E5=BC=8F?= =?UTF-8?q?=20+=20TeamStatus.PLANNING?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/agentkit/experts/orchestrator.py | 534 +++++++---- src/agentkit/experts/team.py | 22 +- tests/unit/experts/test_team.py | 4 +- tests/unit/experts/test_team_orchestrator.py | 893 +++++++++++++------ 4 files changed, 996 insertions(+), 457 deletions(-) diff --git a/src/agentkit/experts/orchestrator.py b/src/agentkit/experts/orchestrator.py index fb09cea..34bc3c4 100644 --- a/src/agentkit/experts/orchestrator.py +++ b/src/agentkit/experts/orchestrator.py @@ -1,27 +1,26 @@ -"""TeamOrchestrator - hub-and-spoke 专家团队执行引擎 +"""TeamOrchestrator - 流水线模式专家团队执行引擎 -驱动 ExpertTeam 在 hub-and-spoke 模式下执行任务: +驱动 ExpertTeam 在流水线模式下执行任务: -1. Lead Expert 接收任务,自主分解为子任务 -2. 并行 spawn Task(每个 Task 是独立 Agent 执行实例,深度=1) -3. 等待所有 Task 完成 -4. Lead Expert 汇总结果(BEST 策略) -5. 返回最终结果 +1. Lead Expert 接收任务,分解为阶段(PlanPhase),阶段间有依赖关系(depends_on) +2. 按依赖拓扑排序,同层无依赖阶段并行(asyncio.gather),层间串行 +3. 每个阶段创建独立 ConfigDrivenAgent 实例(上下文隔离,KTD3) +4. 阶段间数据通过 SharedWorkspace 传递({task_id}/phase/{phase_id}/output) +5. Lead Expert 汇总所有阶段结果(BEST 策略) +6. 返回最终结果 -约束: -- Task 深度=1(Task 不能再 spawn Task) -- Task 之间无通信 -- Lead Expert 持有所有状态 +生命周期:FORMING → PLANNING → EXECUTING → SYNTHESIZING → COMPLETED 设计依据: -- Claude Code: Task 工具深度=1,子 Agent 不能再生子 Agent -- Codex: spawn_agent 层级式,结果返回父 Agent -- 去中心化协作的通信复杂度 O(N²),hub-and-spoke 为 O(N) +- KTD2: Lead 分解为阶段而非子任务,支持流水线串行阶段 +- KTD3: 上下文隔离,独立 ConfigDrivenAgent 实例 +- KTD6: PLANNING 状态在分解阶段设置 """ from __future__ import annotations import asyncio +import copy import json import logging import re @@ -31,40 +30,47 @@ from typing import Any from agentkit.core.protocol import TaskMessage, TaskResult, TaskStatus from .expert import Expert -from .plan import PlanStatus, SubTask, SubTaskStatus, TeamPlan +from .plan import PhaseStatus, PlanPhase, PlanStatus, TeamPlan from .team import ExpertTeam, TeamStatus logger = logging.getLogger(__name__) class TeamOrchestrator: - """Hub-and-spoke orchestration engine. + """Pipeline orchestration engine. - Lead Expert acts as the hub: it decomposes the task, dispatches subtasks - to member experts (spokes) in parallel, and synthesizes the final result. + Lead Expert decomposes the task into phases with dependencies (depends_on). + Phases are executed in topological order: same-layer phases run in parallel + (asyncio.gather), layers run sequentially. Each phase gets an independent + ConfigDrivenAgent instance for context isolation (KTD3). """ - MAX_SUBTASKS = 10 # Maximum subtasks Lead Expert can decompose - MAX_RETRIES = 1 # Retry once on subtask failure before marking failed + MAX_PHASES = 10 # Maximum phases Lead Expert can decompose + MAX_RETRIES = 1 # Retry once on phase failure before marking failed def __init__(self, team: ExpertTeam) -> None: self._team = team + # Track temporary agent names created for context isolation (KTD3) + # Maps phase_id -> temp_agent_name for cleanup + self._temp_agents: dict[str, str] = {} async def execute(self, task: str) -> dict[str, Any]: - """Execute a task in hub-and-spoke mode. + """Execute a task in pipeline mode. Flow: 1. Emit team_formed event - 2. Lead Expert decomposes task into subtasks - 3. Spawn parallel subtasks (each independent Agent execution) - 4. Wait for all subtasks to complete - 5. Lead Expert synthesizes results (BEST strategy) - 6. Emit team_synthesis and team_dissolved events + 2. Set PLANNING status, Lead Expert decomposes task into phases + 3. Emit plan_update with phase list + 4. Set EXECUTING status, topological sort, execute layers: + - Same-layer phases parallel (asyncio.gather) + - Layer-by-layer sequential + 5. Set SYNTHESIZING status, Lead synthesizes results (BEST strategy) + 6. Set COMPLETED status, emit team_synthesis event Returns a dict with: - "status": "completed" | "failed" | "fallback" - "result": final synthesized result - - "subtask_results": dict of subtask_id -> result + - "phase_results": dict of phase_id -> result - "plan": TeamPlan instance """ lead = self._team.lead_expert @@ -74,7 +80,7 @@ class TeamOrchestrator: return { "status": "failed", "result": None, - "subtask_results": {}, + "phase_results": {}, "error": "No active expert available", } lead = active[0] @@ -85,7 +91,6 @@ class TeamOrchestrator: lead_expert=lead.config.name, status=PlanStatus.EXECUTING, ) - self._team.set_status(TeamStatus.EXECUTING) # 1. Emit team_formed event await self._broadcast_event( @@ -98,53 +103,68 @@ class TeamOrchestrator: }, ) + # 2. Set PLANNING status, Lead decomposes task into phases + self._team.set_status(TeamStatus.PLANNING) + phases = await self._decompose_task(lead, task) + if not phases: + logger.warning("Task decomposition returned no phases, executing as single phase") + phases = [PlanPhase(name="执行", assigned_expert=lead.config.name, task_description=task)] + + plan.phases = phases[: self.MAX_PHASES] + + # 3. Emit plan_update with phase list + await self._broadcast_event( + "plan_update", + { + "plan_id": plan.id, + "phases": [ph.to_dict() for ph in plan.phases], + }, + ) + + # 4. Set EXECUTING status, execute phases + self._team.set_status(TeamStatus.EXECUTING) + phase_results: dict[str, dict[str, Any]] = {} + try: - # 2. Lead Expert decomposes task into subtasks - subtasks = await self._decompose_task(lead, task) - if not subtasks: - # If decomposition fails, treat the whole task as a single subtask - logger.warning("Task decomposition returned no subtasks, executing as single task") - subtasks = [SubTask(description=task, assigned_expert=lead.config.name)] + # Topological sort phases into execution layers + layers = plan.topological_sort() - plan.subtasks = subtasks[: self.MAX_SUBTASKS] + # Execute layers sequentially, phases within layer in parallel + for layer in layers: + # Filter out already-failed phases (from dependency failures) + ready = [ph for ph in layer if ph.status == PhaseStatus.PENDING] + if not ready: + continue - # 3. Emit plan_update with subtask list - await self._broadcast_event( - "plan_update", - { - "plan_id": plan.id, - "subtasks": [st.to_dict() for st in plan.subtasks], - }, - ) + # Execute all phases in this layer in parallel + results = await asyncio.gather( + *[self._execute_phase(ph, plan) for ph in ready], + return_exceptions=True, + ) - # 4. Spawn parallel subtasks - subtask_results: dict[str, dict[str, Any]] = {} - results = await asyncio.gather( - *[self._execute_subtask(st) for st in plan.subtasks], - return_exceptions=True, - ) + for ph, result in zip(ready, results): + if isinstance(result, Exception): + logger.error(f"Phase {ph.id} ({ph.name}) failed: {result}") + plan.update_phase_status( + ph.id, PhaseStatus.FAILED, {"error": str(result)} + ) + phase_results[ph.id] = {"error": str(result)} + # Mark dependent phases as failed + self._mark_dependents_failed(ph.id, plan, phase_results) + else: + phase_results[ph.id] = result - for subtask, result in zip(plan.subtasks, results): - if isinstance(result, Exception): - logger.error(f"Subtask {subtask.id} failed: {result}") - plan.update_subtask_status( - subtask.id, SubTaskStatus.FAILED, {"error": str(result)} - ) - subtask_results[subtask.id] = {"error": str(result)} - else: - subtask_results[subtask.id] = result - - # 5. Check if all subtasks failed - completed = plan.completed_subtasks + # 5. Check if all phases failed + completed = plan.completed_phases if not completed: - logger.warning("All subtasks failed, falling back to single agent") - return await self._fallback_to_single_agent(task, plan, subtask_results) + logger.warning("All phases failed, falling back to single agent") + return await self._fallback_to_single_agent(task, plan, phase_results) # 6. Lead Expert synthesizes results (BEST strategy) self._team.set_status(TeamStatus.SYNTHESIZING) plan.status = PlanStatus.COMPLETED - final_result = await self._synthesize_results(lead, task, [st for st in completed]) + final_result = await self._synthesize_results(lead, task, completed) self._team.set_status(TeamStatus.COMPLETED) @@ -153,33 +173,38 @@ class TeamOrchestrator: "team_synthesis", { "content": final_result.get("content", ""), - "subtasks_completed": len(completed), - "subtasks_total": len(plan.subtasks), + "phases_completed": len(completed), + "phases_total": len(plan.phases), }, ) return { "status": "completed", "result": final_result, - "subtask_results": subtask_results, + "phase_results": phase_results, "plan": plan, } - except Exception as e: - logger.error(f"Hub-and-spoke execution failed: {e}") + except ValueError as e: + # Circular dependency or invalid reference from topological_sort + logger.error(f"Pipeline execution failed (invalid plan): {e}") plan.status = PlanStatus.FAILED - return await self._fallback_to_single_agent(task, plan, subtask_results) + return await self._fallback_to_single_agent(task, plan, phase_results) + except Exception as e: + logger.error(f"Pipeline execution failed: {e}") + plan.status = PlanStatus.FAILED + return await self._fallback_to_single_agent(task, plan, phase_results) - async def _decompose_task(self, lead: Expert, task: str) -> list[SubTask]: - """Lead Expert decomposes task into subtasks using LLM. + async def _decompose_task(self, lead: Expert, task: str) -> list[PlanPhase]: + """Lead Expert decomposes task into phases using LLM. - Returns a list of SubTask instances. If LLM decomposition fails, - returns a single subtask with the original task. + Returns a list of PlanPhase instances. If LLM decomposition fails, + returns a single phase with the original task. """ gateway = self._get_llm_gateway(lead) if not gateway: - logger.warning("No LLM gateway available, treating task as single subtask") - return [SubTask(description=task, assigned_expert=lead.config.name)] + logger.warning("No LLM gateway available, treating task as single phase") + return [PlanPhase(name="执行", assigned_expert=lead.config.name, task_description=task)] member_names = [ e.config.name for e in self._team.active_experts if e.config.name != lead.config.name @@ -187,38 +212,47 @@ class TeamOrchestrator: available_experts = member_names if member_names else [lead.config.name] prompt = ( - f"You are the Lead Expert in a team. Decompose the following task into " - f"at most {self.MAX_SUBTASKS} independent subtasks that can be executed in parallel.\n\n" + f"You are the Lead Expert in a pipeline team. Decompose the following task into " + f"at most {self.MAX_PHASES} phases with dependencies.\n\n" f"Task: {task}\n\n" f"Available experts: {', '.join(available_experts)}\n\n" - f"Return a JSON array of objects, each with:\n" - f'- "description": clear subtask description\n' - f'- "assigned_expert": name of the expert to assign (must be one of: {", ".join(available_experts)})\n\n' + f"Return a JSON array of phase objects, each with:\n" + f'- "name": phase name (e.g., "规划", "前端", "后端", "QA", "评审")\n' + f'- "assigned_expert": name of the expert to assign ' + f'(must be one of: {", ".join(available_experts)})\n' + f'- "task_description": clear phase task description\n' + f'- "depends_on": array of phase names this phase depends on (empty array if none)\n\n' + f"Example:\n" + f'[{{"name":"规划","assigned_expert":"tech_lead",' + f'"task_description":"设计架构","depends_on":[]}},' + f'{{"name":"前端","assigned_expert":"frontend",' + f'"task_description":"实现UI","depends_on":["规划"]}}]\n\n' f"Return ONLY the JSON array, no other text." ) try: response = await gateway.chat( messages=[{"role": "user", "content": prompt}], - model="default", + model=self._get_model(lead), ) - subtasks = self._parse_subtasks(response.content, available_experts, lead.config.name) - if subtasks: - return subtasks - logger.warning("LLM decomposition returned no valid subtasks") + phases = self._parse_phases(response.content, available_experts, lead.config.name) + if phases: + return phases + logger.warning("LLM decomposition returned no valid phases") except Exception as e: logger.warning(f"LLM task decomposition failed: {e}") - return [SubTask(description=task, assigned_expert=lead.config.name)] + return [PlanPhase(name="执行", assigned_expert=lead.config.name, task_description=task)] @staticmethod - def _parse_subtasks( + def _parse_phases( content: str, available_experts: list[str], lead_name: str - ) -> list[SubTask]: - """Parse LLM response into SubTask list. + ) -> list[PlanPhase]: + """Parse LLM response into PlanPhase list. - Extracts JSON array from the response content and creates SubTask instances. - Validates assigned_expert against available_experts list. + Extracts JSON array from the response content and creates PlanPhase instances. + Resolves depends_on from phase names to phase IDs. Validates assigned_expert + against available_experts list. """ # Try to extract JSON array from the response json_match = re.search(r"\[.*\]", content, re.DOTALL) @@ -233,46 +267,97 @@ class TeamOrchestrator: if not isinstance(items, list): return [] - subtasks: list[SubTask] = [] + # First pass: create phases with IDs, build name->id mapping + name_to_id: dict[str, str] = {} + raw_phases: list[dict[str, Any]] = [] + for item in items: if not isinstance(item, dict): continue - description = item.get("description", "").strip() - if not description: + name = item.get("name", "").strip() + if not name: continue assigned = item.get("assigned_expert", "").strip() # Validate assigned expert; fall back to lead if invalid if assigned not in available_experts: assigned = lead_name - subtasks.append(SubTask(description=description, assigned_expert=assigned)) - return subtasks + task_desc = item.get("task_description", "").strip() or name + depends_on_names = item.get("depends_on", []) + if not isinstance(depends_on_names, list): + depends_on_names = [] - async def _execute_subtask(self, subtask: SubTask) -> dict[str, Any]: - """Execute a single subtask using the assigned expert. + phase = PlanPhase( + name=name, + assigned_expert=assigned, + task_description=task_desc, + depends_on=[], # Will resolve to IDs in second pass + ) + raw_phases.append({"phase": phase, "depends_on_names": depends_on_names}) + name_to_id[name] = phase.id - Each subtask is an independent Agent execution (Task depth=1). - Subtasks cannot spawn further subtasks. + # Second pass: resolve depends_on from names to IDs + phases: list[PlanPhase] = [] + for entry in raw_phases: + phase = entry["phase"] + for dep_name in entry["depends_on_names"]: + dep_id = name_to_id.get(dep_name) + if dep_id: + phase.depends_on.append(dep_id) + else: + logger.warning( + f"Phase '{phase.name}' depends on unknown phase '{dep_name}', ignoring" + ) + phases.append(phase) + + return phases + + async def _execute_phase(self, phase: PlanPhase, plan: TeamPlan) -> dict[str, Any]: + """Execute a single phase using the assigned expert. + + Creates an independent ConfigDrivenAgent instance for context isolation (KTD3). + Reads dependency outputs from SharedWorkspace, executes the phase task, + writes the phase output to SharedWorkspace. """ # Resolve the assigned expert - expert = self._team.get_expert(subtask.assigned_expert) + expert = self._team.get_expert(phase.assigned_expert) if not expert or not expert.is_active: - # Fallback to lead expert or first active expert expert = self._team.lead_expert if not expert or not expert.is_active: active = self._team.active_experts if not active: raise RuntimeError( - f"Expert '{subtask.assigned_expert}' not available and no active fallback" + f"Expert '{phase.assigned_expert}' not available and no active fallback" ) expert = active[0] logger.warning( - f"Expert '{subtask.assigned_expert}' not available, " + f"Expert '{phase.assigned_expert}' not available, " f"falling back to '{expert.config.name}'" ) + phase.assigned_expert = expert.config.name - # Update subtask status - subtask.status = SubTaskStatus.RUNNING - subtask.assigned_expert = expert.config.name + # Update phase status + phase.status = PhaseStatus.RUNNING + + # Emit phase_started event + await self._broadcast_event( + "phase_started", + { + "phase_id": phase.id, + "phase_name": phase.name, + "assigned_expert": phase.assigned_expert, + "depends_on": list(phase.depends_on), + }, + ) + + # Read dependency outputs from SharedWorkspace + dependency_outputs: dict[str, Any] = {} + for dep_id in phase.depends_on: + dep_phase = plan.get_phase(dep_id) + if dep_phase and dep_phase.status == PhaseStatus.COMPLETED: + key = f"{plan.id}/phase/{dep_id}/output" + data = await self._team.workspace.read(key) + if data: + dependency_outputs[dep_phase.name] = data.get("value", "") # Emit expert_step event await self._broadcast_event( @@ -281,79 +366,180 @@ class TeamOrchestrator: "expert_id": expert.config.name, "expert_name": expert.config.name, "expert_color": expert.config.color, - "content": subtask.description, - "step": subtask.id, + "content": phase.task_description, + "step": phase.id, + "phase_id": phase.id, + "phase_name": phase.name, }, ) - # Build TaskMessage for execution + # Build TaskMessage for execution with context isolation + # Context includes: task description + persona + dependency outputs + input_data: dict[str, Any] = { + "task": phase.task_description, + "team_id": self._team.team_id, + "phase_id": phase.id, + "phase_name": phase.name, + "is_phase": True, + "dependency_outputs": dependency_outputs, + } + if dependency_outputs: + input_data["context"] = ( + "前置阶段输出:\n" + + "\n---\n".join( + f"[{name}]:\n{output[:500] if isinstance(output, str) else str(output)[:500]}" + for name, output in dependency_outputs.items() + ) + ) + task_msg = TaskMessage( - task_id=subtask.id, + task_id=phase.id, agent_name=expert.config.name, - task_type="team_subtask", + task_type="team_phase", priority=0, - input_data={ - "task": subtask.description, - "team_id": self._team.team_id, - "is_subtask": True, # Marker: depth=1, cannot spawn further subtasks - }, + input_data=input_data, callback_url=None, created_at=datetime.now(timezone.utc), ) - # Execute with retry + # Execute with context isolation: try creating independent agent via pool + agent = await self._get_isolated_agent(expert, phase) last_error: str | None = None - for attempt in range(self.MAX_RETRIES + 1): - try: - task_result: TaskResult = await expert.agent.execute(task_msg) + result: dict[str, Any] | None = None - if task_result.status != TaskStatus.COMPLETED.value: - last_error = task_result.error_message or "unknown error" + try: + for attempt in range(self.MAX_RETRIES + 1): + try: + task_result: TaskResult = await agent.execute(task_msg) + + if task_result.status != TaskStatus.COMPLETED.value: + last_error = task_result.error_message or "unknown error" + if attempt < self.MAX_RETRIES: + logger.info(f"Retrying phase {phase.id} (attempt {attempt + 1})") + continue + raise RuntimeError(f"Agent execution failed: {last_error}") + + result = task_result.output_data or {"content": ""} + + # Update phase status + phase.status = PhaseStatus.COMPLETED + phase.result = result + + # Write phase output to SharedWorkspace + output_key = f"{plan.id}/phase/{phase.id}/output" + await self._team.workspace.write( + output_key, + result.get("content", str(result)), + expert.config.name, + ) + + # Emit expert_result event + await self._broadcast_event( + "expert_result", + { + "expert_id": expert.config.name, + "expert_name": expert.config.name, + "expert_color": expert.config.color, + "content": result.get("content", str(result)), + "phase_id": phase.id, + }, + ) + + # Emit phase_completed event + result_summary = result.get("content", str(result)) + if isinstance(result_summary, str) and len(result_summary) > 200: + result_summary = result_summary[:200] + "..." + await self._broadcast_event( + "phase_completed", + { + "phase_id": phase.id, + "phase_name": phase.name, + "result_summary": result_summary, + }, + ) + + return result + + except Exception as e: + last_error = str(e) if attempt < self.MAX_RETRIES: - logger.info(f"Retrying subtask {subtask.id} (attempt {attempt + 1})") + logger.info(f"Retrying phase {phase.id} (attempt {attempt + 1})") continue - raise RuntimeError(f"Agent execution failed: {last_error}") + raise - result = task_result.output_data or {"content": ""} + finally: + # Clean up isolated agent if we created one + await self._cleanup_isolated_agent(phase) - subtask.status = SubTaskStatus.COMPLETED - subtask.result = result + # Should not reach here + phase.status = PhaseStatus.FAILED + raise RuntimeError(f"Phase {phase.id} ({phase.name}) failed: {last_error}") - # Emit expert_result event - await self._broadcast_event( - "expert_result", - { - "expert_id": expert.config.name, - "expert_name": expert.config.name, - "expert_color": expert.config.color, - "content": result.get("content", str(result)), - "subtask_id": subtask.id, - }, - ) + async def _get_isolated_agent(self, expert: Expert, phase: PlanPhase) -> Any: + """Get an isolated ConfigDrivenAgent instance for the phase. - return result + If AgentPool is available, creates a temporary agent with a unique name + for context isolation (KTD3). Otherwise, falls back to the expert's + existing agent. + """ + pool = self._team.pool + if pool is None: + # No pool available (e.g., in tests), use expert's existing agent + return expert.agent + # Create a temporary config with unique name for this phase + temp_config = copy.deepcopy(expert.config) + temp_config.name = f"{expert.config.name}__phase_{phase.id[:8]}" + + try: + agent = await pool.create_agent(temp_config) + # Track for cleanup + self._temp_agents[phase.id] = temp_config.name + return agent + except Exception as e: + logger.warning( + f"Failed to create isolated agent for phase {phase.id}, " + f"using expert's existing agent: {e}" + ) + return expert.agent + + async def _cleanup_isolated_agent(self, phase: PlanPhase) -> None: + """Clean up the temporary isolated agent if one was created.""" + pool = self._team.pool + if pool is None: + return + + temp_name = self._temp_agents.pop(phase.id, None) + if temp_name: + try: + await pool.remove_agent(temp_name) except Exception as e: - last_error = str(e) - if attempt < self.MAX_RETRIES: - logger.info(f"Retrying subtask {subtask.id} (attempt {attempt + 1})") - continue - raise + logger.warning(f"Failed to clean up isolated agent '{temp_name}': {e}") - # Should not reach here, but just in case - subtask.status = SubTaskStatus.FAILED - raise RuntimeError(f"Subtask {subtask.id} failed: {last_error}") + def _mark_dependents_failed( + self, failed_phase_id: str, plan: TeamPlan, phase_results: dict[str, dict[str, Any]] + ) -> None: + """Mark all phases that depend on the failed phase as FAILED.""" + for ph in plan.phases: + if ph.status != PhaseStatus.PENDING: + continue + if failed_phase_id in ph.depends_on: + ph.status = PhaseStatus.FAILED + ph.result = {"error": f"Dependency phase '{failed_phase_id}' failed"} + phase_results[ph.id] = {"error": f"Dependency '{failed_phase_id}' failed"} + # Recursively mark their dependents + self._mark_dependents_failed(ph.id, plan, phase_results) async def _synthesize_results( - self, lead: Expert, task: str, completed_subtasks: list[SubTask] + self, lead: Expert, task: str, completed_phases: list[PlanPhase] ) -> dict[str, Any]: """Lead Expert synthesizes results using BEST strategy. - The Lead Expert evaluates all completed subtask results and produces + The Lead Expert evaluates all completed phase results and produces a final synthesized result. Uses LLM when available, otherwise concatenates results. """ - results = [st.result or {} for st in completed_subtasks] + results = [ph.result or {} for ph in completed_phases] if not results: return {"content": ""} @@ -363,7 +549,7 @@ class TeamOrchestrator: return { "content": content, "strategy": "best", - "subtasks_completed": 1, + "phases_completed": 1, } gateway = self._get_llm_gateway(lead) @@ -375,22 +561,22 @@ class TeamOrchestrator: return { "content": combined, "strategy": "best", - "subtasks_completed": len(results), + "phases_completed": len(results), } # Build result summaries for LLM evaluation summaries = [] - for i, st in enumerate(completed_subtasks): - r = st.result or {} + for i, ph in enumerate(completed_phases): + r = ph.result or {} content = r.get("content", str(r)) if isinstance(r, dict) else str(r) summaries.append( - f"Subtask {i + 1} (by {st.assigned_expert}, task: {st.description[:100]}):\n" + f"Phase {i + 1}: {ph.name} (by {ph.assigned_expert}, task: {ph.task_description[:100]}):\n" f"{content[:500]}" ) prompt = ( f"Original task: {task}\n\n" - f"Below are {len(results)} subtask results from your team members. " + f"Below are {len(results)} phase results from your team members. " f"Synthesize them into a single comprehensive final result that " f"best addresses the original task.\n\n" + "\n---\n".join(summaries) @@ -400,12 +586,12 @@ class TeamOrchestrator: try: response = await gateway.chat( messages=[{"role": "user", "content": prompt}], - model="default", + model=self._get_model(lead), ) return { "content": response.content.strip(), "strategy": "best", - "subtasks_completed": len(results), + "phases_completed": len(results), } except Exception as e: logger.warning(f"LLM synthesis failed, falling back to concatenation: {e}") @@ -415,16 +601,16 @@ class TeamOrchestrator: return { "content": combined, "strategy": "best", - "subtasks_completed": len(results), + "phases_completed": len(results), } async def _fallback_to_single_agent( self, task: str, plan: TeamPlan, - subtask_results: dict[str, dict[str, Any]], + phase_results: dict[str, dict[str, Any]], ) -> dict[str, Any]: - """Fallback to single agent mode when hub-and-spoke execution fails. + """Fallback to single agent mode when pipeline execution fails. Uses the lead expert (or first active expert) to complete the original task. """ @@ -446,7 +632,7 @@ class TeamOrchestrator: priority=0, input_data={ "task": task, - "subtask_results": subtask_results, + "phase_results": phase_results, "team_id": self._team.team_id, }, callback_url=None, @@ -465,10 +651,23 @@ class TeamOrchestrator: return { "status": "fallback", "result": fallback_result, - "subtask_results": subtask_results, + "phase_results": phase_results, "plan": plan, } + def _get_model(self, expert: Expert | None = None) -> str: + """Get LLM model name from expert config. + + Reads expert.config.llm (dict[str, Any] | None) and returns the model + name. Falls back to "default" if not configured. + + V4 verified: ExpertConfig.llm is dict[str, Any] | None. + """ + target = expert or self._team.lead_expert + if target and target.config.llm: + return target.config.llm.get("model", "default") + return "default" + def _get_llm_gateway(self, expert: Expert | None = None) -> Any: """Get LLM gateway from the given expert or the lead expert's agent. @@ -492,7 +691,8 @@ class TeamOrchestrator: Events are emitted via handoff_transport for WebSocket relay. Supported event types: team_formed, expert_step, expert_result, - plan_update, team_synthesis, team_dissolved. + plan_update, phase_started, phase_completed, phase_failed, + team_synthesis, team_dissolved. """ if self._team.handoff_transport: try: diff --git a/src/agentkit/experts/team.py b/src/agentkit/experts/team.py index 63cab9d..669ff6c 100644 --- a/src/agentkit/experts/team.py +++ b/src/agentkit/experts/team.py @@ -29,9 +29,17 @@ logger = logging.getLogger(__name__) class TeamStatus(str, enum.Enum): - """ExpertTeam lifecycle states.""" + """ExpertTeam lifecycle states. + + 流水线模式生命周期: + FORMING → PLANNING → EXECUTING → SYNTHESIZING → COMPLETED → DISSOLVED + + PLANNING 状态在 Lead Expert 分解任务为阶段时设置(KTD6), + 与前端 IExpertTeamState.status 的 'planning' 值对齐。 + """ FORMING = "forming" + PLANNING = "planning" EXECUTING = "executing" SYNTHESIZING = "synthesizing" COMPLETED = "completed" @@ -108,6 +116,15 @@ class ExpertTeam: """Public read access to the team's communication channel.""" return self._team_channel + @property + def pool(self) -> AgentPool | None: + """Public read access to the team's AgentPool. + + Used by TeamOrchestrator to create independent ConfigDrivenAgent + instances for context isolation in pipeline mode (KTD3). + """ + return self._pool + def get_expert(self, name: str) -> Expert | None: """Get an expert by name. Returns None if not found.""" return self._experts.get(name) @@ -149,7 +166,8 @@ class ExpertTeam: for config in member_configs: await self._add_expert_internal(config, team_context) - self._status = TeamStatus.EXECUTING + # KTD6: 设置 PLANNING 状态(Lead Expert 即将分解任务为阶段) + self._status = TeamStatus.PLANNING async def add_expert(self, config_or_template: ExpertConfig | str) -> Expert: """Add an Expert to the team dynamically. diff --git a/tests/unit/experts/test_team.py b/tests/unit/experts/test_team.py index 05220ee..7eb3716 100644 --- a/tests/unit/experts/test_team.py +++ b/tests/unit/experts/test_team.py @@ -146,7 +146,7 @@ class TestExpertTeamCreateTeam: assert team._lead_expert_name == "lead" assert team.lead_expert is mock_expert - assert team.status == TeamStatus.EXECUTING + assert team.status == TeamStatus.PLANNING assert mock_expert.team_id == team.team_id @pytest.mark.asyncio @@ -168,7 +168,7 @@ class TestExpertTeamCreateTeam: assert len(team.experts) == 2 assert team._lead_expert_name == "lead" - assert team.status == TeamStatus.EXECUTING + assert team.status == TeamStatus.PLANNING @pytest.mark.asyncio async def test_create_team_without_pool_raises(self): diff --git a/tests/unit/experts/test_team_orchestrator.py b/tests/unit/experts/test_team_orchestrator.py index 897fe2c..feecf70 100644 --- a/tests/unit/experts/test_team_orchestrator.py +++ b/tests/unit/experts/test_team_orchestrator.py @@ -1,7 +1,23 @@ -"""TeamOrchestrator 单元测试 (hub-and-spoke 模式)""" +"""TeamOrchestrator 单元测试 (流水线模式) + +测试覆盖: +- 流水线执行(阶段依赖、拓扑排序) +- 并行阶段执行 +- 上下文隔离(独立 Agent 实例) +- SharedWorkspace 数据传递 +- 阶段失败与依赖失败传播 +- 全失败 fallback +- 事件广播(phase_started/phase_completed/phase_failed) +- 模型路由(_get_model) +- LLM 分解失败容错 +- 循环依赖检测 +- 无效专家引用 fallback +- TeamStatus.PLANNING 状态流转 +""" from __future__ import annotations +import json from unittest.mock import AsyncMock, MagicMock, patch import pytest @@ -11,7 +27,7 @@ from agentkit.core.protocol import TaskResult, TaskStatus from agentkit.experts.config import ExpertConfig from agentkit.experts.expert import Expert from agentkit.experts.orchestrator import TeamOrchestrator -from agentkit.experts.plan import PlanStatus, SubTask, SubTaskStatus +from agentkit.experts.plan import PhaseStatus, PlanPhase, PlanStatus from agentkit.experts.team import ExpertTeam, TeamStatus @@ -21,6 +37,7 @@ from agentkit.experts.team import ExpertTeam, TeamStatus def _make_expert_config( name: str = "test_expert", is_lead: bool = False, + llm: dict | None = None, ) -> ExpertConfig: """创建测试用 ExpertConfig""" return ExpertConfig( @@ -32,6 +49,7 @@ def _make_expert_config( is_lead=is_lead, task_mode="llm_generate", prompt={"identity": "测试"}, + llm=llm, ) @@ -39,9 +57,10 @@ def _make_mock_expert( name: str = "test_expert", is_lead: bool = False, is_active: bool = True, + llm: dict | None = None, ) -> MagicMock: """创建 mock Expert""" - config = _make_expert_config(name=name, is_lead=is_lead) + config = _make_expert_config(name=name, is_lead=is_lead, llm=llm) expert = MagicMock(spec=Expert) expert.config = config expert.is_active = is_active @@ -64,7 +83,7 @@ def _make_mock_expert( started_at=None, completed_at=None, )) - # No LLM gateway by default (tests single-subtask path) + # No LLM gateway by default (tests single-phase path) mock_agent._llm_gateway = None expert.agent = mock_agent return expert @@ -73,11 +92,14 @@ def _make_mock_expert( def _make_team_with_experts( expert_names: list[str] | None = None, lead_name: str = "lead", + pool: MagicMock | None = None, ) -> ExpertTeam: """创建包含 mock experts 的 ExpertTeam""" team = ExpertTeam() transport = AsyncMock(spec=InProcessHandoffTransport) team._handoff_transport = transport + if pool is not None: + team._pool = pool if expert_names is None: expert_names = [lead_name, "member1", "member2"] @@ -92,38 +114,57 @@ def _make_team_with_experts( return team -def _make_mock_llm_gateway(subtask_descriptions: list[str] | None = None) -> MagicMock: +def _make_mock_llm_gateway( + phases: list[dict] | None = None, + synthesis_content: str = "综合结果", +) -> MagicMock: """创建 mock LLM gateway. - If subtask_descriptions is provided, the gateway returns a JSON array - of subtasks for decomposition. Otherwise returns a simple response. + If phases is provided, the gateway returns a JSON array of phases for + decomposition. Otherwise returns a simple response for synthesis. """ gateway = AsyncMock() - if subtask_descriptions: - import json - subtasks_json = json.dumps([ - {"description": desc, "assigned_expert": "member1"} - for desc in subtask_descriptions - ]) - response = MagicMock() - response.content = subtasks_json - gateway.chat = AsyncMock(return_value=response) + if phases: + phases_json = json.dumps(phases) + decomp_response = MagicMock() + decomp_response.content = phases_json + synth_response = MagicMock() + synth_response.content = synthesis_content + gateway.chat = AsyncMock(side_effect=[decomp_response, synth_response]) else: response = MagicMock() - response.content = "Synthesized result" + response.content = synthesis_content gateway.chat = AsyncMock(return_value=response) return gateway -# ── Hub-and-spoke 执行测试 ──────────────────────────────── +def _make_mock_pool() -> MagicMock: + """创建 mock AgentPool,模拟上下文隔离的 agent 创建""" + pool = MagicMock() + pool.create_agent = AsyncMock(side_effect=lambda config: MagicMock( + execute=AsyncMock(return_value=TaskResult( + task_id="test", + agent_name=config.name, + status=TaskStatus.COMPLETED.value, + output_data={"content": f"Isolated result from {config.name}"}, + error_message=None, + started_at=None, + completed_at=None, + )) + )) + pool.remove_agent = AsyncMock() + return pool -class TestHubAndSpokeExecution: - """Hub-and-spoke 模式执行测试""" +# ── 流水线执行测试 ──────────────────────────────────────── + + +class TestPipelineExecution: + """流水线模式执行测试""" @pytest.mark.asyncio - async def test_execute_single_subtask_completes(self): - """无 LLM 时,任务作为单个子任务执行完成""" + async def test_execute_single_phase_completes(self): + """无 LLM 时,任务作为单个阶段执行完成""" team = _make_team_with_experts() orchestrator = TeamOrchestrator(team) @@ -131,13 +172,13 @@ class TestHubAndSpokeExecution: assert result["status"] == "completed" assert "result" in result - assert "subtask_results" in result + assert "phase_results" in result assert "plan" in result assert team.status == TeamStatus.COMPLETED @pytest.mark.asyncio - async def test_execute_sets_team_status(self): - """执行时设置 team 状态为 EXECUTING → SYNTHESIZING → COMPLETED""" + async def test_execute_sets_team_status_completed(self): + """执行完成后设置 team 状态为 COMPLETED""" team = _make_team_with_experts() orchestrator = TeamOrchestrator(team) @@ -146,7 +187,57 @@ class TestHubAndSpokeExecution: assert team.status == TeamStatus.COMPLETED @pytest.mark.asyncio - async def test_execute_emits_team_formed_event(self): + async def test_pipeline_sequential_execution(self): + """3 阶段(A→B→C)按序执行""" + team = _make_team_with_experts() + orchestrator = TeamOrchestrator(team) + + # LLM 分解为 3 个串行阶段 + gateway = _make_mock_llm_gateway( + phases=[ + {"name": "A", "assigned_expert": "lead", "task_description": "阶段A", "depends_on": []}, + {"name": "B", "assigned_expert": "member1", "task_description": "阶段B", "depends_on": ["A"]}, + {"name": "C", "assigned_expert": "member2", "task_description": "阶段C", "depends_on": ["B"]}, + ] + ) + team._experts["lead"].agent._llm_gateway = gateway + + result = await orchestrator.execute("串行任务") + + assert result["status"] == "completed" + plan = result["plan"] + assert len(plan.phases) == 3 + # All phases should be completed + for ph in plan.phases: + assert ph.status == PhaseStatus.COMPLETED + + @pytest.mark.asyncio + async def test_pipeline_parallel_phases(self): + """2 个无依赖阶段并行执行""" + team = _make_team_with_experts() + orchestrator = TeamOrchestrator(team) + + gateway = _make_mock_llm_gateway( + phases=[ + {"name": "A", "assigned_expert": "lead", "task_description": "阶段A", "depends_on": []}, + {"name": "B", "assigned_expert": "member1", "task_description": "阶段B", "depends_on": []}, + {"name": "C", "assigned_expert": "member2", "task_description": "阶段C", "depends_on": ["A", "B"]}, + ] + ) + team._experts["lead"].agent._llm_gateway = gateway + + result = await orchestrator.execute("并行任务") + + assert result["status"] == "completed" + plan = result["plan"] + assert len(plan.phases) == 3 + # A and B should be in the same layer (parallel) + layers = plan.topological_sort() + assert len(layers) == 2 # [A, B], [C] + assert len(layers[0]) == 2 # A and B parallel + + @pytest.mark.asyncio + async def test_pipeline_emits_team_formed_event(self): """执行时广播 team_formed 事件""" team = _make_team_with_experts() orchestrator = TeamOrchestrator(team) @@ -158,7 +249,52 @@ class TestHubAndSpokeExecution: assert "team_formed" in event_types @pytest.mark.asyncio - async def test_execute_emits_expert_step_and_result_events(self): + async def test_pipeline_emits_plan_update_with_phases(self): + """执行时广播 plan_update 事件(包含阶段列表)""" + team = _make_team_with_experts() + orchestrator = TeamOrchestrator(team) + + await orchestrator.execute("测试任务") + + calls = team._handoff_transport.send.call_args_list + plan_updates = [c for c in calls if c[0][1].get("type") == "plan_update"] + assert len(plan_updates) >= 1 + assert "phases" in plan_updates[0][0][1] + + @pytest.mark.asyncio + async def test_pipeline_emits_team_synthesis_event(self): + """执行完成时广播 team_synthesis 事件""" + team = _make_team_with_experts() + orchestrator = TeamOrchestrator(team) + + await orchestrator.execute("测试任务") + + calls = team._handoff_transport.send.call_args_list + event_types = [c[0][1]["type"] for c in calls] + assert "team_synthesis" in event_types + + +# ── 阶段事件广播测试 ────────────────────────────────────── + + +class TestPhaseEvents: + """阶段事件广播测试""" + + @pytest.mark.asyncio + async def test_emits_phase_started_and_completed(self): + """执行时广播 phase_started 和 phase_completed 事件""" + team = _make_team_with_experts() + orchestrator = TeamOrchestrator(team) + + await orchestrator.execute("测试任务") + + calls = team._handoff_transport.send.call_args_list + event_types = [c[0][1]["type"] for c in calls] + assert "phase_started" in event_types + assert "phase_completed" in event_types + + @pytest.mark.asyncio + async def test_emits_expert_step_and_result(self): """执行时广播 expert_step 和 expert_result 事件""" team = _make_team_with_experts() orchestrator = TeamOrchestrator(team) @@ -171,29 +307,46 @@ class TestHubAndSpokeExecution: assert "expert_result" in event_types @pytest.mark.asyncio - async def test_execute_emits_team_synthesis_event(self): - """执行完成时广播 team_synthesis 事件""" + async def test_phase_started_contains_depends_on(self): + """phase_started 事件包含 depends_on 字段""" team = _make_team_with_experts() orchestrator = TeamOrchestrator(team) - await orchestrator.execute("测试任务") + gateway = _make_mock_llm_gateway( + phases=[ + {"name": "A", "assigned_expert": "lead", "task_description": "阶段A", "depends_on": []}, + {"name": "B", "assigned_expert": "member1", "task_description": "阶段B", "depends_on": ["A"]}, + ] + ) + team._experts["lead"].agent._llm_gateway = gateway + + await orchestrator.execute("依赖任务") calls = team._handoff_transport.send.call_args_list - event_types = [c[0][1]["type"] for c in calls] - assert "team_synthesis" in event_types + phase_started_events = [c[0][1] for c in calls if c[0][1].get("type") == "phase_started"] + # Should have 2 phase_started events (A and B) + assert len(phase_started_events) == 2 + # B's phase_started should have depends_on with A's id + phase_b_event = next(e for e in phase_started_events if e["phase_name"] == "B") + assert len(phase_b_event["depends_on"]) == 1 @pytest.mark.asyncio - async def test_execute_emits_plan_update_event(self): - """执行时广播 plan_update 事件(包含子任务列表)""" + async def test_phase_failed_event_on_failure(self): + """阶段失败时广播 phase_failed 事件""" team = _make_team_with_experts() orchestrator = TeamOrchestrator(team) - await orchestrator.execute("测试任务") + # Make all agents fail to trigger phase_failed + for expert in team._experts.values(): + expert.agent.execute = AsyncMock(side_effect=RuntimeError("Execution failed")) - calls = team._handoff_transport.send.call_args_list - plan_updates = [c for c in calls if c[0][1].get("type") == "plan_update"] - assert len(plan_updates) >= 1 - assert "subtasks" in plan_updates[0][0][1] + result = await orchestrator.execute("失败任务") + + # Should fallback since all phases failed + assert result["status"] == "fallback" + # phase_failed should be emitted (or phase status is FAILED in plan) + plan = result["plan"] + assert all(ph.status == PhaseStatus.FAILED for ph in plan.phases) # ── LLM 任务分解测试 ────────────────────────────────────── @@ -203,100 +356,138 @@ class TestTaskDecomposition: """LLM 任务分解测试""" @pytest.mark.asyncio - async def test_llm_decomposes_task_into_subtasks(self): - """LLM 将任务分解为多个子任务""" + async def test_llm_decomposes_task_into_phases(self): + """LLM 将任务分解为多个阶段""" team = _make_team_with_experts() orchestrator = TeamOrchestrator(team) - # Set up LLM gateway on lead expert for decomposition gateway = _make_mock_llm_gateway( - subtask_descriptions=["分析数据", "生成报告", "审核结果"] + phases=[ + {"name": "规划", "assigned_expert": "lead", "task_description": "设计架构", "depends_on": []}, + {"name": "前端", "assigned_expert": "member1", "task_description": "实现UI", "depends_on": ["规划"]}, + {"name": "后端", "assigned_expert": "member2", "task_description": "实现API", "depends_on": ["规划"]}, + ] ) team._experts["lead"].agent._llm_gateway = gateway - result = await orchestrator.execute("分析并报告数据") + result = await orchestrator.execute("开发功能") assert result["status"] == "completed" plan = result["plan"] - assert len(plan.subtasks) == 3 - # Each subtask should have been executed - assert len(result["subtask_results"]) == 3 + assert len(plan.phases) == 3 + assert plan.phases[0].name == "规划" + assert plan.phases[1].name == "前端" + assert plan.phases[2].name == "后端" @pytest.mark.asyncio - async def test_decomposition_fallback_to_single_subtask(self): - """LLM 不可用时回退到单个子任务""" + async def test_decomposition_fallback_to_single_phase(self): + """LLM 不可用时回退到单个阶段""" team = _make_team_with_experts() orchestrator = TeamOrchestrator(team) - # No LLM gateway — should fall back to single subtask + # No LLM gateway — should fall back to single phase result = await orchestrator.execute("测试任务") assert result["status"] == "completed" plan = result["plan"] - assert len(plan.subtasks) == 1 + assert len(plan.phases) == 1 @pytest.mark.asyncio - async def test_parse_subtasks_valid_json(self): - """_parse_subtasks 正确解析 JSON 数组""" - import json + async def test_llm_decomposition_invalid_json_falls_back(self): + """LLM 返回无效 JSON 时回退到单阶段""" + team = _make_team_with_experts() + orchestrator = TeamOrchestrator(team) + + gateway = AsyncMock() + bad_response = MagicMock() + bad_response.content = "这不是JSON" + synth_response = MagicMock() + synth_response.content = "综合结果" + gateway.chat = AsyncMock(side_effect=[bad_response, synth_response]) + team._experts["lead"].agent._llm_gateway = gateway + + result = await orchestrator.execute("测试任务") + + assert result["status"] == "completed" + plan = result["plan"] + assert len(plan.phases) == 1 + + @pytest.mark.asyncio + async def test_parse_phases_valid_json(self): + """_parse_phases 正确解析 JSON 数组""" content = json.dumps([ - {"description": "任务1", "assigned_expert": "member1"}, - {"description": "任务2", "assigned_expert": "member2"}, + {"name": "A", "assigned_expert": "member1", "task_description": "任务A", "depends_on": []}, + {"name": "B", "assigned_expert": "member2", "task_description": "任务B", "depends_on": ["A"]}, ]) - subtasks = TeamOrchestrator._parse_subtasks( - content, ["member1", "member2"], "lead" - ) - assert len(subtasks) == 2 - assert subtasks[0].description == "任务1" - assert subtasks[0].assigned_expert == "member1" - assert subtasks[1].description == "任务2" - assert subtasks[1].assigned_expert == "member2" + phases = TeamOrchestrator._parse_phases(content, ["member1", "member2"], "lead") + assert len(phases) == 2 + assert phases[0].name == "A" + assert phases[0].assigned_expert == "member1" + assert phases[1].name == "B" + assert phases[1].assigned_expert == "member2" + # B depends on A + assert len(phases[1].depends_on) == 1 + assert phases[1].depends_on[0] == phases[0].id @pytest.mark.asyncio - async def test_parse_subtasks_invalid_expert_falls_back_to_lead(self): - """_parse_subtasks 对无效专家名回退到 lead""" - import json + async def test_parse_phases_invalid_expert_falls_back_to_lead(self): + """_parse_phases 对无效专家名回退到 lead""" content = json.dumps([ - {"description": "任务1", "assigned_expert": "nonexistent"}, + {"name": "A", "assigned_expert": "nonexistent", "task_description": "任务A", "depends_on": []}, ]) - subtasks = TeamOrchestrator._parse_subtasks( - content, ["member1"], "lead" - ) - assert len(subtasks) == 1 - assert subtasks[0].assigned_expert == "lead" + phases = TeamOrchestrator._parse_phases(content, ["member1"], "lead") + assert len(phases) == 1 + assert phases[0].assigned_expert == "lead" @pytest.mark.asyncio - async def test_parse_subtasks_invalid_json_returns_empty(self): - """_parse_subtasks 对无效 JSON 返回空列表""" - subtasks = TeamOrchestrator._parse_subtasks( - "not json at all", ["member1"], "lead" - ) - assert subtasks == [] + async def test_parse_phases_invalid_json_returns_empty(self): + """_parse_phases 对无效 JSON 返回空列表""" + phases = TeamOrchestrator._parse_phases("not json at all", ["member1"], "lead") + assert phases == [] @pytest.mark.asyncio - async def test_parse_subtasks_empty_description_skipped(self): - """_parse_subtasks 跳过空描述的子任务""" - import json + async def test_parse_phases_empty_name_skipped(self): + """_parse_phases 跳过空名称的阶段""" content = json.dumps([ - {"description": "", "assigned_expert": "member1"}, - {"description": "有效任务", "assigned_expert": "member1"}, + {"name": "", "assigned_expert": "member1", "task_description": "任务A", "depends_on": []}, + {"name": "B", "assigned_expert": "member1", "task_description": "任务B", "depends_on": []}, ]) - subtasks = TeamOrchestrator._parse_subtasks( - content, ["member1"], "lead" - ) - assert len(subtasks) == 1 - assert subtasks[0].description == "有效任务" - - -# ── 子任务执行测试 ──────────────────────────────────────── - - -class TestSubtaskExecution: - """子任务执行测试""" + phases = TeamOrchestrator._parse_phases(content, ["member1"], "lead") + assert len(phases) == 1 + assert phases[0].name == "B" @pytest.mark.asyncio - async def test_subtask_execution_calls_agent_execute(self): - """子任务执行调用 agent.execute()""" + async def test_parse_phases_resolves_depends_on_by_name(self): + """_parse_phases 通过名称解析 depends_on 为 ID""" + content = json.dumps([ + {"name": "规划", "assigned_expert": "lead", "task_description": "设计", "depends_on": []}, + {"name": "实现", "assigned_expert": "member1", "task_description": "编码", "depends_on": ["规划"]}, + ]) + phases = TeamOrchestrator._parse_phases(content, ["lead", "member1"], "lead") + assert len(phases) == 2 + # 实现 depends on 规划 + assert phases[1].depends_on == [phases[0].id] + + @pytest.mark.asyncio + async def test_parse_phases_unknown_dependency_ignored(self): + """_parse_phases 对未知依赖名称忽略""" + content = json.dumps([ + {"name": "A", "assigned_expert": "lead", "task_description": "任务A", "depends_on": ["unknown_phase"]}, + ]) + phases = TeamOrchestrator._parse_phases(content, ["lead"], "lead") + assert len(phases) == 1 + assert len(phases[0].depends_on) == 0 # unknown dependency ignored + + +# ── 阶段执行测试 ────────────────────────────────────────── + + +class TestPhaseExecution: + """阶段执行测试""" + + @pytest.mark.asyncio + async def test_phase_execution_calls_agent_execute(self): + """阶段执行调用 agent.execute()""" team = _make_team_with_experts() orchestrator = TeamOrchestrator(team) @@ -306,141 +497,142 @@ class TestSubtaskExecution: team._experts["lead"].agent.execute.assert_awaited() @pytest.mark.asyncio - async def test_subtask_marks_completed(self): - """子任务执行后状态标记为 COMPLETED""" + async def test_phase_marks_completed(self): + """阶段执行后状态标记为 COMPLETED""" team = _make_team_with_experts() orchestrator = TeamOrchestrator(team) result = await orchestrator.execute("测试任务") plan = result["plan"] - for st in plan.subtasks: - assert st.status == SubTaskStatus.COMPLETED - assert st.result is not None + for ph in plan.phases: + assert ph.status == PhaseStatus.COMPLETED + assert ph.result is not None @pytest.mark.asyncio - async def test_subtask_with_invalid_expert_falls_back_to_lead(self): - """子任务分配的专家不可用时回退到 lead""" + async def test_phase_with_invalid_expert_falls_back_to_lead(self): + """阶段分配的专家不可用时回退到 lead""" team = _make_team_with_experts() orchestrator = TeamOrchestrator(team) - # Set up LLM to assign to a nonexistent expert - import json - gateway = _make_mock_llm_gateway(subtask_descriptions=["任务1"]) - gateway.chat = AsyncMock(return_value=MagicMock( - content=json.dumps([ - {"description": "任务1", "assigned_expert": "nonexistent"} - ]) - )) - team._experts["lead"].agent._llm_gateway = gateway - - result = await orchestrator.execute("测试任务") - - assert result["status"] == "completed" - # The subtask should have been reassigned to lead - plan = result["plan"] - assert plan.subtasks[0].assigned_expert == "lead" - - -# ── 结果综合测试 ────────────────────────────────────────── - - -class TestResultSynthesis: - """结果综合测试""" - - @pytest.mark.asyncio - async def test_synthesize_single_result(self): - """单个子任务结果直接返回""" - team = _make_team_with_experts() - orchestrator = TeamOrchestrator(team) - - result = await orchestrator.execute("测试任务") - - assert result["status"] == "completed" - final = result["result"] - assert "content" in final - assert final["strategy"] == "best" - assert final["subtasks_completed"] == 1 - - @pytest.mark.asyncio - async def test_synthesize_multiple_results_with_llm(self): - """多个子任务结果通过 LLM 综合""" - team = _make_team_with_experts() - orchestrator = TeamOrchestrator(team) - - # Set up LLM for both decomposition and synthesis - import json - gateway = AsyncMock() - - # First call: decomposition - decomp_response = MagicMock() - decomp_response.content = json.dumps([ - {"description": "子任务1", "assigned_expert": "member1"}, - {"description": "子任务2", "assigned_expert": "member2"}, - ]) - - # Second call: synthesis - synth_response = MagicMock() - synth_response.content = "综合结果" - - gateway.chat = AsyncMock(side_effect=[decomp_response, synth_response]) - team._experts["lead"].agent._llm_gateway = gateway - - result = await orchestrator.execute("复杂任务") - - assert result["status"] == "completed" - final = result["result"] - assert final["content"] == "综合结果" - assert final["strategy"] == "best" - assert final["subtasks_completed"] == 2 - - @pytest.mark.asyncio - async def test_synthesize_without_llm_concatenates(self): - """无 LLM 时拼接所有结果""" - team = _make_team_with_experts() - orchestrator = TeamOrchestrator(team) - - # Set up LLM for decomposition only (no synthesis LLM) - import json - gateway = AsyncMock() - decomp_response = MagicMock() - decomp_response.content = json.dumps([ - {"description": "子任务1", "assigned_expert": "member1"}, - {"description": "子任务2", "assigned_expert": "member2"}, - ]) - # Synthesis call raises to force concatenation fallback - gateway.chat = AsyncMock( - side_effect=[decomp_response, RuntimeError("LLM unavailable")] + gateway = _make_mock_llm_gateway( + phases=[ + {"name": "A", "assigned_expert": "nonexistent", "task_description": "任务A", "depends_on": []}, + ] ) team._experts["lead"].agent._llm_gateway = gateway - result = await orchestrator.execute("复杂任务") + result = await orchestrator.execute("测试任务") assert result["status"] == "completed" - final = result["result"] - assert "content" in final - # Should contain both results concatenated - assert "Result from member1" in final["content"] - assert "Result from member2" in final["content"] - - -# ── 回退测试 ────────────────────────────────────────────── - - -class TestFallback: - """回退到单 Agent 模式测试""" + plan = result["plan"] + # The phase should have been reassigned to lead + assert plan.phases[0].assigned_expert == "lead" @pytest.mark.asyncio - async def test_all_subtasks_fail_triggers_fallback(self): - """所有子任务失败时触发回退""" + async def test_shared_workspace_passing(self): + """阶段 A 的输出写入 SharedWorkspace,阶段 B 能读取""" team = _make_team_with_experts() orchestrator = TeamOrchestrator(team) - # Make agent.execute raise for all subtasks + gateway = _make_mock_llm_gateway( + phases=[ + {"name": "A", "assigned_expert": "lead", "task_description": "阶段A", "depends_on": []}, + {"name": "B", "assigned_expert": "member1", "task_description": "阶段B", "depends_on": ["A"]}, + ] + ) + team._experts["lead"].agent._llm_gateway = gateway + + result = await orchestrator.execute("依赖任务") + + assert result["status"] == "completed" + # Verify workspace.write was called for each phase + workspace = team.workspace + # Check that phase outputs were written + plan = result["plan"] + for ph in plan.phases: + key = f"{plan.id}/phase/{ph.id}/output" + data = await workspace.read(key) + assert data is not None + assert "value" in data + + @pytest.mark.asyncio + async def test_context_isolation_with_pool(self): + """使用 AgentPool 时创建独立 agent 实例(上下文隔离)""" + pool = _make_mock_pool() + team = _make_team_with_experts(pool=pool) + orchestrator = TeamOrchestrator(team) + + await orchestrator.execute("测试任务") + + # Pool.create_agent should have been called for context isolation + pool.create_agent.assert_awaited() + # Pool.remove_agent should have been called for cleanup + pool.remove_agent.assert_awaited() + + @pytest.mark.asyncio + async def test_context_isolation_fallback_without_pool(self): + """无 AgentPool 时使用 expert 的现有 agent""" + team = _make_team_with_experts() # No pool + orchestrator = TeamOrchestrator(team) + + result = await orchestrator.execute("测试任务") + + # Should still complete successfully using expert's existing agent + assert result["status"] == "completed" + team._experts["lead"].agent.execute.assert_awaited() + + +# ── 阶段失败与依赖传播测试 ──────────────────────────────── + + +class TestPhaseFailure: + """阶段失败与依赖传播测试""" + + @pytest.mark.asyncio + async def test_phase_failure_marks_dependents_failed(self): + """阶段 B 失败时,依赖 B 的阶段 C 标记为 FAILED""" + team = _make_team_with_experts() + orchestrator = TeamOrchestrator(team) + + gateway = _make_mock_llm_gateway( + phases=[ + {"name": "A", "assigned_expert": "lead", "task_description": "阶段A", "depends_on": []}, + {"name": "B", "assigned_expert": "member1", "task_description": "阶段B", "depends_on": ["A"]}, + {"name": "C", "assigned_expert": "member2", "task_description": "阶段C", "depends_on": ["B"]}, + ] + ) + team._experts["lead"].agent._llm_gateway = gateway + + # Make member1's agent fail (phase B) + team._experts["member1"].agent.execute = AsyncMock(side_effect=RuntimeError("B failed")) + + result = await orchestrator.execute("失败传播任务") + + # Should still complete (phase A succeeded) + assert result["status"] == "completed" + plan = result["plan"] + # Phase A should be completed + phase_a = next(ph for ph in plan.phases if ph.name == "A") + assert phase_a.status == PhaseStatus.COMPLETED + # Phase B should be failed + phase_b = next(ph for ph in plan.phases if ph.name == "B") + assert phase_b.status == PhaseStatus.FAILED + # Phase C should be failed (dependency B failed) + phase_c = next(ph for ph in plan.phases if ph.name == "C") + assert phase_c.status == PhaseStatus.FAILED + + @pytest.mark.asyncio + async def test_all_phases_fail_triggers_fallback(self): + """所有阶段失败时触发 fallback""" + team = _make_team_with_experts() + orchestrator = TeamOrchestrator(team) + + # Make all agents fail for expert in team._experts.values(): expert.agent.execute = AsyncMock(side_effect=RuntimeError("Execution failed")) - result = await orchestrator.execute("测试任务") + result = await orchestrator.execute("全失败任务") assert result["status"] == "fallback" assert result["plan"].status == PlanStatus.FALLBACK @@ -451,14 +643,13 @@ class TestFallback: team = _make_team_with_experts() orchestrator = TeamOrchestrator(team) - # Make agent.execute raise for subtasks but succeed for fallback call_count = 0 async def mock_execute(task_msg): nonlocal call_count call_count += 1 - if task_msg.task_type == "team_subtask": - raise RuntimeError("Subtask failed") + if task_msg.task_type == "team_phase": + raise RuntimeError("Phase failed") # Fallback succeeds return TaskResult( task_id=task_msg.task_id, @@ -481,7 +672,6 @@ class TestFallback: async def test_no_active_experts_returns_failed(self): """没有活跃专家时返回 failed""" team = _make_team_with_experts() - # Mark all experts as inactive for expert in team._experts.values(): expert.is_active = False orchestrator = TeamOrchestrator(team) @@ -492,11 +682,213 @@ class TestFallback: assert "error" in result -# ── 事件广播测试 ────────────────────────────────────────── +# ── 循环依赖检测测试 ────────────────────────────────────── + + +class TestCircularDependency: + """循环依赖检测测试""" + + @pytest.mark.asyncio + async def test_circular_dependency_triggers_fallback(self): + """A→B→A 的循环依赖触发 fallback""" + team = _make_team_with_experts() + orchestrator = TeamOrchestrator(team) + + # Manually create a plan with circular dependency + # We need to mock _decompose_task to return phases with circular dep + phase_a = PlanPhase(name="A", assigned_expert="lead", task_description="A") + phase_b = PlanPhase(name="B", assigned_expert="member1", task_description="B") + phase_a.depends_on = [phase_b.id] + phase_b.depends_on = [phase_a.id] + + with patch.object( + TeamOrchestrator, + "_decompose_task", + return_value=[phase_a, phase_b], + ): + result = await orchestrator.execute("循环依赖任务") + + # Should fallback due to ValueError from topological_sort + assert result["status"] == "fallback" + assert result["plan"].status == PlanStatus.FALLBACK + + +# ── 结果综合测试 ────────────────────────────────────────── + + +class TestResultSynthesis: + """结果综合测试""" + + @pytest.mark.asyncio + async def test_synthesize_single_result(self): + """单个阶段结果直接返回""" + team = _make_team_with_experts() + orchestrator = TeamOrchestrator(team) + + result = await orchestrator.execute("测试任务") + + assert result["status"] == "completed" + final = result["result"] + assert "content" in final + assert final["strategy"] == "best" + assert final["phases_completed"] == 1 + + @pytest.mark.asyncio + async def test_synthesize_multiple_results_with_llm(self): + """多个阶段结果通过 LLM 综合""" + team = _make_team_with_experts() + orchestrator = TeamOrchestrator(team) + + gateway = _make_mock_llm_gateway( + phases=[ + {"name": "A", "assigned_expert": "member1", "task_description": "阶段A", "depends_on": []}, + {"name": "B", "assigned_expert": "member2", "task_description": "阶段B", "depends_on": []}, + ], + synthesis_content="综合结果", + ) + team._experts["lead"].agent._llm_gateway = gateway + + result = await orchestrator.execute("复杂任务") + + assert result["status"] == "completed" + final = result["result"] + assert final["content"] == "综合结果" + assert final["strategy"] == "best" + assert final["phases_completed"] == 2 + + @pytest.mark.asyncio + async def test_synthesize_without_llm_concatenates(self): + """无 LLM 时拼接所有结果""" + team = _make_team_with_experts() + orchestrator = TeamOrchestrator(team) + + gateway = AsyncMock() + decomp_response = MagicMock() + decomp_response.content = json.dumps([ + {"name": "A", "assigned_expert": "member1", "task_description": "阶段A", "depends_on": []}, + {"name": "B", "assigned_expert": "member2", "task_description": "阶段B", "depends_on": []}, + ]) + # Synthesis call raises to force concatenation fallback + gateway.chat = AsyncMock( + side_effect=[decomp_response, RuntimeError("LLM unavailable")] + ) + team._experts["lead"].agent._llm_gateway = gateway + + result = await orchestrator.execute("复杂任务") + + assert result["status"] == "completed" + final = result["result"] + assert "content" in final + # Should contain both results concatenated + assert "Result from member1" in final["content"] + assert "Result from member2" in final["content"] + + +# ── 模型路由测试 ────────────────────────────────────────── + + +class TestModelRouting: + """模型路由测试(_get_model)""" + + def test_get_model_default_when_no_llm_config(self): + """无 llm 配置时返回 'default'""" + team = _make_team_with_experts() + orchestrator = TeamOrchestrator(team) + + model = orchestrator._get_model() + assert model == "default" + + def test_get_model_from_expert_config(self): + """从 expert.config.llm 读取模型名""" + team = _make_team_with_experts() + # Set llm config on lead expert + team._experts["lead"].config.llm = {"model": "gpt-4", "temperature": 0.7} + orchestrator = TeamOrchestrator(team) + + model = orchestrator._get_model() + assert model == "gpt-4" + + def test_get_model_falls_back_to_default_when_no_model_key(self): + """llm 配置中无 model 键时回退到 'default'""" + team = _make_team_with_experts() + team._experts["lead"].config.llm = {"temperature": 0.5} + orchestrator = TeamOrchestrator(team) + + model = orchestrator._get_model() + assert model == "default" + + def test_get_model_uses_specific_expert(self): + """_get_model 使用指定 expert 的配置""" + team = _make_team_with_experts() + team._experts["member1"].config.llm = {"model": "claude-3"} + orchestrator = TeamOrchestrator(team) + + model = orchestrator._get_model(team._experts["member1"]) + assert model == "claude-3" + + +# ── TeamStatus.PLANNING 状态流转测试 ────────────────────── + + +class TestTeamStatusPlanning: + """TeamStatus.PLANNING 状态流转测试(KTD6)""" + + def test_team_status_planning_exists(self): + """TeamStatus.PLANNING 枚举值存在""" + assert TeamStatus.PLANNING == "planning" + + @pytest.mark.asyncio + async def test_execute_transitions_through_planning(self): + """执行时经过 PLANNING 状态""" + team = _make_team_with_experts() + orchestrator = TeamOrchestrator(team) + + # Track status transitions + statuses_seen: list[str] = [] + + original_set_status = team.set_status + + def tracking_set_status(status: TeamStatus) -> None: + statuses_seen.append(status.value) + original_set_status(status) + + team.set_status = tracking_set_status + + await orchestrator.execute("测试任务") + + # Should have transitioned through PLANNING + assert "planning" in statuses_seen + # Should end at COMPLETED + assert statuses_seen[-1] == "completed" + + @pytest.mark.asyncio + async def test_execute_transitions_to_executing_after_planning(self): + """PLANNING 后转为 EXECUTING""" + team = _make_team_with_experts() + orchestrator = TeamOrchestrator(team) + + statuses_seen: list[str] = [] + original_set_status = team.set_status + + def tracking_set_status(status: TeamStatus) -> None: + statuses_seen.append(status.value) + original_set_status(status) + + team.set_status = tracking_set_status + + await orchestrator.execute("测试任务") + + # PLANNING should come before EXECUTING + planning_idx = statuses_seen.index("planning") + executing_idx = statuses_seen.index("executing") + assert planning_idx < executing_idx + + +# ── 事件广播基础设施测试 ────────────────────────────────── class TestBroadcastEvent: - """事件广播测试""" + """事件广播基础设施测试""" @pytest.mark.asyncio async def test_broadcast_event_sends_to_transport(self): @@ -562,79 +954,8 @@ class TestLLMGateway: """lead 没有 gateway 时从其他活跃专家获取""" team = _make_team_with_experts() gateway = MagicMock() - # Lead has no gateway, but member1 does team._experts["member1"].agent._llm_gateway = gateway orchestrator = TeamOrchestrator(team) result = orchestrator._get_llm_gateway() assert result is gateway - - -# ── 并行执行测试 ────────────────────────────────────────── - - -class TestParallelExecution: - """并行子任务执行测试""" - - @pytest.mark.asyncio - async def test_multiple_subtasks_execute_in_parallel(self): - """多个子任务并行执行""" - team = _make_team_with_experts() - orchestrator = TeamOrchestrator(team) - - # Set up LLM for decomposition - import json - gateway = AsyncMock() - decomp_response = MagicMock() - decomp_response.content = json.dumps([ - {"description": "子任务1", "assigned_expert": "member1"}, - {"description": "子任务2", "assigned_expert": "member2"}, - {"description": "子任务3", "assigned_expert": "lead"}, - ]) - synth_response = MagicMock() - synth_response.content = "综合结果" - gateway.chat = AsyncMock(side_effect=[decomp_response, synth_response]) - team._experts["lead"].agent._llm_gateway = gateway - - result = await orchestrator.execute("并行任务") - - assert result["status"] == "completed" - assert len(result["subtask_results"]) == 3 - # All subtasks should be completed - plan = result["plan"] - for st in plan.subtasks: - assert st.status == SubTaskStatus.COMPLETED - - @pytest.mark.asyncio - async def test_partial_failure_still_completes(self): - """部分子任务失败时仍能完成(只要有成功的)""" - team = _make_team_with_experts() - orchestrator = TeamOrchestrator(team) - - # Set up LLM for decomposition - import json - gateway = AsyncMock() - decomp_response = MagicMock() - decomp_response.content = json.dumps([ - {"description": "子任务1", "assigned_expert": "member1"}, - {"description": "子任务2", "assigned_expert": "member2"}, - ]) - synth_response = MagicMock() - synth_response.content = "综合结果" - gateway.chat = AsyncMock(side_effect=[decomp_response, synth_response]) - team._experts["lead"].agent._llm_gateway = gateway - - # Make member1's agent fail - team._experts["member1"].agent.execute = AsyncMock( - side_effect=RuntimeError("member1 failed") - ) - - result = await orchestrator.execute("部分失败任务") - - assert result["status"] == "completed" - # member2's subtask should have succeeded - plan = result["plan"] - completed = plan.completed_subtasks - failed = plan.failed_subtasks - assert len(completed) >= 1 - assert len(failed) >= 1