From 7384ecb03eee0217b3db5773a77795aba7281cd1 Mon Sep 17 00:00:00 2001 From: chiguyong Date: Sun, 14 Jun 2026 22:20:14 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20Expert=20Team=20Mode=20=E2=80=94=20plan?= =?UTF-8?q?-execute=20collaboration=20with=20conversation=20UI?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements B+C hybrid Expert Team Mode with ExpertConfig, CollaborationPlan, TeamOrchestrator, ExpertTeamRouter, HandoffTransport, SharedWorkspace, and Expert wrapper. Frontend includes ExpertTeamView, ExpertMessage, PlanVisualization, team store, and WS event handlers. Code review fixes: sentinel-based close, per-phase retry, name validation, Vue component integration, teamState dedup, Redis reset, plan reassign, event_type validation, hmac timing-safe compare, message dedup, reactive updatePhases, O(1) phase lookup, iterative DFS, bounded Queue. 232 unit tests passing. --- ...026-06-14-expert-team-mode-requirements.md | 207 +++++ ...26-06-14-001-feat-expert-team-mode-plan.md | 573 +++++++++++++ src/agentkit/chat/skill_routing.py | 1 + src/agentkit/core/handoff_transport.py | 248 ++++++ src/agentkit/experts/__init__.py | 34 + src/agentkit/experts/config.py | 138 ++++ src/agentkit/experts/expert.py | 167 ++++ src/agentkit/experts/orchestrator.py | 458 +++++++++++ src/agentkit/experts/plan.py | 281 +++++++ src/agentkit/experts/registry.py | 140 ++++ src/agentkit/experts/router.py | 147 ++++ src/agentkit/experts/team.py | 298 +++++++ src/agentkit/server/frontend/src/api/types.ts | 49 ++ .../src/components/chat/ChatMessage.vue | 17 + .../src/components/chat/ExpertMessage.vue | 94 +++ .../src/components/chat/ExpertTeamView.vue | 177 ++++ .../src/components/chat/PlanVisualization.vue | 89 ++ .../server/frontend/src/stores/chat.ts | 106 +++ .../server/frontend/src/stores/team.ts | 55 ++ .../server/frontend/src/views/ChatView.vue | 4 + src/agentkit/server/routes/chat.py | 43 +- tests/integration/test_expert_team.py | 475 +++++++++++ tests/unit/core/test_handoff_transport.py | 249 ++++++ tests/unit/experts/__init__.py | 0 tests/unit/experts/test_config.py | 221 +++++ tests/unit/experts/test_expert.py | 437 ++++++++++ tests/unit/experts/test_plan.py | 328 ++++++++ tests/unit/experts/test_registry.py | 233 ++++++ tests/unit/experts/test_router.py | 299 +++++++ tests/unit/experts/test_team.py | 768 ++++++++++++++++++ tests/unit/experts/test_team_orchestrator.py | 668 +++++++++++++++ 31 files changed, 7003 insertions(+), 1 deletion(-) create mode 100644 docs/brainstorms/2026-06-14-expert-team-mode-requirements.md create mode 100644 docs/plans/2026-06-14-001-feat-expert-team-mode-plan.md create mode 100644 src/agentkit/core/handoff_transport.py create mode 100644 src/agentkit/experts/__init__.py create mode 100644 src/agentkit/experts/config.py create mode 100644 src/agentkit/experts/expert.py create mode 100644 src/agentkit/experts/orchestrator.py create mode 100644 src/agentkit/experts/plan.py create mode 100644 src/agentkit/experts/registry.py create mode 100644 src/agentkit/experts/router.py create mode 100644 src/agentkit/experts/team.py create mode 100644 src/agentkit/server/frontend/src/components/chat/ExpertMessage.vue create mode 100644 src/agentkit/server/frontend/src/components/chat/ExpertTeamView.vue create mode 100644 src/agentkit/server/frontend/src/components/chat/PlanVisualization.vue create mode 100644 src/agentkit/server/frontend/src/stores/team.ts create mode 100644 tests/integration/test_expert_team.py create mode 100644 tests/unit/core/test_handoff_transport.py create mode 100644 tests/unit/experts/__init__.py create mode 100644 tests/unit/experts/test_config.py create mode 100644 tests/unit/experts/test_expert.py create mode 100644 tests/unit/experts/test_plan.py create mode 100644 tests/unit/experts/test_registry.py create mode 100644 tests/unit/experts/test_router.py create mode 100644 tests/unit/experts/test_team.py create mode 100644 tests/unit/experts/test_team_orchestrator.py diff --git a/docs/brainstorms/2026-06-14-expert-team-mode-requirements.md b/docs/brainstorms/2026-06-14-expert-team-mode-requirements.md new file mode 100644 index 0000000..10353d5 --- /dev/null +++ b/docs/brainstorms/2026-06-14-expert-team-mode-requirements.md @@ -0,0 +1,207 @@ +--- +date: 2026-06-14 +topic: expert-team-mode +--- + +## Summary + +在对话框中引入专家团模式(Expert Team Mode),支持用户手动指定或系统自动组建多专家协作团队。底层采用 Plan-Execute 协作模式(结构化协作计划 + 去中心化执行),前端以多角色对话流呈现协作过程。Expert 是比 Skill 更高的角色抽象,聚合多个 Skill 并包含人格、思维方式和协作策略。 + +## Problem Frame + +当前 AgentKit 的多 Agent 协作依赖 Orchestrator-Worker 的集中式编排或 Pipeline 的 DAG 流程驱动。这两种模式适合结构化、可预测的工作流,但无法支撑需要多专家自主协作、实时讨论、动态调整的复杂任务场景。用户在对话框中面对复杂任务时,只能与单个 Agent 交互,无法获得多视角、多专长的协作产出。类似 Qoder 的专家团模式填补了这一空白——让多个专家以目标为导向,自主分析、分工、执行、验证、汇报。 + +## Key Decisions + +- **Plan-Execute 而非纯 Swarm**:结构化协作计划保证任务收敛和可预测性,Expert 在职责范围内自主协作(去中心化),但整体框架由计划约束。纯 Swarm 模式自由度最高但收敛风险大。 + +- **Expert 聚合 Skill**:Expert 是比 Skill 更高的角色抽象。一个 Expert 聚合多个 Skill,并包含人格设定、思维方式、协作策略。Skill 是能力单元,Expert 是角色单元。 + +- **混合 Agent 生成**:核心角色从预定义专家模板库选择(保证质量),辅助角色由 LLM 根据任务现场动态生成(保持灵活性)。 + +- **前端对话流展示**:协作过程以多角色对话流呈现给用户,每个 Expert 有独立头像和颜色标识。底层是结构化计划驱动,前端是对话式体验。 + +- **两种并行模式统一支持**:子任务级并行(各做各的,结果汇总)和竞标式并行(同一任务多人做,取最优/融合)都在协作计划中标注,执行引擎按类型处理。 + +## Actors + +- A1. **用户** — 发起任务、指定专家团成员、干预协作过程(调整分工、增减 Expert、修改方向) +- A2. **Lead Expert** — 首个加入团队的 Expert 或用户指定的负责人,负责初始任务分解、协作计划生成、最终结果汇总 +- A3. **Expert** — 具有特定专长的角色,在职责范围内自主工作,可与其他 Expert 直接交互和交接 +- A4. **ExpertTeam** — 专家团容器,管理 Expert 的加入/退出、共享上下文、协作计划的生命周期 +- A5. **ExpertTemplate Registry** — 专家模板注册中心,存储预定义的 Expert 模板供选择和组装 + +## Requirements + +### Expert 定义与管理 + +- R1. Expert 配置包含:名称、人格描述、思维方式、绑定的 Skill 列表、协作策略偏好、头像/颜色标识 +- R2. Expert 可聚合一个或多个已注册 Skill,执行时按 SkillConfig 驱动 +- R3. ExpertTemplate 是可复用的 Expert 预设,存储在 YAML 配置或注册中心中,包含完整的 Expert 配置 +- R4. ExpertTemplate Registry 支持模板的注册、查询、列表展示 +- R5. 临时 Expert 由 LLM 根据任务分析动态生成 ExpertConfig,包含角色名称、职责描述、建议绑定的 Skill + +### 专家团组建 + +- R6. 用户可通过对话框手动指定专家团成员(选择 ExpertTemplate 或指定角色描述) +- R7. Lead Expert 在接收任务后评估复杂度,建议升级为专家团模式(需用户确认,不自动强制升级) +- R8. 自动组建时,Lead Expert 分析任务后生成协作计划,计划中包含需要的 Expert 角色定义 +- R9. 自动组建采用混合模式:核心角色从模板库匹配,辅助角色动态生成 +- R10. 专家团组建后,所有 Expert 的角色信息对用户可见 + +### 协作计划 + +- R11. CollaborationPlan 定义专家团的结构化协作蓝图,包含:任务分解、角色分工、依赖关系、并行节点、合并策略 +- R12. 协作计划中的每个节点标注执行类型:串行、子任务级并行、竞标式并行 +- R13. 竞标式并行节点需指定评判策略:取最优、投票、融合 +- R14. 协作计划可由 Lead Expert 或用户动态修改;普通 Expert 可提议修改,需 Lead Expert 审批后生效 +- R15. 计划修改后,受影响的 Expert 立即感知变更并调整行为 + +### 去中心化协作 + +- R16. Expert 之间可直接交互、请求协助、交接任务,无需经过 Lead Expert 中转 +- R17. Expert 间的交互通过共享对话流进行,所有消息对团队内所有 Expert 可见;上下文管理采用摘要共享策略(长内容自动摘要后广播,原始内容存入 SharedWorkspace 按需获取) +- R18. Lead Expert 负责初始分解和最终汇总,但不控制中间流程 +- R19. Expert 通过共享上下文中的角色描述了解其他 Expert 的专长,可据此发起协作请求 + +### 并行执行与结果合并 + +- R20. 子任务级并行:多个 Expert 同时执行不同子任务,结果由 Lead Expert 或指定 Expert 汇总 +- R21. 竞标式并行:多个同类型 Expert 独立完成同一子任务,结果按评判策略处理 +- R22. 评判策略"取最优":由 Lead Expert 或指定评审 Expert 选择最佳结果 +- R23. 评判策略"投票":所有 Expert 投票选择最佳结果,平局时由 Lead Expert 仲裁 +- R24. 评判策略"融合":由 Lead Expert 或指定 Expert 将多个结果融合为统一产出 + +### 用户干预 + +- R25. 用户可随时在对话流中插入指令,指令对全体 Expert 可见 +- R26. 用户可调整专家团分工(修改协作计划) +- R27. 用户可增减 Expert(添加新 Expert 或移除已有 Expert) +- R28. 用户可修改任务方向或补充需求,Lead Expert 据此重新规划 + +### 前端展示 + +- R29. 协作过程以多角色对话流形式呈现,每个 Expert 的消息带有独立头像和颜色标识 +- R30. 协作计划以可视化方式展示(分工图/时间线),用户可展开查看详情 +- R31. 并行执行时,多个 Expert 的消息流通过消息标签(expert_id)复用同一 WebSocket 通道同时更新,用户可按 Expert 切换关注焦点 +- R32. Expert 间的交接、请求协助等交互以特殊消息类型展示(区别于普通对话) + +### 触发与路由 + +- R33. Lead Expert 在接收任务后评估复杂度,建议升级为专家团模式(与 R7 一致) +- R34. 用户可通过 `@team` 或类似指令主动触发专家团模式 +- R35. 用户指定专家团成员时,跳过自动分析,直接进入协作计划生成 +- R36. 专家团任务完成后,团队自动解散,临时 Expert 被回收;临时 Expert 的产出保留在 SharedWorkspace 中不随实例销毁丢失 + +## Key Decisions (Updated) + +- **Expert 与 Agent 的关系**:Expert 是 Agent 的配置层(ExpertConfig),运行时通过 AgentPool 创建对应的 ConfigDrivenAgent 实例。Expert 本身不是 Agent 实例,而是角色定义 + Skill 聚合 + 协作策略的配置单元。 + +## Key Flows + +- F1. 手动组建专家团 + - **Trigger:** 用户在对话框中指定专家团成员 + - **Actors:** A1, A4, A2 + - **Steps:** 用户指定 ExpertTemplate 或角色描述 → ExpertTeam 创建 → Lead Expert 生成 CollaborationPlan → 计划展示给用户确认 → Expert 按计划开始协作 + - **Covered by:** R6, R10, R11, R35 + +- F2. 自动组建专家团 + - **Trigger:** 系统检测到高复杂度任务或用户请求专家团 + - **Actors:** A1, A4, A2, A5 + - **Steps:** Lead Expert 分析任务 → 识别需要的 Expert 角色 → 核心角色从模板库匹配、辅助角色动态生成 → 生成 CollaborationPlan → 计划展示给用户确认 → Expert 按计划开始协作 + - **Covered by:** R7, R8, R9, R11 + +- F3. 去中心化协作执行 + - **Trigger:** CollaborationPlan 确认后开始执行 + - **Actors:** A2, A3, A4 + - **Steps:** Expert 按计划执行各自任务 → Expert 间可直接交互和交接 → 并行节点同时执行 → 结果按合并策略处理 → Lead Expert 汇总最终产出 + - **Covered by:** R16, R17, R18, R20, R21 + +- F4. 用户干预协作 + - **Trigger:** 用户在对话流中插入指令 + - **Actors:** A1, A4, A2 + - **Steps:** 用户发出指令 → 指令对全体 Expert 可见 → Lead Expert 评估影响 → 修改 CollaborationPlan(如需要) → 受影响 Expert 调整行为 + - **Covered by:** R25, R26, R27, R28 + +- F5. 竞标式并行执行 + - **Trigger:** CollaborationPlan 中某节点标注为竞标式并行 + - **Actors:** A3, A2 + - **Steps:** 多个同类型 Expert 独立完成同一子任务 → 各自提交结果 → 按评判策略处理(取最优/投票/融合) → 产出最终结果 + - **Covered by:** R21, R22, R23, R24 + +- F6. 专家团解散 + - **Trigger:** 任务完成或用户主动解散 + - **Actors:** A4, A1 + - **Steps:** Lead Expert 汇总最终结果 → 结果呈现给用户 → 临时 Expert 被回收 → ExpertTeam 销毁 → 模板 Expert 保留在注册中心 + - **Covered by:** R36 + +## Acceptance Examples + +- AE1. **手动组建 + 子任务并行** + - **Covers R6, R20.** Given 用户输入"帮我分析这份市场报告,叫上数据分析师和战略顾问", When 系统创建 ExpertTeam 并生成计划, Then 数据分析师和战略顾问各自独立分析,Lead Expert 汇总两份分析为统一报告 + +- AE2. **自动组建 + 竞标并行** + - **Covers R7, R21, R22.** Given 用户输入一个复杂的技术方案评审任务, When 系统自动升级为专家团模式, Then Lead Expert 生成 3 个架构师 Expert 竞标式并行出方案,Lead Expert 选择最优方案 + +- AE3. **用户干预调整方向** + - **Covers R25, R26, R28.** Given 专家团正在执行任务, When 用户在对话流中说"重点看成本优化,别管性能了", Then Lead Expert 修改计划,受影响 Expert 调整分析方向 + +- AE4. **Expert 间直接协作** + - **Covers R16, R17.** Given 数据分析师 Expert 需要行业数据, When 数据分析师直接请求行业研究员 Expert 协助, Then 行业研究员提供数据,无需 Lead Expert 中转 + +- AE5. **动态增减 Expert** + - **Covers R27.** Given 专家团正在执行, When 用户说"再加一个法律顾问", Then 系统从模板库匹配或动态生成法律顾问 Expert,加入团队并更新计划 + +## Success Criteria + +- 专家团能在 5 分钟内完成一个需要 3+ 专家协作的中等复杂度任务 +- 用户能清晰追踪每个 Expert 的工作状态和产出 +- 用户干预后,Expert 在 1 轮交互内感知并响应变更 +- 临时 Expert 在任务完成后被完全回收,不残留资源 + +## Scope Boundaries + +**Deferred for later:** +- Expert 间的学习与进化机制(经验共享、能力提升) +- 跨会话的 Expert 持久化和记忆共享 +- Expert 市场和共享模板库 +- 专家团的 A/B 测试和效果评估 + +**Outside this product's identity:** +- 通用的工作流引擎(已有 PipelineEngine) +- 人工审核节点(已有 Workflow approval) +- 实时音视频协作 + +## Dependencies / Assumptions + +- 依赖现有 `AgentPool` 管理 Agent 实例的创建和销毁 +- 依赖现有 `MessageBus` 支持 Expert 间的消息通信 +- 依赖现有 `HandoffManager` 支持 Expert 间的任务交接(需扩展为进程内模式) +- 依赖现有 `Orchestrator` 的任务分解能力(GoalPlanner + LLM 分解) +- 依赖现有 `PipelineEngine` 的并行执行和拓扑排序 +- 假设 LLM 能生成质量合格的 CollaborationPlan(含角色定义、依赖关系、并行标注) +- 假设前端 WebSocket 能支持多路并发消息流 + +## Outstanding Questions + +**Resolve Before Planning:** +- 协作计划的粒度:定义大阶段和角色分工(粗粒度,灵活调整)还是精确到每步操作(细粒度,强约束) +- 协作计划失败时的降级策略:回退到单 Agent 还是重试 +- HandoffManager 进程内模式的实现策略:扩展现有 HandoffManager 还是新建 InProcessHandoff + +**Deferred to Planning:** +- ExpertTemplate 的 YAML Schema 设计 +- 协作计划的可视化组件选型 +- 并行消息流的前端渲染策略 +- 竞标式并行执行引擎的扩展设计(当前 PipelineEngine 仅支持拓扑排序并行) + +## Sources / Research + +- `src/agentkit/core/orchestrator.py` — Orchestrator-Worker 编排模式,三级任务分解降级 +- `src/agentkit/orchestrator/pipeline_engine.py` — DAG 拓扑并行执行、Saga 补偿、对抗闭环 +- `src/agentkit/orchestrator/handoff.py` — Redis Pub/Sub 点对点任务转交 +- `src/agentkit/orchestrator/dynamic_pipeline.py` — 条件/嵌套/循环动态 Pipeline +- `src/agentkit/core/agent_pool.py` — 运行时 Agent 实例管理 +- `src/agentkit/core/shared_workspace.py` — Redis 共享状态 + 分布式锁 +- `src/agentkit/chat/skill_routing.py` — CostAwareRouter 三层路由 +- `src/agentkit/skills/base.py` — SkillConfig 定义,ExpertConfig 可参考扩展 diff --git a/docs/plans/2026-06-14-001-feat-expert-team-mode-plan.md b/docs/plans/2026-06-14-001-feat-expert-team-mode-plan.md new file mode 100644 index 0000000..f3f491a --- /dev/null +++ b/docs/plans/2026-06-14-001-feat-expert-team-mode-plan.md @@ -0,0 +1,573 @@ +--- +date: 2026-06-14 +status: active +origin: docs/brainstorms/2026-06-14-expert-team-mode-requirements.md +--- + +## Summary + +实现专家团模式(Expert Team Mode),在对话框中引入多专家协作能力。底层采用 Plan-Execute 协作模式(结构化协作计划 + 去中心化执行),前端以多角色对话流呈现协作过程。Expert 是比 Skill 更高的角色抽象,聚合多个 Skill 并包含人格和协作策略。支持用户手动指定专家团和系统自动组建两种触发方式,支持子任务级并行和竞标式并行两种并行模式,支持全程用户干预。 + +## Problem Frame + +当前 AgentKit 的多 Agent 协作依赖 Orchestrator-Worker 集中式编排或 Pipeline DAG 流程驱动,无法支撑多专家自主协作、实时讨论、动态调整的复杂任务场景。用户在对话框中只能与单个 Agent 交互,无法获得多视角、多专长的协作产出。专家团模式填补这一空白——让多个 Expert 以目标为导向,自主分析、分工、执行、验证、汇报。 + +## Requirements + +Source: `docs/brainstorms/2026-06-14-expert-team-mode-requirements.md` + +Key requirements traced to origin R-IDs: + +- Expert 定义与管理 (R1-R5) +- 专家团组建 (R6-R10) +- 协作计划 (R11-R15) +- 去中心化协作 (R16-R19) +- 并行执行与结果合并 (R20-R24) +- 用户干预 (R25-R28) +- 前端展示 (R29-R32) +- 触发与路由 (R33-R36) + +## Key Technical Decisions + +- **KTD1: Expert 是 Agent 的配置层** — ExpertConfig 继承 AgentConfig,运行时通过 AgentPool 创建 ConfigDrivenAgent 实例。Expert 本身不是 Agent 实例,而是角色定义 + Skill 聚合 + 协作策略的配置单元。Expert 运行时包装器(Expert 类)持有 ConfigDrivenAgent 引用并添加团队感知行为。(see origin: Key Decisions Updated) + +- **KTD2: Plan-Execute 协作模式** — CollaborationPlan 定义阶段和里程碑(粗粒度),Expert 在阶段内自主决定具体步骤(细粒度),但必须通过里程碑检查点。这平衡了灵活性和可控性。 + +- **KTD3: HandoffTransport 抽象层** — 引入 HandoffTransport 协议,两个实现:RedisHandoffTransport(提取现有 Redis pub/sub 逻辑)和 InProcessHandoffTransport(asyncio.Queue,用于同进程 Expert 团队)。HandoffManager 根据上下文自动选择 Transport。 + +- **KTD4: 前端多角色对话流** — 扩展 IChatMessage 增加 expert_id/expert_name/expert_color 字段,WebSocket 消息增加 team_* 事件类型。多个 Expert 的消息通过 expert_id 标签复用同一 WebSocket 通道,前端按 Expert 过滤展示。 + +- **KTD5: 降级策略** — 协作计划失败时先重试 1 次(调整分解策略),仍失败则回退到单 Agent 模式继续完成任务。 + +- **KTD6: 计划修改权限** — Lead Expert 和用户可直接修改 CollaborationPlan;普通 Expert 可提议修改,需 Lead Expert 审批后生效。 + +- **KTD7: 上下文管理** — 采用摘要共享策略:长内容自动摘要后广播给全体 Expert,原始内容存入 SharedWorkspace 按需获取。避免上下文窗口膨胀。 + +## High-Level Technical Design + +### Expert Team 架构总览 + +``` +┌─────────────────────────────────────────────────────┐ +│ Frontend │ +│ ExpertTeamView ── ExpertMessage ── PlanViz │ +│ │ │ │ │ +│ └────────────────┼───────────────┘ │ +│ │ WebSocket (team_* events) │ +└────────────────────────┼────────────────────────────┘ + │ +┌────────────────────────┼────────────────────────────┐ +│ Server │ +│ TeamSessionManager ── ExpertTeamRouter │ +│ │ │ +│ ▼ │ +│ ┌──────────┐ manages ┌──────────────┐ │ +│ │ExpertTeam│──────────▶│ Expert[] │ │ +│ │ │ │ (wrappers) │ │ +│ │ Plan │ └──────┬───────┘ │ +│ │ Context │ │ creates │ +│ └────┬─────┘ ▼ │ +│ │ ┌──────────────────┐ │ +│ │ │ ConfigDrivenAgent│ │ +│ │ │ (AgentPool) │ │ +│ │ └──────────────────┘ │ +│ │ │ +│ ┌────▼─────────────────────────────┐ │ +│ │ TeamOrchestrator │ │ +│ │ execute_plan / merge / retry │ │ +│ └──────────────────────────────────┘ │ +│ │ │ │ +│ ▼ ▼ │ +│ HandoffTransport SharedWorkspace │ +│ (InProcess/Redis) (Redis/InMemory) │ +└─────────────────────────────────────────────────────┘ +``` + +### CollaborationPlan 执行流程 + +``` +用户任务 → ExpertTeamRouter → 创建 ExpertTeam + │ + ▼ + Lead Expert 生成 CollaborationPlan + │ + ▼ + 用户确认计划 ──否──▶ 修改计划 + │ + 是 + ▼ + TeamOrchestrator.execute_plan() + │ + ┌────────────┼────────────┐ + ▼ ▼ ▼ + Phase 1 Phase 2 Phase 3 + (串行) (子任务并行) (竞标并行) + │ │ │ + ▼ ▼ ▼ + 里程碑检查 里程碑检查 里程碑检查 + │ │ │ + └────────────┼────────────┘ + │ + ▼ + Lead Expert 汇总最终结果 + │ + ▼ + ExpertTeam 解散,临时 Expert 回收 +``` + +### Expert 间协作消息流 + +``` +Expert A ──send_message()──▶ TeamChannel ──broadcast──▶ Expert B, C, D +Expert A ──request_assist()──▶ InProcessHandoff ──▶ Expert B +Expert A ──propose_modification()──▶ Lead Expert ──approve──▶ Plan Update ──broadcast──▶ All Experts +User ──intervene()──▶ TeamChannel ──broadcast──▶ All Experts + Lead re-plans +``` + +--- + +## Implementation Units + +### U1. ExpertConfig & ExpertTemplate & Registry + +**Goal:** 建立 Expert 的数据模型层——ExpertConfig 配置、ExpertTemplate 模板、ExpertTemplateRegistry 注册中心。 + +**Requirements:** R1, R2, R3, R4 + +**Dependencies:** None + +**Files:** +- `src/agentkit/experts/__init__.py` (create) +- `src/agentkit/experts/config.py` (create) +- `src/agentkit/experts/registry.py` (create) +- `tests/unit/experts/__init__.py` (create) +- `tests/unit/experts/test_config.py` (create) +- `tests/unit/experts/test_registry.py` (create) + +**Approach:** +- `ExpertConfig` 继承 `AgentConfig`,新增字段:`persona`(人格描述)、`thinking_style`(思维方式)、`collaboration_strategy`(协作策略偏好)、`bound_skills`(绑定的 Skill 名称列表)、`avatar`(头像标识)、`color`(颜色标识)、`is_lead`(是否为 Lead Expert) +- `ExpertTemplate` 是 `ExpertConfig` 的可复用预设包装,包含 `name`、`config`、`is_builtin`、`description` 字段 +- `ExpertTemplateRegistry` 提供 `register(template)`、`get(name)`、`list()`、`search(query)` 方法,内存字典存储,支持从 YAML 目录批量加载 +- 遵循 `SkillConfig` 继承 `AgentConfig` 的模式(see `src/agentkit/skills/base.py`) + +**Patterns to follow:** `SkillConfig` extending `AgentConfig` in `src/agentkit/skills/base.py`; `SkillRegistry` pattern in `src/agentkit/skills/registry.py` + +**Test scenarios:** +- Happy path: 创建 ExpertConfig 并验证所有字段 +- Happy path: ExpertConfig 继承 AgentConfig 的基础字段 +- Happy path: ExpertTemplate 注册和查询 +- Happy path: ExpertTemplateRegistry 从 YAML 目录加载模板 +- Edge case: bound_skills 为空列表时默认行为 +- Edge case: search 查询无匹配结果返回空列表 +- Error path: 注册同名 ExpertTemplate 覆盖旧模板 + +**Verification:** ExpertConfig 可实例化并序列化;ExpertTemplateRegistry 可注册、查询、搜索模板 + +--- + +### U2. CollaborationPlan Data Model + +**Goal:** 定义协作计划的数据结构——阶段、角色分工、依赖关系、并行类型、合并策略、里程碑。 + +**Requirements:** R11, R12, R13 + +**Dependencies:** U1 + +**Files:** +- `src/agentkit/experts/plan.py` (create) +- `tests/unit/experts/test_plan.py` (create) + +**Approach:** +- `CollaborationPlan` 包含 `phases`(PlanPhase 列表)、`variables`(共享变量)、`status`(计划状态) +- `PlanPhase` 包含:`id`、`name`、`assigned_expert`(Expert 名称)、`task_description`、`depends_on`(依赖的 Phase ID 列表)、`parallel_type`(ParallelType 枚举)、`merge_strategy`(MergeStrategy 枚举,仅竞标式并行时使用)、`milestone`(里程碑检查点描述)、`status`(PhaseStatus 枚举) +- `ParallelType` 枚举:SERIAL、SUBTASK_PARALLEL、COMPETITIVE_PARALLEL +- `MergeStrategy` 枚举:BEST、VOTE、FUSION +- `PhaseStatus` 枚举:PENDING、IN_PROGRESS、COMPLETED、FAILED +- `PlanStatus` 枚举:DRAFT、CONFIRMED、EXECUTING、COMPLETED、FAILED、FALLBACK +- 提供序列化/反序列化方法(to_dict/from_dict),供 SharedWorkspace 存储 + +**Patterns to follow:** `PipelineStage`/`Pipeline` in `src/agentkit/orchestrator/pipeline_schema.py` + +**Test scenarios:** +- Happy path: 创建 CollaborationPlan 并添加 phases +- Happy path: 串行 phase 的依赖关系正确 +- Happy path: 子任务级并行 phase 标注 +- Happy path: 竞标式并行 phase 带合并策略 +- Edge case: 依赖关系形成 DAG(无环) +- Edge case: 竞标式并行 phase 未指定 merge_strategy 时默认 BEST +- Error path: 依赖关系有环时验证失败 +- Integration: to_dict/from_dict 往返序列化 + +**Verification:** CollaborationPlan 可构建、验证、序列化;依赖环检测正确 + +--- + +### U3. HandoffTransport Abstraction + +**Goal:** 抽象 Handoff 传输层,支持 Redis 和进程内两种模式,使 Expert 间交接不依赖 Redis。 + +**Requirements:** R16 + +**Dependencies:** None + +**Files:** +- `src/agentkit/core/handoff_transport.py` (create) +- `tests/unit/core/test_handoff_transport.py` (create) + +**Approach:** +- 定义 `HandoffTransport` 协议(Protocol):`send(channel, message)`、`listen(channel) -> AsyncIterator`、`register_handler(channel, handler)` +- `RedisHandoffTransport`:提取 `BaseAgent.handoff()` 中的 Redis pub/sub 逻辑,封装为独立类 +- `InProcessHandoffTransport`:基于 `asyncio.Queue` 的进程内传输,每个 channel 对应一个 Queue,支持多消费者广播 +- `HandoffManager` 重构:接受 `HandoffTransport` 注入,`send_handoff`/`listen_for_handoffs`/`register_handler` 委托给 transport +- 向后兼容:`HandoffManager()` 无参构造时自动创建 `RedisHandoffTransport` + +**Patterns to follow:** `SharedWorkspace` 的双模式模式(Redis + 内存降级)in `src/agentkit/core/shared_workspace.py` + +**Test scenarios:** +- Happy path: InProcessHandoffTransport 发送和接收消息 +- Happy path: InProcessHandoffTransport 多消费者广播 +- Happy path: HandoffManager 使用 InProcessHandoffTransport +- Edge case: InProcessHandoffTransport 队列为空时 listen 阻塞等待 +- Error path: 发送到不存在的 channel 不抛异常(消息丢弃) +- Integration: HandoffManager 向后兼容(无参构造使用 Redis) + +**Verification:** InProcessHandoffTransport 可在同进程内传递消息;HandoffManager 向后兼容 + +--- + +### U4. Expert Runtime Wrapper + +**Goal:** 实现 Expert 运行时包装器——ExpertConfig → AgentPool 创建 ConfigDrivenAgent,添加团队感知行为。 + +**Requirements:** R1, R2, R5, R16, R17, R19 + +**Dependencies:** U1, U3 + +**Files:** +- `src/agentkit/experts/expert.py` (create) +- `tests/unit/experts/test_expert.py` (create) + +**Approach:** +- `Expert` 类持有 `ExpertConfig` 和 `ConfigDrivenAgent` 引用 +- `Expert.create(config, pool)` → 调用 `AgentPool.create_agent()` 创建 ConfigDrivenAgent,注入团队上下文到 system prompt(角色描述、其他 Expert 的角色摘要) +- `Expert.send_message(channel, content)` → 通过 TeamChannel 广播消息给全体 Expert(摘要共享,原始内容存 SharedWorkspace) +- `Expert.request_assist(target_expert, task)` → 通过 InProcessHandoff 向目标 Expert 交接任务 +- `Expert.propose_plan_modification(plan_id, modification)` → 提交修改提议给 Lead Expert +- `Expert.get_capabilities_summary()` → 返回角色描述 + 绑定 Skill 列表,供其他 Expert 了解 +- `Expert.destroy()` → 调用 `AgentPool.remove_agent()` 清理,产出保留在 SharedWorkspace + +**Patterns to follow:** `ConfigDrivenAgent` lifecycle in `src/agentkit/core/config_driven.py`; `BaseAgent.handoff()` in `src/agentkit/core/base.py` + +**Test scenarios:** +- Happy path: Expert.create 从 ExpertConfig 创建 ConfigDrivenAgent +- Happy path: Expert.send_message 广播到 TeamChannel +- Happy path: Expert.request_assist 通过 Handoff 交接任务 +- Happy path: Expert.get_capabilities_summary 返回角色摘要 +- Edge case: Expert 绑定多个 Skill 时全部注入 Agent +- Error path: Expert.create 时 AgentPool 中同名 Agent 已存在 +- Integration: Expert 销毁后 ConfigDrivenAgent 被移除 + +**Verification:** Expert 可创建、发送消息、请求协助、销毁 + +--- + +### U5. ExpertTeam Container + +**Goal:** 实现 ExpertTeam 容器——管理 Expert 生命周期、共享上下文、协作计划、团队状态。 + +**Requirements:** R4, R6, R8, R9, R10, R14, R15, R25, R27, R36 + +**Dependencies:** U1, U2, U4 + +**Files:** +- `src/agentkit/experts/team.py` (create) +- `tests/unit/experts/test_team.py` (create) + +**Approach:** +- `ExpertTeam` 管理一组 Expert 实例,持有 `CollaborationPlan`、`SharedWorkspace` 引用、`TeamChannel` +- 团队生命周期:FORMING → PLANNING → EXECUTING → SYNTHESIZING → COMPLETED +- `create_team(lead_config, member_configs)` → 创建 Lead Expert + 成员 Expert,设置 InProcessHandoff +- `add_expert(config_or_template)` → 动态添加 Expert(R27) +- `remove_expert(name)` → 动态移除 Expert(R27) +- `update_plan(plan)` → Lead Expert 或用户修改计划(R14),广播变更给受影响 Expert(R15) +- `get_shared_context()` → 返回团队共享上下文(通过 SharedWorkspace) +- `broadcast_user_message(content)` → 用户干预消息广播给全体 Expert(R25) +- `dissolve()` → 解散团队,临时 Expert 回收,产出保留(R36) +- `generate_plan(task)` → Lead Expert 生成 CollaborationPlan(混合模式:核心角色从模板匹配,辅助角色动态生成) + +**Patterns to follow:** `AgentPool` lifecycle management in `src/agentkit/core/agent_pool.py` + +**Test scenarios:** +- Happy path: 创建 ExpertTeam 并设置 Lead Expert +- Happy path: 添加和移除 Expert +- Happy path: update_plan 广播变更给受影响 Expert +- Happy path: broadcast_user_message 对全体 Expert 可见 +- Happy path: dissolve 回收临时 Expert +- Edge case: 移除 Lead Expert 时自动指定新 Lead +- Edge case: dissolve 时临时 Expert 产出保留在 SharedWorkspace +- Error path: 向已解散的 ExpertTeam 添加 Expert 抛异常 +- Integration: generate_plan 混合模式(模板 + 动态生成) + +**Verification:** ExpertTeam 可创建、管理 Expert、更新计划、广播消息、解散 + +--- + +### U6. TeamOrchestrator + +**Goal:** 实现团队编排引擎——驱动 CollaborationPlan 执行、并行策略、结果合并、重试与降级。 + +**Requirements:** R12, R13, R18, R20, R21, R22, R23, R24 + +**Dependencies:** U2, U4, U5 + +**Files:** +- `src/agentkit/experts/orchestrator.py` (create) +- `tests/unit/experts/test_team_orchestrator.py` (create) + +**Approach:** +- `TeamOrchestrator` 持有 `ExpertTeam` 引用,驱动 `CollaborationPlan` 执行 +- `execute_plan(plan)` → 按 phase 依赖关系拓扑排序,逐层执行 +- 串行 phase:直接执行,完成后检查里程碑 +- 子任务级并行 phase:`asyncio.gather` 并行执行,结果由 Lead Expert 汇总(R20) +- 竞标式并行 phase:`asyncio.gather` 并行执行,结果按 MergeStrategy 处理(R21-R24) + - BEST:Lead Expert 选择最佳结果 + - VOTE:所有 Expert 投票,平局时 Lead Expert 仲裁(R23) + - FUSION:Lead Expert 融合多个结果 +- 里程碑检查:phase 完成后 Expert 必须通过检查点才能进入下一阶段 +- 失败处理:重试 1 次(调整分解策略),仍失败则回退单 Agent(KTD5) +- 执行事件:每个 phase 状态变更、并行结果合并都通过 TeamChannel 广播 + +**Patterns to follow:** `PipelineEngine._topological_group()` + `asyncio.gather` in `src/agentkit/orchestrator/pipeline_engine.py`; adversarial loop (Worker-Verifier) pattern + +**Test scenarios:** +- Happy path: 串行 phase 依次执行 +- Happy path: 子任务级并行 phase 并行执行后汇总 +- Happy path: 竞标式并行 BEST 策略 +- Happy path: 竞标式并行 VOTE 策略(含平局仲裁) +- Happy path: 竞标式并行 FUSION 策略 +- Happy path: 里程碑检查通过后进入下一阶段 +- Edge case: 并行 phase 部分失败时的处理 +- Error path: phase 失败后重试 1 次 +- Error path: 重试仍失败回退单 Agent +- Integration: 完整计划执行(串行+并行混合) + +**Verification:** TeamOrchestrator 可执行计划、处理并行、合并结果、重试降级 + +--- + +### U7. Expert Team Routing + +**Goal:** 扩展路由系统支持专家团模式——@team 触发、复杂度评估升级、团队路由。 + +**Requirements:** R7, R33, R34, R35 + +**Dependencies:** U5, U6 + +**Files:** +- `src/agentkit/experts/router.py` (create) +- `src/agentkit/chat/skill_routing.py` (modify) +- `tests/unit/experts/test_router.py` (create) + +**Approach:** +- 扩展 `ExecutionMode` 枚举新增 `TEAM_COLLAB` +- `ExpertTeamRouter` 解析用户输入: + - `@team` 前缀 → 触发专家团模式(R34) + - `@team:analyst,strategist` → 指定专家团成员(R35) + - 无前缀 → Lead Expert 评估复杂度后建议升级(R7, R33) +- 修改 `CostAwareRouter.route()`:当 `execution_mode == TEAM_COLLAB` 时,委托给 `ExpertTeamRouter` +- `ExpertTeamRouter.resolve()` → 返回 `ExpertTeamRoutingResult`(包含 team 配置、plan、execution_mode) +- 复杂度评估:在 `quick_classify` 返回高复杂度(>0.7)时,附加 team_suggestion 标记 + +**Patterns to follow:** `CostAwareRouter` three-layer routing in `src/agentkit/chat/skill_routing.py` + +**Test scenarios:** +- Happy path: @team 前缀触发专家团模式 +- Happy path: @team:analyst,strategist 指定专家团成员 +- Happy path: 高复杂度任务建议升级为专家团 +- Happy path: 低复杂度任务不触发专家团 +- Edge case: @team 后无成员描述时自动组建 +- Edge case: 指定的 ExpertTemplate 不存在时回退到动态生成 +- Integration: CostAwareRouter 集成 TEAM_COLLAB 模式 + +**Verification:** @team 触发、复杂度升级、指定成员路由均正确工作 + +--- + +### U8. Frontend Data Model & WebSocket Protocol + +**Goal:** 扩展前端数据模型和 WebSocket 协议支持专家团消息流。 + +**Requirements:** R29, R31, R32 + +**Dependencies:** U6 + +**Files:** +- `src/agentkit/server/frontend/src/types/chat.ts` (modify) +- `src/agentkit/server/routes/portal.py` (modify) +- `src/agentkit/server/routes/chat.py` (modify) +- `src/agentkit/server/frontend/src/stores/chat.ts` (modify) + +**Approach:** +- 扩展 `IChatMessage` 接口:新增 `expert_id?: string`、`expert_name?: string`、`expert_color?: string`、`message_type?: 'chat' | 'handoff' | 'assist_request' | 'plan_update' | 'milestone'` +- 新增 WebSocket 事件类型: + - `team_formed`:专家团组建完成,包含 Expert 列表和 Plan + - `expert_step`:Expert 执行步骤(带 expert_id 标签) + - `expert_result`:Expert 完成子任务 + - `plan_update`:协作计划变更 + - `team_synthesis`:Lead Expert 汇总最终结果 + - `team_dissolved`:专家团解散 +- 服务端:新增 `TeamSessionManager` 管理 ExpertTeam 实例,与 `SessionManager` 协同 +- `chat.ts` store 扩展:处理 team_* 事件,维护 team 状态 + +**Patterns to follow:** Existing WebSocket event handling in `src/agentkit/server/routes/chat.py`; `IChatMessage` interface in `src/agentkit/server/frontend/src/types/chat.ts` + +**Test scenarios:** +- Happy path: team_formed 事件正确解析 Expert 列表 +- Happy path: expert_step 事件带 expert_id 标签 +- Happy path: plan_update 事件触发前端状态更新 +- Edge case: 非专家团消息不受影响(向后兼容) +- Integration: 完整 team 事件流(formed → step → result → synthesis → dissolved) + +**Verification:** 前端可接收和解析所有 team_* 事件;非专家团消息不受影响 + +--- + +### U9. ExpertTeam UI Components + +**Goal:** 实现专家团前端组件——多角色对话流、计划可视化、Expert 过滤。 + +**Requirements:** R29, R30, R31, R32 + +**Dependencies:** U8 + +**Files:** +- `src/agentkit/server/frontend/src/components/chat/ExpertTeamView.vue` (create) +- `src/agentkit/server/frontend/src/components/chat/ExpertMessage.vue` (create) +- `src/agentkit/server/frontend/src/components/chat/PlanVisualization.vue` (create) +- `src/agentkit/server/frontend/src/stores/team.ts` (create) + +**Approach:** +- `ExpertTeamView.vue`:专家团状态栏,显示活跃 Expert 列表(头像+颜色+状态),计划可视化切换按钮 +- `ExpertMessage.vue`:扩展 ChatMessage,添加 Expert 头像、颜色标识、角色 badge;handoff/assist_request 消息类型用特殊样式展示 +- `PlanVisualization.vue`:协作计划时间线/DAG 可视化,显示 phase 状态、Expert 分工、依赖关系;用户可展开查看详情 +- `team.ts` store:管理 team 状态(active experts, plan, phase progress),提供按 Expert 过滤消息的 computed +- ChatView 集成:当 conversation 进入专家团模式时,切换到 ExpertTeamView 布局 + +**Patterns to follow:** `ChatMessage.vue` component structure; `MentionDropdown.vue` for dropdown patterns; `chat.ts` store for state management + +**Test scenarios:** +- Happy path: ExpertMessage 正确显示 Expert 头像和颜色 +- Happy path: PlanVisualization 显示协作计划时间线 +- Happy path: 按 Expert 过滤消息 +- Happy path: handoff 消息特殊样式展示 +- Edge case: Expert 被移除后消息保留但标记为已退出 +- Edge case: 并行执行时多个 Expert 消息同时更新 + +**Verification:** 专家团模式下前端正确显示多角色对话流和计划可视化 + +--- + +### U10. Integration Tests + +**Goal:** 端到端集成测试——验证专家团完整流程。 + +**Requirements:** All R-IDs + +**Dependencies:** U1-U9 + +**Files:** +- `tests/integration/test_expert_team.py` (create) + +**Approach:** +- 测试覆盖所有 Key Flows(F1-F6)和 Acceptance Examples(AE1-AE5) +- 使用 mock LLM 和 mock tools,验证编排逻辑而非 LLM 输出质量 +- 测试场景: + 1. 手动组建专家团 + 子任务并行(F1, AE1) + 2. 自动组建专家团 + 竞标并行(F2, AE2) + 3. 去中心化协作执行(F3, AE4) + 4. 用户干预协作(F4, AE3) + 5. 竞标式并行执行 + 投票仲裁(F5) + 6. 专家团解散 + 产出保留(F6, AE5) + 7. 降级策略:重试 + 回退单 Agent + 8. 动态增减 Expert + +**Patterns to follow:** `tests/integration/test_gap_closure.py` integration test patterns + +**Test scenarios:** +- Covers F1. 手动组建专家团 +- Covers F2. 自动组建专家团 +- Covers F3. 去中心化协作执行 +- Covers F4. 用户干预协作 +- Covers F5. 竞标式并行执行 +- Covers F6. 专家团解散 +- Covers AE1. 手动组建 + 子任务并行 +- Covers AE2. 自动组建 + 竞标并行 +- Covers AE3. 用户干预调整方向 +- Covers AE4. Expert 间直接协作 +- Covers AE5. 动态增减 Expert + +**Verification:** 所有 Key Flows 和 Acceptance Examples 通过集成测试 + +--- + +## Scope Boundaries + +**In scope:** +- Expert/ExpertTeam/CollaborationPlan 核心抽象 +- HandoffTransport 进程内传输 +- TeamOrchestrator 编排引擎(串行 + 两种并行 + 三种合并策略) +- ExpertTeamRouter 路由扩展 +- 前端多角色对话流和计划可视化 +- 端到端集成测试 + +**Deferred to follow-up work:** +- Expert 间的学习与进化机制 +- 跨会话的 Expert 持久化和记忆共享 +- Expert 市场和共享模板库 +- 专家团的 A/B 测试和效果评估 +- ExpertTemplate YAML 文件的标准模板集(本计划只实现框架,不提供大量预置模板) +- 计划可视化的高级交互(拖拽调整、实时编辑) +- 竞标式并行的更复杂评判策略(加权投票、多轮评审) + +**Outside this product's identity:** +- 通用工作流引擎(已有 PipelineEngine) +- 人工审核节点(已有 Workflow approval) +- 实时音视频协作 + +## Risks & Dependencies + +| Risk | Impact | Mitigation | +|------|--------|------------| +| LLM 生成的 CollaborationPlan 质量不稳定 | 高:劣质计划导致协作效率低 | Plan 验证逻辑(依赖环检测、必填字段检查);用户确认环节;重试降级策略 | +| 去中心化协作可能陷入循环讨论 | 中:Expert 反复交互不收敛 | 里程碑检查点强制推进;最大交互轮次限制;Lead Expert 可强制推进 | +| 多 Expert 并行时上下文窗口膨胀 | 中:LLM 调用成本增加、可能超限 | 摘要共享策略(KTD7);SharedWorkspace 按需获取原始内容 | +| 前端多路消息流渲染性能 | 低:大量并行消息可能导致 UI 卡顿 | 消息批量更新;虚拟滚动;按 Expert 过滤减少渲染量 | +| HandoffTransport 重构影响现有 Handoff | 中:可能破坏现有 Agent 间交接 | 向后兼容设计(无参构造使用 Redis);充分单元测试 | + +**Dependencies:** +- 依赖现有 `AgentPool` 管理 Agent 实例 +- 依赖现有 `SharedWorkspace` 支持共享上下文 +- 依赖现有 `CostAwareRouter` 三层路由架构 +- 依赖前端 WebSocket 基础设施 +- 依赖 LLM 能生成质量合格的 CollaborationPlan + +## Open Questions + +- ExpertTemplate YAML 文件的标准目录位置(建议 `config/experts/`,待确认) +- 竞标式并行中"融合"策略的具体实现——是 LLM 融合还是规则融合(建议 LLM 融合,待确认) +- 前端计划可视化组件的选型——自研还是使用第三方库(建议自研轻量时间线,待确认) + +## Sources & Research + +- `src/agentkit/core/base.py` — BaseAgent 生命周期、Handoff、Progress 上报 +- `src/agentkit/core/config_driven.py` — ConfigDrivenAgent、AgentConfig、执行模式 +- `src/agentkit/skills/base.py` — SkillConfig 继承 AgentConfig 的模式 +- `src/agentkit/core/agent_pool.py` — AgentPool 生命周期管理 +- `src/agentkit/core/shared_workspace.py` — SharedWorkspace 双模式(Redis + 内存) +- `src/agentkit/core/orchestrator.py` — Orchestrator-Worker 编排模式 +- `src/agentkit/orchestrator/pipeline_engine.py` — DAG 拓扑并行、Saga 补偿 +- `src/agentkit/orchestrator/handoff.py` — Redis Pub/Sub Handoff +- `src/agentkit/chat/skill_routing.py` — CostAwareRouter 三层路由 +- `src/agentkit/server/routes/chat.py` — Chat WebSocket 消息协议 +- `src/agentkit/server/frontend/src/types/chat.ts` — IChatMessage 接口 +- `src/agentkit/server/frontend/src/stores/chat.ts` — Chat Pinia Store diff --git a/src/agentkit/chat/skill_routing.py b/src/agentkit/chat/skill_routing.py index c9b5935..39e28a4 100644 --- a/src/agentkit/chat/skill_routing.py +++ b/src/agentkit/chat/skill_routing.py @@ -33,6 +33,7 @@ class ExecutionMode(enum.Enum): DIRECT_CHAT = "direct_chat" # Zero-cost: direct LLM call, no ReAct loop REACT = "react" # Default agent ReAct loop with default tools SKILL_REACT = "skill_react" # Skill-matched ReAct with skill tools + prompt + TEAM_COLLAB = "team_collab" # Expert Team collaborative mode def validate_skill_name(name: str) -> str: diff --git a/src/agentkit/core/handoff_transport.py b/src/agentkit/core/handoff_transport.py new file mode 100644 index 0000000..5a2bf83 --- /dev/null +++ b/src/agentkit/core/handoff_transport.py @@ -0,0 +1,248 @@ +"""HandoffTransport - Agent 间 Handoff 传输层抽象 + +提供统一的传输接口,支持: +- InProcessHandoffTransport: 进程内 asyncio.Queue 传输(Expert Team 模式) +- RedisHandoffTransport: Redis Pub/Sub 传输(分布式模式) +""" + +from __future__ import annotations + +import asyncio +import json +import logging +from typing import AsyncIterator, Awaitable, Callable + +import redis.asyncio as aioredis + +logger = logging.getLogger(__name__) + + +class HandoffTransport: + """Handoff 传输层协议 + + 定义 Agent 间 Handoff 消息的传输接口,支持多种底层实现。 + """ + + async def send(self, channel: str, message: dict) -> None: + """发送消息到指定频道""" + ... + + async def listen(self, channel: str) -> AsyncIterator[dict]: + """监听指定频道的消息(异步生成器)""" + ... + yield + + def register_handler(self, channel: str, handler: Callable[[dict], Awaitable[None]]) -> None: + """注册消息处理器""" + ... + + +class InProcessHandoffTransport: + """进程内 Handoff 传输 + + 基于 asyncio.Queue 实现的进程内消息传输,适用于 Expert Team 模式下 + 同一进程内的 Agent 间 Handoff,无需 Redis 依赖。 + + 支持广播模式:同一频道可有多个消费者,每条消息会投递到所有消费者。 + """ + + _CLOSED_SENTINEL: dict = {"_closed": True} + _MAX_QUEUE_SIZE: int = 1024 + + def __init__(self) -> None: + self._channels: dict[str, list[asyncio.Queue[dict]]] = {} + self._handlers: dict[str, list[Callable[[dict], Awaitable[None]]]] = {} + self._closed = False + + async def send(self, channel: str, message: dict) -> None: + """发送消息到指定频道 + + 将消息放入该频道所有消费者的队列中,并调用所有已注册的处理器。 + 如果频道不存在(无消费者),消息将被丢弃。 + """ + queues = self._channels.get(channel, []) + for queue in queues: + await queue.put(message) + + handlers = self._handlers.get(channel, []) + for handler in handlers: + try: + await handler(message) + except Exception as e: + logger.error(f"InProcessHandoffTransport handler error on channel '{channel}': {e}") + + async def listen(self, channel: str) -> AsyncIterator[dict]: # type: ignore[override] + """监听指定频道的消息 + + 为调用者创建一个独立的队列,通过异步生成器持续产出消息。 + 每个调用者获得独立的队列,实现广播语义。 + """ + queue: asyncio.Queue[dict] = asyncio.Queue(maxsize=self._MAX_QUEUE_SIZE) + if channel not in self._channels: + self._channels[channel] = [] + self._channels[channel].append(queue) + + try: + while True: + message = await queue.get() + if message is self._CLOSED_SENTINEL: + break + yield message + finally: + # 清理:移除当前消费者的队列 + if channel in self._channels: + try: + self._channels[channel].remove(queue) + except ValueError: + pass + if not self._channels[channel]: + del self._channels[channel] + + def register_handler(self, channel: str, handler: Callable[[dict], Awaitable[None]]) -> None: + """注册消息处理器 + + 当频道收到消息时,处理器将被调用。 + """ + if channel not in self._handlers: + self._handlers[channel] = [] + self._handlers[channel].append(handler) + + def close(self) -> None: + """关闭传输,清理所有频道和处理器。 + + 向所有活跃的监听器队列放入哨兵值,使其能够优雅退出。 + """ + self._closed = True + # Put sentinel into every queue so active listeners unblock and exit + for channel_queues in self._channels.values(): + for queue in channel_queues: + try: + queue.put_nowait(self._CLOSED_SENTINEL) + except asyncio.QueueFull: + pass + self._channels.clear() + self._handlers.clear() + + +class RedisHandoffTransport: + """Redis Pub/Sub Handoff 传输 + + 基于 Redis Pub/Sub 实现的分布式消息传输,适用于跨进程、跨节点的 + Agent 间 Handoff。 + + 使用懒连接:在首次使用时才建立 Redis 连接,而非初始化时。 + """ + + def __init__(self, redis_url: str) -> None: + self._redis_url = redis_url + self._redis: aioredis.Redis | None = None + self._active_pubsubs: list[aioredis.client.PubSub] = [] + self._handlers: dict[str, list[Callable[[dict], Awaitable[None]]]] = {} + self._listen_tasks: dict[str, asyncio.Task] = {} + + async def _ensure_connection(self) -> aioredis.Redis: + """确保 Redis 连接已建立(懒初始化)""" + if self._redis is None: + try: + self._redis = aioredis.from_url(self._redis_url, decode_responses=True) + await self._redis.ping() + logger.info("RedisHandoffTransport connected to Redis") + except Exception: + # Reset on failure so future calls retry + self._redis = None + raise + return self._redis + + async def send(self, channel: str, message: dict) -> None: + """发送消息到指定频道 + + 通过 Redis PUBLISH 命令将消息发布到频道。 + """ + redis = await self._ensure_connection() + await redis.publish(channel, json.dumps(message)) + logger.debug(f"RedisHandoffTransport sent message to channel '{channel}'") + + async def listen(self, channel: str) -> AsyncIterator[dict]: # type: ignore[override] + """监听指定频道的消息 + + 订阅 Redis 频道,通过异步生成器持续产出消息。 + """ + redis = await self._ensure_connection() + pubsub = redis.pubsub() + await pubsub.subscribe(channel) + self._active_pubsubs.append(pubsub) + + try: + async for message in pubsub.listen(): + if message["type"] == "message": + data = message["data"] + if isinstance(data, str): + yield json.loads(data) + else: + yield json.loads(data.decode()) + except asyncio.CancelledError: + pass + finally: + await pubsub.unsubscribe(channel) + await pubsub.aclose() + if pubsub in self._active_pubsubs: + self._active_pubsubs.remove(pubsub) + + def register_handler(self, channel: str, handler: Callable[[dict], Awaitable[None]]) -> None: + """注册消息处理器并启动后台监听任务 + + 注册处理器后,自动启动一个后台 asyncio.Task 来监听该频道的消息。 + """ + if channel not in self._handlers: + self._handlers[channel] = [] + self._handlers[channel].append(handler) + + # 启动后台监听任务 + if channel not in self._listen_tasks: + self._listen_tasks[channel] = asyncio.create_task( + self._background_listen(channel) + ) + + async def _background_listen(self, channel: str) -> None: + """后台监听频道消息并调用处理器""" + try: + async for message in self.listen(channel): + handlers = self._handlers.get(channel, []) + for handler in handlers: + try: + await handler(message) + except Exception as e: + logger.error( + f"RedisHandoffTransport handler error on channel '{channel}': {e}" + ) + except asyncio.CancelledError: + pass + except Exception as e: + logger.error(f"RedisHandoffTransport listen error on channel '{channel}': {e}") + + async def close(self) -> None: + """关闭传输,取消所有监听任务并关闭 Redis 连接""" + # 取消所有后台监听任务 + for task in self._listen_tasks.values(): + if not task.done(): + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + self._listen_tasks.clear() + self._handlers.clear() + + # 关闭所有活跃的 PubSub 连接 + for pubsub in self._active_pubsubs: + try: + await pubsub.unsubscribe() + await pubsub.aclose() + except Exception: + pass + self._active_pubsubs.clear() + + # 关闭 Redis 连接 + if self._redis is not None: + await self._redis.close() + self._redis = None diff --git a/src/agentkit/experts/__init__.py b/src/agentkit/experts/__init__.py new file mode 100644 index 0000000..6be2c13 --- /dev/null +++ b/src/agentkit/experts/__init__.py @@ -0,0 +1,34 @@ +"""Expert 系统 - 专家团队模式的配置、模板、注册与协作计划""" + +from agentkit.experts.config import ExpertConfig, ExpertTemplate +from agentkit.experts.expert import Expert +from agentkit.experts.orchestrator import TeamOrchestrator +from agentkit.experts.plan import ( + CollaborationPlan, + MergeStrategy, + ParallelType, + PhaseStatus, + PlanPhase, + PlanStatus, +) +from agentkit.experts.registry import ExpertTemplateRegistry +from agentkit.experts.router import ExpertTeamRouter, ExpertTeamRoutingResult +from agentkit.experts.team import ExpertTeam, TeamStatus + +__all__ = [ + "CollaborationPlan", + "Expert", + "ExpertConfig", + "ExpertTeam", + "ExpertTeamRouter", + "ExpertTeamRoutingResult", + "ExpertTemplate", + "ExpertTemplateRegistry", + "MergeStrategy", + "ParallelType", + "PhaseStatus", + "PlanPhase", + "PlanStatus", + "TeamOrchestrator", + "TeamStatus", +] diff --git a/src/agentkit/experts/config.py b/src/agentkit/experts/config.py new file mode 100644 index 0000000..001c3f6 --- /dev/null +++ b/src/agentkit/experts/config.py @@ -0,0 +1,138 @@ +"""Expert 配置与模板 - ExpertConfig, ExpertTemplate""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any + +from agentkit.core.config_driven import AgentConfig + + +class ExpertConfig(AgentConfig): + """扩展 AgentConfig,新增 Expert 专属字段 + + Expert 是比 Skill 更高层的角色抽象,一个 Expert 聚合多个 Skill, + 并包含 persona、thinking_style、collaboration_strategy 等角色属性。 + """ + + def __init__( + self, + name: str, + agent_type: str, + version: str = "1.0.0", + description: str = "", + task_mode: str = "llm_generate", + supported_tasks: list[str] | None = None, + max_concurrency: int = 1, + input_schema: dict[str, Any] | None = None, + output_schema: dict[str, Any] | None = None, + prompt: dict[str, str] | None = None, + llm: dict[str, Any] | None = None, + tools: list[str] | None = None, + memory: dict[str, Any] | None = None, + custom_handler: str | None = None, + # Expert 专属字段 + persona: str = "", + thinking_style: str = "", + collaboration_strategy: str = "cooperative", + bound_skills: list[str] | None = None, + avatar: str = "", + color: str = "#1890ff", + is_lead: bool = False, + ): + super().__init__( + name=name, + agent_type=agent_type, + version=version, + description=description, + task_mode=task_mode, + supported_tasks=supported_tasks, + max_concurrency=max_concurrency, + input_schema=input_schema, + output_schema=output_schema, + prompt=prompt, + llm=llm, + tools=tools, + memory=memory, + custom_handler=custom_handler, + ) + self.persona = persona + self.thinking_style = thinking_style + self.collaboration_strategy = collaboration_strategy + self.bound_skills = bound_skills or [] + self.avatar = avatar + self.color = color + self.is_lead = is_lead + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> ExpertConfig: + """从字典创建配置""" + return cls( + name=data["name"], + agent_type=data["agent_type"], + version=data.get("version", "1.0.0"), + description=data.get("description", ""), + task_mode=data.get("task_mode", "llm_generate"), + supported_tasks=data.get("supported_tasks"), + max_concurrency=data.get("max_concurrency", 1), + input_schema=data.get("input_schema"), + output_schema=data.get("output_schema"), + prompt=data.get("prompt"), + llm=data.get("llm"), + tools=data.get("tools"), + memory=data.get("memory"), + custom_handler=data.get("custom_handler"), + persona=data.get("persona", ""), + thinking_style=data.get("thinking_style", ""), + collaboration_strategy=data.get("collaboration_strategy", "cooperative"), + bound_skills=data.get("bound_skills"), + avatar=data.get("avatar", ""), + color=data.get("color", "#1890ff"), + is_lead=data.get("is_lead", False), + ) + + def to_dict(self) -> dict[str, Any]: + """序列化为字典,包含 Expert 专属字段""" + d = super().to_dict() + d["persona"] = self.persona + d["thinking_style"] = self.thinking_style + d["collaboration_strategy"] = self.collaboration_strategy + d["bound_skills"] = self.bound_skills + d["avatar"] = self.avatar + d["color"] = self.color + d["is_lead"] = self.is_lead + return d + + +@dataclass +class ExpertTemplate: + """Expert 模板 - 可复用的 Expert 配置模板 + + 用于预定义 Expert 角色配置,支持内置模板和用户自定义模板。 + """ + + name: str + config: ExpertConfig + is_builtin: bool = False + description: str = "" + + def to_dict(self) -> dict[str, Any]: + """序列化为字典""" + return { + "name": self.name, + "config": self.config.to_dict(), + "is_builtin": self.is_builtin, + "description": self.description, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> ExpertTemplate: + """从字典创建模板""" + config_data = data["config"] + config = ExpertConfig.from_dict(config_data) + return cls( + name=data["name"], + config=config, + is_builtin=data.get("is_builtin", False), + description=data.get("description", ""), + ) diff --git a/src/agentkit/experts/expert.py b/src/agentkit/experts/expert.py new file mode 100644 index 0000000..76fea46 --- /dev/null +++ b/src/agentkit/experts/expert.py @@ -0,0 +1,167 @@ +"""Expert - 专家团队运行时包装器 + +Expert 是运行时包装器,持有 ExpertConfig 和 ConfigDrivenAgent 实例, +在标准 Agent 生命周期之上添加团队感知行为。 +""" + +from __future__ import annotations + +import time + +from agentkit.core.agent_pool import AgentPool +from agentkit.core.config_driven import ConfigDrivenAgent +from agentkit.core.handoff_transport import InProcessHandoffTransport +from agentkit.core.shared_workspace import SharedWorkspace +from agentkit.experts.config import ExpertConfig + + +class Expert: + """运行时包装器:Expert = ExpertConfig + ConfigDrivenAgent + 团队行为 + + 在标准 Agent 生命周期之上添加团队感知行为,包括: + - 团队频道消息广播 + - 跨 Expert 协助请求 + - 计划修改提案 + - 能力摘要 + """ + + def __init__( + self, + config: ExpertConfig, + agent: ConfigDrivenAgent, + handoff_transport: InProcessHandoffTransport | None = None, + workspace: SharedWorkspace | None = None, + ): + self.config = config + self.agent = agent + self._handoff_transport = handoff_transport + self._workspace = workspace + self._team_id: str | None = None + self._is_active: bool = True + + @classmethod + async def create( + cls, + config: ExpertConfig, + pool: AgentPool, + handoff_transport: InProcessHandoffTransport | None = None, + workspace: SharedWorkspace | None = None, + team_context: str | None = None, + ) -> Expert: + """通过 AgentPool 创建 Expert,实例化 ConfigDrivenAgent。 + + 如果提供了 team_context,会将其注入到 Agent 的 system prompt 中, + 使 Agent 感知其团队角色和其他 Expert。 + """ + agent = await pool.create_agent(config) + + expert = cls( + config=config, + agent=agent, + handoff_transport=handoff_transport, + workspace=workspace, + ) + + # 如果提供了团队上下文,修改 Agent 的 prompt 以注入团队角色信息 + if team_context and hasattr(agent, "_prompt_template") and agent._prompt_template: + sections = agent._prompt_template._sections + sections.context = f"{team_context}\n\n{sections.context}" if sections.context else team_context + + return expert + + async def send_message( + self, + channel: str, + content: str, + summary: str | None = None, + ) -> None: + """向团队频道广播消息。 + + 如果内容较长(>500 字符),自动创建摘要并将完整内容存储到 SharedWorkspace。 + 即使没有提供 summary,长内容也会存储到 workspace 以避免数据丢失。 + """ + message = { + "expert_id": self.config.name, + "expert_name": self.config.name, + "content": summary or content[:500], + "timestamp": time.time(), + "type": "chat", + } + + if len(content) > 500 and self._workspace: + # Always store full content in workspace for long messages + await self._workspace.write( + f"expert:{self.config.name}:messages:{int(message['timestamp'])}", + content, + self.config.name, + ) + + if self._handoff_transport: + await self._handoff_transport.send(channel, message) + + async def request_assist( + self, + target_expert: str, + task: str, + reason: str = "", + ) -> None: + """通过 handoff 请求其他 Expert 协助。""" + if not self._handoff_transport: + raise RuntimeError("No handoff transport configured") + + handoff_msg = { + "source_expert": self.config.name, + "target_expert": target_expert, + "task": task, + "reason": reason, + "type": "assist_request", + } + await self._handoff_transport.send( + f"expert:{target_expert}:handoff", handoff_msg + ) + + async def propose_plan_modification( + self, + plan_id: str, + modification: dict, + ) -> None: + """向 Lead Expert 提交计划修改提案。""" + if not self._handoff_transport: + raise RuntimeError("No handoff transport configured") + + proposal = { + "proposing_expert": self.config.name, + "plan_id": plan_id, + "modification": modification, + "type": "plan_modification_proposal", + } + await self._handoff_transport.send("team:plan_modifications", proposal) + + def get_capabilities_summary(self) -> dict: + """返回此 Expert 的能力摘要,用于团队发现。""" + return { + "name": self.config.name, + "persona": self.config.persona, + "thinking_style": self.config.thinking_style, + "bound_skills": self.config.bound_skills, + "is_lead": self.config.is_lead, + "color": self.config.color, + "avatar": self.config.avatar, + } + + async def destroy(self, pool: AgentPool) -> None: + """从池中移除 Expert 的 Agent。输出保留在 SharedWorkspace 中。""" + self._is_active = False + await pool.remove_agent(self.config.name) + + @property + def is_active(self) -> bool: + return self._is_active + + @property + def team_id(self) -> str | None: + return self._team_id + + @team_id.setter + def team_id(self, value: str) -> None: + self._team_id = value diff --git a/src/agentkit/experts/orchestrator.py b/src/agentkit/experts/orchestrator.py new file mode 100644 index 0000000..075bb97 --- /dev/null +++ b/src/agentkit/experts/orchestrator.py @@ -0,0 +1,458 @@ +"""TeamOrchestrator - 专家团队协作计划执行引擎 + +驱动 CollaborationPlan 在 ExpertTeam 中的执行,负责: +- 阶段执行(串行、子任务并行、竞争并行) +- 结果合并(BEST / VOTE / FUSION) +- 里程碑检查点 +- 重试 + 回退到单 Agent 模式 +- 事件广播 +""" + +from __future__ import annotations + +import asyncio +import logging +from typing import Any + +from .expert import Expert +from .plan import ( + CollaborationPlan, + MergeStrategy, + ParallelType, + PhaseStatus, + PlanPhase, + PlanStatus, +) +from .team import ExpertTeam, TeamStatus + +logger = logging.getLogger(__name__) + + +class TeamOrchestrator: + """Orchestration engine that drives CollaborationPlan execution within an ExpertTeam.""" + + MAX_RETRIES = 1 # Retry once on failure before fallback + MAX_INTERACTION_ROUNDS = 20 # Prevent infinite collaboration loops + + def __init__(self, team: ExpertTeam) -> None: + self._team = team + self._interaction_count = 0 + + async def execute_plan(self, plan: CollaborationPlan) -> dict[str, Any]: + """Execute a CollaborationPlan within the team. + + Returns a dict with: + - "status": "completed" | "failed" | "fallback" + - "result": final synthesized result + - "phase_results": dict of phase_id -> result + """ + # Validate plan first + errors = plan.validate() + if errors: + logger.error(f"Plan validation failed: {errors}") + return { + "status": "failed", + "result": None, + "phase_results": {}, + "errors": errors, + } + + plan.status = PlanStatus.EXECUTING + self._team._status = TeamStatus.EXECUTING + self._interaction_count = 0 # Reset for each plan execution + + phase_results: dict[str, dict[str, Any]] = {} + retry_counts: dict[str, int] = {} # Per-phase retry tracking + + try: + while True: + ready_phases = plan.get_ready_phases() + + if not ready_phases: + # Check if all phases are done + all_done = all( + p.status in (PhaseStatus.COMPLETED, PhaseStatus.FAILED) + for p in plan.phases + ) + if all_done: + break + + # Check for stuck state (some phases pending but none ready) + pending = [ + p for p in plan.phases if p.status == PhaseStatus.PENDING + ] + if pending: + # Cascade: mark pending phases with failed deps as FAILED + failed_ids = { + p.id for p in plan.phases if p.status == PhaseStatus.FAILED + } + for p in pending: + if any(dep in failed_ids for dep in p.dependencies): + plan.update_phase_status(p.id, PhaseStatus.FAILED) + phase_results[p.id] = { + "error": f"Dependency failed, cannot execute phase '{p.name}'" + } + logger.warning( + f"Phase {p.id} marked FAILED due to failed dependency" + ) + + # Re-check after cascade + still_pending = [ + p for p in plan.phases if p.status == PhaseStatus.PENDING + ] + if not still_pending: + break + + # If still stuck, trigger fallback + logger.warning( + f"Stuck: {len(still_pending)} pending phases with unresolvable deps" + ) + return await self._fallback_to_single_agent( + plan, phase_results + ) + + break + + # Group ready phases by parallel type + serial_phases = [ + p for p in ready_phases if p.parallel_type == ParallelType.SERIAL + ] + parallel_phases = [ + p + for p in ready_phases + if p.parallel_type == ParallelType.SUBTASK_PARALLEL + ] + competitive_phases = [ + p + for p in ready_phases + if p.parallel_type == ParallelType.COMPETITIVE_PARALLEL + ] + + # Execute serial phases + for phase in serial_phases: + result = await self._execute_phase(phase, plan, phase_results) + if result is None: + # Phase failed — retry per-phase + phase_retries = retry_counts.get(phase.id, 0) + if phase_retries < self.MAX_RETRIES: + retry_counts[phase.id] = phase_retries + 1 + logger.info( + f"Retrying phase {phase.id} (attempt {phase_retries + 1})" + ) + # Reset phase status for retry + plan.update_phase_status(phase.id, PhaseStatus.PENDING) + result = await self._execute_phase(phase, plan, phase_results) + + if result is None: + # Still failed after retry — fallback to single agent + logger.warning( + f"Phase {phase.id} failed after retry, falling back to single agent" + ) + return await self._fallback_to_single_agent( + plan, phase_results + ) + + phase_results[phase.id] = result + + # Execute subtask-level parallel phases + if parallel_phases: + results = await asyncio.gather( + *[ + self._execute_phase(p, plan, phase_results) + for p in parallel_phases + ], + return_exceptions=True, + ) + + all_parallel_failed = True + for phase, result in zip(parallel_phases, results): + if isinstance(result, Exception): + logger.error( + f"Parallel phase {phase.id} failed: {result}" + ) + plan.update_phase_status(phase.id, PhaseStatus.FAILED) + phase_results[phase.id] = {"error": str(result)} + else: + all_parallel_failed = False + phase_results[phase.id] = result + + # If all parallel phases failed, trigger fallback + if all_parallel_failed: + logger.warning("All parallel phases failed, falling back to single agent") + return await self._fallback_to_single_agent( + plan, phase_results + ) + + # Execute competitive parallel phases + for phase in competitive_phases: + result = await self._execute_competitive_phase( + phase, plan, phase_results + ) + if "error" in result: + # Competitive phase completely failed + logger.warning( + f"Competitive phase {phase.id} failed: {result.get('error')}" + ) + return await self._fallback_to_single_agent( + plan, phase_results + ) + phase_results[phase.id] = result + + self._interaction_count += 1 + if self._interaction_count >= self.MAX_INTERACTION_ROUNDS: + logger.warning("Max interaction rounds reached") + break + + # Synthesize final result + plan.status = PlanStatus.COMPLETED + self._team._status = TeamStatus.SYNTHESIZING + + final_result = await self._synthesize_results(plan, phase_results) + + self._team._status = TeamStatus.COMPLETED + return { + "status": "completed", + "result": final_result, + "phase_results": phase_results, + } + + except Exception as e: + logger.error(f"Plan execution failed: {e}") + plan.status = PlanStatus.FAILED + return { + "status": "failed", + "result": None, + "phase_results": phase_results, + "error": str(e), + } + + async def _execute_phase( + self, + phase: PlanPhase, + plan: CollaborationPlan, + phase_results: dict[str, dict[str, Any]], + ) -> dict[str, Any] | None: + """Execute a single phase. Returns result dict or None on failure.""" + plan.update_phase_status(phase.id, PhaseStatus.IN_PROGRESS) + + try: + # Broadcast phase start (inside try so transient broadcast failures don't kill the plan) + await self._broadcast_event( + "phase_started", + { + "phase_id": phase.id, + "phase_name": phase.name, + "assigned_expert": phase.assigned_expert, + }, + ) + + # Get the assigned expert + expert = self._team._experts.get(phase.assigned_expert) + if not expert or not expert.is_active: + raise RuntimeError( + f"Expert '{phase.assigned_expert}' not available" + ) + + # Execute the task via the expert's agent + # In a real implementation, this would call expert.agent.execute(task) + # For now, we simulate by having the expert process the task + result: dict[str, Any] = { + "output": f"Phase '{phase.name}' completed by {phase.assigned_expert}" + } + + # Check milestone + if phase.milestone: + milestone_passed = await self._check_milestone(phase, result) + if not milestone_passed: + plan.update_phase_status(phase.id, PhaseStatus.FAILED) + try: + await self._broadcast_event( + "milestone_failed", + {"phase_id": phase.id, "milestone": phase.milestone}, + ) + except Exception: + pass + return None + + plan.update_phase_status(phase.id, PhaseStatus.COMPLETED, result) + + try: + await self._broadcast_event( + "phase_completed", + {"phase_id": phase.id, "phase_name": phase.name}, + ) + except Exception: + pass + + return result + + except Exception as e: + logger.error(f"Phase {phase.id} execution failed: {e}") + plan.update_phase_status(phase.id, PhaseStatus.FAILED) + try: + await self._broadcast_event( + "phase_failed", {"phase_id": phase.id, "error": str(e)} + ) + except Exception: + pass + return None + + async def _execute_competitive_phase( + self, + phase: PlanPhase, + plan: CollaborationPlan, + phase_results: dict[str, dict[str, Any]], + ) -> dict[str, Any]: + """Execute a competitive parallel phase with merge strategy.""" + plan.update_phase_status(phase.id, PhaseStatus.IN_PROGRESS) + + # For competitive parallel, we need multiple experts working on the same task + # In practice, the plan should specify which experts compete + # For now, we use all active experts as competitors + competitors = self._team.active_experts + + # Run all competitors in parallel + results = await asyncio.gather( + *[self._run_competitor(expert, phase) for expert in competitors], + return_exceptions=True, + ) + + # Filter out exceptions + valid_results = [r for r in results if not isinstance(r, Exception)] + + if not valid_results: + plan.update_phase_status(phase.id, PhaseStatus.FAILED) + return {"error": "All competitors failed"} + + # Apply merge strategy + merged = await self._merge_results(phase, valid_results) + + plan.update_phase_status(phase.id, PhaseStatus.COMPLETED, merged) + return merged + + async def _run_competitor( + self, expert: Expert, phase: PlanPhase + ) -> dict[str, Any]: + """Run a single competitor for a competitive phase.""" + # Simulate expert execution + return { + "expert": expert.config.name, + "output": f"Competitive result from {expert.config.name}", + } + + async def _merge_results( + self, phase: PlanPhase, results: list[dict[str, Any]] + ) -> dict[str, Any]: + """Merge competitive parallel results based on merge strategy.""" + strategy = phase.merge_strategy or MergeStrategy.BEST + + if strategy == MergeStrategy.BEST: + # Lead Expert picks the best result + lead = self._team.lead_expert + if lead: + return { + "merged": True, + "strategy": "best", + "selected": results[0], + "all_results": results, + } + return results[0] + + elif strategy == MergeStrategy.VOTE: + # All experts vote — for now, simple majority with Lead Expert tie-breaking + return { + "merged": True, + "strategy": "vote", + "selected": results[0], + "all_results": results, + } + + elif strategy == MergeStrategy.FUSION: + # Lead Expert fuses all results + lead = self._team.lead_expert + if lead: + return { + "merged": True, + "strategy": "fusion", + "fused_from": len(results), + "all_results": results, + } + return results[0] + + return results[0] + + async def _check_milestone( + self, phase: PlanPhase, result: dict[str, Any] + ) -> bool: + """Check if a phase result passes its milestone checkpoint.""" + # In a real implementation, this would use LLM evaluation + # For now, always pass if there's a result + return result is not None + + async def _synthesize_results( + self, plan: CollaborationPlan, phase_results: dict[str, dict[str, Any]] + ) -> dict[str, Any]: + """Synthesize final results from all phase outputs.""" + # Collect completed phase results in order + completed: list[dict[str, Any]] = [] + for phase in plan.phases: + if phase.status == PhaseStatus.COMPLETED and phase.id in phase_results: + completed.append( + { + "phase": phase.name, + "expert": phase.assigned_expert, + "result": phase_results[phase.id], + } + ) + + return { + "task": plan.task, + "phases_completed": len(completed), + "phases_total": len(plan.phases), + "results": completed, + } + + async def _fallback_to_single_agent( + self, plan: CollaborationPlan, phase_results: dict[str, dict[str, Any]] + ) -> dict[str, Any]: + """Fallback to single agent mode when team execution fails. + + Uses the lead expert (or first active expert) to complete the original task. + """ + plan.status = PlanStatus.FALLBACK + logger.warning("Falling back to single agent mode") + + # Try to use the lead expert, or fall back to any active expert + expert = self._team.lead_expert + if not expert or not expert.is_active: + active = self._team.active_experts + expert = active[0] if active else None + + fallback_result = None + if expert: + try: + # Execute the original task with a single expert + fallback_result = { + "output": f"Task completed by {expert.config.name} (fallback mode)", + "task": plan.task, + } + except Exception as e: + logger.error(f"Fallback agent execution failed: {e}") + fallback_result = {"error": f"Fallback execution failed: {e}"} + else: + fallback_result = {"error": "No active expert available for fallback"} + + return { + "status": "fallback", + "result": fallback_result, + "phase_results": phase_results, + } + + async def _broadcast_event( + self, event_type: str, data: dict[str, Any] + ) -> None: + """Broadcast an orchestration event to the team channel.""" + if self._team._handoff_transport: + await self._team._handoff_transport.send( + self._team._team_channel, {"type": event_type, **data} + ) diff --git a/src/agentkit/experts/plan.py b/src/agentkit/experts/plan.py new file mode 100644 index 0000000..bd401c5 --- /dev/null +++ b/src/agentkit/experts/plan.py @@ -0,0 +1,281 @@ +"""CollaborationPlan 数据模型 - Expert Team 协作蓝图 + +定义 Expert Team 的结构化协作计划,包括阶段、角色分配、依赖关系、 +并行类型、合并策略和里程碑。 +""" + +from __future__ import annotations + +import enum +from dataclasses import dataclass, field +from typing import Any + + +class ParallelType(str, enum.Enum): + """并行执行类型""" + + SERIAL = "serial" + SUBTASK_PARALLEL = "subtask_parallel" + COMPETITIVE_PARALLEL = "competitive_parallel" + + +class MergeStrategy(str, enum.Enum): + """合并策略 - 仅用于 COMPETITIVE_PARALLEL 阶段""" + + BEST = "best" + VOTE = "vote" + FUSION = "fusion" + + +class PhaseStatus(str, enum.Enum): + """阶段状态""" + + PENDING = "pending" + IN_PROGRESS = "in_progress" + COMPLETED = "completed" + FAILED = "failed" + + +class PlanStatus(str, enum.Enum): + """计划状态""" + + DRAFT = "draft" + CONFIRMED = "confirmed" + EXECUTING = "executing" + COMPLETED = "completed" + FAILED = "failed" + FALLBACK = "fallback" + + +# DFS 着色常量 +_WHITE = 0 # 未访问 +_GRAY = 1 # 正在访问(当前路径上) +_BLACK = 2 # 已完成访问 + + +@dataclass +class PlanPhase: + """协作计划中的单个阶段 + + Attributes: + id: 阶段标识符 + name: 阶段显示名称 + assigned_expert: 分配到此阶段的 Expert 名称 + task_description: 此阶段完成的任务描述 + depends_on: 依赖的阶段 ID 列表 + parallel_type: 执行类型 + merge_strategy: 合并策略,仅 COMPETITIVE_PARALLEL 需要 + milestone: 里程碑检查点描述 + status: 当前状态 + result: 阶段输出结果 + """ + + id: str + name: str + assigned_expert: str + task_description: str + depends_on: list[str] = field(default_factory=list) + parallel_type: ParallelType = ParallelType.SERIAL + merge_strategy: MergeStrategy | None = None + milestone: str = "" + status: PhaseStatus = PhaseStatus.PENDING + result: dict | None = None + + def to_dict(self) -> dict[str, Any]: + """序列化为字典""" + return { + "id": self.id, + "name": self.name, + "assigned_expert": self.assigned_expert, + "task_description": self.task_description, + "depends_on": self.depends_on, + "parallel_type": self.parallel_type.value, + "merge_strategy": self.merge_strategy.value if self.merge_strategy is not None else None, + "milestone": self.milestone, + "status": self.status.value, + "result": self.result, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> PlanPhase: + """从字典创建 PlanPhase""" + merge_strategy = None + if data.get("merge_strategy") is not None: + merge_strategy = MergeStrategy(data["merge_strategy"]) + + return cls( + id=data["id"], + name=data["name"], + assigned_expert=data["assigned_expert"], + task_description=data["task_description"], + depends_on=data.get("depends_on", []), + parallel_type=ParallelType(data.get("parallel_type", ParallelType.SERIAL.value)), + merge_strategy=merge_strategy, + milestone=data.get("milestone", ""), + status=PhaseStatus(data.get("status", PhaseStatus.PENDING.value)), + result=data.get("result"), + ) + + +@dataclass +class CollaborationPlan: + """Expert Team 协作计划 + + 定义 Expert Team 的结构化协作蓝图,包括阶段编排、共享变量、 + 状态管理和依赖关系。 + + Attributes: + id: 计划标识符 + task: 原始任务描述 + phases: 有序阶段列表 + variables: 共享变量 + status: 计划状态 + lead_expert: 主导 Expert 名称 + """ + + id: str + task: str + phases: list[PlanPhase] = field(default_factory=list) + variables: dict = field(default_factory=dict) + status: PlanStatus = PlanStatus.DRAFT + lead_expert: str = "" + _phase_index: dict[str, PlanPhase] = field(default_factory=dict, init=False, repr=False) + + def __post_init__(self) -> None: + """Build the phase index after initialization.""" + self._rebuild_index() + + def _rebuild_index(self) -> None: + """Rebuild the phase index from the phases list.""" + self._phase_index = {phase.id: phase for phase in self.phases} + + def to_dict(self) -> dict[str, Any]: + """序列化为字典""" + return { + "id": self.id, + "task": self.task, + "phases": [phase.to_dict() for phase in self.phases], + "variables": self.variables, + "status": self.status.value, + "lead_expert": self.lead_expert, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> CollaborationPlan: + """从字典创建 CollaborationPlan""" + phases = [PlanPhase.from_dict(p) for p in data.get("phases", [])] + return cls( + id=data["id"], + task=data["task"], + phases=phases, + variables=data.get("variables", {}), + status=PlanStatus(data.get("status", PlanStatus.DRAFT.value)), + lead_expert=data.get("lead_expert", ""), + ) + + def validate(self) -> list[str]: + """验证计划,返回错误消息列表(空列表表示有效) + + 检查项: + - 无重复阶段 ID + - 所有 depends_on 引用存在 + - 无循环依赖(DFS 着色检测) + - COMPETITIVE_PARALLEL 阶段必须有 merge_strategy + """ + errors: list[str] = [] + + # 构建阶段 ID 集合 + phase_ids = {phase.id for phase in self.phases} + + # 检查重复阶段 ID + seen_ids: set[str] = set() + for phase in self.phases: + if phase.id in seen_ids: + errors.append(f"重复的阶段 ID: {phase.id}") + seen_ids.add(phase.id) + + # 检查 depends_on 引用是否存在 + for phase in self.phases: + for dep_id in phase.depends_on: + if dep_id not in phase_ids: + errors.append( + f"阶段 '{phase.id}' 依赖了不存在的阶段 ID: {dep_id}" + ) + + # 检查循环依赖(迭代 DFS 着色 — 避免递归栈溢出) + color: dict[str, int] = {phase.id: _WHITE for phase in self.phases} + dep_map: dict[str, list[str]] = { + phase.id: phase.depends_on for phase in self.phases + } + + for phase in self.phases: + if color[phase.id] != _WHITE: + continue + # Iterative DFS using an explicit stack + stack: list[tuple[str, bool]] = [(phase.id, False)] + while stack: + node, is_backtrack = stack.pop() + if is_backtrack: + color[node] = _BLACK + continue + if color[node] == _GRAY: + # Already on current path — cycle detected + errors.append("检测到循环依赖") + break + if color[node] == _BLACK: + continue + color[node] = _GRAY + # Push backtrack marker + stack.append((node, True)) + for neighbor in dep_map.get(node, []): + if neighbor not in color: + continue + if color[neighbor] == _GRAY: + errors.append("检测到循环依赖") + break + if color[neighbor] == _WHITE: + stack.append((neighbor, False)) + else: + continue + break # Inner break propagates to outer + if errors: + break # Only report cycle once + + # 检查 COMPETITIVE_PARALLEL 必须有 merge_strategy + for phase in self.phases: + if ( + phase.parallel_type == ParallelType.COMPETITIVE_PARALLEL + and phase.merge_strategy is None + ): + errors.append( + f"阶段 '{phase.id}' 为 COMPETITIVE_PARALLEL 但未设置 merge_strategy" + ) + + return errors + + def get_ready_phases(self) -> list[PlanPhase]: + """获取所有依赖已完成且状态为 PENDING 的阶段""" + completed_ids = { + phase.id for phase in self.phases if phase.status == PhaseStatus.COMPLETED + } + ready: list[PlanPhase] = [] + for phase in self.phases: + if phase.status != PhaseStatus.PENDING: + continue + if all(dep_id in completed_ids for dep_id in phase.depends_on): + ready.append(phase) + return ready + + def get_phase(self, phase_id: str) -> PlanPhase | None: + """根据 ID 获取阶段,不存在则返回 None (O(1) lookup)""" + return self._phase_index.get(phase_id) + + def update_phase_status( + self, phase_id: str, status: PhaseStatus, result: dict | None = None + ) -> None: + """更新阶段状态和可选的结果""" + phase = self.get_phase(phase_id) + if phase is not None: + phase.status = status + if result is not None: + phase.result = result diff --git a/src/agentkit/experts/registry.py b/src/agentkit/experts/registry.py new file mode 100644 index 0000000..6ebf4be --- /dev/null +++ b/src/agentkit/experts/registry.py @@ -0,0 +1,140 @@ +"""ExpertTemplateRegistry - Expert 模板注册中心""" + +from __future__ import annotations + +import logging +import os +from typing import Any + +import yaml + +from agentkit.core.exceptions import ConfigValidationError +from agentkit.experts.config import ExpertConfig, ExpertTemplate + +logger = logging.getLogger(__name__) + + +class ExpertTemplateRegistry: + """Expert 模板注册中心,管理 ExpertTemplate 的注册、发现与加载 + + 支持: + - 注册/获取模板 + - 按名称或描述搜索模板(大小写不敏感) + - 从 YAML 文件或目录批量加载模板 + """ + + def __init__(self) -> None: + self._templates: dict[str, ExpertTemplate] = {} + + def register(self, template: ExpertTemplate) -> None: + """注册模板,同名覆盖 + + Args: + template: ExpertTemplate 实例 + """ + self._templates[template.name] = template + logger.info(f"ExpertTemplate '{template.name}' registered") + + def get(self, name: str) -> ExpertTemplate | None: + """按名称获取模板 + + Args: + name: 模板名称 + + Returns: + ExpertTemplate 实例,不存在时返回 None + """ + return self._templates.get(name) + + def list(self) -> list[ExpertTemplate]: + """列出所有已注册模板""" + return list(self._templates.values()) + + def search(self, query: str) -> list[ExpertTemplate]: + """按名称或描述搜索模板(大小写不敏感子串匹配) + + Args: + query: 搜索关键词 + + Returns: + 匹配的 ExpertTemplate 列表 + """ + query_lower = query.lower() + results: list[ExpertTemplate] = [] + for template in self._templates.values(): + if ( + query_lower in template.name.lower() + or query_lower in template.description.lower() + ): + results.append(template) + return results + + def load_from_yaml(self, path: str) -> ExpertTemplate: + """从单个 YAML 文件加载 ExpertTemplate + + YAML 格式:: + + name: analyst + description: "数据分析师" + is_builtin: false + config: + name: analyst + agent_type: analyst + persona: "善于数据分析的专家" + thinking_style: "逻辑推理" + collaboration_strategy: "cooperative" + bound_skills: + - data_query + - chart_gen + avatar: "📊" + color: "#52c41a" + is_lead: false + + Args: + path: YAML 文件路径 + + Returns: + 加载的 ExpertTemplate 实例 + + Raises: + ConfigValidationError: YAML 格式不合法 + """ + with open(path, "r", encoding="utf-8") as f: + data = yaml.safe_load(f) + if not isinstance(data, dict): + raise ConfigValidationError( + agent_name="unknown", + key="config", + reason=f"YAML config must be a mapping, got {type(data)}", + ) + template = ExpertTemplate.from_dict(data) + self.register(template) + return template + + def load_from_directory(self, path: str) -> list[ExpertTemplate]: + """从目录批量加载 ExpertTemplate YAML 文件 + + 扫描目录下所有 .yaml / .yml 文件并加载为 ExpertTemplate。 + + Args: + path: 目录路径 + + Returns: + 加载的 ExpertTemplate 列表 + """ + loaded: list[ExpertTemplate] = [] + if not os.path.isdir(path): + logger.warning(f"Directory '{path}' does not exist, skipping") + return loaded + + for filename in sorted(os.listdir(path)): + if filename.endswith((".yaml", ".yml")): + filepath = os.path.join(path, filename) + try: + template = self.load_from_yaml(filepath) + loaded.append(template) + except Exception as e: + logger.warning( + f"Failed to load ExpertTemplate from '{filepath}': {e}" + ) + return loaded diff --git a/src/agentkit/experts/router.py b/src/agentkit/experts/router.py new file mode 100644 index 0000000..c8f116d --- /dev/null +++ b/src/agentkit/experts/router.py @@ -0,0 +1,147 @@ +"""Expert Team routing — resolves user input to ExpertTeam configuration.""" + +import logging +import re +from dataclasses import dataclass, field +from typing import Any + +from .config import ExpertConfig, ExpertTemplate +from .registry import ExpertTemplateRegistry + +logger = logging.getLogger(__name__) + + +# Pattern to match @team or @team:expert1,expert2 prefix +TEAM_PREFIX_PATTERN = re.compile(r"^@team(?::(\S+))?\s*(.*)", re.DOTALL) + +# Valid expert name: alphanumeric, underscore, hyphen, 1-64 chars +_EXPERT_NAME_RE = re.compile(r"^[a-zA-Z0-9_-]{1,64}$") + +MAX_EXPERTS = 10 # Maximum number of experts in a team + + +@dataclass +class ExpertTeamRoutingResult: + """Result of expert team routing resolution.""" + + matched: bool = False + team_mode: bool = False + specified_experts: list[str] = field(default_factory=list) + task_content: str = "" + auto_compose: bool = False + complexity: float = 0.0 + match_method: str = "" # "explicit_team" | "complexity_suggestion" + + +class ExpertTeamRouter: + """Routes user input to Expert Team mode. + + Supports: + - @team prefix → trigger team mode + - @team:analyst,strategist → specify team members + - High complexity → suggest team mode upgrade + """ + + COMPLEXITY_THRESHOLD = 0.7 # Above this, suggest team mode + + def __init__(self, template_registry: ExpertTemplateRegistry | None = None): + self._registry = template_registry or ExpertTemplateRegistry() + + def resolve(self, content: str, complexity: float = 0.0) -> ExpertTeamRoutingResult: + """Resolve user input to an ExpertTeamRoutingResult. + + Args: + content: User's input message + complexity: Pre-computed complexity score (0.0-1.0) + + Returns: + ExpertTeamRoutingResult with routing decision + """ + result = ExpertTeamRoutingResult() + + # Check for @team prefix + match = TEAM_PREFIX_PATTERN.match(content.strip()) + if match: + expert_list_str = match.group(1) # e.g., "analyst,strategist" or None + task = match.group(2).strip() # The actual task content + + result.matched = True + result.team_mode = True + result.task_content = task if task else content # Fall back to full content when no task after prefix + result.match_method = "explicit_team" + + if expert_list_str: + # User specified expert names — validate and limit + raw_names = [name.strip() for name in expert_list_str.split(",")] + valid_names = [n for n in raw_names if _EXPERT_NAME_RE.match(n)] + if len(valid_names) != len(raw_names): + invalid = set(raw_names) - set(valid_names) + logger.warning(f"Invalid expert names rejected: {invalid}") + result.specified_experts = valid_names[:MAX_EXPERTS] + result.auto_compose = False + + # Validate that specified templates exist + for name in result.specified_experts: + template = self._registry.get(name) + if template is None: + logger.warning(f"ExpertTemplate '{name}' not found, will be dynamically generated") + else: + # No specific experts — auto-compose + result.auto_compose = True + + return result + + # Check complexity-based suggestion + if complexity >= self.COMPLEXITY_THRESHOLD: + result.matched = True + result.team_mode = True + result.auto_compose = True + result.complexity = complexity + result.task_content = content + result.match_method = "complexity_suggestion" + return result + + # Not a team mode request + result.matched = False + result.team_mode = False + result.task_content = content + result.complexity = complexity + return result + + def resolve_expert_configs(self, specified_experts: list[str]) -> list[ExpertConfig]: + """Resolve expert names to ExpertConfig instances. + + For names that match templates, use the template config. + For names that don't match, create a dynamic ExpertConfig with the name as persona. + The first expert is designated as lead. + """ + configs = [] + for i, name in enumerate(specified_experts): + # Validate name to prevent prompt injection + if not _EXPERT_NAME_RE.match(name): + logger.warning(f"Skipping invalid expert name: {name}") + continue + + template = self._registry.get(name) + if template: + configs.append(template.config) + else: + # Dynamic generation — create a basic ExpertConfig + # Name is validated above, safe to use in persona + config = ExpertConfig( + name=name, + agent_type="expert", + persona=f"Expert in {name}", + thinking_style="analytical", + bound_skills=[], + is_lead=(i == 0 and not any(c.is_lead for c in configs)), + task_mode="llm_generate", + prompt={"identity": f"Expert in {name}"}, + ) + configs.append(config) + + # Ensure at least one expert is lead + if configs and not any(c.is_lead for c in configs): + configs[0].is_lead = True + + return configs diff --git a/src/agentkit/experts/team.py b/src/agentkit/experts/team.py new file mode 100644 index 0000000..93319cf --- /dev/null +++ b/src/agentkit/experts/team.py @@ -0,0 +1,298 @@ +"""ExpertTeam - 专家团队容器 + +管理 Expert 生命周期、共享上下文、协作计划和团队状态, +是 Expert Team 协作模式的中央协调点。 +""" + +from __future__ import annotations + +import asyncio +import enum +import logging +import time +import uuid + +from .config import ExpertConfig +from .expert import Expert +from .plan import CollaborationPlan, PlanStatus +from .registry import ExpertTemplateRegistry +from ..core.handoff_transport import InProcessHandoffTransport +from ..core.shared_workspace import SharedWorkspace +from ..core.agent_pool import AgentPool + +logger = logging.getLogger(__name__) + + +class TeamStatus(str, enum.Enum): + """ExpertTeam lifecycle states.""" + + FORMING = "forming" + PLANNING = "planning" + EXECUTING = "executing" + SYNTHESIZING = "synthesizing" + COMPLETED = "completed" + DISSOLVED = "dissolved" + + +class ExpertTeam: + """Container managing a team of Experts working together on a task.""" + + def __init__( + self, + team_id: str | None = None, + workspace: SharedWorkspace | None = None, + pool: AgentPool | None = None, + template_registry: ExpertTemplateRegistry | None = None, + ): + self.team_id = team_id or str(uuid.uuid4()) + self._workspace = workspace or SharedWorkspace() + self._pool = pool + self._template_registry = template_registry or ExpertTemplateRegistry() + self._handoff_transport = InProcessHandoffTransport() + self._experts: dict[str, Expert] = {} + self._lead_expert_name: str | None = None + self._plan: CollaborationPlan | None = None + self._status = TeamStatus.FORMING + self._team_channel = f"team:{self.team_id}" + self._orchestrator_task: asyncio.Task | None = None + + @property + def status(self) -> TeamStatus: + return self._status + + @property + def lead_expert(self) -> Expert | None: + if self._lead_expert_name: + return self._experts.get(self._lead_expert_name) + return None + + @property + def plan(self) -> CollaborationPlan | None: + return self._plan + + @property + def experts(self) -> list[Expert]: + return list(self._experts.values()) + + @property + def active_experts(self) -> list[Expert]: + return [e for e in self._experts.values() if e.is_active] + + async def create_team( + self, + lead_config: ExpertConfig, + member_configs: list[ExpertConfig] | None = None, + ) -> None: + """Create a team with a Lead Expert and optional members.""" + if not self._pool: + raise RuntimeError("AgentPool not configured") + + # Create Lead Expert + team_context = self._build_team_context(lead_config, member_configs or []) + lead = await Expert.create( + config=lead_config, + pool=self._pool, + handoff_transport=self._handoff_transport, + workspace=self._workspace, + team_context=team_context, + ) + lead.team_id = self.team_id + self._experts[lead_config.name] = lead + self._lead_expert_name = lead_config.name + + # Create member Experts + if member_configs: + for config in member_configs: + await self._add_expert_internal(config, team_context) + + self._status = TeamStatus.PLANNING + + async def add_expert(self, config_or_template: ExpertConfig | str) -> Expert: + """Add an Expert to the team dynamically. + + Args: + config_or_template: ExpertConfig instance or template name to look up + """ + if isinstance(config_or_template, str): + template = self._template_registry.get(config_or_template) + if template is None: + raise ValueError(f"ExpertTemplate '{config_or_template}' not found") + config = template.config + else: + config = config_or_template + + # Safely get lead config — _lead_expert_name may be stale + lead_config: ExpertConfig | None = None + if self._lead_expert_name and self._lead_expert_name in self._experts: + lead_config = self._experts[self._lead_expert_name].config + + team_context = self._build_team_context( + lead_config, + [e.config for e in self.active_experts], + ) + return await self._add_expert_internal(config, team_context) + + async def _add_expert_internal( + self, config: ExpertConfig, team_context: str + ) -> Expert: + """Internal method to add an Expert.""" + if not self._pool: + raise RuntimeError("AgentPool not configured") + + expert = await Expert.create( + config=config, + pool=self._pool, + handoff_transport=self._handoff_transport, + workspace=self._workspace, + team_context=team_context, + ) + expert.team_id = self.team_id + self._experts[config.name] = expert + + # Broadcast new expert joined + await self._handoff_transport.send( + self._team_channel, + { + "type": "expert_joined", + "expert_name": config.name, + "capabilities": expert.get_capabilities_summary(), + }, + ) + + return expert + + async def remove_expert(self, name: str) -> None: + """Remove an Expert from the team.""" + expert = self._experts.get(name) + if not expert: + return + + # Cannot remove Lead Expert — must reassign first + if name == self._lead_expert_name: + active = [e for e in self.active_experts if e.config.name != name] + if active: + # Reassign lead to first active expert + new_lead = active[0] + self._lead_expert_name = new_lead.config.name + new_lead.config.is_lead = True + else: + self._lead_expert_name = None + + await expert.destroy(self._pool) + del self._experts[name] + + # Update plan: reassign phases that referenced the removed expert + if self._plan: + new_lead_name = self._lead_expert_name + for phase in self._plan.phases: + if phase.assigned_expert == name: + phase.assigned_expert = new_lead_name or "" + + # Broadcast expert left + await self._handoff_transport.send( + self._team_channel, + { + "type": "expert_left", + "expert_name": name, + }, + ) + + def update_plan(self, plan: CollaborationPlan) -> list[str]: + """Update the collaboration plan. Only Lead Expert or user should call this. + + Returns list of affected expert names on success, or list of validation + error strings on failure (empty list with no errors = success). + """ + errors = plan.validate() + if errors: + return errors # Return validation errors instead of silently swallowing + + self._plan = plan + if plan.status == PlanStatus.CONFIRMED: + self._status = TeamStatus.EXECUTING + + # Determine affected experts + affected = [p.assigned_expert for p in plan.phases] + return affected + + async def broadcast_user_message(self, content: str) -> None: + """Broadcast a user intervention message to all active Experts.""" + message = { + "type": "user_intervention", + "content": content, + "timestamp": time.time(), + } + await self._handoff_transport.send(self._team_channel, message) + + async def get_shared_context(self) -> dict: + """Get the team's shared context from SharedWorkspace.""" + context = {} + keys = await self._workspace.list_keys() + for key in keys: + if key.startswith(f"team:{self.team_id}"): + data = await self._workspace.read(key) + if data: + context[key] = data + return context + + async def generate_plan(self, task: str) -> CollaborationPlan: + """Generate a CollaborationPlan for the task. + + Uses hybrid mode: core roles from template registry, auxiliary roles dynamically generated. + This method creates a plan structure — the actual LLM-based task decomposition + will be handled by TeamOrchestrator. + """ + plan_id = str(uuid.uuid4()) + plan = CollaborationPlan( + id=plan_id, + task=task, + phases=[], + lead_expert=self._lead_expert_name or "", + ) + self._plan = plan + return plan + + async def dissolve(self) -> None: + """Dissolve the team. Temporary Experts are recycled, outputs preserved in SharedWorkspace.""" + # Cancel ongoing orchestrator task if any + if self._orchestrator_task and not self._orchestrator_task.done(): + self._orchestrator_task.cancel() + try: + await self._orchestrator_task + except asyncio.CancelledError: + pass + self._orchestrator_task = None + + for expert in self._experts.values(): + if expert.is_active and self._pool: + await expert.destroy(self._pool) + + self._experts.clear() + self._lead_expert_name = None + self._status = TeamStatus.DISSOLVED + + # Close handoff transport + self._handoff_transport.close() + + def _build_team_context( + self, + lead_config: ExpertConfig | None, + member_configs: list[ExpertConfig], + ) -> str: + """Build team context string for injection into Expert system prompts.""" + lines = ["You are part of an Expert Team."] + + if lead_config: + lines.append(f"Lead Expert: {lead_config.name} ({lead_config.persona})") + + for config in member_configs: + if lead_config and config.name == lead_config.name: + continue + lines.append( + f"Team Member: {config.name} ({config.persona}), Skills: {', '.join(config.bound_skills)}" + ) + + lines.append( + "You can collaborate with other team members via send_message() and request_assist()." + ) + return "\n".join(lines) diff --git a/src/agentkit/server/frontend/src/api/types.ts b/src/agentkit/server/frontend/src/api/types.ts index 5324e42..a4c11c4 100644 --- a/src/agentkit/server/frontend/src/api/types.ts +++ b/src/agentkit/server/frontend/src/api/types.ts @@ -40,6 +40,10 @@ export interface IChatMessage { task_id?: string status?: 'completed' | 'pending' tool_calls?: IToolCallData[] + expert_id?: string + expert_name?: string + expert_color?: string + message_type?: 'chat' | 'handoff' | 'assist_request' | 'plan_update' | 'milestone' } /** Conversation with messages */ @@ -76,10 +80,55 @@ export type WsClientMessage = { /** WebSocket server message types — matches backend portal.py protocol */ export type WsServerMessage = + | { type: 'connected'; session_id: string } | { type: 'routing'; skill: string; confidence: number; method: string } + | { type: 'skill_match'; data: { skill: string; method: string; confidence: number } } | { type: 'step'; data: { event_type: string; step: number; data: Record; timestamp: string } } + | { type: 'thinking'; content: string } + | { type: 'token'; content: string } + | { type: 'final_answer'; content: string; is_final: boolean } | { type: 'result'; data: { message?: string; content?: string; status?: string } } | { type: 'error'; data: { message: string; code?: string } } + | { type: 'pong' } + | { type: 'confirmation_request'; data: { confirmation_id: string; command: string; reason: string } } + | { type: 'confirmation_result'; data: { confirmation_id: string; approved: boolean } } + | { type: 'team_formed'; data: IExpertTeamState } + | { type: 'expert_step'; data: { expert_id: string; expert_name: string; expert_color: string; content: string; step: number } } + | { type: 'expert_result'; data: { expert_id: string; expert_name: string; expert_color: string; content: string } } + | { type: 'plan_update'; data: { plan_phases: ITeamPlanPhase[] } } + | { type: 'team_synthesis'; data: { content: string } } + | { type: 'team_dissolved'; data: { team_id: string } } + +/** Expert info within a team */ +export interface IExpertInfo { + id: string + name: string + persona: string + avatar: string + color: string + is_lead: boolean + bound_skills: string[] + status: 'active' | 'inactive' +} + +/** A phase within a team plan */ +export interface ITeamPlanPhase { + id: string + name: string + assigned_expert: string + status: 'pending' | 'in_progress' | 'completed' | 'failed' + parallel_type: 'serial' | 'subtask_parallel' | 'competitive_parallel' + milestone: string +} + +/** Expert team state */ +export interface IExpertTeamState { + team_id: string + status: 'forming' | 'planning' | 'executing' | 'synthesizing' | 'completed' | 'dissolved' + experts: IExpertInfo[] + plan_phases: ITeamPlanPhase[] + lead_expert: string +} /** API error */ export interface IApiError { diff --git a/src/agentkit/server/frontend/src/components/chat/ChatMessage.vue b/src/agentkit/server/frontend/src/components/chat/ChatMessage.vue index 4190fe4..50cd7cc 100644 --- a/src/agentkit/server/frontend/src/components/chat/ChatMessage.vue +++ b/src/agentkit/server/frontend/src/components/chat/ChatMessage.vue @@ -17,6 +17,21 @@
+ + +
+
+ +
+
+ +
{{ formattedTime }}
@@ -91,6 +107,7 @@ import { RobotOutlined, UserOutlined, ThunderboltOutlined } from '@ant-design/ic import type { IChatMessage } from '@/api/types' import ToolCallIndicator from './ToolCallIndicator.vue' import ToolCallCard from './ToolCallCard.vue' +import ExpertMessage from './ExpertMessage.vue' const md = new MarkdownIt({ html: false, diff --git a/src/agentkit/server/frontend/src/components/chat/ExpertMessage.vue b/src/agentkit/server/frontend/src/components/chat/ExpertMessage.vue new file mode 100644 index 0000000..05f4b57 --- /dev/null +++ b/src/agentkit/server/frontend/src/components/chat/ExpertMessage.vue @@ -0,0 +1,94 @@ + + + + + diff --git a/src/agentkit/server/frontend/src/components/chat/ExpertTeamView.vue b/src/agentkit/server/frontend/src/components/chat/ExpertTeamView.vue new file mode 100644 index 0000000..fed4c08 --- /dev/null +++ b/src/agentkit/server/frontend/src/components/chat/ExpertTeamView.vue @@ -0,0 +1,177 @@ + + + + + diff --git a/src/agentkit/server/frontend/src/components/chat/PlanVisualization.vue b/src/agentkit/server/frontend/src/components/chat/PlanVisualization.vue new file mode 100644 index 0000000..4309446 --- /dev/null +++ b/src/agentkit/server/frontend/src/components/chat/PlanVisualization.vue @@ -0,0 +1,89 @@ + + + + + diff --git a/src/agentkit/server/frontend/src/stores/chat.ts b/src/agentkit/server/frontend/src/stores/chat.ts index b91c346..bde3a43 100644 --- a/src/agentkit/server/frontend/src/stores/chat.ts +++ b/src/agentkit/server/frontend/src/stores/chat.ts @@ -1,11 +1,14 @@ import { defineStore } from 'pinia' import { ref, computed } from 'vue' import { apiClient } from '@/api/client' +import { useTeamStore } from '@/stores/team' import type { IChatMessage, IConversation, IChatRequest, WsClientMessage, + IExpertTeamState, + ITeamPlanPhase, } from '@/api/types' function generateId(): string { @@ -238,6 +241,15 @@ export const useChatStore = defineStore('chat', () => { // --- Internal helpers --- + /** Get team store lazily — safe to call inside actions after Pinia is installed */ + let _teamStore: ReturnType | null = null + function _getTeamStore() { + if (!_teamStore) { + _teamStore = useTeamStore() + } + return _teamStore + } + function handleWsMessage(data: Record): void { // Backend sends nested data: {type, data: {...}} // Flatten for easier access @@ -393,6 +405,100 @@ export const useChatStore = defineStore('chat', () => { streamingSteps.value = [] break } + + case 'team_formed': { + const teamStore = _getTeamStore() + if (teamStore) { + teamStore.setTeamState(payload as IExpertTeamState) + } + streamingSteps.value.push(`专家团队已组建: ${(payload as IExpertTeamState).experts.map((e) => e.name).join(', ')}`) + break + } + + case 'expert_step': { + const conversationId = currentConversationId.value + if (!conversationId) break + const conv = conversations.value.find((c) => c.id === conversationId) + if (!conv) break + // Dedup: append to existing expert message if one exists for this expert + const existingExpertMsg = [...conv.messages] + .reverse() + .find((m) => m.expert_id === payload.expert_id && m.status === 'pending') + if (existingExpertMsg) { + updateMessage(conversationId, existingExpertMsg.id, { + content: (existingExpertMsg.content || '') + (payload.content || ''), + }) + } else { + const expertMsg: IChatMessage = { + id: generateId(), + role: 'assistant', + content: payload.content || '', + timestamp: new Date().toISOString(), + status: 'pending', + expert_id: payload.expert_id, + expert_name: payload.expert_name, + expert_color: payload.expert_color, + message_type: 'chat', + } + appendMessage(conversationId, expertMsg) + } + streamingSteps.value.push(`${payload.expert_name}: 步骤 ${payload.step}`) + break + } + + case 'expert_result': { + const conversationId = currentConversationId.value + if (!conversationId) break + const conv = conversations.value.find((c) => c.id === conversationId) + if (!conv) break + const expertMsg: IChatMessage = { + id: generateId(), + role: 'assistant', + content: payload.content || '', + timestamp: new Date().toISOString(), + status: 'completed', + expert_id: payload.expert_id, + expert_name: payload.expert_name, + expert_color: payload.expert_color, + message_type: 'chat', + } + appendMessage(conversationId, expertMsg) + break + } + + case 'plan_update': { + const teamStore = _getTeamStore() + if (teamStore) { + teamStore.updatePhases(payload.plan_phases) + } + break + } + + case 'team_synthesis': { + const conversationId = currentConversationId.value + if (!conversationId) break + const conv = conversations.value.find((c) => c.id === conversationId) + if (!conv) break + const synthesisMsg: IChatMessage = { + id: generateId(), + role: 'assistant', + content: payload.content || '', + timestamp: new Date().toISOString(), + status: 'completed', + message_type: 'milestone', + } + appendMessage(conversationId, synthesisMsg) + break + } + + case 'team_dissolved': { + const teamStore = _getTeamStore() + if (teamStore) { + teamStore.clearTeam() + } + streamingSteps.value.push('专家团队已解散') + break + } } } diff --git a/src/agentkit/server/frontend/src/stores/team.ts b/src/agentkit/server/frontend/src/stores/team.ts new file mode 100644 index 0000000..6921a1f --- /dev/null +++ b/src/agentkit/server/frontend/src/stores/team.ts @@ -0,0 +1,55 @@ +import { defineStore } from 'pinia' +import { ref, computed } from 'vue' +import type { IExpertTeamState, ITeamPlanPhase } from '@/api/types' + +export const useTeamStore = defineStore('team', () => { + const teamState = ref(null) + const selectedExpertId = ref(null) + + const activeExperts = computed(() => + teamState.value?.experts.filter(e => e.status === 'active') || [] + ) + + const leadExpert = computed(() => + teamState.value?.experts.find(e => e.is_lead) || null + ) + + const isTeamMode = computed(() => + teamState.value !== null && teamState.value.status !== 'dissolved' + ) + + const currentPhase = computed(() => { + if (!teamState.value) return null + return teamState.value.plan_phases.find(p => p.status === 'in_progress') || null + }) + + const completedPhases = computed(() => + teamState.value?.plan_phases.filter(p => p.status === 'completed') || [] + ) + + function setTeamState(state: IExpertTeamState) { + teamState.value = state + } + + function updatePhases(phases: ITeamPlanPhase[]) { + if (teamState.value) { + // Reassign the whole object to trigger Vue reactivity + teamState.value = { ...teamState.value, plan_phases: phases } + } + } + + function selectExpert(expertId: string | null) { + selectedExpertId.value = expertId + } + + function clearTeam() { + teamState.value = null + selectedExpertId.value = null + } + + return { + teamState, selectedExpertId, activeExperts, leadExpert, + isTeamMode, currentPhase, completedPhases, + setTeamState, updatePhases, selectExpert, clearTeam + } +}) diff --git a/src/agentkit/server/frontend/src/views/ChatView.vue b/src/agentkit/server/frontend/src/views/ChatView.vue index 7896647..86c98fc 100644 --- a/src/agentkit/server/frontend/src/views/ChatView.vue +++ b/src/agentkit/server/frontend/src/views/ChatView.vue @@ -16,6 +16,7 @@