feat(experts):重写 TeamOrchestrator 为流水线模式 + TeamStatus.PLANNING

This commit is contained in:
chiguyong 2026-06-18 01:39:22 +08:00
parent 1075598ebf
commit 0f8ea6e21e
4 changed files with 996 additions and 457 deletions

View File

@ -1,27 +1,26 @@
"""TeamOrchestrator - hub-and-spoke 专家团队执行引擎 """TeamOrchestrator - 流水线模式专家团队执行引擎
驱动 ExpertTeam hub-and-spoke 模式下执行任务 驱动 ExpertTeam 流水线模式下执行任务
1. Lead Expert 接收任务自主分解为子任务 1. Lead Expert 接收任务分解为阶段PlanPhase阶段间有依赖关系depends_on
2. 并行 spawn Task每个 Task 是独立 Agent 执行实例深度=1 2. 按依赖拓扑排序同层无依赖阶段并行asyncio.gather层间串行
3. 等待所有 Task 完成 3. 每个阶段创建独立 ConfigDrivenAgent 实例上下文隔离KTD3
4. Lead Expert 汇总结果BEST 策略 4. 阶段间数据通过 SharedWorkspace 传递{task_id}/phase/{phase_id}/output
5. 返回最终结果 5. Lead Expert 汇总所有阶段结果BEST 策略
6. 返回最终结果
约束 生命周期FORMING PLANNING EXECUTING SYNTHESIZING COMPLETED
- Task 深度=1Task 不能再 spawn Task
- Task 之间无通信
- Lead Expert 持有所有状态
设计依据 设计依据
- Claude Code: Task 工具深度=1 Agent 不能再生子 Agent - KTD2: Lead 分解为阶段而非子任务支持流水线串行阶段
- Codex: spawn_agent 层级式结果返回父 Agent - KTD3: 上下文隔离独立 ConfigDrivenAgent 实例
- 去中心化协作的通信复杂度 O()hub-and-spoke O(N) - KTD6: PLANNING 状态在分解阶段设置
""" """
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
import copy
import json import json
import logging import logging
import re import re
@ -31,40 +30,47 @@ from typing import Any
from agentkit.core.protocol import TaskMessage, TaskResult, TaskStatus from agentkit.core.protocol import TaskMessage, TaskResult, TaskStatus
from .expert import Expert from .expert import Expert
from .plan import PlanStatus, SubTask, SubTaskStatus, TeamPlan from .plan import PhaseStatus, PlanPhase, PlanStatus, TeamPlan
from .team import ExpertTeam, TeamStatus from .team import ExpertTeam, TeamStatus
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class TeamOrchestrator: class TeamOrchestrator:
"""Hub-and-spoke orchestration engine. """Pipeline orchestration engine.
Lead Expert acts as the hub: it decomposes the task, dispatches subtasks Lead Expert decomposes the task into phases with dependencies (depends_on).
to member experts (spokes) in parallel, and synthesizes the final result. Phases are executed in topological order: same-layer phases run in parallel
(asyncio.gather), layers run sequentially. Each phase gets an independent
ConfigDrivenAgent instance for context isolation (KTD3).
""" """
MAX_SUBTASKS = 10 # Maximum subtasks Lead Expert can decompose MAX_PHASES = 10 # Maximum phases Lead Expert can decompose
MAX_RETRIES = 1 # Retry once on subtask failure before marking failed MAX_RETRIES = 1 # Retry once on phase failure before marking failed
def __init__(self, team: ExpertTeam) -> None: def __init__(self, team: ExpertTeam) -> None:
self._team = team self._team = team
# Track temporary agent names created for context isolation (KTD3)
# Maps phase_id -> temp_agent_name for cleanup
self._temp_agents: dict[str, str] = {}
async def execute(self, task: str) -> dict[str, Any]: async def execute(self, task: str) -> dict[str, Any]:
"""Execute a task in hub-and-spoke mode. """Execute a task in pipeline mode.
Flow: Flow:
1. Emit team_formed event 1. Emit team_formed event
2. Lead Expert decomposes task into subtasks 2. Set PLANNING status, Lead Expert decomposes task into phases
3. Spawn parallel subtasks (each independent Agent execution) 3. Emit plan_update with phase list
4. Wait for all subtasks to complete 4. Set EXECUTING status, topological sort, execute layers:
5. Lead Expert synthesizes results (BEST strategy) - Same-layer phases parallel (asyncio.gather)
6. Emit team_synthesis and team_dissolved events - Layer-by-layer sequential
5. Set SYNTHESIZING status, Lead synthesizes results (BEST strategy)
6. Set COMPLETED status, emit team_synthesis event
Returns a dict with: Returns a dict with:
- "status": "completed" | "failed" | "fallback" - "status": "completed" | "failed" | "fallback"
- "result": final synthesized result - "result": final synthesized result
- "subtask_results": dict of subtask_id -> result - "phase_results": dict of phase_id -> result
- "plan": TeamPlan instance - "plan": TeamPlan instance
""" """
lead = self._team.lead_expert lead = self._team.lead_expert
@ -74,7 +80,7 @@ class TeamOrchestrator:
return { return {
"status": "failed", "status": "failed",
"result": None, "result": None,
"subtask_results": {}, "phase_results": {},
"error": "No active expert available", "error": "No active expert available",
} }
lead = active[0] lead = active[0]
@ -85,7 +91,6 @@ class TeamOrchestrator:
lead_expert=lead.config.name, lead_expert=lead.config.name,
status=PlanStatus.EXECUTING, status=PlanStatus.EXECUTING,
) )
self._team.set_status(TeamStatus.EXECUTING)
# 1. Emit team_formed event # 1. Emit team_formed event
await self._broadcast_event( await self._broadcast_event(
@ -98,53 +103,68 @@ class TeamOrchestrator:
}, },
) )
# 2. Set PLANNING status, Lead decomposes task into phases
self._team.set_status(TeamStatus.PLANNING)
phases = await self._decompose_task(lead, task)
if not phases:
logger.warning("Task decomposition returned no phases, executing as single phase")
phases = [PlanPhase(name="执行", assigned_expert=lead.config.name, task_description=task)]
plan.phases = phases[: self.MAX_PHASES]
# 3. Emit plan_update with phase list
await self._broadcast_event(
"plan_update",
{
"plan_id": plan.id,
"phases": [ph.to_dict() for ph in plan.phases],
},
)
# 4. Set EXECUTING status, execute phases
self._team.set_status(TeamStatus.EXECUTING)
phase_results: dict[str, dict[str, Any]] = {}
try: try:
# 2. Lead Expert decomposes task into subtasks # Topological sort phases into execution layers
subtasks = await self._decompose_task(lead, task) layers = plan.topological_sort()
if not subtasks:
# If decomposition fails, treat the whole task as a single subtask
logger.warning("Task decomposition returned no subtasks, executing as single task")
subtasks = [SubTask(description=task, assigned_expert=lead.config.name)]
plan.subtasks = subtasks[: self.MAX_SUBTASKS] # Execute layers sequentially, phases within layer in parallel
for layer in layers:
# Filter out already-failed phases (from dependency failures)
ready = [ph for ph in layer if ph.status == PhaseStatus.PENDING]
if not ready:
continue
# 3. Emit plan_update with subtask list # Execute all phases in this layer in parallel
await self._broadcast_event( results = await asyncio.gather(
"plan_update", *[self._execute_phase(ph, plan) for ph in ready],
{ return_exceptions=True,
"plan_id": plan.id, )
"subtasks": [st.to_dict() for st in plan.subtasks],
},
)
# 4. Spawn parallel subtasks for ph, result in zip(ready, results):
subtask_results: dict[str, dict[str, Any]] = {} if isinstance(result, Exception):
results = await asyncio.gather( logger.error(f"Phase {ph.id} ({ph.name}) failed: {result}")
*[self._execute_subtask(st) for st in plan.subtasks], plan.update_phase_status(
return_exceptions=True, ph.id, PhaseStatus.FAILED, {"error": str(result)}
) )
phase_results[ph.id] = {"error": str(result)}
# Mark dependent phases as failed
self._mark_dependents_failed(ph.id, plan, phase_results)
else:
phase_results[ph.id] = result
for subtask, result in zip(plan.subtasks, results): # 5. Check if all phases failed
if isinstance(result, Exception): completed = plan.completed_phases
logger.error(f"Subtask {subtask.id} failed: {result}")
plan.update_subtask_status(
subtask.id, SubTaskStatus.FAILED, {"error": str(result)}
)
subtask_results[subtask.id] = {"error": str(result)}
else:
subtask_results[subtask.id] = result
# 5. Check if all subtasks failed
completed = plan.completed_subtasks
if not completed: if not completed:
logger.warning("All subtasks failed, falling back to single agent") logger.warning("All phases failed, falling back to single agent")
return await self._fallback_to_single_agent(task, plan, subtask_results) return await self._fallback_to_single_agent(task, plan, phase_results)
# 6. Lead Expert synthesizes results (BEST strategy) # 6. Lead Expert synthesizes results (BEST strategy)
self._team.set_status(TeamStatus.SYNTHESIZING) self._team.set_status(TeamStatus.SYNTHESIZING)
plan.status = PlanStatus.COMPLETED plan.status = PlanStatus.COMPLETED
final_result = await self._synthesize_results(lead, task, [st for st in completed]) final_result = await self._synthesize_results(lead, task, completed)
self._team.set_status(TeamStatus.COMPLETED) self._team.set_status(TeamStatus.COMPLETED)
@ -153,33 +173,38 @@ class TeamOrchestrator:
"team_synthesis", "team_synthesis",
{ {
"content": final_result.get("content", ""), "content": final_result.get("content", ""),
"subtasks_completed": len(completed), "phases_completed": len(completed),
"subtasks_total": len(plan.subtasks), "phases_total": len(plan.phases),
}, },
) )
return { return {
"status": "completed", "status": "completed",
"result": final_result, "result": final_result,
"subtask_results": subtask_results, "phase_results": phase_results,
"plan": plan, "plan": plan,
} }
except Exception as e: except ValueError as e:
logger.error(f"Hub-and-spoke execution failed: {e}") # Circular dependency or invalid reference from topological_sort
logger.error(f"Pipeline execution failed (invalid plan): {e}")
plan.status = PlanStatus.FAILED plan.status = PlanStatus.FAILED
return await self._fallback_to_single_agent(task, plan, subtask_results) return await self._fallback_to_single_agent(task, plan, phase_results)
except Exception as e:
logger.error(f"Pipeline execution failed: {e}")
plan.status = PlanStatus.FAILED
return await self._fallback_to_single_agent(task, plan, phase_results)
async def _decompose_task(self, lead: Expert, task: str) -> list[SubTask]: async def _decompose_task(self, lead: Expert, task: str) -> list[PlanPhase]:
"""Lead Expert decomposes task into subtasks using LLM. """Lead Expert decomposes task into phases using LLM.
Returns a list of SubTask instances. If LLM decomposition fails, Returns a list of PlanPhase instances. If LLM decomposition fails,
returns a single subtask with the original task. returns a single phase with the original task.
""" """
gateway = self._get_llm_gateway(lead) gateway = self._get_llm_gateway(lead)
if not gateway: if not gateway:
logger.warning("No LLM gateway available, treating task as single subtask") logger.warning("No LLM gateway available, treating task as single phase")
return [SubTask(description=task, assigned_expert=lead.config.name)] return [PlanPhase(name="执行", assigned_expert=lead.config.name, task_description=task)]
member_names = [ member_names = [
e.config.name for e in self._team.active_experts if e.config.name != lead.config.name e.config.name for e in self._team.active_experts if e.config.name != lead.config.name
@ -187,38 +212,47 @@ class TeamOrchestrator:
available_experts = member_names if member_names else [lead.config.name] available_experts = member_names if member_names else [lead.config.name]
prompt = ( prompt = (
f"You are the Lead Expert in a team. Decompose the following task into " f"You are the Lead Expert in a pipeline team. Decompose the following task into "
f"at most {self.MAX_SUBTASKS} independent subtasks that can be executed in parallel.\n\n" f"at most {self.MAX_PHASES} phases with dependencies.\n\n"
f"Task: {task}\n\n" f"Task: {task}\n\n"
f"Available experts: {', '.join(available_experts)}\n\n" f"Available experts: {', '.join(available_experts)}\n\n"
f"Return a JSON array of objects, each with:\n" f"Return a JSON array of phase objects, each with:\n"
f'- "description": clear subtask description\n' f'- "name": phase name (e.g., "规划", "前端", "后端", "QA", "评审")\n'
f'- "assigned_expert": name of the expert to assign (must be one of: {", ".join(available_experts)})\n\n' f'- "assigned_expert": name of the expert to assign '
f'(must be one of: {", ".join(available_experts)})\n'
f'- "task_description": clear phase task description\n'
f'- "depends_on": array of phase names this phase depends on (empty array if none)\n\n'
f"Example:\n"
f'[{{"name":"规划","assigned_expert":"tech_lead",'
f'"task_description":"设计架构","depends_on":[]}},'
f'{{"name":"前端","assigned_expert":"frontend",'
f'"task_description":"实现UI","depends_on":["规划"]}}]\n\n'
f"Return ONLY the JSON array, no other text." f"Return ONLY the JSON array, no other text."
) )
try: try:
response = await gateway.chat( response = await gateway.chat(
messages=[{"role": "user", "content": prompt}], messages=[{"role": "user", "content": prompt}],
model="default", model=self._get_model(lead),
) )
subtasks = self._parse_subtasks(response.content, available_experts, lead.config.name) phases = self._parse_phases(response.content, available_experts, lead.config.name)
if subtasks: if phases:
return subtasks return phases
logger.warning("LLM decomposition returned no valid subtasks") logger.warning("LLM decomposition returned no valid phases")
except Exception as e: except Exception as e:
logger.warning(f"LLM task decomposition failed: {e}") logger.warning(f"LLM task decomposition failed: {e}")
return [SubTask(description=task, assigned_expert=lead.config.name)] return [PlanPhase(name="执行", assigned_expert=lead.config.name, task_description=task)]
@staticmethod @staticmethod
def _parse_subtasks( def _parse_phases(
content: str, available_experts: list[str], lead_name: str content: str, available_experts: list[str], lead_name: str
) -> list[SubTask]: ) -> list[PlanPhase]:
"""Parse LLM response into SubTask list. """Parse LLM response into PlanPhase list.
Extracts JSON array from the response content and creates SubTask instances. Extracts JSON array from the response content and creates PlanPhase instances.
Validates assigned_expert against available_experts list. Resolves depends_on from phase names to phase IDs. Validates assigned_expert
against available_experts list.
""" """
# Try to extract JSON array from the response # Try to extract JSON array from the response
json_match = re.search(r"\[.*\]", content, re.DOTALL) json_match = re.search(r"\[.*\]", content, re.DOTALL)
@ -233,46 +267,97 @@ class TeamOrchestrator:
if not isinstance(items, list): if not isinstance(items, list):
return [] return []
subtasks: list[SubTask] = [] # First pass: create phases with IDs, build name->id mapping
name_to_id: dict[str, str] = {}
raw_phases: list[dict[str, Any]] = []
for item in items: for item in items:
if not isinstance(item, dict): if not isinstance(item, dict):
continue continue
description = item.get("description", "").strip() name = item.get("name", "").strip()
if not description: if not name:
continue continue
assigned = item.get("assigned_expert", "").strip() assigned = item.get("assigned_expert", "").strip()
# Validate assigned expert; fall back to lead if invalid # Validate assigned expert; fall back to lead if invalid
if assigned not in available_experts: if assigned not in available_experts:
assigned = lead_name assigned = lead_name
subtasks.append(SubTask(description=description, assigned_expert=assigned)) task_desc = item.get("task_description", "").strip() or name
return subtasks depends_on_names = item.get("depends_on", [])
if not isinstance(depends_on_names, list):
depends_on_names = []
async def _execute_subtask(self, subtask: SubTask) -> dict[str, Any]: phase = PlanPhase(
"""Execute a single subtask using the assigned expert. name=name,
assigned_expert=assigned,
task_description=task_desc,
depends_on=[], # Will resolve to IDs in second pass
)
raw_phases.append({"phase": phase, "depends_on_names": depends_on_names})
name_to_id[name] = phase.id
Each subtask is an independent Agent execution (Task depth=1). # Second pass: resolve depends_on from names to IDs
Subtasks cannot spawn further subtasks. phases: list[PlanPhase] = []
for entry in raw_phases:
phase = entry["phase"]
for dep_name in entry["depends_on_names"]:
dep_id = name_to_id.get(dep_name)
if dep_id:
phase.depends_on.append(dep_id)
else:
logger.warning(
f"Phase '{phase.name}' depends on unknown phase '{dep_name}', ignoring"
)
phases.append(phase)
return phases
async def _execute_phase(self, phase: PlanPhase, plan: TeamPlan) -> dict[str, Any]:
"""Execute a single phase using the assigned expert.
Creates an independent ConfigDrivenAgent instance for context isolation (KTD3).
Reads dependency outputs from SharedWorkspace, executes the phase task,
writes the phase output to SharedWorkspace.
""" """
# Resolve the assigned expert # Resolve the assigned expert
expert = self._team.get_expert(subtask.assigned_expert) expert = self._team.get_expert(phase.assigned_expert)
if not expert or not expert.is_active: if not expert or not expert.is_active:
# Fallback to lead expert or first active expert
expert = self._team.lead_expert expert = self._team.lead_expert
if not expert or not expert.is_active: if not expert or not expert.is_active:
active = self._team.active_experts active = self._team.active_experts
if not active: if not active:
raise RuntimeError( raise RuntimeError(
f"Expert '{subtask.assigned_expert}' not available and no active fallback" f"Expert '{phase.assigned_expert}' not available and no active fallback"
) )
expert = active[0] expert = active[0]
logger.warning( logger.warning(
f"Expert '{subtask.assigned_expert}' not available, " f"Expert '{phase.assigned_expert}' not available, "
f"falling back to '{expert.config.name}'" f"falling back to '{expert.config.name}'"
) )
phase.assigned_expert = expert.config.name
# Update subtask status # Update phase status
subtask.status = SubTaskStatus.RUNNING phase.status = PhaseStatus.RUNNING
subtask.assigned_expert = expert.config.name
# Emit phase_started event
await self._broadcast_event(
"phase_started",
{
"phase_id": phase.id,
"phase_name": phase.name,
"assigned_expert": phase.assigned_expert,
"depends_on": list(phase.depends_on),
},
)
# Read dependency outputs from SharedWorkspace
dependency_outputs: dict[str, Any] = {}
for dep_id in phase.depends_on:
dep_phase = plan.get_phase(dep_id)
if dep_phase and dep_phase.status == PhaseStatus.COMPLETED:
key = f"{plan.id}/phase/{dep_id}/output"
data = await self._team.workspace.read(key)
if data:
dependency_outputs[dep_phase.name] = data.get("value", "")
# Emit expert_step event # Emit expert_step event
await self._broadcast_event( await self._broadcast_event(
@ -281,79 +366,180 @@ class TeamOrchestrator:
"expert_id": expert.config.name, "expert_id": expert.config.name,
"expert_name": expert.config.name, "expert_name": expert.config.name,
"expert_color": expert.config.color, "expert_color": expert.config.color,
"content": subtask.description, "content": phase.task_description,
"step": subtask.id, "step": phase.id,
"phase_id": phase.id,
"phase_name": phase.name,
}, },
) )
# Build TaskMessage for execution # Build TaskMessage for execution with context isolation
# Context includes: task description + persona + dependency outputs
input_data: dict[str, Any] = {
"task": phase.task_description,
"team_id": self._team.team_id,
"phase_id": phase.id,
"phase_name": phase.name,
"is_phase": True,
"dependency_outputs": dependency_outputs,
}
if dependency_outputs:
input_data["context"] = (
"前置阶段输出:\n"
+ "\n---\n".join(
f"[{name}]:\n{output[:500] if isinstance(output, str) else str(output)[:500]}"
for name, output in dependency_outputs.items()
)
)
task_msg = TaskMessage( task_msg = TaskMessage(
task_id=subtask.id, task_id=phase.id,
agent_name=expert.config.name, agent_name=expert.config.name,
task_type="team_subtask", task_type="team_phase",
priority=0, priority=0,
input_data={ input_data=input_data,
"task": subtask.description,
"team_id": self._team.team_id,
"is_subtask": True, # Marker: depth=1, cannot spawn further subtasks
},
callback_url=None, callback_url=None,
created_at=datetime.now(timezone.utc), created_at=datetime.now(timezone.utc),
) )
# Execute with retry # Execute with context isolation: try creating independent agent via pool
agent = await self._get_isolated_agent(expert, phase)
last_error: str | None = None last_error: str | None = None
for attempt in range(self.MAX_RETRIES + 1): result: dict[str, Any] | None = None
try:
task_result: TaskResult = await expert.agent.execute(task_msg)
if task_result.status != TaskStatus.COMPLETED.value: try:
last_error = task_result.error_message or "unknown error" for attempt in range(self.MAX_RETRIES + 1):
try:
task_result: TaskResult = await agent.execute(task_msg)
if task_result.status != TaskStatus.COMPLETED.value:
last_error = task_result.error_message or "unknown error"
if attempt < self.MAX_RETRIES:
logger.info(f"Retrying phase {phase.id} (attempt {attempt + 1})")
continue
raise RuntimeError(f"Agent execution failed: {last_error}")
result = task_result.output_data or {"content": ""}
# Update phase status
phase.status = PhaseStatus.COMPLETED
phase.result = result
# Write phase output to SharedWorkspace
output_key = f"{plan.id}/phase/{phase.id}/output"
await self._team.workspace.write(
output_key,
result.get("content", str(result)),
expert.config.name,
)
# Emit expert_result event
await self._broadcast_event(
"expert_result",
{
"expert_id": expert.config.name,
"expert_name": expert.config.name,
"expert_color": expert.config.color,
"content": result.get("content", str(result)),
"phase_id": phase.id,
},
)
# Emit phase_completed event
result_summary = result.get("content", str(result))
if isinstance(result_summary, str) and len(result_summary) > 200:
result_summary = result_summary[:200] + "..."
await self._broadcast_event(
"phase_completed",
{
"phase_id": phase.id,
"phase_name": phase.name,
"result_summary": result_summary,
},
)
return result
except Exception as e:
last_error = str(e)
if attempt < self.MAX_RETRIES: if attempt < self.MAX_RETRIES:
logger.info(f"Retrying subtask {subtask.id} (attempt {attempt + 1})") logger.info(f"Retrying phase {phase.id} (attempt {attempt + 1})")
continue continue
raise RuntimeError(f"Agent execution failed: {last_error}") raise
result = task_result.output_data or {"content": ""} finally:
# Clean up isolated agent if we created one
await self._cleanup_isolated_agent(phase)
subtask.status = SubTaskStatus.COMPLETED # Should not reach here
subtask.result = result phase.status = PhaseStatus.FAILED
raise RuntimeError(f"Phase {phase.id} ({phase.name}) failed: {last_error}")
# Emit expert_result event async def _get_isolated_agent(self, expert: Expert, phase: PlanPhase) -> Any:
await self._broadcast_event( """Get an isolated ConfigDrivenAgent instance for the phase.
"expert_result",
{
"expert_id": expert.config.name,
"expert_name": expert.config.name,
"expert_color": expert.config.color,
"content": result.get("content", str(result)),
"subtask_id": subtask.id,
},
)
return result If AgentPool is available, creates a temporary agent with a unique name
for context isolation (KTD3). Otherwise, falls back to the expert's
existing agent.
"""
pool = self._team.pool
if pool is None:
# No pool available (e.g., in tests), use expert's existing agent
return expert.agent
# Create a temporary config with unique name for this phase
temp_config = copy.deepcopy(expert.config)
temp_config.name = f"{expert.config.name}__phase_{phase.id[:8]}"
try:
agent = await pool.create_agent(temp_config)
# Track for cleanup
self._temp_agents[phase.id] = temp_config.name
return agent
except Exception as e:
logger.warning(
f"Failed to create isolated agent for phase {phase.id}, "
f"using expert's existing agent: {e}"
)
return expert.agent
async def _cleanup_isolated_agent(self, phase: PlanPhase) -> None:
"""Clean up the temporary isolated agent if one was created."""
pool = self._team.pool
if pool is None:
return
temp_name = self._temp_agents.pop(phase.id, None)
if temp_name:
try:
await pool.remove_agent(temp_name)
except Exception as e: except Exception as e:
last_error = str(e) logger.warning(f"Failed to clean up isolated agent '{temp_name}': {e}")
if attempt < self.MAX_RETRIES:
logger.info(f"Retrying subtask {subtask.id} (attempt {attempt + 1})")
continue
raise
# Should not reach here, but just in case def _mark_dependents_failed(
subtask.status = SubTaskStatus.FAILED self, failed_phase_id: str, plan: TeamPlan, phase_results: dict[str, dict[str, Any]]
raise RuntimeError(f"Subtask {subtask.id} failed: {last_error}") ) -> None:
"""Mark all phases that depend on the failed phase as FAILED."""
for ph in plan.phases:
if ph.status != PhaseStatus.PENDING:
continue
if failed_phase_id in ph.depends_on:
ph.status = PhaseStatus.FAILED
ph.result = {"error": f"Dependency phase '{failed_phase_id}' failed"}
phase_results[ph.id] = {"error": f"Dependency '{failed_phase_id}' failed"}
# Recursively mark their dependents
self._mark_dependents_failed(ph.id, plan, phase_results)
async def _synthesize_results( async def _synthesize_results(
self, lead: Expert, task: str, completed_subtasks: list[SubTask] self, lead: Expert, task: str, completed_phases: list[PlanPhase]
) -> dict[str, Any]: ) -> dict[str, Any]:
"""Lead Expert synthesizes results using BEST strategy. """Lead Expert synthesizes results using BEST strategy.
The Lead Expert evaluates all completed subtask results and produces The Lead Expert evaluates all completed phase results and produces
a final synthesized result. Uses LLM when available, otherwise a final synthesized result. Uses LLM when available, otherwise
concatenates results. concatenates results.
""" """
results = [st.result or {} for st in completed_subtasks] results = [ph.result or {} for ph in completed_phases]
if not results: if not results:
return {"content": ""} return {"content": ""}
@ -363,7 +549,7 @@ class TeamOrchestrator:
return { return {
"content": content, "content": content,
"strategy": "best", "strategy": "best",
"subtasks_completed": 1, "phases_completed": 1,
} }
gateway = self._get_llm_gateway(lead) gateway = self._get_llm_gateway(lead)
@ -375,22 +561,22 @@ class TeamOrchestrator:
return { return {
"content": combined, "content": combined,
"strategy": "best", "strategy": "best",
"subtasks_completed": len(results), "phases_completed": len(results),
} }
# Build result summaries for LLM evaluation # Build result summaries for LLM evaluation
summaries = [] summaries = []
for i, st in enumerate(completed_subtasks): for i, ph in enumerate(completed_phases):
r = st.result or {} r = ph.result or {}
content = r.get("content", str(r)) if isinstance(r, dict) else str(r) content = r.get("content", str(r)) if isinstance(r, dict) else str(r)
summaries.append( summaries.append(
f"Subtask {i + 1} (by {st.assigned_expert}, task: {st.description[:100]}):\n" f"Phase {i + 1}: {ph.name} (by {ph.assigned_expert}, task: {ph.task_description[:100]}):\n"
f"{content[:500]}" f"{content[:500]}"
) )
prompt = ( prompt = (
f"Original task: {task}\n\n" f"Original task: {task}\n\n"
f"Below are {len(results)} subtask results from your team members. " f"Below are {len(results)} phase results from your team members. "
f"Synthesize them into a single comprehensive final result that " f"Synthesize them into a single comprehensive final result that "
f"best addresses the original task.\n\n" f"best addresses the original task.\n\n"
+ "\n---\n".join(summaries) + "\n---\n".join(summaries)
@ -400,12 +586,12 @@ class TeamOrchestrator:
try: try:
response = await gateway.chat( response = await gateway.chat(
messages=[{"role": "user", "content": prompt}], messages=[{"role": "user", "content": prompt}],
model="default", model=self._get_model(lead),
) )
return { return {
"content": response.content.strip(), "content": response.content.strip(),
"strategy": "best", "strategy": "best",
"subtasks_completed": len(results), "phases_completed": len(results),
} }
except Exception as e: except Exception as e:
logger.warning(f"LLM synthesis failed, falling back to concatenation: {e}") logger.warning(f"LLM synthesis failed, falling back to concatenation: {e}")
@ -415,16 +601,16 @@ class TeamOrchestrator:
return { return {
"content": combined, "content": combined,
"strategy": "best", "strategy": "best",
"subtasks_completed": len(results), "phases_completed": len(results),
} }
async def _fallback_to_single_agent( async def _fallback_to_single_agent(
self, self,
task: str, task: str,
plan: TeamPlan, plan: TeamPlan,
subtask_results: dict[str, dict[str, Any]], phase_results: dict[str, dict[str, Any]],
) -> dict[str, Any]: ) -> dict[str, Any]:
"""Fallback to single agent mode when hub-and-spoke execution fails. """Fallback to single agent mode when pipeline execution fails.
Uses the lead expert (or first active expert) to complete the original task. Uses the lead expert (or first active expert) to complete the original task.
""" """
@ -446,7 +632,7 @@ class TeamOrchestrator:
priority=0, priority=0,
input_data={ input_data={
"task": task, "task": task,
"subtask_results": subtask_results, "phase_results": phase_results,
"team_id": self._team.team_id, "team_id": self._team.team_id,
}, },
callback_url=None, callback_url=None,
@ -465,10 +651,23 @@ class TeamOrchestrator:
return { return {
"status": "fallback", "status": "fallback",
"result": fallback_result, "result": fallback_result,
"subtask_results": subtask_results, "phase_results": phase_results,
"plan": plan, "plan": plan,
} }
def _get_model(self, expert: Expert | None = None) -> str:
"""Get LLM model name from expert config.
Reads expert.config.llm (dict[str, Any] | None) and returns the model
name. Falls back to "default" if not configured.
V4 verified: ExpertConfig.llm is dict[str, Any] | None.
"""
target = expert or self._team.lead_expert
if target and target.config.llm:
return target.config.llm.get("model", "default")
return "default"
def _get_llm_gateway(self, expert: Expert | None = None) -> Any: def _get_llm_gateway(self, expert: Expert | None = None) -> Any:
"""Get LLM gateway from the given expert or the lead expert's agent. """Get LLM gateway from the given expert or the lead expert's agent.
@ -492,7 +691,8 @@ class TeamOrchestrator:
Events are emitted via handoff_transport for WebSocket relay. Events are emitted via handoff_transport for WebSocket relay.
Supported event types: team_formed, expert_step, expert_result, Supported event types: team_formed, expert_step, expert_result,
plan_update, team_synthesis, team_dissolved. plan_update, phase_started, phase_completed, phase_failed,
team_synthesis, team_dissolved.
""" """
if self._team.handoff_transport: if self._team.handoff_transport:
try: try:

View File

@ -29,9 +29,17 @@ logger = logging.getLogger(__name__)
class TeamStatus(str, enum.Enum): class TeamStatus(str, enum.Enum):
"""ExpertTeam lifecycle states.""" """ExpertTeam lifecycle states.
流水线模式生命周期
FORMING PLANNING EXECUTING SYNTHESIZING COMPLETED DISSOLVED
PLANNING 状态在 Lead Expert 分解任务为阶段时设置KTD6
与前端 IExpertTeamState.status 'planning' 值对齐
"""
FORMING = "forming" FORMING = "forming"
PLANNING = "planning"
EXECUTING = "executing" EXECUTING = "executing"
SYNTHESIZING = "synthesizing" SYNTHESIZING = "synthesizing"
COMPLETED = "completed" COMPLETED = "completed"
@ -108,6 +116,15 @@ class ExpertTeam:
"""Public read access to the team's communication channel.""" """Public read access to the team's communication channel."""
return self._team_channel return self._team_channel
@property
def pool(self) -> AgentPool | None:
"""Public read access to the team's AgentPool.
Used by TeamOrchestrator to create independent ConfigDrivenAgent
instances for context isolation in pipeline mode (KTD3).
"""
return self._pool
def get_expert(self, name: str) -> Expert | None: def get_expert(self, name: str) -> Expert | None:
"""Get an expert by name. Returns None if not found.""" """Get an expert by name. Returns None if not found."""
return self._experts.get(name) return self._experts.get(name)
@ -149,7 +166,8 @@ class ExpertTeam:
for config in member_configs: for config in member_configs:
await self._add_expert_internal(config, team_context) await self._add_expert_internal(config, team_context)
self._status = TeamStatus.EXECUTING # KTD6: 设置 PLANNING 状态Lead Expert 即将分解任务为阶段)
self._status = TeamStatus.PLANNING
async def add_expert(self, config_or_template: ExpertConfig | str) -> Expert: async def add_expert(self, config_or_template: ExpertConfig | str) -> Expert:
"""Add an Expert to the team dynamically. """Add an Expert to the team dynamically.

View File

@ -146,7 +146,7 @@ class TestExpertTeamCreateTeam:
assert team._lead_expert_name == "lead" assert team._lead_expert_name == "lead"
assert team.lead_expert is mock_expert assert team.lead_expert is mock_expert
assert team.status == TeamStatus.EXECUTING assert team.status == TeamStatus.PLANNING
assert mock_expert.team_id == team.team_id assert mock_expert.team_id == team.team_id
@pytest.mark.asyncio @pytest.mark.asyncio
@ -168,7 +168,7 @@ class TestExpertTeamCreateTeam:
assert len(team.experts) == 2 assert len(team.experts) == 2
assert team._lead_expert_name == "lead" assert team._lead_expert_name == "lead"
assert team.status == TeamStatus.EXECUTING assert team.status == TeamStatus.PLANNING
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_create_team_without_pool_raises(self): async def test_create_team_without_pool_raises(self):

File diff suppressed because it is too large Load Diff