diff --git a/docs/brainstorms/2026-06-24-agent-debate-collaboration-requirements.md b/docs/brainstorms/2026-06-24-agent-debate-collaboration-requirements.md new file mode 100644 index 0000000..ad198a2 --- /dev/null +++ b/docs/brainstorms/2026-06-24-agent-debate-collaboration-requirements.md @@ -0,0 +1,137 @@ +# Agent 间结构化辩论协作 + +**日期**: 2026-06-24 +**状态**: 待规划 +**范围**: Deep — feature + +## 背景与问题 + +当前 `@team` 多 Agent 协作是 hub-and-spoke 模式:Lead 分解任务 → 专家隔离执行 → Lead 汇总。专家之间不对话、不质疑、不补充。`HandoffTransport` 只做事件广播,无 Agent 间通信通道。 + +用户反馈"体现不出多 Agent 协同"——核心痛点是 **看不到 Agent 间互动**。当前流程本质是"并行单 Agent",不是"协作"。 + +同时存在两个已知缺口: +- `ExecutionMode.TEAM_COLLAB` 是死代码(定义于 `src/agentkit/chat/skill_routing.py:35`,全代码库无产生点) +- CLI 完全没有多 Agent 入口(`src/agentkit/cli/chat.py` 不处理 `@team`/`@board` 前缀,会被当普通文本送给 LLM) + +## 目标 + +让"Lead 主导的结构化辩论"成为 `@team` 模式的通用能力:Lead 能在关键决策点发起辩论,指定专家就分歧点交锋,裁决后继续执行。用户也能手动触发辩论。 + +**不是**:Agent 间自由点对点通信。保持 Lead 主导的可控性。 + +## 成功标准 + +1. 用户在 `@team` 任务执行中,能看到专家间就某个分歧点来回辩论(不是各自独立发言) +2. Lead 能自动检测专家产出间的冲突/分歧,并触发辩论 +3. 用户能在执行期间手动请求就某个点发起辩论 +4. 辩论有明确收敛:Lead 裁决,产出喂给下一阶段 +5. CLI 用户能使用 `@team`/`@board`,且能触发辩论 +6. 简单任务可以跳过辩论,不强制增加延迟 + +## 方案方向:A + C 混合 + +以方向 A(Debate Phase)为主体,吸收方向 C(方案先辩论再执行)作为可选模式。 + +### 两个辩论插入点 + +1. **方案评审辩论**(来自 C):Lead 提出任务分解方案后,先让相关专家质疑/补充方案本身,收敛后才开始执行。可选,由 Lead 判断是否需要。 +2. **决策点辩论**(来自 A):执行过程中,Lead 在关键阶段完成后检测分歧,触发辩论阶段。指定专家就该阶段产出交锋,Lead 裁决。 + +两者都是 `DEBATE` 类型的 `PlanPhase`,只是插入位置不同。 + +### 辩论阶段执行流程 + +``` +Lead 开场:陈述分歧点 + 上下文 + → 专家 A 发言(论证立场) + → 专家 B 发言(反驳或补充) + → (可选)专家 A 回应 + → (可选)专家 B 回应 + → Lead 裁决:采纳/折中/搁置,产出辩论结论 + → 结论写入 SharedWorkspace,喂给下一阶段 +``` + +### 触发机制 + +- **自动**:Lead 在方案评审点和阶段完成后运行分歧检测(LLM 判断),检测到冲突时插入辩论阶段 +- **手动**:用户通过 WS 消息(Web)或命令(CLI)请求辩论,指定主题和参与专家,Lead 插入辩论阶段 + +## 范围边界 + +### 包含 + +- `TeamOrchestrator` 新增 `DEBATE` 阶段类型及执行器 +- Lead 的分歧检测能力(prompt + 判断逻辑) +- `@team` 执行期间用户干预通道(前置工程,顺带修复无 `/stop` 缺口) +- 新增 WebSocket 事件:`debate_started`、`expert_argument`、`debate_resolved` +- 前端辩论过程可视化(专家交锋气泡、裁决结果) +- CLI `@team`/`@board` 前缀处理 + 辩论触发命令 +- "跳过辩论"逃生舱(简单任务/用户显式跳过) + +### 不包含 + +- Agent 间点对点自由通信(保持 Lead 主导) +- `@board` 模式改造(它已经是讨论模式,不混入) +- 团队状态持久化(独立问题,另行规划) +- 辩论成本优化(如缓存、早停等,先验证价值再优化) +- `ExecutionMode.TEAM_COLLAB` 死代码清理(顺手可做,但不作为本需求的核心交付) + +### 延后 + +- 方向 C 全量(辩论优先作为默认模式):先验证 A+C 混合的价值,再考虑是否默认化 +- 自定义团队模板保存:用户在 UI 选的专家组合无法存为模板复用,独立需求 +- `orchestrator/` 子系统与团队流程打通:`PipelineEngine`/`SagaOrchestrator` 等通用编排能力与团队流程的整合,独立规划 + +## 依赖与假设 + +### 依赖 + +- **用户干预通道是前置工程**:当前 `@team` 执行期间用户消息被当新任务处理(`src/agentkit/server/routes/chat.py:_handle_chat_message`)。手动触发辩论要求先建立"执行期间用户干预"通道。这顺带修复团队模式无 `/stop` 的缺口。 + +### 假设 + +- **Lead 的分歧检测能力可靠**:自动触发依赖 Lead(LLM)判断"是否值得辩论"。误报浪费 token,漏报错过辩论。需要好的 prompt 和判断标准。若不可靠,降级为纯手动触发。 +- **辩论的延迟和成本可接受**:方案评审辩论可能让任务启动延迟 30s-1min。目标用户能接受这个代价换取更高质量的协作。 +- **CLI 用户需要多 Agent 能力**:假设 CLI 用户与 Web 用户一样需要多 Agent 协作,而非只用于快速交互。 + +## 关键决策记录 + +| 决策 | 选择 | 理由 | +|------|------|------| +| 互动形态 | Lead 主导的结构化辩论 | 复用 hub-and-spoke 架构,可控且能收敛,不做点对点通信 | +| 触发机制 | 自动 + 手动结合 | 兼顾智能和可控,用户随时能介入 | +| 方案方向 | A + C 混合 | A 最小改动,C 的方案评审辩论提升协作质感,两者都是 DEBATE 阶段类型 | +| CLI 纳入 | 是 | 多 Agent 协作应全端可用,不只在 Web | +| 辩论位置 | 阶段边界 | 不在专家执行中途打断,状态清晰,避免级联重跑 | + +## 风险 + +1. **分歧检测质量**:Lead 判断失误(误报/漏报)影响体验。缓解:提供"是否值得辩论"的明确标准,允许用户关闭自动触发。 +2. **辩论不收敛**:专家反复争论无法收敛。缓解:限制辩论轮次(默认 2 轮,最多 4 轮),Lead 有强制裁决权。 +3. **成本上升**:辩论增加 token 消耗。缓解:逃生舱机制,简单任务跳过;后续可加成本预算阈值。 +4. **CLI 交互复杂度**:终端展示多 Agent 辩论不如 Web 直观。缓解:用 Rich 库的 Panel/Live 渲染,专家发言用不同颜色区分。 + +## 待规划时深入的问题 + +以下问题留给 `ce-plan` 阶段,不在本需求文档展开: + +- `DEBATE` 阶段类型的具体数据结构(`PlanPhase` 扩展字段) +- 分歧检测 prompt 的具体设计 +- 用户干预通道的 WS 协议设计(新消息类型?复用现有?) +- 前端辩论可视化的组件设计 +- CLI `@team`/`@board` 路由的代码路径(复用 Web 侧的 `ExpertTeamRouter`/`BoardRouter`?) +- 辩论结论如何写入 `SharedWorkspace`(键名约定、与阶段产出的关系) + +## 参考文件 + +- 团队流水线执行器: `src/agentkit/experts/orchestrator.py` +- 团队容器: `src/agentkit/experts/team.py` +- 阶段/计划模型: `src/agentkit/experts/plan.py` +- 私董会讨论引擎(可借鉴多轮发言模式): `src/agentkit/experts/board_orchestrator.py` +- WebSocket 拦截入口: `src/agentkit/server/routes/chat.py`(`_execute_team_collab` 第 321 行) +- 死枚举 ExecutionMode: `src/agentkit/chat/skill_routing.py:35` +- CLI chat(无多 Agent): `src/agentkit/cli/chat.py` +- 前端聊天输入: `src/agentkit/server/frontend/src/components/chat/ChatInput.vue` +- 前端事件处理: `src/agentkit/server/frontend/src/stores/chat.ts`(第 870-1200 行) +- 团队模板: `configs/experts/dev_team.yaml` diff --git a/docs/plans/2026-06-24-001-feat-agent-debate-collaboration-plan.md b/docs/plans/2026-06-24-001-feat-agent-debate-collaboration-plan.md new file mode 100644 index 0000000..479df9b --- /dev/null +++ b/docs/plans/2026-06-24-001-feat-agent-debate-collaboration-plan.md @@ -0,0 +1,500 @@ +# feat: Agent 间结构化辩论协作 + +**日期**: 2026-06-24 +**状态**: active +**范围**: Deep — feature +**Origin**: `docs/brainstorms/2026-06-24-agent-debate-collaboration-requirements.md` + +--- + +## Summary + +在 `@team` 多 Agent 协作模式中引入"Lead 主导的结构化辩论"能力。当前专家隔离执行、无互动,本计划让 Lead 能在关键决策点发起辩论(指定专家交锋→裁决),支持自动检测分歧触发 + 用户手动触发。同时修复 CLI 完全缺失多 Agent 入口的问题,并顺带补齐 `@team` 执行期间的用户干预通道(当前无 `/stop`)。 + +--- + +## Problem Frame + +当前 `TeamOrchestrator`(`src/agentkit/experts/orchestrator.py`)是 hub-and-spoke 模式:Lead 分解任务 → 专家隔离执行 → Lead 汇总。`HandoffTransport` 只做事件广播,专家间无通信通道。用户反馈"体现不出多 Agent 协同"——本质是"并行单 Agent"而非协作。 + +同时存在三个已知缺口: +1. `ExecutionMode.TEAM_COLLAB` 是死代码(`src/agentkit/chat/skill_routing.py:35`,全代码库无产生点) +2. CLI 完全没有多 Agent 入口(`src/agentkit/cli/chat.py` 不处理 `@team`/`@board` 前缀) +3. `@team` 执行期间无用户干预通道(`ExpertTeam.broadcast_user_message()` 方法存在但 `TeamOrchestrator.execute()` 从不检查) + +--- + +## Requirements + +源自 `docs/brainstorms/2026-06-24-agent-debate-collaboration-requirements.md`: + +- **R1**: 用户在 `@team` 任务执行中,能看到专家间就某个分歧点来回辩论(不是各自独立发言) +- **R2**: Lead 能自动检测专家产出间的冲突/分歧,并触发辩论 +- **R3**: 用户能在执行期间手动请求就某个点发起辩论 +- **R4**: 辩论有明确收敛:Lead 裁决,产出喂给下一阶段 +- **R5**: CLI 用户能使用 `@team`/`@board`,且能触发辩论 +- **R6**: 简单任务可以跳过辩论,不强制增加延迟 + +--- + +## Key Technical Decisions + +### KTD1: 辩论作为 `DEBATE` 阶段类型,而非独立编排器 + +在 `PlanPhase` 上新增 `phase_type` 字段(`EXECUTION` | `DEBATE`),而非创建独立的 `DebateOrchestrator`。辩论阶段复用现有流水线的拓扑排序、依赖管理、SharedWorkspace 机制。 + +**理由**:最小架构改动。辩论阶段与其他阶段一样有 `depends_on`,只是执行逻辑不同。避免引入第二套编排引擎导致状态管理分裂。 + +**代价**:`TeamOrchestrator._execute_phase()` 需要按 `phase_type` 分派,增加一个分支。可接受。 + +### KTD2: 辩论执行逻辑借鉴 `BoardOrchestrator`,但不复用其类 + +`BoardOrchestrator`(`src/agentkit/experts/board_orchestrator.py`)已实现"成员并行发言→主持人小结"的多轮循环。辩论阶段借鉴这个模式(Lead 开场→专家轮流发言→Lead 裁决),但作为 `TeamOrchestrator._execute_debate_phase()` 方法内联,不实例化 `BoardOrchestrator`。 + +**理由**:`BoardOrchestrator` 绑定 `BoardTeam`(独立容器、独立历史、独立状态机),强行复用会引入两套状态同步。内联一个方法比桥接两个编排器简单。 + +### KTD3: 用户干预通道复用 `ExpertTeam.broadcast_user_message()` + 新增 WS 消息类型 + +`ExpertTeam` 已有 `broadcast_user_message()` 方法(`src/agentkit/experts/team.py:253`),但 `TeamOrchestrator.execute()` 从不检查。方案: +- WS 新增 `team_intervention` 消息类型,`chat.py` 收到后调用 `team.broadcast_user_message()` +- `TeamOrchestrator` 在阶段边界检查干预队列(与 `BoardOrchestrator` 检查 `consume_user_interventions()` 一致) +- 干预消息可以是 `/stop`(停止团队)、`/debate `(触发辩论)、或普通文本(追加上下文) + +**理由**:复用已有方法,不引入新队列。与 `BoardOrchestrator` 的干预检查模式一致,降低认知成本。 + +### KTD4: 分歧检测作为 Lead 的 LLM 判断,带"是否值得辩论"的明确标准 + +自动触发不依赖复杂的一致性算法,而是 Lead 在阶段完成后用 LLM 判断"该阶段产出是否与其他阶段/约束冲突,是否值得辩论"。Prompt 给出明确判断标准(见 U3)。 + +**理由**:YAGNI——不引入冲突检测框架。LLM 判断够用,误报由"跳过辩论"逃生舱兜底。若不可靠,降级为纯手动触发(需求文档已记录此假设)。 + +### KTD5: CLI 复用 `ExpertTeamRouter`/`BoardRouter` + Rich 渲染 + +CLI 在 `chat.py` 的 chat loop 中,于 skill routing 之前拦截 `@team`/`@board` 前缀,复用 Web 侧的 `ExpertTeamRouter.resolve()` 和 `BoardRouter.resolve()`。辩论过程用 Rich 的 `Panel` + 不同颜色渲染专家发言。 + +**理由**:路由逻辑已存在,CLI 只需接入。不重复实现前缀解析。 + +--- + +## High-Level Technical Design + +### 辩论阶段在流水线中的位置 + +``` +Lead 分解任务 → phases[] + ├── [可选] 方案评审辩论 (DEBATE phase, depends_on: 无, 在执行前) + │ Lead 开场 → 专家质疑方案 → Lead 修订 → 产出"确认的方案" + │ + ├── 执行阶段 A (EXECUTION phase) + ├── 执行阶段 B (EXECUTION phase, depends_on: A) + │ + ├── [自动] 决策点辩论 (DEBATE phase, depends_on: B, Lead 检测分歧后动态插入) + │ Lead 陈述分歧 → 专家 A/B 交锋 → Lead 裁决 → 产出"辩论结论" + │ + └── 执行阶段 C (EXECUTION phase, depends_on: 辩论结论) +``` + +### 辩论阶段执行流程(内联于 TeamOrchestrator) + +``` +_execute_debate_phase(phase, plan): + 1. 解析 phase.debate_config: {topic, participants, max_rounds} + 2. Lead 开场:陈述分歧点 + 上下文 → broadcast debate_started + 3. for round in 1..max_rounds: + a. 检查用户干预(/stop 则提前结束) + b. 参与专家并行发言(基于历史 + 角色)→ broadcast expert_argument + c. Lead 小结本轮 → broadcast debate_round_summary + 4. Lead 裁决:采纳/折中/搁置 → broadcast debate_resolved + 5. 结论写入 SharedWorkspace ({plan_id}/phase/{phase_id}/output) + 6. phase.status = COMPLETED +``` + +### 用户干预通道数据流 + +``` +Web 用户 → WS message {type: "team_intervention", content: "/debate 前端框架选型"} + → chat.py _handle_chat_message 检测团队执行中 + → team.broadcast_user_message(content) + → TeamOrchestrator 在阶段边界检查 team.consume_user_interventions() + → 识别 /debate 命令 → 动态插入 DEBATE phase + +CLI 用户 → 输入 /debate 前端框架选型 + → cli/chat.py 检测团队执行中 + → team.broadcast_user_message(content) + → 同上 +``` + +--- + +## Implementation Units + +### U1. 数据模型:PhaseType 枚举 + PlanPhase 扩展 + +**Goal**: 为 `PlanPhase` 增加 `phase_type` 字段和辩论配置,使流水线能区分执行阶段和辩论阶段。 + +**Requirements**: 支撑 R1, R4 + +**Dependencies**: 无 + +**Files**: +- `src/agentkit/experts/plan.py` (修改) +- `tests/unit/experts/test_plan.py` (新建或修改) + +**Approach**: +- 新增 `PhaseType(str, enum.Enum)`: `EXECUTION = "execution"`, `DEBATE = "debate"` +- `PlanPhase` 新增字段: + - `phase_type: PhaseType = PhaseType.EXECUTION`(默认执行,向后兼容) + - `debate_config: dict[str, Any] | None = None`(辩论阶段专用:`topic`, `participants: list[str]`, `max_rounds: int = 2`) +- `to_dict()` / `from_dict()` 序列化新字段 +- `topological_sort()` 无需改动(辩论阶段也有 `depends_on`,与其他阶段一视同仁) + +**Patterns to follow**: 现有 `PlanPhase` 的 dataclass + enum 模式(`src/agentkit/experts/plan.py`) + +**Test scenarios**: +- Happy path: 创建 `DEBATE` 类型 phase,序列化/反序列化后字段保留 +- 向后兼容: 不带 `phase_type` 的旧 dict 反序列化后默认为 `EXECUTION` +- 边界: `debate_config` 为 None 时不影响 EXECUTION 阶段 +- 拓扑排序: 混合 EXECUTION + DEBATE 阶段的依赖图能正确分层 + +**Verification**: `pytest tests/unit/experts/test_plan.py -x -q` 通过 + +--- + +### U2. 辩论阶段执行器(TeamOrchestrator) + +**Goal**: 在 `TeamOrchestrator` 中实现辩论阶段的执行逻辑,借鉴 `BoardOrchestrator` 的多轮发言模式。 + +**Requirements**: R1, R4, R6 + +**Dependencies**: U1 + +**Files**: +- `src/agentkit/experts/orchestrator.py` (修改) +- `tests/unit/experts/test_orchestrator_debate.py` (新建) + +**Approach**: +- `_execute_phase()` 入口按 `phase.phase_type` 分派: + - `EXECUTION` → 现有 `_execute_phase()` 逻辑(重命名为 `_execute_execution_phase()`) + - `DEBATE` → 新增 `_execute_debate_phase()` +- `_execute_debate_phase(phase, plan)`: + 1. 从 `phase.debate_config` 解析 topic/participants/max_rounds + 2. Lead 开场(LLM 生成,陈述分歧点)→ emit `debate_started` + 3. 循环 max_rounds 轮: + - 检查 `team.consume_user_interventions()`(/stop 提前结束) + - 参与专家并行发言(LLM 生成,基于历史 + 角色 prompt)→ emit `expert_argument` + - Lead 小结 → emit `debate_round_summary` + 4. Lead 裁决(LLM 生成,JSON: `decision`, `rationale`, `conclusion`)→ emit `debate_resolved` + 5. 结论写入 SharedWorkspace,`phase.status = COMPLETED` +- 辩论 prompt 借鉴 `BoardOrchestrator._generate_expert_speech()` 的角色注入模式(persona + thinking_style + speaking_style + history) +- **逃生舱**: `debate_config` 可设 `skip: true`,或 Lead 判断"无分歧"时直接跳过(`phase.status = COMPLETED`, result = "无需辩论") + +**Technical design** (directional): +```python +async def _execute_debate_phase(self, phase: PlanPhase, plan: TeamPlan) -> dict[str, Any]: + config = phase.debate_config or {} + topic = config.get("topic", phase.task_description) + participants = config.get("participants", []) + max_rounds = min(config.get("max_rounds", 2), 4) # 硬上限 4 轮 + + # Lead 开场 + lead = self._team.lead_expert + opening = await self._generate_debate_opening(lead, topic, phase) + await self._broadcast_event("debate_started", {...}) + + history = [{"expert": lead.config.name, "content": opening, "round": 0}] + + for round_num in range(1, max_rounds + 1): + # 检查用户干预 + interventions = self._team.consume_user_interventions() + if self._has_stop_command(interventions): + break + + # 参与专家并行发言 + experts = [self._team.get_expert(name) for name in participants if self._team.get_expert(name)] + speeches = await asyncio.gather( + *[self._generate_debate_argument(e, topic, history, round_num) for e in experts], + return_exceptions=True, + ) + for expert, speech in zip(experts, speeches): + if not isinstance(speech, Exception): + history.append({"expert": expert.config.name, "content": speech, "round": round_num}) + await self._broadcast_event("expert_argument", {...}) + + # Lead 小结 + summary = await self._generate_debate_summary(lead, topic, history, round_num) + history.append({"expert": lead.config.name, "content": summary, "round": round_num}) + await self._broadcast_event("debate_round_summary", {...}) + + # Lead 裁决 + verdict = await self._generate_debate_verdict(lead, topic, history) + await self._broadcast_event("debate_resolved", {...}) + + # 写入 SharedWorkspace + result = {"content": verdict.get("conclusion", ""), "verdict": verdict} + phase.status = PhaseStatus.COMPLETED + phase.result = result + return result +``` + +**Patterns to follow**: +- `BoardOrchestrator._generate_expert_speech()` 的角色 prompt 模式(`src/agentkit/experts/board_orchestrator.py:268`) +- `BoardOrchestrator._has_stop_command()` 的停止命令检查(`src/agentkit/experts/board_orchestrator.py:486`) +- `TeamOrchestrator._broadcast_event()` 的事件广播模式 + +**Test scenarios**: +- Happy path: 2 轮辩论,2 个专家参与,Lead 裁决产出结论,phase 状态变为 COMPLETED +- 边界: max_rounds=1 时只辩论一轮就裁决 +- 边界: participants 为空时,Lead 直接给出结论(无辩论) +- 用户停止: 辩论中收到 /stop,提前结束并裁决 +- 逃生舱: `debate_config.skip=true` 时直接跳过,phase 状态 COMPLETED,result="无需辩论" +- 错误路径: LLM 不可用时,Lead 用模板文本裁决,不抛异常 +- 集成: 辩论结论写入 SharedWorkspace,后续 EXECUTION 阶段能读取 + +**Verification**: `pytest tests/unit/experts/test_orchestrator_debate.py -x -q` 通过 + +--- + +### U3. 分歧检测 + 方案评审辩论(自动触发) + +**Goal**: Lead 在阶段完成后自动检测分歧,动态插入辩论阶段;在分解任务后可选发起方案评审辩论。 + +**Requirements**: R2, R6 + +**Dependencies**: U1, U2 + +**Files**: +- `src/agentkit/experts/orchestrator.py` (修改) +- `tests/unit/experts/test_divergence_detection.py` (新建) + +**Approach**: +- 新增 `_detect_divergence(lead, completed_phase, plan) -> bool`: + - Lead 用 LLM 判断该阶段产出是否与其他已完成阶段冲突,或是否存在多个可行方案 + - Prompt 给出明确标准:"以下情况值得辩论:1) 两个阶段产出矛盾 2) 阶段产出与任务约束冲突 3) 存在多个合理方案。其他情况返回 false。" + - LLM 不可用或判断失败时返回 false(宁可漏报不误报) +- `execute()` 主循环修改:每层执行完成后,对每个 completed phase 运行分歧检测,若 true 则动态插入一个 `DEBATE` phase(`depends_on` 指向该 phase),加入下一层 +- 方案评审辩论(可选):`_decompose_task()` 返回 phases 后,Lead 判断"该任务是否需要方案评审",若需要则在 phases 头部插入一个 `DEBATE` phase(topic="方案评审", participants=所有成员, depends_on=[]) +- **跳过逻辑**: `MAX_DEBATES = 3` 限制单次执行最多插入 3 个辩论阶段(防止成本失控);简单任务(phases <= 2)默认跳过方案评审 + +**Patterns to follow**: `TeamOrchestrator._decompose_task()` 的 LLM prompt + JSON 解析模式 + +**Test scenarios**: +- Happy path: 两个阶段产出矛盾,分歧检测返回 true,自动插入辩论阶段 +- Happy path: 阶段产出一致,分歧检测返回 false,不插入辩论 +- 边界: phases <= 2 时跳过方案评审 +- 边界: 已插入 3 个辩论后不再插入(MAX_DEBATES 上限) +- 错误路径: LLM 不可用时分歧检测返回 false +- 集成: 插入的辩论阶段能被 `topological_sort()` 正确分层,后续阶段能依赖辩论结论 + +**Verification**: `pytest tests/unit/experts/test_divergence_detection.py -x -q` 通过 + +--- + +### U4. 用户干预通道 + 手动辩论触发(WS + CLI 共用) + +**Goal**: 建立 `@team` 执行期间的用户干预通道,支持 `/stop`、`/debate `、普通文本追加上下文。 + +**Requirements**: R3, R5 + +**Dependencies**: U1, U2 + +**Files**: +- `src/agentkit/experts/team.py` (修改:补齐干预队列,参考 BoardTeam 模式) +- `src/agentkit/server/routes/chat.py` (修改:`_execute_team_collab` 增加 WS 干预消息处理) +- `src/agentkit/cli/chat.py` (修改:团队执行期间拦截 `/debate`、`/stop` 命令) +- `tests/unit/experts/test_team_intervention.py` (新建) + +**Approach**: +- `ExpertTeam` 补齐干预队列(参考 `BoardTeam` 的 `add_user_intervention()` / `consume_user_interventions()`,`src/agentkit/experts/board.py`): + - `_interventions: asyncio.Queue` (bounded, maxsize=64) + - `add_user_intervention(msg: str)` / `consume_user_interventions() -> list[str]` + - `broadcast_user_message()` 已存在,改为同时入队干预队列 +- WS 侧(`chat.py _execute_team_collab`): + - 团队执行期间,`_handle_chat_message` 收到的消息若来自当前 session,识别为干预 + - 新增 WS 消息类型 `team_intervention`,或复用 `message` 类型 + session 匹配 + - 调用 `team.add_user_intervention(content)` +- CLI 侧(`cli/chat.py`): + - 团队执行期间,用户输入以 `/` 开头时识别为命令:`/stop`、`/debate ` + - 调用 `team.add_user_intervention(content)` +- `TeamOrchestrator` 在阶段边界(每层执行前 + 辩论每轮前)检查 `consume_user_interventions()`: + - `/stop` → 终止执行,走 fallback + - `/debate ` → 动态插入 DEBATE phase + - 其他文本 → 追加到 Lead 上下文(影响后续分解/裁决) + +**Patterns to follow**: +- `BoardTeam.add_user_intervention()` / `consume_user_interventions()`(`src/agentkit/experts/board.py`) +- `BoardOrchestrator._has_stop_command()`(`src/agentkit/experts/board_orchestrator.py:486`) + +**Test scenarios**: +- Happy path: 用户发送 `/debate 前端框架选型`,团队在下一阶段边界插入辩论 +- Happy path: 用户发送 `/stop`,团队终止执行并走 fallback +- Happy path: 用户发送普通文本,Lead 在后续裁决中参考 +- 边界: 干预队列为空时 `consume_user_interventions()` 返回空列表 +- 边界: 多条干预消息累积,一次性消费 +- 集成: WS 干预消息能从 `chat.py` 传到 `ExpertTeam` 再到 `TeamOrchestrator` + +**Verification**: `pytest tests/unit/experts/test_team_intervention.py -x -q` 通过 + +--- + +### U5. 前端辩论可视化 + +**Goal**: 前端展示辩论过程,专家交锋有独立气泡样式,裁决结果清晰可见。 + +**Requirements**: R1 + +**Dependencies**: U1, U2, U4 + +**Files**: +- `src/agentkit/server/frontend/src/stores/chat.ts` (修改:处理新事件) +- `src/agentkit/server/frontend/src/components/chat/` (修改:辩论气泡组件) +- `src/agentkit/server/frontend/src/types/chat.ts` (修改:新增辩论事件类型) + +**Approach**: +- 新增 WS 事件类型声明:`debate_started`、`expert_argument`、`debate_round_summary`、`debate_resolved` +- `chat.ts` 事件处理(参考现有 `expert_step`/`expert_result` 处理,约第 870-1200 行): + - `debate_started`: 显示"辩论开始"分隔线 + 分歧主题 + - `expert_argument`: 专家发言气泡,带"辩论中"标签和轮次标记 + - `debate_round_summary`: Lead 小结,缩进显示 + - `debate_resolved`: 裁决结果,高亮显示(采纳/折中/搁置 + 理由) +- 辩论气泡与普通专家发言气泡视觉区分:边框颜色/图标不同 +- 用户干预入口:团队执行期间,ChatInput 显示"辩论"按钮(发送 `/debate` 命令) + +**Patterns to follow**: +- 现有 `expert_step`/`expert_result` 事件处理模式(`src/agentkit/server/frontend/src/stores/chat.ts`) +- 现有专家气泡组件样式(`src/agentkit/server/frontend/src/components/chat/`) + +**Test scenarios**: +- Happy path: 收到 `debate_started` 后显示辩论分隔线和主题 +- Happy path: 收到 `expert_argument` 后显示带轮次标记的专家辩论气泡 +- Happy path: 收到 `debate_resolved` 后高亮显示裁决结果 +- 边界: 辩论中 WebSocket 断开,已显示的辩论内容保留 +- 集成: 团队执行期间点击"辩论"按钮,发送 `/debate` 命令 + +**Verification**: `npm run typecheck` 通过;手动验证辩论过程可视化 + +--- + +### U6. CLI 多 Agent 入口 + 辩论支持 + +**Goal**: CLI 支持 `@team`/`@board` 前缀触发多 Agent 协作,辩论过程用 Rich 渲染。 + +**Requirements**: R5 + +**Dependencies**: U1, U2, U4 + +**Files**: +- `src/agentkit/cli/chat.py` (修改) +- `tests/unit/cli/test_chat_multiagent.py` (新建) + +**Approach**: +- chat loop 中,在 skill routing 之前拦截 `@team`/`@board` 前缀: + - 复用 `ExpertTeamRouter.resolve()` / `BoardRouter.resolve()` 解析前缀 + - 构建 `ExpertTeam` / `BoardTeam`(复用 Web 侧逻辑,但不经过 WS) + - 注册事件回调:用 Rich 渲染而非 WS 广播 +- 事件渲染(Rich): + - `team_formed`: Panel 显示团队成员 + - `phase_started`/`expert_step`: 带颜色的专家名 + 任务 + - `expert_result`: Markdown 渲染专家产出 + - `debate_started`: 分隔线 + "辩论: {topic}" + - `expert_argument`: 带轮次标记的专家发言 Panel(不同专家不同颜色) + - `debate_resolved`: 高亮裁决结果 Panel + - `team_synthesis`: 最终结果 Markdown 渲染 +- 团队执行期间,用户输入 `/debate`/`/stop` 走干预通道(U4) +- 帮助文本(`_print_help`)补充 `@team`/`@board` 说明 + +**Patterns to follow**: +- 现有 CLI chat loop 的 Rich 渲染模式(`src/agentkit/cli/chat.py`) +- `BoardOrchestrator` 的事件广播模式(改为回调而非 WS) + +**Test scenarios**: +- Happy path: 输入 `@team 开发登录功能`,CLI 显示团队组建 + 阶段执行 + 最终结果 +- Happy path: 输入 `@board 讨论微服务 vs 单体`,CLI 显示多轮讨论 + 总结 +- Happy path: 团队执行中输入 `/debate 前端框架`,CLI 显示辩论过程 +- Happy path: 团队执行中输入 `/stop`,CLI 显示终止 + fallback 结果 +- 边界: `@team` 无任务描述时提示用法 +- 边界: 专家名称不存在时提示错误 +- 集成: CLI `@team` 流程能触发自动分歧检测和辩论(U3) + +**Verification**: `pytest tests/unit/cli/test_chat_multiagent.py -x -q` 通过 + +--- + +## Scope Boundaries + +### 包含 + +- `DEBATE` 阶段类型及执行器 +- Lead 分歧检测(自动触发) +- 用户干预通道(手动触发 + `/stop`) +- 前端辩论可视化 +- CLI `@team`/`@board` 入口 + 辩论支持 +- "跳过辩论"逃生舱 + +### 不包含 + +- Agent 间点对点自由通信(保持 Lead 主导) +- `@board` 模式改造(它已是讨论模式) +- 团队状态持久化(独立问题) +- 辩论成本优化(缓存、早停等,先验证价值) +- `ExecutionMode.TEAM_COLLAB` 死代码清理(顺手可做,非核心交付) + +### 延后到后续工作 + +- 方向 C 全量(辩论优先作为默认模式):先验证 A+C 混合价值 +- 自定义团队模板保存:用户选的专家组合无法存为模板 +- `orchestrator/` 子系统与团队流程打通 +- 辩论成本预算阈值(token 上限触发跳过) + +--- + +## Risks & Dependencies + +### 风险 + +1. **分歧检测质量**:Lead LLM 判断失误(误报浪费 token,漏报错过辩论)。缓解:明确判断标准 prompt + `MAX_DEBATES` 上限 + 用户可关闭自动触发。 +2. **辩论不收敛**:专家反复争论。缓解:硬上限 4 轮 + Lead 强制裁决权。 +3. **成本上升**:辩论增加 token 消耗。缓解:逃生舱 + `MAX_DEBATES=3` + 简单任务跳过方案评审。 +4. **CLI 交互复杂度**:终端展示多 Agent 辩论不如 Web 直观。缓解:Rich Panel + 颜色区分 + 轮次标记。 +5. **WS 干预消息与正常消息混淆**:团队执行期间用户消息可能被当新任务。缓解:session 匹配 + `team_intervention` 消息类型显式区分。 + +### 依赖 + +- U1 是所有后续单元的基础(数据模型) +- U2 依赖 U1(辩论执行器需要 DEBATE 阶段类型) +- U3 依赖 U1 + U2(分歧检测需要插入 DEBATE phase) +- U4 依赖 U1 + U2(手动触发需要干预通道 + 辩论执行器) +- U5 依赖 U1 + U2 + U4(前端需要新事件 + 干预入口) +- U6 依赖 U1 + U2 + U4(CLI 需要路由 + 辩论 + 干预) + +--- + +## Open Questions + +以下问题留给实现阶段,不阻塞规划: + +- `debate_config` 的确切 JSON schema(`participants` 是专家名列表还是 Expert 对象?倾向名字列表,执行时解析) +- WS `team_intervention` 消息的确切格式(是复用 `message` 类型 + flag,还是新类型?倾向新类型,显式优于隐式) +- 前端辩论气泡的具体样式(边框颜色、轮次标记位置)——实现时对齐现有专家气泡风格 +- CLI 辩论渲染是否用 `Live` 动态更新还是逐条打印——倾向逐条打印(辩论是离散事件,不需要流式) + +--- + +## System-Wide Impact + +- **后端**: `experts/` 模块(plan.py, orchestrator.py, team.py)+ `server/routes/chat.py` + `cli/chat.py` +- **前端**: `stores/chat.ts` + `components/chat/` + `types/chat.ts` +- **测试**: 新增 4 个测试文件 +- **配置**: 无新配置项(辩论参数通过 `debate_config` 在运行时传递) +- **文档**: AGENTS.md 的 ExecutionMode 描述需更新(TEAM_COLLAB 死代码清理可顺手做) + +--- + +## Sources & Research + +- 需求文档: `docs/brainstorms/2026-06-24-agent-debate-collaboration-requirements.md` +- 现有团队流水线: `src/agentkit/experts/orchestrator.py` +- 现有私董会讨论引擎(借鉴模式): `src/agentkit/experts/board_orchestrator.py` +- 现有阶段/计划模型: `src/agentkit/experts/plan.py` +- WS 拦截入口: `src/agentkit/server/routes/chat.py`(`_execute_team_collab` 第 321 行) +- CLI chat(当前无多 Agent): `src/agentkit/cli/chat.py` +- 前端事件处理: `src/agentkit/server/frontend/src/stores/chat.ts`(第 870-1200 行) diff --git a/src/agentkit/experts/orchestrator.py b/src/agentkit/experts/orchestrator.py index a9129f9..2bb80c4 100644 --- a/src/agentkit/experts/orchestrator.py +++ b/src/agentkit/experts/orchestrator.py @@ -32,7 +32,7 @@ from agentkit.core.protocol import TaskMessage, TaskResult, TaskStatus from agentkit.llm.gateway import LLMGateway from .expert import Expert -from .plan import PhaseStatus, PlanPhase, PlanStatus, TeamPlan +from .plan import PhaseStatus, PhaseType, PlanPhase, PlanStatus, TeamPlan from .team import ExpertTeam, TeamStatus logger = logging.getLogger(__name__) @@ -45,10 +45,17 @@ class TeamOrchestrator: Phases are executed in topological order: same-layer phases run in parallel (asyncio.gather), layers run sequentially. Each phase gets an independent ConfigDrivenAgent instance for context isolation (KTD3). + + Phase types: + - EXECUTION: standard phase, expert independently completes assigned task + - DEBATE: Lead-facilitated debate, designated experts argue a divergence + point, Lead adjudicates and produces a conclusion """ MAX_PHASES = 10 # Maximum phases Lead Expert can decompose MAX_RETRIES = 1 # Retry once on phase failure before marking failed + MAX_DEBATE_ROUNDS = 4 # Hard cap on debate rounds per phase + STOP_COMMANDS = frozenset({"/stop", "停止", "stop", "结束"}) def __init__(self, team: ExpertTeam) -> None: self._team = team @@ -349,7 +356,17 @@ class TeamOrchestrator: return phases async def _execute_phase(self, phase: PlanPhase, plan: TeamPlan) -> dict[str, Any]: - """Execute a single phase using the assigned expert. + """Execute a single phase, dispatching by phase_type. + + EXECUTION phases run the standard expert execution flow. + DEBATE phases run the Lead-facilitated debate flow. + """ + if phase.phase_type == PhaseType.DEBATE: + return await self._execute_debate_phase(phase, plan) + return await self._execute_execution_phase(phase, plan) + + async def _execute_execution_phase(self, phase: PlanPhase, plan: TeamPlan) -> dict[str, Any]: + """Execute a standard EXECUTION phase using the assigned expert. Creates an independent ConfigDrivenAgent instance for context isolation (KTD3). Reads dependency outputs from SharedWorkspace, executes the phase task, @@ -520,6 +537,415 @@ class TeamOrchestrator: ) raise RuntimeError(f"Phase {phase.id} ({phase.name}) failed: {last_error}") + async def _execute_debate_phase(self, phase: PlanPhase, plan: TeamPlan) -> dict[str, Any]: + """Execute a DEBATE phase: Lead-facilitated structured debate. + + Flow: + 1. Parse debate_config (topic, participants, max_rounds, skip) + 2. If skip=True, short-circuit with "no debate needed" + 3. Lead opens with the divergence point + 4. Loop max_rounds: experts argue in parallel, Lead summarizes + 5. Lead adjudicates (decision, rationale, conclusion) + 6. Write conclusion to SharedWorkspace, mark phase COMPLETED + + Borrows the multi-round speech pattern from BoardOrchestrator but + stays inline to avoid bridging two orchestrator state machines. + """ + config = phase.debate_config or {} + topic = config.get("topic", phase.task_description) + participants: list[str] = config.get("participants", []) + max_rounds = min(config.get("max_rounds", 2), self.MAX_DEBATE_ROUNDS) + + # Escape hatch: skip debate entirely + if config.get("skip", False): + logger.info(f"Debate phase {phase.id} skipped (skip=True)") + phase.status = PhaseStatus.COMPLETED + result = {"content": "无需辩论", "skipped": True} + phase.result = result + await self._broadcast_event( + "debate_resolved", + { + "phase_id": phase.id, + "phase_name": phase.name, + "decision": "skipped", + "conclusion": "无需辩论", + "rationale": "debate_config.skip=True", + }, + ) + return result + + lead = self._team.lead_expert + if not lead or not lead.is_active: + active = self._team.active_experts + if not active: + raise RuntimeError("No active expert available for debate") + lead = active[0] + + # Resolve participant experts (filter to active ones) + debate_experts: list[Expert] = [] + for name in participants: + expert = self._team.get_expert(name) + if expert and expert.is_active and expert.config.name != lead.config.name: + debate_experts.append(expert) + + phase.status = PhaseStatus.RUNNING + + # 1. Lead opens the debate + opening = await self._generate_debate_opening(lead, topic, phase, plan) + await self._broadcast_event( + "debate_started", + { + "phase_id": phase.id, + "phase_name": phase.name, + "topic": topic, + "participants": [e.config.name for e in debate_experts], + "max_rounds": max_rounds, + "opening": opening, + }, + ) + + # Debate history for context (Lead opening + expert arguments + Lead summaries) + history: list[dict[str, Any]] = [ + {"expert": lead.config.name, "content": opening, "round": 0, "role": "moderator"} + ] + + # 2. Debate rounds + for round_num in range(1, max_rounds + 1): + # Check for user intervention (/stop) + interventions = self._consume_team_interventions() + if self._has_stop_command(interventions): + logger.info(f"Debate {phase.id} stopped by user at round {round_num}") + break + + if not debate_experts: + # No participants — Lead directly adjudicates + break + + # Experts argue in parallel + speech_results = await asyncio.gather( + *[ + self._generate_debate_argument(e, topic, history, round_num) + for e in debate_experts + ], + return_exceptions=True, + ) + + for expert, speech in zip(debate_experts, speech_results): + if isinstance(speech, Exception): + logger.warning( + f"Expert '{expert.config.name}' debate argument failed: {speech}" + ) + continue + history.append( + { + "expert": expert.config.name, + "content": speech, + "round": round_num, + "role": "expert", + } + ) + await self._broadcast_event( + "expert_argument", + { + "phase_id": phase.id, + "expert_id": expert.config.name, + "expert_name": expert.config.name, + "expert_color": expert.config.color, + "content": speech, + "round": round_num, + "topic": topic, + }, + ) + + # Lead summarizes the round + summary = await self._generate_debate_summary(lead, topic, history, round_num) + if summary: + history.append( + { + "expert": lead.config.name, + "content": summary, + "round": round_num, + "role": "moderator", + } + ) + await self._broadcast_event( + "debate_round_summary", + { + "phase_id": phase.id, + "moderator_name": lead.config.name, + "content": summary, + "round": round_num, + "continue": round_num < max_rounds, + }, + ) + + # 3. Lead adjudicates + verdict = await self._generate_debate_verdict(lead, topic, history) + conclusion = verdict.get("conclusion", "") + decision = verdict.get("decision", "inconclusive") + + await self._broadcast_event( + "debate_resolved", + { + "phase_id": phase.id, + "phase_name": phase.name, + "decision": decision, + "conclusion": conclusion, + "rationale": verdict.get("rationale", ""), + }, + ) + + # 4. Write conclusion to SharedWorkspace + result = {"content": conclusion, "verdict": verdict, "decision": decision} + phase.status = PhaseStatus.COMPLETED + phase.result = result + + output_key = f"{plan.id}/phase/{phase.id}/output" + await self._team.workspace.write(output_key, conclusion, lead.config.name) + + # Emit phase_completed event (consistent with execution phases) + result_summary = conclusion[:200] if len(conclusion) > 200 else conclusion + await self._broadcast_event( + "phase_completed", + { + "phase_id": phase.id, + "phase_name": phase.name, + "result_summary": result_summary, + }, + ) + + return result + + async def _generate_debate_opening( + self, lead: Expert, topic: str, phase: PlanPhase, plan: TeamPlan + ) -> str: + """Generate Lead's opening statement for the debate. + + States the divergence point and context from dependency phases. + """ + gateway = self._get_llm_gateway(lead) + if not gateway: + return f"辩论主题:{topic}。请各位专家发表看法。" + + # Gather dependency outputs for context + dep_context = self._build_dependency_context(phase, plan) + + prompt = ( + f"你是团队 Lead {lead.config.name},正在主持一场结构化辩论。\n\n" + f"辩论主题:{topic}\n" + f"阶段任务:{phase.task_description}\n" + ) + if dep_context: + prompt += f"\n前置阶段产出:\n{dep_context}\n" + prompt += ( + "\n请作为主持人开场:\n" + "- 明确陈述分歧点或需要辩论的核心问题\n" + "- 提供必要的上下文(来自前置阶段的产出)\n" + "- 邀请参与专家发表立场\n" + "- 保持简洁,3-5 句话\n" + ) + + try: + response = await gateway.chat( + messages=[{"role": "user", "content": prompt}], + model=self._get_model(lead), + ) + return response.content.strip() + except Exception as e: + logger.warning(f"Debate opening generation failed: {e}") + return f"辩论主题:{topic}。请各位专家发表看法。" + + async def _generate_debate_argument( + self, expert: Expert, topic: str, history: list[dict[str, Any]], round_num: int + ) -> str: + """Generate an expert's debate argument for the current round. + + Based on expert persona + debate history. Borrows the role-injection + pattern from BoardOrchestrator._generate_expert_speech. + """ + gateway = self._get_llm_gateway(expert) + if not gateway: + return f"[{expert.config.name} 因 LLM 不可用无法发言]" + + history_text = self._format_debate_history(history) + + prompt = ( + f"你是 {expert.config.name},正在参加一场结构化辩论。\n\n" + f"你的角色:{expert.config.persona}\n" + f"你的思维风格:{expert.config.thinking_style}\n" + f"你的表达风格:{expert.config.speaking_style}\n" + f"你的决策框架:{expert.config.decision_framework}\n\n" + f"辩论主题:{topic}\n" + f"当前轮次:第 {round_num} 轮\n\n" + ) + if history_text: + prompt += f"辩论历史:\n{history_text}\n\n" + prompt += ( + "请基于你的角色和决策框架,就辩论主题发表你的论点:\n" + "- 明确你的立场(支持/反对/折中)\n" + "- 给出你的论据和理由\n" + "- 可以引用或反驳之前发言者的观点\n" + "- 2-4 段话,简洁有力\n" + ) + + response = await gateway.chat( + messages=[{"role": "user", "content": prompt}], + model=self._get_model(expert), + ) + return response.content.strip() + + async def _generate_debate_summary( + self, lead: Expert, topic: str, history: list[dict[str, Any]], round_num: int + ) -> str: + """Generate Lead's summary of the current debate round.""" + gateway = self._get_llm_gateway(lead) + if not gateway: + return f"[第 {round_num} 轮辩论小结因 LLM 不可用无法生成]" + + # Get only current round's arguments + round_entries = [h for h in history if h.get("round") == round_num and h["role"] == "expert"] + if not round_entries: + return "" + + round_text = "\n\n".join( + f"[{h['expert']}]: {h['content']}" for h in round_entries + ) + + prompt = ( + f"你是团队 Lead {lead.config.name},正在主持辩论。\n\n" + f"辩论主题:{topic}\n" + f"当前轮次:第 {round_num} 轮\n\n" + f"本轮专家论点:\n{round_text}\n\n" + "请小结本轮辩论:\n" + "- 归纳各方核心论点(2-3 句话)\n" + "- 指出共识点和分歧点\n" + "- 提示下一轮可以深入的方向\n" + "- 保持简洁,3-5 句话\n" + ) + + try: + response = await gateway.chat( + messages=[{"role": "user", "content": prompt}], + model=self._get_model(lead), + ) + return response.content.strip() + except Exception as e: + logger.warning(f"Debate summary generation failed: {e}") + return f"[第 {round_num} 轮辩论完成,小结生成失败]" + + async def _generate_debate_verdict( + self, lead: Expert, topic: str, history: list[dict[str, Any]] + ) -> dict[str, Any]: + """Generate Lead's final verdict for the debate. + + Returns dict with: decision (adopt/compromise/shelve/inconclusive), + rationale, conclusion. + """ + gateway = self._get_llm_gateway(lead) + if not gateway: + return { + "decision": "inconclusive", + "rationale": "LLM 不可用", + "conclusion": f"辩论主题:{topic}。因 LLM 不可用,无法生成裁决。", + } + + history_text = self._format_debate_history(history) + + prompt = ( + f"你是团队 Lead {lead.config.name},需要为这场辩论做出最终裁决。\n\n" + f"辩论主题:{topic}\n\n" + f"完整辩论历史:\n{history_text}\n\n" + "请给出最终裁决。输出 JSON 格式:\n" + "```json\n" + "{\n" + ' "decision": "adopt|compromise|shelve|inconclusive",\n' + ' "rationale": "裁决理由,2-3 句话",\n' + ' "conclusion": "最终结论,作为下一阶段的输入"\n' + "}\n" + "```\n" + "decision 含义:\n" + "- adopt: 采纳某方观点\n" + "- compromise: 折中方案\n" + "- shelve: 搁置争议,后续再议\n" + "- inconclusive: 无法裁决\n" + "只输出 JSON,不要其他文字。" + ) + + try: + response = await gateway.chat( + messages=[{"role": "user", "content": prompt}], + model=self._get_model(lead), + ) + content = response.content.strip() + + # Extract JSON from response + json_match = re.search(r"\{.*\}", content, re.DOTALL) + if json_match: + result = json.loads(json_match.group(0)) + return { + "decision": result.get("decision", "inconclusive"), + "rationale": result.get("rationale", ""), + "conclusion": result.get("conclusion", content), + } + + # JSON parsing failed — return raw content as conclusion + return { + "decision": "inconclusive", + "rationale": "JSON 解析失败", + "conclusion": content, + } + except Exception as e: + logger.warning(f"Debate verdict generation failed: {e}") + return { + "decision": "inconclusive", + "rationale": f"裁决生成失败: {e}", + "conclusion": f"辩论主题:{topic}。裁决生成失败,建议参考辩论历史自行判断。", + } + + def _format_debate_history(self, history: list[dict[str, Any]]) -> str: + """Format debate history as readable text for LLM prompts.""" + if not history: + return "" + lines = [] + for h in history: + role_tag = "主持人" if h.get("role") == "moderator" else "专家" + round_tag = f"[第{h['round']}轮]" if h.get("round", 0) > 0 else "[开场]" + lines.append(f"{round_tag} {role_tag} {h['expert']}:\n{h['content']}") + return "\n\n".join(lines) + + def _build_dependency_context(self, phase: PlanPhase, plan: TeamPlan) -> str: + """Build context text from dependency phase outputs for debate prompts.""" + if not phase.depends_on: + return "" + parts = [] + for dep_id in phase.depends_on: + dep_phase = plan.get_phase(dep_id) + if dep_phase and dep_phase.status == PhaseStatus.COMPLETED and dep_phase.result: + content = dep_phase.result.get("content", str(dep_phase.result)) + parts.append(f"[{dep_phase.name}]:\n{content[:500]}") + return "\n---\n".join(parts) if parts else "" + + def _consume_team_interventions(self) -> list[str]: + """Consume user interventions from the team, if available. + + Checks ExpertTeam for an intervention queue (added in U4). + Falls back to empty list if the team doesn't support interventions yet. + """ + consume = getattr(self._team, "consume_user_interventions", None) + if consume is None: + return [] + try: + return consume() + except Exception: + return [] + + def _has_stop_command(self, interventions: list[str]) -> bool: + """Check if any user intervention contains a stop command.""" + for msg in interventions: + if msg.strip().lower() in self.STOP_COMMANDS: + return True + return False + async def _get_isolated_agent(self, expert: Expert, phase: PlanPhase) -> ConfigDrivenAgent: """Get an isolated ConfigDrivenAgent instance for the phase. diff --git a/tests/unit/experts/test_orchestrator_debate.py b/tests/unit/experts/test_orchestrator_debate.py new file mode 100644 index 0000000..980554f --- /dev/null +++ b/tests/unit/experts/test_orchestrator_debate.py @@ -0,0 +1,923 @@ +"""TeamOrchestrator 辩论阶段执行器单元测试 (U2) + +测试覆盖: +- Happy path: 2 轮辩论,2 个专家参与,Lead 裁决产出结论 +- 边界: max_rounds=1 时只辩论一轮就裁决 +- 边界: participants 为空时,Lead 直接给出结论(无辩论) +- 用户停止: 辩论中收到 /stop,提前结束并裁决 +- 逃生舱: debate_config.skip=true 时直接跳过 +- 错误路径: LLM 不可用时,Lead 用模板文本裁决,不抛异常 +- 集成: 辩论结论写入 SharedWorkspace +- 事件广播: debate_started / expert_argument / debate_round_summary / debate_resolved +- 干预通道: _consume_team_interventions getattr 回退(U4 兼容) +""" + +from __future__ import annotations + +import json +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from agentkit.core.handoff_transport import InProcessHandoffTransport +from agentkit.experts.config import ExpertConfig +from agentkit.experts.expert import Expert +from agentkit.experts.orchestrator import TeamOrchestrator +from agentkit.experts.plan import PhaseStatus, PhaseType, PlanPhase, TeamPlan +from agentkit.experts.team import ExpertTeam + + +# ── 辅助函数 ────────────────────────────────────────────── + + +def _make_expert_config( + name: str = "test_expert", + is_lead: bool = False, + llm: dict | None = None, +) -> ExpertConfig: + """创建测试用 ExpertConfig(含辩论 prompt 所需的角色字段)""" + return ExpertConfig( + name=name, + agent_type="expert", + persona=f"{name}的角色描述", + thinking_style="逻辑推理", + speaking_style="简洁直接", + decision_framework="数据驱动决策", + bound_skills=["skill_a"], + is_lead=is_lead, + task_mode="llm_generate", + prompt={"identity": "测试"}, + llm=llm, + ) + + +def _make_mock_expert( + name: str = "test_expert", + is_lead: bool = False, + is_active: bool = True, + llm: dict | None = None, + gateway: MagicMock | None = None, +) -> MagicMock: + """创建 mock Expert + + Args: + gateway: 如果提供,设置到 expert.agent._llm_gateway 上 + """ + config = _make_expert_config(name=name, is_lead=is_lead, llm=llm) + expert = MagicMock(spec=Expert) + expert.config = config + expert.is_active = is_active + expert.team_id = None + expert.get_capabilities_summary.return_value = { + "name": name, + "persona": config.persona, + "thinking_style": config.thinking_style, + "bound_skills": config.bound_skills, + "is_lead": is_lead, + } + mock_agent = MagicMock() + mock_agent._llm_gateway = gateway + expert.agent = mock_agent + return expert + + +def _make_team_with_experts( + expert_names: list[str] | None = None, + lead_name: str = "lead", + gateway: MagicMock | None = None, +) -> ExpertTeam: + """创建包含 mock experts 的 ExpertTeam + + Args: + gateway: 如果提供,设置到所有 expert 的 agent._llm_gateway 上 + """ + team = ExpertTeam() + transport = AsyncMock(spec=InProcessHandoffTransport) + team._handoff_transport = transport + + if expert_names is None: + expert_names = [lead_name, "member1", "member2"] + + for name in expert_names: + is_lead = name == lead_name + expert = _make_mock_expert(name=name, is_lead=is_lead, gateway=gateway) + team._experts[name] = expert + if is_lead: + team._lead_expert_name = name + + return team + + +def _make_smart_llm_gateway( + opening: str = "开场:我们需要讨论这个分歧点。", + argument_template: str = "[{expert}] 我认为应该采用这个方案。", + summary: str = "本轮小结:双方各有道理。", + verdict: dict | None = None, +) -> AsyncMock: + """创建智能 mock LLM gateway,根据 prompt 内容返回不同响应 + + 通过 prompt 关键词区分:开场 / 论点 / 小结 / 裁决 + 避免依赖并行调用顺序。 + """ + if verdict is None: + verdict = { + "decision": "adopt", + "rationale": "甲方论据更充分", + "conclusion": "采纳甲方方案,按此执行。", + } + verdict_json = json.dumps(verdict, ensure_ascii=False) + + async def chat_side_effect(messages, model=None, **kwargs): + prompt = messages[0]["content"] if messages else "" + response = MagicMock() + # Order matters: check most specific first — verdict/summary prompts + # contain debate history which includes opening/argument text. + if "最终裁决" in prompt: + response.content = f"```json\n{verdict_json}\n```" + elif "小结本轮辩论" in prompt: + response.content = summary + elif "发表你的论点" in prompt: + # Extract expert name from prompt: "你是 {name},正在参加" + import re + + name_match = re.search(r"你是 (\w+),正在参加", prompt) + expert_name = name_match.group(1) if name_match else "expert" + response.content = argument_template.format(expert=expert_name) + elif "主持人开场" in prompt: + response.content = opening + else: + response.content = "默认响应" + return response + + gateway = AsyncMock() + gateway.chat = AsyncMock(side_effect=chat_side_effect) + return gateway + + +def _make_debate_phase( + phase_id: str = "debate_1", + name: str = "架构辩论", + topic: str = "前端框架选型:React vs Vue", + participants: list[str] | None = None, + max_rounds: int = 2, + skip: bool = False, + depends_on: list[str] | None = None, + assigned_expert: str = "lead", +) -> PlanPhase: + """创建测试用 DEBATE 阶段""" + if participants is None: + participants = ["member1", "member2"] + debate_config: dict = { + "topic": topic, + "participants": participants, + "max_rounds": max_rounds, + } + if skip: + debate_config["skip"] = True + return PlanPhase( + id=phase_id, + name=name, + assigned_expert=assigned_expert, + task_description=topic, + depends_on=depends_on or [], + phase_type=PhaseType.DEBATE, + debate_config=debate_config, + ) + + +def _make_plan_with_debate_phase(phase: PlanPhase) -> TeamPlan: + """创建包含单个 DEBATE 阶段的 TeamPlan""" + return TeamPlan( + id="test_plan", + task="测试辩论任务", + phases=[phase], + lead_expert="lead", + ) + + +# ── Happy Path 测试 ─────────────────────────────────────── + + +class TestDebatePhaseHappyPath: + """辩论阶段 happy path 测试""" + + @pytest.mark.asyncio + async def test_two_rounds_two_experts_completes(self): + """2 轮辩论,2 个专家参与,phase 状态变为 COMPLETED""" + gateway = _make_smart_llm_gateway() + team = _make_team_with_experts(gateway=gateway) + orchestrator = TeamOrchestrator(team) + + phase = _make_debate_phase(max_rounds=2, participants=["member1", "member2"]) + plan = _make_plan_with_debate_phase(phase) + + result = await orchestrator._execute_debate_phase(phase, plan) + + assert phase.status == PhaseStatus.COMPLETED + assert result["content"] == "采纳甲方方案,按此执行。" + assert result["decision"] == "adopt" + assert "verdict" in result + assert result["verdict"]["decision"] == "adopt" + + @pytest.mark.asyncio + async def test_debate_produces_verdict_with_required_fields(self): + """辩论裁决包含 decision / rationale / conclusion 三个字段""" + gateway = _make_smart_llm_gateway( + verdict={ + "decision": "compromise", + "rationale": "双方各有优势", + "conclusion": "采用折中方案。", + } + ) + team = _make_team_with_experts(gateway=gateway) + orchestrator = TeamOrchestrator(team) + + phase = _make_debate_phase(max_rounds=1, participants=["member1"]) + plan = _make_plan_with_debate_phase(phase) + + result = await orchestrator._execute_debate_phase(phase, plan) + + assert result["verdict"]["decision"] == "compromise" + assert result["verdict"]["rationale"] == "双方各有优势" + assert result["verdict"]["conclusion"] == "采用折中方案。" + + @pytest.mark.asyncio + async def test_debate_emits_debate_started_event(self): + """辩论开始时广播 debate_started 事件""" + gateway = _make_smart_llm_gateway() + team = _make_team_with_experts(gateway=gateway) + orchestrator = TeamOrchestrator(team) + + phase = _make_debate_phase(max_rounds=1, participants=["member1"]) + plan = _make_plan_with_debate_phase(phase) + + await orchestrator._execute_debate_phase(phase, plan) + + calls = team._handoff_transport.send.call_args_list + event_types = [c[0][1]["type"] for c in calls] + assert "debate_started" in event_types + + @pytest.mark.asyncio + async def test_debate_emits_expert_argument_events(self): + """每个专家发言时广播 expert_argument 事件""" + gateway = _make_smart_llm_gateway() + team = _make_team_with_experts(gateway=gateway) + orchestrator = TeamOrchestrator(team) + + phase = _make_debate_phase(max_rounds=1, participants=["member1", "member2"]) + plan = _make_plan_with_debate_phase(phase) + + await orchestrator._execute_debate_phase(phase, plan) + + calls = team._handoff_transport.send.call_args_list + argument_events = [c[0][1] for c in calls if c[0][1].get("type") == "expert_argument"] + # 2 experts × 1 round = 2 argument events + assert len(argument_events) == 2 + expert_ids = {e["expert_id"] for e in argument_events} + assert expert_ids == {"member1", "member2"} + + @pytest.mark.asyncio + async def test_debate_emits_round_summary_events(self): + """每轮辩论结束时广播 debate_round_summary 事件""" + gateway = _make_smart_llm_gateway() + team = _make_team_with_experts(gateway=gateway) + orchestrator = TeamOrchestrator(team) + + phase = _make_debate_phase(max_rounds=2, participants=["member1", "member2"]) + plan = _make_plan_with_debate_phase(phase) + + await orchestrator._execute_debate_phase(phase, plan) + + calls = team._handoff_transport.send.call_args_list + summary_events = [ + c[0][1] for c in calls if c[0][1].get("type") == "debate_round_summary" + ] + assert len(summary_events) == 2 # 2 rounds + # Round 1 summary should have continue=True, round 2 continue=False + assert summary_events[0]["round"] == 1 + assert summary_events[0]["continue"] is True + assert summary_events[1]["round"] == 2 + assert summary_events[1]["continue"] is False + + @pytest.mark.asyncio + async def test_debate_emits_debate_resolved_event(self): + """辩论裁决时广播 debate_resolved 事件""" + gateway = _make_smart_llm_gateway() + team = _make_team_with_experts(gateway=gateway) + orchestrator = TeamOrchestrator(team) + + phase = _make_debate_phase(max_rounds=1, participants=["member1"]) + plan = _make_plan_with_debate_phase(phase) + + await orchestrator._execute_debate_phase(phase, plan) + + calls = team._handoff_transport.send.call_args_list + resolved_events = [ + c[0][1] for c in calls if c[0][1].get("type") == "debate_resolved" + ] + assert len(resolved_events) == 1 + assert resolved_events[0]["decision"] == "adopt" + assert "conclusion" in resolved_events[0] + + @pytest.mark.asyncio + async def test_debate_emits_phase_completed_event(self): + """辩论阶段完成时广播 phase_completed 事件(与 EXECUTION 阶段一致)""" + gateway = _make_smart_llm_gateway() + team = _make_team_with_experts(gateway=gateway) + orchestrator = TeamOrchestrator(team) + + phase = _make_debate_phase(max_rounds=1, participants=["member1"]) + plan = _make_plan_with_debate_phase(phase) + + await orchestrator._execute_debate_phase(phase, plan) + + calls = team._handoff_transport.send.call_args_list + completed_events = [ + c[0][1] for c in calls if c[0][1].get("type") == "phase_completed" + ] + assert len(completed_events) == 1 + assert completed_events[0]["phase_id"] == phase.id + + +# ── 边界测试 ────────────────────────────────────────────── + + +class TestDebatePhaseMaxRounds: + """max_rounds 边界测试""" + + @pytest.mark.asyncio + async def test_max_rounds_one_single_round(self): + """max_rounds=1 时只辩论一轮就裁决""" + gateway = _make_smart_llm_gateway() + team = _make_team_with_experts(gateway=gateway) + orchestrator = TeamOrchestrator(team) + + phase = _make_debate_phase(max_rounds=1, participants=["member1", "member2"]) + plan = _make_plan_with_debate_phase(phase) + + await orchestrator._execute_debate_phase(phase, plan) + + # Count expert_argument events: 2 experts × 1 round = 2 + calls = team._handoff_transport.send.call_args_list + argument_events = [ + c[0][1] for c in calls if c[0][1].get("type") == "expert_argument" + ] + assert len(argument_events) == 2 + # Count summary events: 1 round = 1 summary + summary_events = [ + c[0][1] for c in calls if c[0][1].get("type") == "debate_round_summary" + ] + assert len(summary_events) == 1 + + @pytest.mark.asyncio + async def test_max_rounds_capped_at_max_debate_rounds(self): + """max_rounds 超过 MAX_DEBATE_ROUNDS 时被截断""" + gateway = _make_smart_llm_gateway() + team = _make_team_with_experts(gateway=gateway) + orchestrator = TeamOrchestrator(team) + + # Request 10 rounds, should be capped to MAX_DEBATE_ROUNDS (4) + phase = _make_debate_phase(max_rounds=10, participants=["member1"]) + plan = _make_plan_with_debate_phase(phase) + + await orchestrator._execute_debate_phase(phase, plan) + + calls = team._handoff_transport.send.call_args_list + summary_events = [ + c[0][1] for c in calls if c[0][1].get("type") == "debate_round_summary" + ] + assert len(summary_events) == TeamOrchestrator.MAX_DEBATE_ROUNDS + + +class TestDebatePhaseEmptyParticipants: + """participants 为空时的边界测试""" + + @pytest.mark.asyncio + async def test_empty_participants_lead_directly_adjudicates(self): + """participants 为空时,Lead 直接给出结论(无辩论轮次)""" + gateway = _make_smart_llm_gateway() + team = _make_team_with_experts(gateway=gateway) + orchestrator = TeamOrchestrator(team) + + phase = _make_debate_phase(participants=[], max_rounds=3) + plan = _make_plan_with_debate_phase(phase) + + result = await orchestrator._execute_debate_phase(phase, plan) + + assert phase.status == PhaseStatus.COMPLETED + # Should still have a conclusion from Lead verdict + assert "content" in result + # No expert_argument events should be emitted + calls = team._handoff_transport.send.call_args_list + argument_events = [ + c[0][1] for c in calls if c[0][1].get("type") == "expert_argument" + ] + assert len(argument_events) == 0 + # No round summary events + summary_events = [ + c[0][1] for c in calls if c[0][1].get("type") == "debate_round_summary" + ] + assert len(summary_events) == 0 + + @pytest.mark.asyncio + async def test_empty_participants_still_emits_debate_started(self): + """participants 为空时仍广播 debate_started(含空 participants 列表)""" + gateway = _make_smart_llm_gateway() + team = _make_team_with_experts(gateway=gateway) + orchestrator = TeamOrchestrator(team) + + phase = _make_debate_phase(participants=[], max_rounds=2) + plan = _make_plan_with_debate_phase(phase) + + await orchestrator._execute_debate_phase(phase, plan) + + calls = team._handoff_transport.send.call_args_list + started_events = [ + c[0][1] for c in calls if c[0][1].get("type") == "debate_started" + ] + assert len(started_events) == 1 + assert started_events[0]["participants"] == [] + + +# ── 用户停止测试 ────────────────────────────────────────── + + +class TestDebatePhaseUserStop: + """用户 /stop 干预测试""" + + @pytest.mark.asyncio + async def test_stop_command_ends_debate_early(self): + """辩论中收到 /stop,提前结束并裁决""" + gateway = _make_smart_llm_gateway() + team = _make_team_with_experts(gateway=gateway) + orchestrator = TeamOrchestrator(team) + + # Mock intervention queue: return /stop on first check (round 1) + team.consume_user_interventions = MagicMock(return_value=["/stop"]) + + phase = _make_debate_phase(max_rounds=3, participants=["member1", "member2"]) + plan = _make_plan_with_debate_phase(phase) + + result = await orchestrator._execute_debate_phase(phase, plan) + + assert phase.status == PhaseStatus.COMPLETED + # Should still produce a verdict + assert "content" in result + # No expert_argument events — stopped before round 1 arguments + calls = team._handoff_transport.send.call_args_list + argument_events = [ + c[0][1] for c in calls if c[0][1].get("type") == "expert_argument" + ] + assert len(argument_events) == 0 + + @pytest.mark.asyncio + async def test_chinese_stop_command_ends_debate(self): + """中文 '停止' 命令也能结束辩论""" + gateway = _make_smart_llm_gateway() + team = _make_team_with_experts(gateway=gateway) + orchestrator = TeamOrchestrator(team) + + team.consume_user_interventions = MagicMock(return_value=["停止"]) + + phase = _make_debate_phase(max_rounds=3, participants=["member1"]) + plan = _make_plan_with_debate_phase(phase) + + await orchestrator._execute_debate_phase(phase, plan) + + assert phase.status == PhaseStatus.COMPLETED + calls = team._handoff_transport.send.call_args_list + argument_events = [ + c[0][1] for c in calls if c[0][1].get("type") == "expert_argument" + ] + assert len(argument_events) == 0 + + @pytest.mark.asyncio + async def test_non_stop_intervention_does_not_end_debate(self): + """非停止命令的干预不会结束辩论""" + gateway = _make_smart_llm_gateway() + team = _make_team_with_experts(gateway=gateway) + orchestrator = TeamOrchestrator(team) + + # Non-stop intervention should not end the debate + team.consume_user_interventions = MagicMock(return_value=["继续讨论"]) + + phase = _make_debate_phase(max_rounds=1, participants=["member1", "member2"]) + plan = _make_plan_with_debate_phase(phase) + + await orchestrator._execute_debate_phase(phase, plan) + + # Debate should proceed normally — arguments emitted + calls = team._handoff_transport.send.call_args_list + argument_events = [ + c[0][1] for c in calls if c[0][1].get("type") == "expert_argument" + ] + assert len(argument_events) == 2 # 2 experts × 1 round + + +# ── 逃生舱测试 ──────────────────────────────────────────── + + +class TestDebatePhaseSkipEscapeHatch: + """skip=True 逃生舱测试""" + + @pytest.mark.asyncio + async def test_skip_true_short_circuits_debate(self): + """debate_config.skip=true 时直接跳过,phase 状态 COMPLETED""" + gateway = _make_smart_llm_gateway() + team = _make_team_with_experts(gateway=gateway) + orchestrator = TeamOrchestrator(team) + + phase = _make_debate_phase(skip=True, participants=["member1", "member2"]) + plan = _make_plan_with_debate_phase(phase) + + result = await orchestrator._execute_debate_phase(phase, plan) + + assert phase.status == PhaseStatus.COMPLETED + assert result["content"] == "无需辩论" + assert result["skipped"] is True + + @pytest.mark.asyncio + async def test_skip_true_does_not_call_llm(self): + """skip=true 时不调用 LLM""" + gateway = _make_smart_llm_gateway() + team = _make_team_with_experts(gateway=gateway) + orchestrator = TeamOrchestrator(team) + + phase = _make_debate_phase(skip=True) + plan = _make_plan_with_debate_phase(phase) + + await orchestrator._execute_debate_phase(phase, plan) + + # LLM should not be called at all + gateway.chat.assert_not_awaited() + + @pytest.mark.asyncio + async def test_skip_true_emits_debate_resolved_with_skipped_decision(self): + """skip=true 时广播 debate_resolved 事件,decision='skipped'""" + gateway = _make_smart_llm_gateway() + team = _make_team_with_experts(gateway=gateway) + orchestrator = TeamOrchestrator(team) + + phase = _make_debate_phase(skip=True) + plan = _make_plan_with_debate_phase(phase) + + await orchestrator._execute_debate_phase(phase, plan) + + calls = team._handoff_transport.send.call_args_list + resolved_events = [ + c[0][1] for c in calls if c[0][1].get("type") == "debate_resolved" + ] + assert len(resolved_events) == 1 + assert resolved_events[0]["decision"] == "skipped" + assert resolved_events[0]["conclusion"] == "无需辩论" + + @pytest.mark.asyncio + async def test_skip_true_no_debate_started_event(self): + """skip=true 时不广播 debate_started 事件""" + gateway = _make_smart_llm_gateway() + team = _make_team_with_experts(gateway=gateway) + orchestrator = TeamOrchestrator(team) + + phase = _make_debate_phase(skip=True) + plan = _make_plan_with_debate_phase(phase) + + await orchestrator._execute_debate_phase(phase, plan) + + calls = team._handoff_transport.send.call_args_list + event_types = [c[0][1]["type"] for c in calls] + assert "debate_started" not in event_types + assert "expert_argument" not in event_types + + +# ── LLM 不可用错误路径测试 ──────────────────────────────── + + +class TestDebatePhaseLLMUnavailable: + """LLM 不可用时的错误路径测试""" + + @pytest.mark.asyncio + async def test_no_llm_gateway_uses_template_verdict(self): + """LLM 不可用时,Lead 用模板文本裁决,不抛异常""" + # No gateway provided — all experts have _llm_gateway=None + team = _make_team_with_experts(gateway=None) + orchestrator = TeamOrchestrator(team) + + phase = _make_debate_phase(max_rounds=2, participants=["member1", "member2"]) + plan = _make_plan_with_debate_phase(phase) + + result = await orchestrator._execute_debate_phase(phase, plan) + + assert phase.status == PhaseStatus.COMPLETED + # Should have a template conclusion (not raise) + assert "content" in result + assert result["decision"] == "inconclusive" + + @pytest.mark.asyncio + async def test_no_llm_gateway_opening_uses_template(self): + """LLM 不可用时,开场使用模板文本""" + team = _make_team_with_experts(gateway=None) + orchestrator = TeamOrchestrator(team) + + phase = _make_debate_phase(max_rounds=1, participants=["member1"]) + plan = _make_plan_with_debate_phase(phase) + + await orchestrator._execute_debate_phase(phase, plan) + + calls = team._handoff_transport.send.call_args_list + started_events = [ + c[0][1] for c in calls if c[0][1].get("type") == "debate_started" + ] + assert len(started_events) == 1 + # Opening should contain the topic (template text) + assert "前端框架选型" in started_events[0]["opening"] + + @pytest.mark.asyncio + async def test_llm_gateway_exception_does_not_crash(self): + """LLM gateway 抛异常时不崩溃,用模板裁决""" + gateway = AsyncMock() + gateway.chat = AsyncMock(side_effect=RuntimeError("LLM service down")) + team = _make_team_with_experts(gateway=gateway) + orchestrator = TeamOrchestrator(team) + + phase = _make_debate_phase(max_rounds=1, participants=["member1"]) + plan = _make_plan_with_debate_phase(phase) + + result = await orchestrator._execute_debate_phase(phase, plan) + + assert phase.status == PhaseStatus.COMPLETED + assert result["decision"] == "inconclusive" + + @pytest.mark.asyncio + async def test_verdict_json_parse_failure_returns_inconclusive(self): + """裁决 JSON 解析失败时返回 inconclusive""" + gateway = AsyncMock() + # Return non-JSON for all calls + response = MagicMock() + response.content = "这不是JSON格式" + gateway.chat = AsyncMock(return_value=response) + team = _make_team_with_experts(gateway=gateway) + orchestrator = TeamOrchestrator(team) + + phase = _make_debate_phase(max_rounds=1, participants=["member1"]) + plan = _make_plan_with_debate_phase(phase) + + result = await orchestrator._execute_debate_phase(phase, plan) + + assert phase.status == PhaseStatus.COMPLETED + assert result["decision"] == "inconclusive" + # Conclusion should fall back to raw content + assert "content" in result + + +# ── SharedWorkspace 集成测试 ────────────────────────────── + + +class TestDebatePhaseSharedWorkspace: + """辩论结论写入 SharedWorkspace 测试""" + + @pytest.mark.asyncio + async def test_conclusion_written_to_workspace(self): + """辩论结论写入 SharedWorkspace""" + gateway = _make_smart_llm_gateway() + team = _make_team_with_experts(gateway=gateway) + orchestrator = TeamOrchestrator(team) + + phase = _make_debate_phase(max_rounds=1, participants=["member1"]) + plan = _make_plan_with_debate_phase(phase) + + await orchestrator._execute_debate_phase(phase, plan) + + # Verify workspace has the debate output + workspace = team.workspace + output_key = f"{plan.id}/phase/{phase.id}/output" + data = await workspace.read(output_key) + assert data is not None + assert data["value"] == "采纳甲方方案,按此执行。" + assert data["agent_id"] == "lead" + + @pytest.mark.asyncio + async def test_phase_result_stored_on_phase_object(self): + """辩论结果存储在 phase.result 上""" + gateway = _make_smart_llm_gateway() + team = _make_team_with_experts(gateway=gateway) + orchestrator = TeamOrchestrator(team) + + phase = _make_debate_phase(max_rounds=1, participants=["member1"]) + plan = _make_plan_with_debate_phase(phase) + + await orchestrator._execute_debate_phase(phase, plan) + + assert phase.result is not None + assert phase.result["content"] == "采纳甲方方案,按此执行。" + assert phase.result["decision"] == "adopt" + assert "verdict" in phase.result + + +# ── 干预通道兼容性测试 ──────────────────────────────────── + + +class TestInterventionChannelCompatibility: + """干预通道 getattr 回退测试(U4 兼容)""" + + @pytest.mark.asyncio + async def test_no_intervention_method_returns_empty(self): + """team 没有 consume_user_interventions 方法时返回空列表""" + gateway = _make_smart_llm_gateway() + team = _make_team_with_experts(gateway=gateway) + orchestrator = TeamOrchestrator(team) + + # ExpertTeam doesn't have consume_user_interventions yet (U4 not implemented) + assert not hasattr(team, "consume_user_interventions") + + phase = _make_debate_phase(max_rounds=1, participants=["member1"]) + plan = _make_plan_with_debate_phase(phase) + + # Should not raise — falls back to empty list + await orchestrator._execute_debate_phase(phase, plan) + assert phase.status == PhaseStatus.COMPLETED + + @pytest.mark.asyncio + async def test_intervention_method_exception_returns_empty(self): + """consume_user_interventions 抛异常时返回空列表""" + gateway = _make_smart_llm_gateway() + team = _make_team_with_experts(gateway=gateway) + orchestrator = TeamOrchestrator(team) + + # Set a broken intervention method + team.consume_user_interventions = MagicMock(side_effect=RuntimeError("broken")) + + phase = _make_debate_phase(max_rounds=1, participants=["member1"]) + plan = _make_plan_with_debate_phase(phase) + + # Should not raise — exception caught, returns empty list + await orchestrator._execute_debate_phase(phase, plan) + assert phase.status == PhaseStatus.COMPLETED + + +# ── Phase 分发测试 ──────────────────────────────────────── + + +class TestPhaseDispatch: + """_execute_phase 分发器测试""" + + @pytest.mark.asyncio + async def test_execution_phase_dispatches_to_execution_method(self): + """EXECUTION 类型阶段分发到 _execute_execution_phase""" + team = _make_team_with_experts() + orchestrator = TeamOrchestrator(team) + + # Mock both execution methods to track dispatch + orchestrator._execute_execution_phase = AsyncMock( + return_value={"content": "execution result"} + ) + orchestrator._execute_debate_phase = AsyncMock( + return_value={"content": "debate result"} + ) + + phase = PlanPhase(name="执行阶段", assigned_expert="lead", task_description="任务") + plan = _make_plan_with_debate_phase(phase) + + await orchestrator._execute_phase(phase, plan) + + orchestrator._execute_execution_phase.assert_awaited_once_with(phase, plan) + orchestrator._execute_debate_phase.assert_not_awaited() + + @pytest.mark.asyncio + async def test_debate_phase_dispatches_to_debate_method(self): + """DEBATE 类型阶段分发到 _execute_debate_phase""" + team = _make_team_with_experts() + orchestrator = TeamOrchestrator(team) + + orchestrator._execute_execution_phase = AsyncMock( + return_value={"content": "execution result"} + ) + orchestrator._execute_debate_phase = AsyncMock( + return_value={"content": "debate result"} + ) + + phase = _make_debate_phase() + plan = _make_plan_with_debate_phase(phase) + + await orchestrator._execute_phase(phase, plan) + + orchestrator._execute_debate_phase.assert_awaited_once_with(phase, plan) + orchestrator._execute_execution_phase.assert_not_awaited() + + +# ── 辅助方法单元测试 ────────────────────────────────────── + + +class TestHelperMethods: + """辅助方法单元测试""" + + def test_has_stop_command_detects_stop_commands(self): + """_has_stop_command 检测停止命令""" + team = _make_team_with_experts() + orchestrator = TeamOrchestrator(team) + + assert orchestrator._has_stop_command(["/stop"]) is True + assert orchestrator._has_stop_command(["停止"]) is True + assert orchestrator._has_stop_command(["stop"]) is True + assert orchestrator._has_stop_command(["结束"]) is True + + def test_has_stop_command_ignores_non_stop(self): + """_has_stop_command 忽略非停止命令""" + team = _make_team_with_experts() + orchestrator = TeamOrchestrator(team) + + assert orchestrator._has_stop_command(["继续"]) is False + assert orchestrator._has_stop_command(["/continue"]) is False + assert orchestrator._has_stop_command([]) is False + + def test_has_stop_command_case_insensitive(self): + """_has_stop_command 大小写不敏感""" + team = _make_team_with_experts() + orchestrator = TeamOrchestrator(team) + + assert orchestrator._has_stop_command(["STOP"]) is True + assert orchestrator._has_stop_command([" /stop "]) is True + + def test_format_debate_history_empty(self): + """_format_debate_history 空历史返回空字符串""" + team = _make_team_with_experts() + orchestrator = TeamOrchestrator(team) + + assert orchestrator._format_debate_history([]) == "" + + def test_format_debate_history_with_entries(self): + """_format_debate_history 格式化历史条目""" + team = _make_team_with_experts() + orchestrator = TeamOrchestrator(team) + + history = [ + {"expert": "lead", "content": "开场白", "round": 0, "role": "moderator"}, + {"expert": "member1", "content": "我的论点", "round": 1, "role": "expert"}, + ] + result = orchestrator._format_debate_history(history) + assert "开场白" in result + assert "我的论点" in result + assert "主持人" in result + assert "专家" in result + assert "[开场]" in result + assert "[第1轮]" in result + + def test_build_dependency_context_no_deps(self): + """_build_dependency_context 无依赖时返回空字符串""" + team = _make_team_with_experts() + orchestrator = TeamOrchestrator(team) + + phase = _make_debate_phase(depends_on=[]) + plan = _make_plan_with_debate_phase(phase) + + assert orchestrator._build_dependency_context(phase, plan) == "" + + def test_build_dependency_context_with_completed_dep(self): + """_build_dependency_context 包含已完成依赖的输出""" + team = _make_team_with_experts() + orchestrator = TeamOrchestrator(team) + + # Create a dependency phase that's completed + dep_phase = PlanPhase( + id="dep_1", + name="前置阶段", + assigned_expert="lead", + task_description="前置任务", + depends_on=[], + ) + dep_phase.status = PhaseStatus.COMPLETED + dep_phase.result = {"content": "前置阶段输出内容"} + + debate_phase = _make_debate_phase(depends_on=["dep_1"]) + plan = TeamPlan( + id="test_plan", + task="测试", + phases=[dep_phase, debate_phase], + lead_expert="lead", + ) + + context = orchestrator._build_dependency_context(debate_phase, plan) + assert "前置阶段" in context + assert "前置阶段输出内容" in context + + def test_build_dependency_context_ignores_incomplete_dep(self): + """_build_dependency_context 忽略未完成的依赖""" + team = _make_team_with_experts() + orchestrator = TeamOrchestrator(team) + + # Dependency phase is still PENDING + dep_phase = PlanPhase( + id="dep_1", + name="前置阶段", + assigned_expert="lead", + task_description="前置任务", + ) + debate_phase = _make_debate_phase(depends_on=["dep_1"]) + plan = TeamPlan( + id="test_plan", + task="测试", + phases=[dep_phase, debate_phase], + lead_expert="lead", + ) + + context = orchestrator._build_dependency_context(debate_phase, plan) + assert context == ""