fischer-agentkit/src/agentkit/experts/orchestrator.py

593 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""TeamOrchestrator - 流水线模式专家团队执行引擎.
Lead 分解任务为阶段PlanPhase按依赖拓扑排序执行同层并行层间串行。
每阶段独立 ConfigDrivenAgentKTD3 上下文隔离),数据经 SharedWorkspace 传递。
生命周期FORMING→PLANNING→EXECUTING→SYNTHESIZING→COMPLETED。
U2 重构:按职责拆分为 7 个 mixin主类保留 execute/_run_pipeline/resume/
_decompose_task/_parse_phases + 共享状态 + LLM/broadcast 辅助方法。
"""
from __future__ import annotations
import asyncio
import json
import logging
import re
from typing import Any
from agentkit.llm.gateway import LLMGateway
from ._debate_runner import DebateRunnerMixin
from ._divergence_detector import DivergenceDetectorMixin
from ._intervention_handler import InterventionHandlerMixin
from ._phase_executor import PhaseExecutorMixin
from ._review_gate import ReviewGateMixin
from ._rollback_handler import RollbackHandlerMixin
from ._synthesizer import SynthesizerMixin
from .expert import Expert
from .plan import (
CollaborationContract,
PhaseStatus,
PhaseType,
PlanPhase,
PlanStatus,
TeamPlan,
)
from .team import ExpertTeam, TeamStatus
logger = logging.getLogger(__name__)
# 专家名校验正则(与 router.py / board_router.py 保持一致)
_EXPERT_NAME_RE = re.compile(r"^[a-zA-Z0-9_-]{1,64}$")
class TeamOrchestrator(
PhaseExecutorMixin,
DebateRunnerMixin,
ReviewGateMixin,
DivergenceDetectorMixin,
RollbackHandlerMixin,
SynthesizerMixin,
InterventionHandlerMixin,
):
"""Pipeline orchestration engine. Lead decomposes task into phases with
dependencies, executed in topological order (same-layer parallel, layers
sequential). U2: 方法体拆分到 7 个 mixin主类保留骨架 + 共享状态。"""
MAX_PHASES = 10 # Maximum phases Lead Expert can decompose
MAX_RETRIES = 1 # Retry once on phase failure before marking failed
MAX_REWORKS = 2 # 返工次数上限,超过则标记阶段失败
MAX_RISK_FLAGS = 10 # 风险标记数量上限,防止 UI 洪泛
MAX_DEBATE_ROUNDS = 4 # Hard cap on debate rounds per phase
MAX_DEBATES = 3 # Hard cap on auto-inserted debate phases per execution
DEFAULT_MAX_CONCURRENT_PHASES = 3 # 同层最大并发阶段数,避免 LLM 限流洪峰
STOP_COMMANDS = frozenset({"/stop", "停止", "stop", "结束"})
# G9/U4: RollbackExecutor default timeout for validation_command / rollback_command.
# Override via constructor `rollback_timeout` from `rollback.default_timeout` config.
DEFAULT_ROLLBACK_TIMEOUT = 30.0
def __init__(
self,
team: ExpertTeam,
max_concurrent_phases: int | None = None,
checkpoint: Any = None,
workspace_root: str | None = None,
rollback_timeout: float | None = None,
) -> None:
self._team = team
# Track temporary agent names created for context isolation (KTD3)
# Maps phase_id -> temp_agent_name for cleanup
self._temp_agents: dict[str, str] = {}
# Count of auto-inserted debate phases (bounded by MAX_DEBATES)
self._debate_count = 0
# U4: User context accumulated from plain-text interventions.
# Appended to Lead's synthesis prompt so user guidance influences result.
self._user_context: list[str] = []
# U2: 并发限制 — 同层并行阶段加 Semaphore避免 LLM 限流洪峰
limit = max_concurrent_phases or self.DEFAULT_MAX_CONCURRENT_PHASES
self._phase_semaphore = asyncio.Semaphore(limit)
# U7: Pipeline checkpoint for crash recovery
self._checkpoint = checkpoint
# G9/U4: workspace_root drives RollbackExecutor cwd; rollback_timeout drives its timeout.
# Both default to no-op-friendly values so existing call sites behave identically.
self._workspace_root = workspace_root
self._rollback_timeout = rollback_timeout or self.DEFAULT_ROLLBACK_TIMEOUT
async def execute(self, task: str) -> dict[str, Any]:
"""Execute a task in pipeline mode. Lead decomposes → topological sort →
execute layers (parallel within layer) → synthesize. Returns dict with
status/result/phase_results/plan."""
lead = self._team.lead_expert
if not lead or not lead.is_active:
active = self._team.active_experts
if not active:
return {
"status": "failed",
"result": None,
"phase_results": {},
"error": "No active expert available",
}
lead = active[0]
logger.warning(f"Lead expert not available, falling back to '{lead.config.name}'")
plan = TeamPlan(
task=task,
lead_expert=lead.config.name,
status=PlanStatus.EXECUTING,
)
# 1. Emit team_formed event
# Send experts as IExpertInfo-compatible dicts + plan_phases: [] to match frontend contract
await self._broadcast_event(
"team_formed",
{
"team_id": self._team.team_id,
"status": self._team.status.value,
"lead_expert": lead.config.name,
"experts": [
{
"id": e.config.name,
"name": e.config.name,
"persona": e.config.persona,
"avatar": e.config.avatar,
"color": e.config.color,
"is_lead": e.config.name == lead.config.name,
"bound_skills": list(e.config.bound_skills),
"status": "active",
}
for e in self._team.active_experts
],
"plan_phases": [],
},
)
# 2. Set PLANNING status, Lead decomposes task into phases
self._team.set_status(TeamStatus.PLANNING)
phases = await self._decompose_task(lead, task)
if not phases:
logger.warning("Task decomposition returned no phases, executing as single phase")
phases = [
PlanPhase(name="执行", assigned_expert=lead.config.name, task_description=task)
]
plan.phases = phases[: self.MAX_PHASES]
# U3: Optionally add plan review debate before execution
await self._maybe_add_plan_review_debate(lead, plan, task)
# 3. Emit plan_update with phase list
await self._broadcast_event(
"plan_update",
{
"plan_id": plan.id,
"plan_phases": [ph.to_dict() for ph in plan.phases],
},
)
# U7: Save plan for potential resume (before execution starts)
if self._checkpoint is not None:
try:
await self._checkpoint.save_plan(plan)
except Exception as e:
logger.warning(f"Checkpoint save_plan failed: {e}")
# 4. Set EXECUTING status, execute phases
self._team.set_status(TeamStatus.EXECUTING)
phase_results: dict[str, dict[str, Any]] = {}
return await self._run_pipeline(lead, plan, phase_results, task)
async def _run_pipeline(
self,
lead: Expert,
plan: TeamPlan,
phase_results: dict[str, dict[str, Any]],
task: str,
) -> dict[str, Any]:
"""Execute the pipeline loop: run pending phases, synthesize, return result.
Shared by execute() and resume(). phase_results may be pre-populated
by resume() with completed phase outputs.
"""
try:
# Execute layers sequentially, phases within layer in parallel.
# U3: while-loop re-computes topological_sort each iteration so
# dynamically inserted DEBATE phases (from divergence detection)
# are picked up correctly.
while True:
layers = plan.topological_sort()
# Find the next layer that still has PENDING phases
current_layer: list[PlanPhase] | None = None
for layer in layers:
if any(ph.status == PhaseStatus.PENDING for ph in layer):
current_layer = layer
break
if current_layer is None:
break # No more pending phases — done
ready = [ph for ph in current_layer if ph.status == PhaseStatus.PENDING]
if not ready:
continue
# U4: Process user interventions at phase boundary.
# /stop → terminate execution; /debate <topic> → insert DEBATE;
# plain text → accumulate as user context for Lead synthesis.
stop_requested = await self._process_interventions(lead, plan)
if stop_requested:
logger.info("Execution stopped by user intervention")
break
# Execute all phases in this layer in parallel (with concurrency limit)
async def _bounded_phase(ph: PlanPhase) -> dict[str, Any]:
async with self._phase_semaphore:
return await self._execute_phase(ph, plan)
results = await asyncio.gather(
*[_bounded_phase(ph) for ph in ready],
return_exceptions=True,
)
for ph, result in zip(ready, results):
if isinstance(result, (Exception, asyncio.CancelledError)):
logger.error(f"Phase {ph.id} ({ph.name}) failed: {result}")
plan.update_phase_status(ph.id, PhaseStatus.FAILED, {"error": str(result)})
phase_results[ph.id] = {"error": str(result)}
# Emit phase_failed event
await self._broadcast_event(
"phase_failed",
{
"phase_id": ph.id,
"phase_name": ph.name,
"error": str(result),
},
)
# Mark dependent phases as failed
await self._mark_dependents_failed(ph.id, plan, phase_results)
else:
phase_results[ph.id] = result
# G9/U4: opt-in rollback (KTD6) + checkpoint ordering (R21).
# When phase configures both validation_command and rollback_command:
# 1. run validation_command — if it passes, treat phase as recoverable, save checkpoint
# 2. if validation fails, run rollback_command
# 3. if rollback passes (exit 0), save checkpoint
# 4. if rollback fails, skip checkpoint (R21 — avoid persisting broken state)
# When neither command is set, behavior is unchanged (existing save).
should_save_checkpoint = True
if (
ph.validation_command
and ph.rollback_command
and isinstance(result, (Exception, asyncio.CancelledError))
):
should_save_checkpoint = await self._run_phase_rollback(plan, ph)
# U7: Save checkpoint after phase finalizes (success or failure)
if should_save_checkpoint and self._checkpoint is not None:
try:
await self._checkpoint.save(plan.id, ph, plan.status.value)
except Exception as e:
logger.warning(f"Checkpoint save failed for phase {ph.id}: {e}")
# U3: Divergence detection — check completed phases for conflicts
# and dynamically insert DEBATE phases if needed
if self._debate_count < self.MAX_DEBATES:
completed_now = [ph for ph in ready if ph.status == PhaseStatus.COMPLETED]
if completed_now:
await self._check_divergence_and_insert_debates(lead, plan, completed_now)
# 5. Check if all phases failed
completed = plan.completed_phases
if not completed:
logger.warning("All phases failed, falling back to single agent")
return await self._fallback_to_single_agent(task, plan, phase_results)
# 6. Lead Expert synthesizes results (BEST strategy)
self._team.set_status(TeamStatus.SYNTHESIZING)
plan.status = PlanStatus.COMPLETED
final_result = await self._synthesize_results(lead, task, completed)
self._team.set_status(TeamStatus.COMPLETED)
# 7. Emit team_synthesis event
await self._broadcast_event(
"team_synthesis",
{
"content": final_result.get("content", ""),
"phases_completed": len(completed),
"phases_total": len(plan.phases),
},
)
# 8. Emit team_dissolved event
await self._broadcast_event(
"team_dissolved",
{"team_id": self._team.team_id},
)
# P2 #13: Clean up checkpoints after successful completion
if self._checkpoint is not None:
try:
await self._checkpoint.clear(plan.id)
except Exception as e:
logger.warning(f"Checkpoint clear failed: {e}")
return {
"status": "completed",
"result": final_result,
"phase_results": phase_results,
"plan": plan,
}
except ValueError as e:
# Circular dependency or invalid reference from topological_sort
logger.error(f"Pipeline execution failed (invalid plan): {e}")
plan.status = PlanStatus.FAILED
await self._broadcast_event("team_dissolved", {"team_id": self._team.team_id})
return await self._fallback_to_single_agent(task, plan, phase_results)
except Exception as e:
logger.error(f"Pipeline execution failed: {e}")
plan.status = PlanStatus.FAILED
await self._broadcast_event("team_dissolved", {"team_id": self._team.team_id})
return await self._fallback_to_single_agent(task, plan, phase_results)
async def resume(self, plan_id: str) -> dict[str, Any]:
"""Resume from last checkpoint: load plan, restore completed/failed phases,
continue via _run_pipeline. Returns same dict shape as execute()."""
if self._checkpoint is None:
return {
"status": "failed",
"result": None,
"phase_results": {},
"error": "No checkpoint manager configured",
}
# 1. Load plan
plan_dict = await self._checkpoint.load_plan(plan_id)
if plan_dict is None:
return {
"status": "failed",
"result": None,
"phase_results": {},
"error": f"No checkpoint found for plan '{plan_id}'",
}
# 2. Reconstruct TeamPlan
plan = TeamPlan.from_dict(plan_dict)
task = plan.task
# 3. Load checkpoints, mark completed phases
checkpoints = await self._checkpoint.list_checkpoints(plan_id)
phase_results: dict[str, dict[str, Any]] = {}
completed_phase_ids: set[str] = set()
failed_phase_ids: set[str] = set()
for cp in checkpoints:
if cp.phase_status == "completed":
completed_phase_ids.add(cp.phase_id)
# Restore phase result from checkpoint
if cp.phase_result:
phase_results[cp.phase_id] = cp.phase_result
elif cp.phase_status == "failed":
# P2 #11: Restore FAILED status so they aren't re-executed
failed_phase_ids.add(cp.phase_id)
# Apply checkpoint state to plan phases
for ph in plan.phases:
if ph.id in completed_phase_ids:
ph.status = PhaseStatus.COMPLETED
if ph.id in phase_results and phase_results[ph.id]:
ph.result = phase_results[ph.id]
elif ph.id in failed_phase_ids:
ph.status = PhaseStatus.FAILED
# PENDING phases remain PENDING — will be executed by _run_pipeline
# P2 #8: Restore debate count so MAX_DEBATES limit holds after resume
self._debate_count = sum(1 for ph in plan.phases if ph.phase_type == PhaseType.DEBATE)
logger.info(
f"Resuming plan {plan_id}: {len(completed_phase_ids)} completed, "
f"{len(failed_phase_ids)} failed, "
f"{len(plan.phases) - len(completed_phase_ids) - len(failed_phase_ids)} pending"
)
# 4. Get lead expert
lead = self._team.lead_expert
if not lead or not lead.is_active:
active = self._team.active_experts
if not active:
return {
"status": "failed",
"result": None,
"phase_results": phase_results,
"error": "No active expert available",
}
lead = active[0]
# 5. Resume execution
self._team.set_status(TeamStatus.EXECUTING)
return await self._run_pipeline(lead, plan, phase_results, task)
async def _decompose_task(self, lead: Expert, task: str) -> list[PlanPhase]:
"""Lead Expert decomposes task into phases using LLM.
Returns a list of PlanPhase instances. If LLM decomposition fails,
returns a single phase with the original task.
"""
gateway = self._get_llm_gateway(lead)
if not gateway:
logger.warning("No LLM gateway available, treating task as single phase")
return [PlanPhase(name="执行", assigned_expert=lead.config.name, task_description=task)]
member_names = [
e.config.name for e in self._team.active_experts if e.config.name != lead.config.name
]
available_experts = member_names if member_names else [lead.config.name]
prompt = (
f"You are the Lead Expert in a pipeline team. Decompose the following task into "
f"at most {self.MAX_PHASES} phases with dependencies.\n\n"
f"Task: {task}\n\n"
f"Available experts: {', '.join(available_experts)}\n\n"
f"Return a JSON array of phase objects, each with:\n"
f'- "name": phase name (e.g., "规划", "前端", "后端", "QA", "评审")\n'
f'- "assigned_expert": name of the expert to assign '
f"(must be one of: {', '.join(available_experts)})\n"
f'- "task_description": clear phase task description\n'
f'- "depends_on": array of phase names this phase depends on (empty array if none)\n'
f'- "collaboration_contracts": 数组,定义该阶段的协作契约,每个契约包含:\n'
f' - "from_expert": 提供内容的专家名称\n'
f' - "to_expert": 接收内容的专家名称\n'
f' - "content_description": 协作内容描述\n'
f' 例如:[{{"from_expert":"backend","to_expert":"frontend",'
f'"content_description":"API 定义"}}]\n\n'
f"Example:\n"
f'[{{"name":"规划","assigned_expert":"tech_lead",'
f'"task_description":"设计架构","depends_on":[],"collaboration_contracts":[]}},'
f'{{"name":"后端","assigned_expert":"backend",'
f'"task_description":"实现API","depends_on":["规划"],'
f'"collaboration_contracts":[{{"from_expert":"backend",'
f'"to_expert":"frontend","content_description":"API 定义"}}]}},'
f'{{"name":"前端","assigned_expert":"frontend",'
f'"task_description":"实现UI","depends_on":["后端"],"collaboration_contracts":[]}}]\n\n'
f"Return ONLY the JSON array, no other text."
)
try:
response = await gateway.chat(
messages=[{"role": "user", "content": prompt}],
model=self._get_model(lead),
)
phases = self._parse_phases(response.content, available_experts, lead.config.name)
if phases:
return phases
logger.warning("LLM decomposition returned no valid phases")
except Exception as e:
logger.warning(f"LLM task decomposition failed: {e}")
return [PlanPhase(name="执行", assigned_expert=lead.config.name, task_description=task)]
@staticmethod
def _parse_phases(
content: str, available_experts: list[str], lead_name: str
) -> list[PlanPhase]:
"""Parse LLM response into PlanPhase list. Extracts JSON array, resolves
depends_on names→IDs, validates assigned_expert."""
# Try to extract JSON array from the response
json_match = re.search(r"\[.*\]", content, re.DOTALL)
if not json_match:
return []
try:
items = json.loads(json_match.group(0))
except json.JSONDecodeError:
return []
if not isinstance(items, list):
return []
# First pass: create phases with IDs, build name->id mapping
name_to_id: dict[str, str] = {}
raw_phases: list[dict[str, Any]] = []
for item in items:
if not isinstance(item, dict):
continue
name = item.get("name", "").strip()
if not name:
continue
assigned = item.get("assigned_expert", "").strip()
# Validate assigned expert; fall back to lead if invalid
if assigned not in available_experts:
assigned = lead_name
task_desc = item.get("task_description", "").strip() or name
depends_on_names = item.get("depends_on", [])
if not isinstance(depends_on_names, list):
depends_on_names = []
# 解析协作契约LLM 返回格式不正确时优雅降级为空列表)
contracts_data = item.get("collaboration_contracts", [])
if not isinstance(contracts_data, list):
contracts_data = []
contracts: list[CollaborationContract] = []
for c in contracts_data:
if not isinstance(c, dict):
contracts.append(CollaborationContract())
continue
contract = CollaborationContract.from_dict(c)
# P1: 校验契约字段 — from_expert/to_expert 必须符合专家名规范
# 不合法则清空,避免注入或引用不存在的专家
if contract.from_expert and not _EXPERT_NAME_RE.match(contract.from_expert):
logger.warning(
f"Invalid from_expert '{contract.from_expert}' in contract, clearing"
)
contract.from_expert = ""
if contract.to_expert and not _EXPERT_NAME_RE.match(contract.to_expert):
logger.warning(
f"Invalid to_expert '{contract.to_expert}' in contract, clearing"
)
contract.to_expert = ""
contracts.append(contract)
phase = PlanPhase(
name=name,
assigned_expert=assigned,
task_description=task_desc,
depends_on=[], # Will resolve to IDs in second pass
collaboration_contracts=contracts,
)
raw_phases.append({"phase": phase, "depends_on_names": depends_on_names})
name_to_id[name] = phase.id
# Second pass: resolve depends_on from names to IDs
phases: list[PlanPhase] = []
for entry in raw_phases:
phase = entry["phase"]
for dep_name in entry["depends_on_names"]:
dep_id = name_to_id.get(dep_name)
if dep_id:
phase.depends_on.append(dep_id)
else:
logger.warning(
f"Phase '{phase.name}' depends on unknown phase '{dep_name}', ignoring"
)
phases.append(phase)
return phases
def _get_model(self, expert: Expert | None = None) -> str:
"""Get LLM model name from expert.config.llm, fallback to "default"."""
target = expert or self._team.lead_expert
if target and target.config.llm:
return target.config.llm.get("model", "default")
return "default"
def _get_llm_gateway(self, expert: Expert | None = None) -> LLMGateway | None:
"""Get LLM gateway from the given expert or the lead expert's agent.
Falls back to other active experts if the primary target has no gateway.
"""
target = expert or self._team.lead_expert
if target and hasattr(target, "agent") and hasattr(target.agent, "_llm_gateway"):
gateway = target.agent._llm_gateway
if gateway is not None:
return gateway
# Fallback: try first active expert with a gateway
for exp in self._team.active_experts:
if hasattr(exp, "agent") and hasattr(exp.agent, "_llm_gateway"):
gateway = exp.agent._llm_gateway
if gateway is not None:
return gateway
return None
async def _broadcast_event(self, event_type: str, data: dict[str, Any]) -> None:
"""Broadcast an orchestration event to the team channel via handoff_transport."""
if self._team.handoff_transport:
try:
await self._team.handoff_transport.send(
self._team.team_channel, {"type": event_type, **data}
)
except Exception as e:
logger.warning(f"Failed to broadcast event '{event_type}': {e}")