diff --git a/AGENTS.md b/AGENTS.md index 8572622..7830293 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -152,7 +152,7 @@ Config search: `--config` path > `./agentkit.yaml` > `~/.agentkit/agentkit.yaml` ## Conventions - Skill configs: `configs/skills/*.yaml` (15 presets) -- LLM configs: `configs/llm_config.yaml` +- LLM configs: `agentkit.yaml` llm section (unified with server config) - Pipeline configs: `configs/pipelines/*.yaml` - Expert templates: registered via `ExpertTemplateRegistry` - All Pydantic models use `model_config = ConfigDict(...)` not `class Config` diff --git a/configs/geo_server.py b/configs/geo_server.py index 9b62e0a..65a89fb 100644 --- a/configs/geo_server.py +++ b/configs/geo_server.py @@ -1,23 +1,21 @@ """GEO AgentKit Server 启动入口 -工厂函数 create_geo_app() 初始化 LLM Gateway、Tool Registry、Skill Registry, -然后创建 FastAPI 应用。 +工厂函数 create_geo_app() 使用 agentkit.yaml 统一配置, +初始化 LLM Gateway、Tool Registry、Skill Registry,然后创建 FastAPI 应用。 使用方式: uvicorn configs.geo_server:create_geo_app --factory --host 0.0.0.0 --port 8001 """ +from __future__ import annotations + import logging import os -from agentkit.core.agent_pool import AgentPool -from agentkit.llm.config import LLMConfig -from agentkit.llm.gateway import LLMGateway -from agentkit.llm.providers.openai import OpenAICompatibleProvider -from agentkit.quality.gate import QualityGate -from agentkit.quality.output import OutputStandardizer -from agentkit.router.intent import IntentRouter -from agentkit.server.app import create_app +from fastapi import FastAPI + +from agentkit.server.app import _build_llm_gateway, create_app +from agentkit.server.config import ServerConfig from agentkit.skills.loader import SkillLoader from agentkit.skills.registry import SkillRegistry from agentkit.tools.registry import ToolRegistry @@ -27,58 +25,25 @@ logger = logging.getLogger(__name__) # ─── 配置路径 ─── CONFIGS_DIR = os.path.dirname(os.path.abspath(__file__)) -LLM_CONFIG_PATH = os.path.join(CONFIGS_DIR, "llm_config.yaml") SKILLS_DIR = os.path.join(CONFIGS_DIR, "skills") - -def _substitute_env_vars(config_path: str) -> dict: - """加载 YAML 配置并替换 ${VAR} 环境变量""" - import yaml - - with open(config_path, encoding="utf-8") as f: - raw = f.read() - - # 递归替换 ${VAR_NAME} 和 ${VAR_NAME:-default} 格式 - import re - def _replace_env(match): - var_expr = match.group(1) - if ":-" in var_expr: - var_name, default = var_expr.split(":-", 1) - return os.getenv(var_name, default) - return os.getenv(var_expr, match.group(0)) - - resolved = re.sub(r"\$\{([^}]+)\}", _replace_env, raw) - return yaml.safe_load(resolved) +# 查找 agentkit.yaml:项目根目录 > configs 目录 +_PROJECT_ROOT = os.path.dirname(CONFIGS_DIR) +_AGENTKIT_YAML = os.path.join(_PROJECT_ROOT, "agentkit.yaml") -def _init_llm_gateway() -> LLMGateway: - """初始化 LLM Gateway 并注册 Provider""" - config_data = _substitute_env_vars(LLM_CONFIG_PATH) - config = LLMConfig.from_dict(config_data) - - gateway = LLMGateway(config) - - for provider_name, pconf in config.providers.items(): - if not pconf.api_key: - logger.warning(f"Skipping provider '{provider_name}': no API key") - continue - models = list(pconf.models.keys()) if pconf.models else [] - default_model = models[0] if models else "gpt-4o-mini" - provider = OpenAICompatibleProvider( - api_key=pconf.api_key, - base_url=pconf.base_url, - default_model=default_model, - ) - gateway.register_provider(provider_name, provider) - logger.info(f"Provider '{provider_name}' registered with model '{default_model}'") - - return gateway +def _load_server_config() -> ServerConfig: + """Load ServerConfig from agentkit.yaml with env var resolution.""" + if os.path.isfile(_AGENTKIT_YAML): + return ServerConfig.from_yaml(_AGENTKIT_YAML) + raise FileNotFoundError(f"agentkit.yaml not found at {_AGENTKIT_YAML}") def _init_tool_registry() -> ToolRegistry: """初始化 Tool Registry 并注册 GEO Tools""" registry = ToolRegistry() from configs.geo_tools import register_geo_tools + register_geo_tools(registry) return registry @@ -92,9 +57,10 @@ def _init_skill_registry(tool_registry: ToolRegistry) -> SkillRegistry: return registry -def create_geo_app() -> "FastAPI": +def create_geo_app() -> FastAPI: """GEO AgentKit Server FastAPI 工厂函数""" - llm_gateway = _init_llm_gateway() + config = _load_server_config() + llm_gateway = _build_llm_gateway(config) tool_registry = _init_tool_registry() skill_registry = _init_skill_registry(tool_registry) @@ -105,7 +71,9 @@ def create_geo_app() -> "FastAPI": ) app.title = "GEO AgentKit Server" - logger.info(f"GEO AgentKit Server initialized: {len(skill_registry.list_skills())} skills, " - f"{len(tool_registry.list_tools())} tools") + logger.info( + f"GEO AgentKit Server initialized: {len(skill_registry.list_skills())} skills, " + f"{len(tool_registry.list_tools())} tools" + ) return app diff --git a/configs/llm_config.yaml b/configs/llm_config.yaml deleted file mode 100644 index 49ef15e..0000000 --- a/configs/llm_config.yaml +++ /dev/null @@ -1,31 +0,0 @@ -# LLM Provider 配置 — 仅 Docker/GEO 部署模式使用 -# 标准 CLI 模式 (agentkit serve/gui/chat) 使用 agentkit.yaml 的 llm 段 -# 环境变量替换:${VAR_NAME} 由 geo_server._substitute_env_vars() 处理 - -providers: - dashscope: - api_key: "${DASHSCOPE_API_KEY}" - base_url: "${DASHSCOPE_BASE_URL:-https://coding.dashscope.aliyuncs.com/v1}" - models: - qwen3-coder-plus: - max_tokens: 64000 - cost_per_1k_input: 0.00014 - cost_per_1k_output: 0.00028 - qwen-plus: - max_tokens: 128000 - cost_per_1k_input: 0.0008 - cost_per_1k_output: 0.002 - qwen3-max: - max_tokens: 128000 - cost_per_1k_input: 0.002 - cost_per_1k_output: 0.006 - qwen-turbo: - max_tokens: 128000 - cost_per_1k_input: 0.0003 - cost_per_1k_output: 0.0006 - -model_aliases: - default: "dashscope/qwen3-coder-plus" - fast: "dashscope/qwen-turbo" - powerful: "dashscope/qwen3-max" - chat: "dashscope/qwen-plus" diff --git a/docs/brainstorms/2026-06-15-autonomous-task-execution-requirements.md b/docs/brainstorms/2026-06-15-autonomous-task-execution-requirements.md new file mode 100644 index 0000000..b191f1c --- /dev/null +++ b/docs/brainstorms/2026-06-15-autonomous-task-execution-requirements.md @@ -0,0 +1,142 @@ +--- +date: 2026-06-15 +topic: autonomous-task-execution +--- + +## Summary + +打通 PlanExecEngine 和 TeamOrchestrator 的执行层,让 AgentKit 能真正自主拆解和执行多步任务。用户一句话描述复杂需求,Agent 自主生成可执行计划、逐步调用工具、返回完整结果。 + +## Problem Frame + +AgentKit 已有完整的任务规划框架骨架——ReAct/PlanExec/ReWOO/Reflexion 四种推理引擎、TeamOrchestrator 多 Agent 协作、PipelineEngine DAG 编排——但执行层未跑通。TeamOrchestrator 的 `_execute_phase` 返回模拟字符串,PlanExecEngine 的步骤执行器功能简单,SharedWorkspace 未集成到执行层。结果是用户提出复杂需求后,Agent 只能简单对话回复,无法真正拆解执行。当前最需要的是把已有框架跑通,而不是新建能力。 + +--- + +## Key Decisions + +**执行层对接优先于新能力建设。** 现有四种引擎和 TeamOrchestrator 框架已完整,最关键的差距是执行层模拟代码未替换为真实 Agent 调用。先跑通再迭代。 + +**验证场景选多步研究任务。** "帮我分析竞品并生成报告"这类任务天然需要搜索→分析→生成三步闭环,且不依赖本地开发环境,最能体现自主执行能力。 + +**步骤间状态通过 SharedWorkspace 传递。** PlanExecEngine 已有 `dependency_results` 机制但仅通过 prompt 注入,需要结构化状态管理。SharedWorkspace 已存在但未在执行层集成。 + +--- + +## Requirements + +**执行层打通** + +- R1. TeamOrchestrator._execute_phase 调用 Agent.execute() 执行真实任务,替代当前返回模拟字符串的实现 +- R2. PlanExecEngine._LLMStepAgent 升级为完整 ReAct 循环执行器,支持工具调用和多步推理 +- R3. SharedWorkspace 集成到 PlanExecEngine 和 TeamOrchestrator 执行层,步骤间可读写结构化状态 +- R4. GoalPlanner 的 LLM prompt 调优,确保任务分解质量(子任务可执行、依赖关系正确、无遗漏) + +**执行闭环** + +- R5. 每个执行步骤完成后,结果自动写入 SharedWorkspace 并通知下游依赖步骤 +- R6. 步骤执行失败时,PlanExecEngine 触发重规划(已有 PipelineReflector/PipelineReplanner,需集成) +- R7. TeamOrchestrator 的 COMPETITIVE_PARALLEL 模式下,合并策略(BEST/VOTE/FUSION)从真实执行结果中选择 + +**验证场景** + +- R8. 多步研究任务端到端验证:用户输入"分析 X 竞品并生成报告",Agent 自主拆解为搜索→分析→生成三步,逐步执行并返回完整报告 +- R9. 执行过程通过 WebSocket 实时推送进度事件(plan_created / step_started / step_completed / plan_completed) + +--- + +## Key Flows + +- F1. 自主任务执行主流程 + - **Trigger:** 用户提交复杂任务(复杂度 > 0.7,路由到 TEAM_COLLAB 或 SKILL_REACT) + - **Actors:** 用户, CostAwareRouter, GoalPlanner, PlanExecEngine/TeamOrchestrator, SharedWorkspace + - **Steps:** + 1. CostAwareRouter 路由到合适的执行模式 + 2. GoalPlanner 将目标分解为子任务计划(含依赖关系) + 3. PlanExecEngine 按依赖拓扑执行子任务,每个子任务通过 ReAct 循环调用工具 + 4. 每步结果写入 SharedWorkspace,触发下游就绪检查 + 5. 全部完成后综合结果返回用户 + - **Outcome:** 用户收到完整的任务执行结果,过程可观测 + +- F2. 执行失败重规划 + - **Trigger:** 某步骤执行失败(工具调用异常/LLM 返回无效结果/超时) + - **Actors:** PlanExecEngine, PipelineReflector, PipelineReplanner + - **Steps:** + 1. PlanExecEngine 捕获步骤失败 + 2. PipelineReflector 分析失败原因 + 3. PipelineReplanner 生成修正后的计划片段 + 4. PlanExecEngine 用修正计划替换失败步骤,继续执行 + - **Outcome:** 任务从失败中恢复,无需用户手动干预 + +--- + +## Acceptance Examples + +- AE1. **多步研究任务成功执行** + - **Covers R1, R2, R3, R5, R8.** + - **Given:** 用户输入"分析飞书和钉钉的竞品对比并生成报告" + - **When:** Agent 自主执行搜索→分析→生成三步 + - **Then:** 返回包含竞品对比分析的完整报告,每步结果可在 SharedWorkspace 中追溯 + +- AE2. **步骤失败自动重规划** + - **Covers R6.** + - **Given:** Agent 执行"搜索竞品信息"步骤时搜索工具返回空结果 + - **When:** PlanExecEngine 触发重规划 + - **Then:** Agent 调整策略(如换搜索关键词/换搜索工具),重新执行该步骤 + +- AE3. **团队竞争模式真实执行** + - **Covers R1, R7.** + - **Given:** 两个 Expert 竞争执行同一分析任务 + - **When:** 各自独立执行并返回结果 + - **Then:** Lead Expert 根据合并策略选择/融合最佳结果 + +--- + +## Success Criteria + +- 多步研究任务端到端成功率 > 80%(任务完成且结果包含所有子步骤输出) +- 执行过程通过 WebSocket 实时可观测(每个步骤有 started/completed 事件) +- 步骤失败时自动重规划成功率 > 50%(至少一次重规划后任务完成) + +--- + +## Scope Boundaries + +**Deferred for later:** +- 执行持久化与断点恢复(集成 PipelineState 到 PlanExecEngine) +- 自适应执行监控(token 预算控制、耗时趋势、策略动态调整) +- 人机协作规划 UI(用户实时调整计划) +- 计划模板库(复用历史成功计划) + +**Outside this scope:** +- 动态工具发现与运行时组合(Agent 自主发现新工具) +- 跨任务长期记忆(任务间经验迁移) +- 多层级嵌套计划(子计划递归分解) + +--- + +## Dependencies / Assumptions + +- LLM Gateway 已配置且可用(至少一个 provider 有有效 API key) +- 搜索工具(WebSearchTool/BaiduSearchTool)已注册且可用 +- SharedWorkspace 数据结构已定义(需确认与现有 `expert_team.SharedWorkspace` 的兼容性) +- GoalPlanner 的 LLM 分解质量足够支撑多步任务(可能需要 prompt 迭代) + +--- + +## Outstanding Questions + +- **Resolve Before Planning:** SharedWorkspace 的现有实现(`expert_team.SharedWorkspace`)是否可直接复用于 PlanExecEngine,还是需要新建? +- **Deferred to Planning:** TeamOrchestrator 的 `_execute_phase` 如何与 Agent.execute() 的异步签名对接(execute 是 async 方法,当前 _execute_phase 也是 async) + +--- + +## Sources / Research + +- `src/agentkit/core/react.py` — ReActEngine 完整实现,think-act-observe 循环 +- `src/agentkit/core/plan_exec_engine.py` — PlanExecEngine,含 _LLMStepAgent 和 dependency_results +- `src/agentkit/experts/orchestrator.py` — TeamOrchestrator,_execute_phase 为模拟代码 +- `src/agentkit/experts/team.py` — ExpertTeam 和 SharedWorkspace 定义 +- `src/agentkit/orchestrator/pipeline_engine.py` — PipelineEngine,含反思-重规划闭环 +- `src/agentkit/orchestrator/reflection.py` — PipelineReflector / PipelineReplanner +- `src/agentkit/core/goal_planner.py` — GoalPlanner,规则+LLM混合分解 diff --git a/docs/plans/2026-06-15-001-feat-autonomous-task-execution-plan.md b/docs/plans/2026-06-15-001-feat-autonomous-task-execution-plan.md new file mode 100644 index 0000000..0c1900b --- /dev/null +++ b/docs/plans/2026-06-15-001-feat-autonomous-task-execution-plan.md @@ -0,0 +1,402 @@ +--- +date: 2026-06-15 +status: active +origin: docs/brainstorms/2026-06-15-autonomous-task-execution-requirements.md +--- + +## Summary + +打通 PlanExecEngine 和 TeamOrchestrator 的执行层,将模拟代码替换为真实的 Agent/ReActEngine 调用,集成 SharedWorkspace 实现步骤间状态传递,并添加 WebSocket 进度事件。用多步研究任务端到端验证闭环。 + +## Problem Frame + +AgentKit 的任务规划框架骨架完整(四种推理引擎 + TeamOrchestrator + PipelineEngine),但执行层未跑通:`_execute_phase` 返回模拟字符串,`_LLMStepAgent` 只做单次 LLM 调用不支持工具,SharedWorkspace 未集成到执行层。用户提出复杂需求后 Agent 无法真正拆解执行。本计划将已有框架跑通,而非新建能力。 + +--- + +## Key Technical Decisions + +**KTD-1. _LLMStepAgent 替换为 ReActStepExecutor。** 现有 `_LLMStepAgent` 只做单次 `llm_gateway.chat()` 调用,不支持工具。新建 `ReActStepExecutor` 类,内部创建 `ReActEngine` 实例执行步骤,支持工具调用和多步推理。保留 `_LLMStepAgent` 作为无工具场景的轻量回退。 + +**KTD-2. SharedWorkspace 直接复用。** 现有 `SharedWorkspace`(`core/shared_workspace.py`)是通用 key-value 存储,支持版本控制和分布式锁。PlanExecEngine 直接注入 SharedWorkspace 实例,步骤结果写入 `plan:{plan_id}:step:{step_id}:result`,无需新建状态管理。 + +**KTD-3. TeamOrchestrator 通过 Expert.agent.execute() 执行。** `Expert.agent` 是 `ConfigDrivenAgent` 实例,其 `execute(TaskMessage)` 是 final 方法,内部根据 execution_mode 选择 ReAct/PlanExec/ReWOO/Reflexion 引擎。直接调用即可,无需手动创建 ReActEngine。 + +**KTD-4. 进度事件通过 HandoffTransport -> WebSocket 桥接。** TeamOrchestrator 已通过 `_broadcast_event` 向 HandoffTransport 发送事件。在 Chat WebSocket handler 中注册 HandoffTransport handler,将 team 事件转发为 WebSocket 消息。PlanExecEngine 的步骤事件通过回调函数注入。 + +--- + +## Requirements Trace + +| R-ID | Implementation Units | +|------|---------------------| +| R1 | U1, U2 | +| R2 | U3 | +| R3 | U4 | +| R4 | U5 | +| R5 | U4 | +| R6 | U6 | +| R7 | U2 | +| R8 | U7 | +| R9 | U8 | + +--- + +## High-Level Technical Design + +```mermaid +flowchart TB + subgraph User Request + A[用户输入复杂任务] --> B[CostAwareRouter] + end + + subgraph Routing + B -->|complexity > 0.7| C[TEAM_COLLAB] + B -->|0.3-0.7| D[SKILL_REACT / REACT] + end + + subgraph PlanExecEngine Path + D --> E[GoalPlanner] + E --> F[ExecutionPlan] + F --> G[ReActStepExecutor] + G -->|per step| H[ReActEngine.execute] + H --> I[Tool Calls] + I --> J[SharedWorkspace.write] + J -->|next step| G + end + + subgraph TeamOrchestrator Path + C --> K[ExpertTeam.form] + K --> L[CollaborationPlan] + L --> M[_execute_phase] + M -->|real call| N[Expert.agent.execute] + N --> O[TaskResult] + O --> P[_merge_results] + end + + subgraph Events + H --> Q[StepEvent callback] + M --> R[HandoffTransport broadcast] + Q --> S[WebSocket emit] + R --> S + end +``` + +--- + +## Implementation Units + +### U1. TeamOrchestrator._execute_phase 真实执行 + +**Goal:** 将 `_execute_phase` 从模拟代码改为调用 `Expert.agent.execute(TaskMessage)` 执行真实任务 + +**Requirements:** R1 + +**Dependencies:** None + +**Files:** +- `src/agentkit/experts/orchestrator.py` — 修改 `_execute_phase` 和 `_run_competitor` +- `tests/unit/experts/test_orchestrator.py` — 新增/更新测试 + +**Approach:** +1. 在 `_execute_phase` 中,获取 `expert = self._team._experts.get(phase.assigned_expert)` +2. 构建 `TaskMessage`:`task_id=phase.phase_id`, `agent_name=expert.config.name`, `task_type="team_phase"`, `input_data={"phase_name": phase.name, "phase_description": phase.description, "team_id": self.team_id}` +3. 从 SharedWorkspace 读取前置阶段结果,注入 `input_data["dependency_results"]` +4. 调用 `result = await expert.agent.execute(task_msg)` +5. 处理 `TaskResult`:成功则写入 SharedWorkspace 并广播 `phase_completed`,失败则广播 `phase_failed` +6. 同样修改 `_run_competitor`,调用 `expert.agent.execute()` 替代模拟返回 + +**Patterns to follow:** `BaseAgent.execute()` 的 final 方法模式(`core/base.py`),`TaskMessage`/`TaskResult` 协议(`core/protocol.py`) + +**Test scenarios:** +- Happy path: _execute_phase 调用 expert.agent.execute() 并返回 TaskResult +- Expert not found: assigned_expert 不在 _experts 中时回退到 lead_expert +- Execution failure: agent.execute() 返回 FAILED 状态时广播 phase_failed +- Covers AE3: 两个 Expert 竞争执行,各自调用 agent.execute() + +**Verification:** 单元测试通过,mock Agent 返回 TaskResult,验证 _execute_phase 正确处理成功/失败 + +--- + +### U2. TeamOrchestrator 合并策略从真实结果选择 + +**Goal:** COMPETITIVE_PARALLEL 模式下,合并策略(BEST/VOTE/FUSION)从真实 TaskResult 中选择/融合 + +**Requirements:** R1, R7 + +**Dependencies:** U1 + +**Files:** +- `src/agentkit/experts/orchestrator.py` — 修改 `_merge_results` +- `tests/unit/experts/test_orchestrator.py` + +**Approach:** +1. `_merge_results` 当前接收 `list[dict]`,改为接收 `list[tuple[Expert, TaskResult]]` +2. BEST 策略:Lead Expert 的 LLM 评估各 TaskResult.output_data,选择最佳 +3. VOTE 策略:每个 Expert 的 LLM 对其他结果评分,最高分胜出 +4. FUSION 策略:Lead Expert 的 LLM 融合所有 output_data +5. 无 LLM Gateway 时回退到当前简化逻辑(选择第一个结果) + +**Patterns to follow:** `PipelineReflector` 的 LLM 调用模式(`orchestrator/reflection.py`) + +**Test scenarios:** +- BEST: 3 个 Expert 结果,Lead Expert 选择最佳 +- VOTE: 3 个 Expert 结果,投票选择 +- FUSION: 3 个 Expert 结果,Lead Expert 融合 +- No LLM Gateway: 回退到选择第一个结果 + +**Verification:** 单元测试验证三种合并策略从真实 TaskResult 中选择 + +--- + +### U3. ReActStepExecutor 替代 _LLMStepAgent + +**Goal:** 新建 ReActStepExecutor,内部使用 ReActEngine 执行步骤,支持工具调用和多步推理 + +**Requirements:** R2 + +**Dependencies:** None + +**Files:** +- `src/agentkit/core/plan_exec_engine.py` — 新增 `ReActStepExecutor` 类,修改 `PlanExecutor` 使用新执行器 +- `tests/unit/core/test_plan_exec_engine.py` — 新增测试 + +**Approach:** +1. 新建 `ReActStepExecutor` 类,构造函数接收 `llm_gateway`, `tools`, `max_steps=5`, `model="default"`, `system_prompt=None` +2. `execute(task_msg: TaskMessage) -> TaskResult` 方法: + - 从 `task_msg.input_data` 提取 `step_name`, `step_description`, `dependency_results` + - 构建 messages:`[{"role": "user", "content": step_description}]` + - 如有 `dependency_results`,追加到 content + - 创建 `ReActEngine(llm_gateway, max_steps=max_steps)` + - 调用 `react_engine.execute(messages, tools, model, system_prompt)` + - 将 `ReActResult.output` 包装为 `TaskResult(output_data={"content": result.output, "steps": result.total_steps, "tokens": result.total_tokens})` +3. `PlanExecutor` 新增 `step_executor_type` 参数:`"react"`(默认)或 `"llm"`(回退到 _LLMStepAgent) +4. `PlanExecutor._execute_step` 根据 `step_executor_type` 选择执行器 + +**Patterns to follow:** `ReActEngine.execute()` 的签名和返回值(`core/react.py`),`_LLMStepAgent` 的接口(`plan_exec_engine.py`) + +**Test scenarios:** +- Happy path: ReActStepExecutor 调用 ReActEngine,返回包含工具调用结果的 TaskResult +- No tools: 无工具时回退到纯 LLM 调用 +- Multi-step: ReActEngine 执行 3 步 think-act-observe 循环 +- Tool failure: 工具调用异常时 ReActEngine 返回 partial status + +**Verification:** 单元测试 mock ReActEngine,验证 ReActStepExecutor 正确调用和包装结果 + +--- + +### U4. SharedWorkspace 集成到执行层 + +**Goal:** PlanExecEngine 和 TeamOrchestrator 通过 SharedWorkspace 传递步骤间状态 + +**Requirements:** R3, R5 + +**Dependencies:** U1, U3 + +**Files:** +- `src/agentkit/core/plan_exec_engine.py` — 注入 SharedWorkspace,步骤结果写入/读取 +- `src/agentkit/experts/orchestrator.py` — 阶段结果写入/读取 SharedWorkspace +- `tests/unit/core/test_plan_exec_engine.py` +- `tests/unit/experts/test_orchestrator.py` + +**Approach:** +1. `PlanExecutor` 构造函数新增 `workspace: SharedWorkspace | None = None` 参数 +2. 步骤完成后:`workspace.write(f"plan:{plan_id}:step:{step_id}:result", result_data, agent_id=step_id)` +3. 步骤执行前:从 workspace 读取依赖步骤结果,注入 `input_data["dependency_results"]` +4. `TeamOrchestrator` 构造函数新增 `workspace: SharedWorkspace | None = None`,默认使用 `team._workspace` +5. 阶段完成后写入 `workspace.write(f"team:{team_id}:phase:{phase_id}:result", ...)` +6. 阶段执行前读取前置阶段结果 + +**Patterns to follow:** `ExpertTeam._workspace` 的使用模式(`experts/team.py`),`SharedWorkspace.write/read` API(`core/shared_workspace.py`) + +**Test scenarios:** +- PlanExecEngine: 步骤 A 完成后结果写入 workspace,步骤 B 执行前从 workspace 读取 +- TeamOrchestrator: 阶段 A 结果写入 workspace,阶段 B 读取 +- No workspace: workspace=None 时回退到原有 dependency_results 机制 +- Concurrent write: 两个并行步骤同时写入 workspace,版本号递增 + +**Verification:** 单元测试验证 workspace 读写和依赖传递 + +--- + +### U5. GoalPlanner prompt 调优 + +**Goal:** 提升 GoalPlanner 的任务分解质量,确保子任务可执行、依赖关系正确 + +**Requirements:** R4 + +**Dependencies:** None + +**Files:** +- `src/agentkit/core/goal_planner.py` — 优化 LLM prompt 和规则分解逻辑 +- `tests/unit/core/test_goal_planner.py` + +**Approach:** +1. 优化 `_llm_decompose` 的 prompt:明确要求输出 JSON 格式,包含 step_id/name/description/dependencies/required_tools 字段 +2. 添加 few-shot 示例:展示"分析竞品并生成报告"的标准分解(搜索→分析→生成) +3. 规则分解增强:识别"搜索/查找/分析/生成/报告/对比"等常见任务动词,映射到标准步骤模板 +4. 添加分解质量自检:LLM 分解后,用第二次 LLM 调用验证步骤是否完整、依赖是否合理 +5. 添加 `required_tools` 字段到 PlanStep,指定步骤需要的工具(如搜索步骤需要 web_search) + +**Patterns to follow:** 现有 `_rule_based_decompose` 和 `_llm_decompose` 模式 + +**Test scenarios:** +- "分析竞品并生成报告" → 3 步分解(搜索→分析→生成),依赖关系正确 +- "搜索最新AI论文" → 1 步分解,required_tools=["web_search"] +- "对比A和B的优缺点" → 2 步分解(分别搜索→对比分析) +- LLM 分解失败 → 回退到规则分解 + +**Verification:** 单元测试验证分解质量和依赖关系 + +--- + +### U6. PlanExecEngine 失败重规划集成 + +**Goal:** 步骤执行失败时,集成 PipelineReflector/PipelineReplanner 触发自动重规划 + +**Requirements:** R6 + +**Dependencies:** U3 + +**Files:** +- `src/agentkit/core/plan_exec_engine.py` — 修改 `_execute_plan` 失败处理逻辑 +- `tests/unit/core/test_plan_exec_engine.py` + +**Approach:** +1. `PlanExecutor` 已有 `_plan_to_pipeline` / `_pipeline_to_plan` 桥接方法(plan_exec_engine.py 第549-664行) +2. 在 `_execute_plan` 的步骤失败分支中: + - 调用 `reflector.reflect(pipeline, pipeline_result, replan_count)` 获取 ReflectionReport + - 调用 `replanner.replan(pipeline, pipeline_result, reflection_report)` 获取修正后的 Pipeline + - 将修正后的 Pipeline 转回 ExecutionPlan + - 用 `_merge_completed_results` 保留已完成步骤的结果 + - 继续执行修正后的计划 +3. 添加 `max_replan_attempts` 参数(默认 2),超过后回退到单 Agent 模式 +4. 广播 `replanning` 事件,包含失败原因和修正计划 + +**Patterns to follow:** `PipelineEngine` 的反思-重规划闭环(`orchestrator/pipeline_engine.py`),现有 `_plan_to_pipeline` 桥接 + +**Test scenarios:** +- Covers AE2: 搜索步骤失败 → Reflector 分析原因 → Replanner 生成修正计划 → 重新执行成功 +- Max replan exceeded: 连续 2 次重规划仍失败 → 回退到单 Agent +- Partial completion: 3 步中第 2 步失败,重规划后保留第 1 步结果 + +**Verification:** 单元测试 mock Reflector/Replanner,验证重规划流程 + +--- + +### U7. 多步研究任务端到端验证 + +**Goal:** 用"分析竞品并生成报告"场景验证完整闭环 + +**Requirements:** R8 + +**Dependencies:** U1, U3, U4, U5, U6 + +**Files:** +- `tests/integration/test_autonomous_research_task.py` — 新增集成测试 +- `src/agentkit/core/plan_exec_engine.py` — 确保 ReActStepExecutor 与搜索工具集成 +- `configs/skills/research.yaml` — 新增研究任务 Skill 配置 + +**Approach:** +1. 创建 `research` Skill 配置,绑定 `web_search` + `web_crawl` + `ask_human` 工具 +2. 集成测试:mock LLM Gateway 返回预设响应,mock 搜索工具返回预设结果 +3. 验证流程:用户输入 → GoalPlanner 分解 → PlanExecEngine 执行 → SharedWorkspace 状态传递 → 最终报告 +4. 验证步骤间依赖:搜索步骤结果被分析步骤读取 +5. 验证失败重规划:搜索工具返回空结果时触发重规划 + +**Test scenarios:** +- Covers AE1: "分析飞书和钉钉的竞品对比" → 搜索→分析→生成完整报告 +- Tool integration: ReActStepExecutor 调用 web_search 工具 +- Dependency chain: 搜索结果传递到分析步骤 +- Failure recovery: 搜索失败 → 重规划 → 换关键词重新搜索 + +**Verification:** 集成测试通过,端到端输出包含搜索结果和分析报告 + +--- + +### U8. WebSocket 进度事件 + +**Goal:** 执行过程通过 WebSocket 实时推送进度事件 + +**Requirements:** R9 + +**Dependencies:** U1, U3 + +**Files:** +- `src/agentkit/server/routes/chat.py` — 注册 HandoffTransport handler,转发 team 事件 +- `src/agentkit/core/plan_exec_engine.py` — 添加 step_event_callback 参数 +- `src/agentkit/server/routes/portal.py` — 添加 plan/step 事件类型 +- `tests/unit/server/test_chat_ws_events.py` — 新增测试 + +**Approach:** +1. PlanExecEngine 新增 `step_event_callback: Callable[[str, dict], Awaitable[None]] | None` 参数 +2. 步骤状态变更时调用 callback:`plan_created`, `step_started`, `step_completed`, `step_failed`, `plan_completed`, `replanning` +3. Chat WebSocket handler 中,当 ExpertTeam 模式激活时,注册 HandoffTransport handler 将 team 事件转发为 WebSocket 消息 +4. Portal WebSocket 添加新事件类型:`plan_step`(步骤进度)和 `plan_update`(计划变更) +5. 前端 `WsServerMessage` 类型添加 `plan_step` 和 `plan_update` 事件支持 + +**Patterns to follow:** 现有 `emit_team_event` 模式(`server/routes/chat.py`),Portal WebSocket 事件格式 + +**Test scenarios:** +- PlanExecEngine: 步骤开始/完成时 callback 被调用,事件类型正确 +- TeamOrchestrator: HandoffTransport 事件转发到 WebSocket +- Portal: plan_step 事件包含 step_id, step_name, status +- No callback: callback=None 时不影响执行 + +**Verification:** 单元测试验证事件回调被正确调用 + +--- + +## Scope Boundaries + +**In scope:** +- 打通 PlanExecEngine 和 TeamOrchestrator 执行层 +- SharedWorkspace 集成 +- GoalPlanner prompt 调优 +- 失败重规划集成 +- WebSocket 进度事件 +- 多步研究任务验证 + +**Deferred to follow-up work:** +- 执行持久化与断点恢复 +- 自适应执行监控(token 预算、耗时趋势) +- 人机协作规划 UI +- 计划模板库 +- 前端进度可视化组件 + +**Outside this scope:** +- 动态工具发现与运行时组合 +- 跨任务长期记忆 +- 多层级嵌套计划 + +--- + +## Risks & Mitigations + +| Risk | Impact | Mitigation | +|------|--------|-----------| +| ReActEngine 步骤级执行 token 消耗高 | 每步骤可能消耗大量 token | ReActStepExecutor 默认 max_steps=5,限制循环次数 | +| GoalPlanner 分解质量不稳定 | 复杂任务可能分解不合理 | 添加分解质量自检 + few-shot 示例 | +| SharedWorkspace 并发写入冲突 | 并行步骤同时写入可能冲突 | SharedWorkspace 内置版本控制和分布式锁 | +| HandoffTransport -> WebSocket 桥接延迟 | 事件转发可能增加延迟 | InProcess 模式下延迟极低(asyncio.Queue) | + +--- + +## Open Questions + +- **Deferred to implementation:** ReActStepExecutor 的 system_prompt 是否需要根据步骤类型动态生成(如搜索步骤 vs 分析步骤) +- **Deferred to implementation:** 前端 WsServerMessage 类型更新是否需要同步修改 chat store 的事件处理逻辑 + +--- + +## Sources & Research + +- `src/agentkit/core/react.py` — ReActEngine 完整实现 +- `src/agentkit/core/plan_exec_engine.py` — PlanExecEngine 和 _LLMStepAgent +- `src/agentkit/experts/orchestrator.py` — TeamOrchestrator mock 实现 +- `src/agentkit/experts/team.py` — ExpertTeam 和 SharedWorkspace +- `src/agentkit/core/shared_workspace.py` — SharedWorkspace API +- `src/agentkit/orchestrator/reflection.py` — PipelineReflector / PipelineReplanner +- `src/agentkit/core/goal_planner.py` — GoalPlanner +- `src/agentkit/core/protocol.py` — TaskMessage / TaskResult 协议 +- `src/agentkit/server/routes/chat.py` — Chat WebSocket 和 emit_team_event diff --git a/pyproject.toml b/pyproject.toml index 2b33fb1..4e5190d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -44,10 +44,13 @@ dev = [ "pytest-asyncio>=0.23", "pytest-cov>=5.0", "pytest-httpx>=0.30", + "pytest-timeout>=2.2", + "pytest-html>=4.1", "testcontainers[postgres,redis]>=4.0", "ruff>=0.4", "fastapi>=0.110", "uvicorn>=0.27", + "websockets>=12.0", ] [tool.setuptools.packages.find] @@ -60,6 +63,9 @@ markers = [ "integration: mark test as integration test (requires docker)", "redis: mark test as requiring Redis", "postgres: mark test as requiring PostgreSQL", + "e2e: end-to-end backtest (requires server)", + "e2e_basic: basic function correctness test", + "e2e_capability: agent intelligence capability test", ] [tool.ruff] diff --git a/src/agentkit/chat/skill_routing.py b/src/agentkit/chat/skill_routing.py index 39e28a4..36483a3 100644 --- a/src/agentkit/chat/skill_routing.py +++ b/src/agentkit/chat/skill_routing.py @@ -30,19 +30,17 @@ class ExecutionMode(enum.Enum): field instead of string-matching match_method. """ - DIRECT_CHAT = "direct_chat" # Zero-cost: direct LLM call, no ReAct loop - REACT = "react" # Default agent ReAct loop with default tools - SKILL_REACT = "skill_react" # Skill-matched ReAct with skill tools + prompt - TEAM_COLLAB = "team_collab" # Expert Team collaborative mode + DIRECT_CHAT = "direct_chat" # Zero-cost: direct LLM call, no ReAct loop + REACT = "react" # Default agent ReAct loop with default tools + SKILL_REACT = "skill_react" # Skill-matched ReAct with skill tools + prompt + TEAM_COLLAB = "team_collab" # Expert Team collaborative mode def validate_skill_name(name: str) -> str: """Validate and normalize a skill name. Raises ValueError on invalid input.""" normalized = name.strip().lower() if not _SKILL_NAME_RE.match(normalized): - raise ValueError( - f"Invalid skill name '{name}': must match [a-z0-9][a-z0-9_-]{{0,63}}" - ) + raise ValueError(f"Invalid skill name '{name}': must match [a-z0-9][a-z0-9_-]{{0,63}}") return normalized @@ -78,7 +76,7 @@ def parse_skill_prefix(content: str) -> tuple[str | None, str]: parts = content.split(" ", 1) skill_ref = parts[0][7:] # strip "@skill:" explicit_skill = skill_ref.strip() - clean = parts[1].strip() if len(parts) > 1 else content[7 + len(skill_ref):].strip() + clean = parts[1].strip() if len(parts) > 1 else content[7 + len(skill_ref) :].strip() return explicit_skill, clean @@ -132,7 +130,9 @@ async def resolve_skill_routing( result.match_confidence = 1.0 logger.info(f"Session {session_id}: using explicit skill '{explicit_skill}'") except Exception as e: - logger.warning(f"Session {session_id}: explicit skill '{explicit_skill}' not found: {e}") + logger.warning( + f"Session {session_id}: explicit skill '{explicit_skill}' not found: {e}" + ) # Reset so we don't enter skill branch with stale data result.skill_name = None result.skill_config = None @@ -179,12 +179,35 @@ async def resolve_skill_routing( # the task might need tools. If so, skip this match # and let it fall through to default agent. tool_hints = [ - "执行", "运行", "命令", "终端", "shell", "bash", - "搜索", "查找", "联网", "搜索", "search", - "安装", "部署", "启动", "停止", "重启", - "文件", "目录", "创建", "删除", "修改", - "run", "execute", "install", "deploy", - "start", "stop", "restart", "file", + "执行", + "运行", + "命令", + "终端", + "shell", + "bash", + "搜索", + "查找", + "联网", + "搜索", + "search", + "安装", + "部署", + "启动", + "停止", + "重启", + "文件", + "目录", + "创建", + "删除", + "修改", + "run", + "execute", + "install", + "deploy", + "start", + "stop", + "restart", + "file", ] content_lower = clean_content.lower() needs_tools = any(h in content_lower for h in tool_hints) @@ -215,7 +238,9 @@ async def resolve_skill_routing( f"via {routing_result.method} (confidence={routing_result.confidence})" ) except Exception as e: - logger.warning(f"Session {session_id}: skill '{skill_name}' found by router but not in registry: {e}") + logger.warning( + f"Session {session_id}: skill '{skill_name}' found by router but not in registry: {e}" + ) except Exception as e: logger.warning(f"Skill routing failed for session {session_id}: {e}") @@ -234,7 +259,11 @@ async def resolve_skill_routing( merged_tools.append(tool) result.tools = merged_tools - result.model = result.skill_config.llm.get("model", default_model) if result.skill_config.llm else default_model + result.model = ( + result.skill_config.llm.get("model", default_model) + if result.skill_config.llm + else default_model + ) result.agent_name = result.skill_name result.execution_mode = ExecutionMode.SKILL_REACT else: @@ -267,9 +296,9 @@ def _build_tools_description(tools: list) -> str: """Build a text description of tools for the system prompt.""" lines = [] for tool in tools: - desc = getattr(tool, 'description', '') + desc = getattr(tool, "description", "") lines.append(f"- **{tool.name}**: {desc}") - schema = getattr(tool, 'input_schema', None) + schema = getattr(tool, "input_schema", None) if schema and "properties" in schema: params = list(schema["properties"].keys()) if params: @@ -299,13 +328,13 @@ _IDENTITY_RE = re.compile( re.IGNORECASE, ) -_SENTENCE_SPLIT_RE = re.compile(r'[,。!?;\n,.!?;]') +_SENTENCE_SPLIT_RE = re.compile(r"[,。!?;\n,.!?;]") def _tokenize_content(content: str) -> list[str]: """Tokenize content for capability matching. Supports Chinese and English.""" # 1. Split by punctuation and whitespace - segments = re.split(r'[\s,,。!?、;:\n]+', content) + segments = re.split(r"[\s,,。!?、;:\n]+", content) # 2. For long Chinese segments, add 2-gram supplements tokens = [] @@ -316,12 +345,32 @@ def _tokenize_content(content: str) -> list[str]: tokens.append(seg) # Add 2-grams for Chinese compound words for i in range(len(seg) - 1): - bigram = seg[i:i+2] - if all('\u4e00' <= c <= '\u9fff' for c in bigram): + bigram = seg[i : i + 2] + if all("\u4e00" <= c <= "\u9fff" for c in bigram): tokens.append(bigram) # 3. Filter stopwords - stopwords = {"的", "了", "是", "在", "和", "与", "也", "都", "就", "要", "会", "我", "你", "他", "这", "那", "有", "没", "不"} + stopwords = { + "的", + "了", + "是", + "在", + "和", + "与", + "也", + "都", + "就", + "要", + "会", + "我", + "你", + "他", + "这", + "那", + "有", + "没", + "不", + } tokens = [t for t in tokens if t not in stopwords and len(t) > 1][:10] return tokens @@ -337,60 +386,194 @@ class HeuristicClassifier: # 高复杂度暗示词(需要工具或多步推理) # 中文关键词使用子串匹配(中文无自然词边界) _HIGH_COMPLEXITY_HINTS_CN = { - "执行", "运行", "命令", "终端", "安装", "部署", "启动", "停止", "重启", - "配置", "搜索", "查找", "联网", "文件", "目录", "创建", "删除", "修改", - "编辑", "分析", "比较", "对比", "评估", "调研", "研究", "设计", "规划", - "方案", "架构", "实现", "开发", "代码", "编程", "函数", "接口", "调试", + "执行", + "运行", + "命令", + "终端", + "安装", + "部署", + "启动", + "停止", + "重启", + "配置", + "搜索", + "查找", + "联网", + "文件", + "目录", + "创建", + "删除", + "修改", + "编辑", + "分析", + "比较", + "对比", + "评估", + "调研", + "研究", + "设计", + "规划", + "方案", + "架构", + "实现", + "开发", + "代码", + "编程", + "函数", + "接口", + "调试", "重构", } # 英文关键词使用词边界匹配(避免子串误匹配如 "profile" 匹配 "file") _HIGH_COMPLEXITY_HINTS_EN = { - "shell", "bash", "script", "search", "query", "directory", - "execute", "install", "deploy", "restart", "modify", - "analyze", "compare", "evaluate", "research", "design", - "implement", "develop", "refactor", "debug", - "python", "javascript", "typescript", "sql", + "shell", + "bash", + "script", + "search", + "query", + "directory", + "execute", + "install", + "deploy", + "restart", + "modify", + "analyze", + "compare", + "evaluate", + "research", + "design", + "implement", + "develop", + "refactor", + "debug", + "python", + "javascript", + "typescript", + "sql", } # 英文短词需要精确匹配(避免子串误匹配) _HIGH_COMPLEXITY_EXACT_EN = { - "run", "find", "start", "stop", "file", "create", "delete", - "plan", "build", "code", "program", "function", "class", - "interface", "api", + "run", + "find", + "start", + "stop", + "file", + "create", + "delete", + "plan", + "build", + "code", + "program", + "function", + "class", + "interface", + "api", } # 中等复杂度暗示词(简单问题但需思考) # 注意:不包含"怎么",因为"怎么样"是闲聊而非工具需求 _MEDIUM_COMPLEXITY_HINTS_CN = { - "如何", "怎样", "为什么", "什么原因", "区别", - "推荐", "建议", "选择", "哪个", + "如何", + "怎样", + "为什么", + "什么原因", + "区别", + "推荐", + "建议", + "选择", + "哪个", } _MEDIUM_COMPLEXITY_HINTS_EN = { - "difference", "explain", "recommend", "suggest", "choose", + "difference", + "explain", + "recommend", + "suggest", + "choose", } # 英文短词精确匹配 _MEDIUM_COMPLEXITY_EXACT_EN = { - "how", "why", "what", "which", + "how", + "why", + "what", + "which", } + # 低复杂度暗示词(问候/闲聊/简单定义,不需要工具) + _LOW_COMPLEXITY_HINTS_CN = { + "你好", + "嗨", + "早上好", + "下午好", + "晚上好", + "再见", + "谢谢", + "辛苦", + "你是谁", + "你叫什么", + "你是什么", + "自我介绍", + "天气", + "今天", + "怎么样", + "闲聊", + "聊天", + } + + _LOW_COMPLEXITY_HINTS_EN = { + "hello", + "hi", + "hey", + "good morning", + "good afternoon", + "good evening", + "goodbye", + "thanks", + "who are you", + "what are you", + "your name", + "introduce yourself", + "how are you", + "chat", + } + + # 否定上下文模式("不要X"中的X不计入高复杂度匹配) + # 匹配1-4个中文字符或1个英文单词(避免匹配过长串如"分析,直接告诉我答案") + _NEGATION_PATTERNS = re.compile( + r"(?:不要|无需|不用|不需要|别|don'?t|no need|without|not)\s*" + r"([\u4e00-\u9fff]{1,4}|[a-zA-Z]+)", + re.IGNORECASE, + ) + + # 短疑问句模式(以?或?结尾且长度<30) + _SHORT_QUESTION_RE = re.compile(r"[??]\s*$") + # 预编译英文词边界正则 _HIGH_EN_RE = re.compile( - r'\b(' + '|'.join(re.escape(w) for w in sorted(_HIGH_COMPLEXITY_HINTS_EN, key=len, reverse=True)) + r')\b', + r"\b(" + + "|".join(re.escape(w) for w in sorted(_HIGH_COMPLEXITY_HINTS_EN, key=len, reverse=True)) + + r")\b", re.IGNORECASE, ) _HIGH_EXACT_RE = re.compile( - r'\b(' + '|'.join(re.escape(w) for w in sorted(_HIGH_COMPLEXITY_EXACT_EN, key=len, reverse=True)) + r')\b', + r"\b(" + + "|".join(re.escape(w) for w in sorted(_HIGH_COMPLEXITY_EXACT_EN, key=len, reverse=True)) + + r")\b", re.IGNORECASE, ) _MEDIUM_EN_RE = re.compile( - r'\b(' + '|'.join(re.escape(w) for w in sorted(_MEDIUM_COMPLEXITY_HINTS_EN, key=len, reverse=True)) + r')\b', + r"\b(" + + "|".join(re.escape(w) for w in sorted(_MEDIUM_COMPLEXITY_HINTS_EN, key=len, reverse=True)) + + r")\b", re.IGNORECASE, ) _MEDIUM_EXACT_RE = re.compile( - r'\b(' + '|'.join(re.escape(w) for w in sorted(_MEDIUM_COMPLEXITY_EXACT_EN, key=len, reverse=True)) + r')\b', + r"\b(" + + "|".join(re.escape(w) for w in sorted(_MEDIUM_COMPLEXITY_EXACT_EN, key=len, reverse=True)) + + r")\b", re.IGNORECASE, ) @@ -398,9 +581,12 @@ class HeuristicClassifier: """评估消息复杂度 (0.0-1.0)。 评分规则: + - 低复杂度信号(问候/闲聊/身份查询)→ 0.05 - 短消息 (<20字符) 且无复杂度暗示 → 0.1 - - 含中等复杂度关键词 → 0.4-0.5 - - 含高复杂度关键词 → 0.7-0.9 + - 含中等复杂度关键词 → 0.35 + - 含高复杂度关键词 → 0.65-0.8 + - 否定上下文中的高复杂度词不计入匹配 + - 短疑问句额外扣减 - 多句/长消息 → 额外加成 - 代码模式 (反引号/括号) → 额外加成 """ @@ -410,48 +596,84 @@ class HeuristicClassifier: content_lower = content.lower() score = 0.0 - # 1. 关键词匹配 + # 0. 低复杂度信号检测(优先级最高) + low_hits_cn = sum(1 for h in self._LOW_COMPLEXITY_HINTS_CN if h in content_lower) + low_hits_en = sum( + 1 for h in self._LOW_COMPLEXITY_HINTS_EN if h in content_lower + ) + if low_hits_cn + low_hits_en > 0: + score = 0.05 # 问候/闲聊直接给极低分 + # 低复杂度信号下不再累加高复杂度词的分数 + # 但仍保留长度和多句的微调 + length = len(content) + if length > 200: + score += 0.05 + elif length > 100: + score += 0.03 + return max(0.0, min(1.0, score)) + + # 1. 否定上下文检测 — 提取被否定的词 + negated_words: set[str] = set() + for match in self._NEGATION_PATTERNS.finditer(content_lower): + negated_words.add(match.group(1).lower()) + + # 2. 关键词匹配(排除否定上下文中的词) # 中文:子串匹配 - high_hits = sum(1 for h in self._HIGH_COMPLEXITY_HINTS_CN if h in content_lower) - medium_hits = sum(1 for m in self._MEDIUM_COMPLEXITY_HINTS_CN if m in content_lower) + high_hits = sum( + 1 + for h in self._HIGH_COMPLEXITY_HINTS_CN + if h in content_lower and h not in negated_words + ) + medium_hits = sum( + 1 for m in self._MEDIUM_COMPLEXITY_HINTS_CN if m in content_lower + ) # 英文:词边界匹配 - high_hits += len(self._HIGH_EN_RE.findall(content)) - high_hits += len(self._HIGH_EXACT_RE.findall(content)) - medium_hits += len(self._MEDIUM_EN_RE.findall(content)) - medium_hits += len(self._MEDIUM_EXACT_RE.findall(content)) + high_en_matches = self._HIGH_EN_RE.findall(content) + self._HIGH_EXACT_RE.findall( + content + ) + high_hits += sum( + 1 for w in high_en_matches if w.lower() not in negated_words + ) + medium_hits += len(self._MEDIUM_EN_RE.findall(content)) + len( + self._MEDIUM_EXACT_RE.findall(content) + ) if high_hits >= 2: - score = 0.8 + score = 0.80 elif high_hits == 1: score = 0.65 elif medium_hits >= 1: - score = 0.45 + score = 0.35 else: - score = 0.15 + score = 0.10 - # 2. 消息长度加成 + # 3. 消息长度加成 length = len(content) if length > 200: score += 0.15 elif length > 100: - score += 0.1 + score += 0.10 elif length > 50: score += 0.05 - # 3. 多句加成(逗号/句号/换行分隔) + # 4. 多句加成(逗号/句号/换行分隔) sentence_count = len(_SENTENCE_SPLIT_RE.split(content)) if sentence_count >= 4: - score += 0.1 + score += 0.10 elif sentence_count >= 2: score += 0.05 - # 4. 代码模式加成 - if '`' in content or '```' in content: + # 5. 代码模式加成 + if "`" in content or "```" in content: score += 0.15 - if re.search(r'[\{\}\[\]\(\)]', content): + if re.search(r"[\{\}\[\]\(\)]", content): score += 0.05 + # 6. 短疑问句扣减(以?或?结尾且长度<30) + if self._SHORT_QUESTION_RE.search(content) and len(content) < 30: + score -= 0.10 + return max(0.0, min(1.0, score)) @@ -472,6 +694,7 @@ class CostAwareRouter: classifier: str = "heuristic", merged_llm_classify: bool = True, semantic_router: Any = None, # SemanticRouter | None + expert_team_router: Any = None, # ExpertTeamRouter | None ): self._llm_gateway = llm_gateway self._model = model @@ -480,6 +703,7 @@ class CostAwareRouter: self._classifier = classifier self._merged_llm_classify = merged_llm_classify self._semantic_router = semantic_router + self._expert_team_router = expert_team_router self._auction_house = AuctionHouse() if auction_enabled else None if classifier not in ("heuristic", "llm"): raise ValueError(f"Invalid classifier: {classifier!r}, must be 'heuristic' or 'llm'") @@ -524,12 +748,12 @@ class CostAwareRouter: return 0.5 prompt = ( - 'You are a complexity classifier. Rate the complexity of the user request on a scale of 0.0 to 1.0.\n' - '0.0 = trivial greeting, 0.3 = simple question, 0.5 = moderate task, ' - '0.7 = complex multi-step task, 1.0 = very complex research task.\n\n' - '---BEGIN USER REQUEST---\n' - f'{content}\n' - '---END USER REQUEST---\n\n' + "You are a complexity classifier. Rate the complexity of the user request on a scale of 0.0 to 1.0.\n" + "0.0 = trivial greeting, 0.3 = simple question, 0.5 = moderate task, " + "0.7 = complex multi-step task, 1.0 = very complex research task.\n\n" + "---BEGIN USER REQUEST---\n" + f"{content}\n" + "---END USER REQUEST---\n\n" 'Respond ONLY with a JSON object: {"complexity": }' ) try: @@ -592,14 +816,14 @@ class CostAwareRouter: skill_list_str = ", ".join(skill_hints) if skill_hints else "none" prompt = ( - 'You are a routing classifier. Analyze the user request and output:\n' - '1. complexity (0.0-1.0): how complex is this request\n' - '2. intent: the primary intent category\n' - '3. skill_hint: the best matching skill name, or null if none match\n\n' - f'Available skills: [{skill_list_str}]\n\n' - '---BEGIN USER REQUEST---\n' - f'{content}\n' - '---END USER REQUEST---\n\n' + "You are a routing classifier. Analyze the user request and output:\n" + "1. complexity (0.0-1.0): how complex is this request\n" + "2. intent: the primary intent category\n" + "3. skill_hint: the best matching skill name, or null if none match\n\n" + f"Available skills: [{skill_list_str}]\n\n" + "---BEGIN USER REQUEST---\n" + f"{content}\n" + "---END USER REQUEST---\n\n" 'Respond ONLY with a JSON object: {"complexity": , "intent": , "skill_hint": }' ) @@ -629,7 +853,9 @@ class CostAwareRouter: execution_mode=ExecutionMode.SKILL_REACT, ) # Merge tools - agent_tools = agent_tool_registry.list_tools() if agent_tool_registry else default_tools + agent_tools = ( + agent_tool_registry.list_tools() if agent_tool_registry else default_tools + ) seen_names = set() merged_tools = [] for tool in result.skill_tools + agent_tools: @@ -637,9 +863,15 @@ class CostAwareRouter: seen_names.add(tool.name) merged_tools.append(tool) result.tools = merged_tools - result.model = result.skill_config.llm.get("model", default_model) if result.skill_config.llm else default_model + result.model = ( + result.skill_config.llm.get("model", default_model) + if result.skill_config.llm + else default_model + ) result.agent_name = skill_hint - result.system_prompt = build_skill_system_prompt(result.skill_config) or default_system_prompt + result.system_prompt = ( + build_skill_system_prompt(result.skill_config) or default_system_prompt + ) # Append available tools to system prompt so LLM knows what it can call if result.tools: tools_desc = _build_tools_description(result.tools) @@ -651,14 +883,18 @@ class CostAwareRouter: "Always prefer using tools over guessing.\n" ) if result.system_prompt: - result.system_prompt += f"{tool_instruction}\n## Available Tools\n{tools_desc}" + result.system_prompt += ( + f"{tool_instruction}\n## Available Tools\n{tools_desc}" + ) logger.info( f"Session {session_id}: merged LLM classify routed to skill '{skill_hint}' " f"(complexity={merged_complexity:.2f})" ) return result except Exception as e: - logger.warning(f"Session {session_id}: merged LLM skill_hint '{skill_hint}' not found: {e}") + logger.warning( + f"Session {session_id}: merged LLM skill_hint '{skill_hint}' not found: {e}" + ) # No valid skill_hint — use complexity to decide routing if merged_complexity < 0.3: @@ -703,7 +939,9 @@ class CostAwareRouter: execution_mode=ExecutionMode.REACT, ) except (json.JSONDecodeError, TypeError, ValueError) as e: - logger.warning(f"CostAwareRouter _classify_merged parse failed: {e}, falling back to default") + logger.warning( + f"CostAwareRouter _classify_merged parse failed: {e}, falling back to default" + ) return SkillRoutingResult( clean_content=content, system_prompt=default_system_prompt, @@ -733,6 +971,39 @@ class CostAwareRouter: # -- Layer 2: Capability matching / Auction (optional) ----------------- + def _try_team_upgrade( + self, + result: SkillRoutingResult, + content: str, + complexity: float, + trace: list[dict] | None, + ) -> SkillRoutingResult: + """Attempt to upgrade REACT → TEAM_COLLAB when complexity is high and experts are available.""" + if ( + result.execution_mode == ExecutionMode.REACT + and complexity >= 0.7 + and self._expert_team_router is not None + ): + try: + if self._expert_team_router.can_handle(content): + team_result = self._expert_team_router.resolve(content, complexity) + if team_result.team_mode: + result.execution_mode = ExecutionMode.TEAM_COLLAB + if trace is not None: + trace.append( + { + "layer": 2, + "method": "team_upgrade", + "from_mode": "REACT", + "to_mode": "TEAM_COLLAB", + "team_match_method": team_result.match_method, + "complexity": complexity, + } + ) + except Exception as e: + logger.warning(f"CostAwareRouter team upgrade check failed: {e}") + return result + async def _route_layer2( self, content: str, @@ -752,10 +1023,18 @@ class CostAwareRouter: content_words = _tokenize_content(content) # --- Vickrey auction path (when enabled) --- - if self._auction_enabled and self._auction_house is not None and self._org_context is not None: + if ( + self._auction_enabled + and self._auction_house is not None + and self._org_context is not None + ): try: # Gather candidate agents from org_context - all_agents = self._org_context.list_agents() if hasattr(self._org_context, "list_agents") else [] + all_agents = ( + self._org_context.list_agents() + if hasattr(self._org_context, "list_agents") + else [] + ) # Filter agents that have at least one relevant capability candidate_agents = [] for agent_profile in all_agents: @@ -772,7 +1051,11 @@ class CostAwareRouter: best_name = best if isinstance(best, str) else getattr(best, "name", str(best)) existing_names = {a.name for a in candidate_agents} if best_name not in existing_names: - profile = self._org_context.get_agent_profile(best_name) if hasattr(self._org_context, "get_agent_profile") else best + profile = ( + self._org_context.get_agent_profile(best_name) + if hasattr(self._org_context, "get_agent_profile") + else best + ) if hasattr(profile, "name"): candidate_agents.append(profile) @@ -780,20 +1063,38 @@ class CostAwareRouter: # Build Bid objects for each candidate bids = [] for agent_profile in candidate_agents: - name = agent_profile.name if hasattr(agent_profile, "name") else str(agent_profile) - caps = agent_profile.capabilities if hasattr(agent_profile, "capabilities") else [] - arch = agent_profile.agent_type if hasattr(agent_profile, "agent_type") else "react" + name = ( + agent_profile.name + if hasattr(agent_profile, "name") + else str(agent_profile) + ) + caps = ( + agent_profile.capabilities + if hasattr(agent_profile, "capabilities") + else [] + ) + arch = ( + agent_profile.agent_type + if hasattr(agent_profile, "agent_type") + else "react" + ) # Use current_load as a proxy for estimated_cost (higher load → higher cost) - estimated_cost = float(agent_profile.current_load + 1) if hasattr(agent_profile, "current_load") else 1.0 - bids.append(Bid( - agent_name=name, - architecture=arch, - estimated_steps=1, - estimated_cost=estimated_cost, - confidence=0.8, - payment_offer=estimated_cost, - capabilities=caps, - )) + estimated_cost = ( + float(agent_profile.current_load + 1) + if hasattr(agent_profile, "current_load") + else 1.0 + ) + bids.append( + Bid( + agent_name=name, + architecture=arch, + estimated_steps=1, + estimated_cost=estimated_cost, + confidence=0.8, + payment_offer=estimated_cost, + capabilities=caps, + ) + ) auction_result = await self._auction_house.run_vickrey_auction( task_description=content, @@ -816,14 +1117,16 @@ class CostAwareRouter: execution_mode=ExecutionMode.REACT, ) if trace is not None: - trace.append({ - "layer": 2, - "method": "vickrey_auction", - "agent_name": winner_name, - "complexity": complexity, - "selection_reason": auction_result.selection_reason, - }) - return result + trace.append( + { + "layer": 2, + "method": "vickrey_auction", + "agent_name": winner_name, + "complexity": complexity, + "selection_reason": auction_result.selection_reason, + } + ) + return self._try_team_upgrade(result, content, complexity, trace) # No winner from auction → fall through to capability matching except Exception as e: logger.warning(f"CostAwareRouter Layer 2 Vickrey auction failed: {e}") @@ -833,7 +1136,11 @@ class CostAwareRouter: try: best_agent = self._org_context.find_best_agent(required_capabilities=content_words) if best_agent is not None: - agent_name = best_agent if isinstance(best_agent, str) else getattr(best_agent, "name", str(best_agent)) + agent_name = ( + best_agent + if isinstance(best_agent, str) + else getattr(best_agent, "name", str(best_agent)) + ) result = SkillRoutingResult( clean_content=content, matched=True, @@ -847,13 +1154,15 @@ class CostAwareRouter: execution_mode=ExecutionMode.REACT, ) if trace is not None: - trace.append({ - "layer": 2, - "method": "capability", - "agent_name": agent_name, - "complexity": complexity, - }) - return result + trace.append( + { + "layer": 2, + "method": "capability", + "agent_name": agent_name, + "complexity": complexity, + } + ) + return self._try_team_upgrade(result, content, complexity, trace) except Exception as e: logger.warning(f"CostAwareRouter Layer 2 org_context.find_best_agent failed: {e}") @@ -871,12 +1180,14 @@ class CostAwareRouter: ) result.complexity = complexity if trace is not None: - trace.append({ - "layer": 2, - "method": "intent_router_fallback", - "complexity": complexity, - }) - return result + trace.append( + { + "layer": 2, + "method": "intent_router_fallback", + "complexity": complexity, + } + ) + return self._try_team_upgrade(result, content, complexity, trace) # -- Main entry point --------------------------------------------------- @@ -933,12 +1244,14 @@ class CostAwareRouter: ) result.match_method = result.match_method or "explicit_skill" result.complexity = 0.0 - trace.append({ - "layer": 0, - "method": "explicit_skill", - "matched": result.matched, - "cost": "zero", - }) + trace.append( + { + "layer": 0, + "method": "explicit_skill", + "matched": result.matched, + "cost": "zero", + } + ) result.execution_trace = trace if transparency != "SILENT" else [] result.transparency_level = transparency span.set_attribute("route.layer", result.match_method or "explicit_skill") @@ -958,12 +1271,14 @@ class CostAwareRouter: complexity=0.0, execution_mode=ExecutionMode.DIRECT_CHAT, ) - trace.append({ - "layer": 0, - "method": match_type, - "matched": False, - "cost": "zero", - }) + trace.append( + { + "layer": 0, + "method": match_type, + "matched": False, + "cost": "zero", + } + ) result.execution_trace = trace if transparency != "SILENT" else [] result.transparency_level = transparency span.set_attribute("route.layer", match_type) @@ -973,18 +1288,22 @@ class CostAwareRouter: # ---- Layer 1: Complexity classification ---- if self._classifier == "heuristic": complexity = self._heuristic.classify(clean_content) - trace.append({ - "layer": 1, - "method": "heuristic_classify", - "complexity": complexity, - }) + trace.append( + { + "layer": 1, + "method": "heuristic_classify", + "complexity": complexity, + } + ) else: complexity = await self.quick_classify(clean_content) - trace.append({ - "layer": 1, - "method": "quick_classify", - "complexity": complexity, - }) + trace.append( + { + "layer": 1, + "method": "quick_classify", + "complexity": complexity, + } + ) # Low complexity → direct chat if complexity < 0.3: @@ -1000,12 +1319,14 @@ class CostAwareRouter: complexity=complexity, execution_mode=ExecutionMode.DIRECT_CHAT, ) - trace.append({ - "layer": 1, - "method": "low_complexity", - "complexity": complexity, - "routed_to": "default", - }) + trace.append( + { + "layer": 1, + "method": "low_complexity", + "complexity": complexity, + "routed_to": "default", + } + ) result.execution_trace = trace if transparency != "SILENT" else [] result.transparency_level = transparency span.set_attribute("route.layer", "low_complexity") @@ -1019,13 +1340,15 @@ class CostAwareRouter: semantic_result = await self._semantic_router.route(clean_content) if semantic_result.confidence == "high" and semantic_result.skill_name: # Direct skill match — skip Layer 2 - trace.append({ - "layer": 1.5, - "method": "semantic_high", - "skill": semantic_result.skill_name, - "similarity": round(semantic_result.similarity, 3), - "cost": "zero", - }) + trace.append( + { + "layer": 1.5, + "method": "semantic_high", + "skill": semantic_result.skill_name, + "similarity": round(semantic_result.similarity, 3), + "cost": "zero", + } + ) result = await resolve_skill_routing( content=content, skill_registry=skill_registry, @@ -1051,19 +1374,23 @@ class CostAwareRouter: elif semantic_result.confidence == "medium" and semantic_result.skill_name: # Pass skill hint to Layer 1.5 merged classify or Layer 2 skill_hint = semantic_result.skill_name - trace.append({ - "layer": 1.5, - "method": "semantic_medium", - "skill_hint": skill_hint, - "similarity": round(semantic_result.similarity, 3), - }) + trace.append( + { + "layer": 1.5, + "method": "semantic_medium", + "skill_hint": skill_hint, + "similarity": round(semantic_result.similarity, 3), + } + ) except Exception as e: logger.warning(f"Semantic routing failed, falling through: {e}") - trace.append({ - "layer": 1.5, - "method": "semantic_error", - "error": str(e), - }) + trace.append( + { + "layer": 1.5, + "method": "semantic_error", + "error": str(e), + } + ) # Medium complexity → merged LLM classify or IntentRouter if complexity <= 0.7: @@ -1082,13 +1409,19 @@ class CostAwareRouter: complexity=complexity, ) # If merged classify returned high complexity, delegate to Layer 2 - if result.complexity > 0.7 and result.match_method and result.match_method.startswith("merged_llm_high"): - trace.append({ - "layer": 1, - "method": "merged_llm_high", - "complexity": result.complexity, - "delegated_to_layer2": True, - }) + if ( + result.complexity > 0.7 + and result.match_method + and result.match_method.startswith("merged_llm_high") + ): + trace.append( + { + "layer": 1, + "method": "merged_llm_high", + "complexity": result.complexity, + "delegated_to_layer2": True, + } + ) layer2_result = await self._route_layer2( content=content, skill_registry=skill_registry, @@ -1119,12 +1452,14 @@ class CostAwareRouter: session_id=session_id, ) result.complexity = result.complexity if result.complexity > 0 else complexity - trace.append({ - "layer": 1, - "method": result.match_method or "merged_llm", - "complexity": result.complexity, - "matched": result.matched, - }) + trace.append( + { + "layer": 1, + "method": result.match_method or "merged_llm", + "complexity": result.complexity, + "matched": result.matched, + } + ) result.execution_trace = trace if transparency != "SILENT" else [] result.transparency_level = transparency span.set_attribute("route.layer", result.match_method or "merged_llm") @@ -1132,12 +1467,14 @@ class CostAwareRouter: return result # ---- Layer 2: Capability matching / Auction (high complexity) ---- - trace.append({ - "layer": 2, - "method": "capability_or_auction", - "complexity": complexity, - "auction_enabled": self._auction_enabled, - }) + trace.append( + { + "layer": 2, + "method": "capability_or_auction", + "complexity": complexity, + "auction_enabled": self._auction_enabled, + } + ) result = await self._route_layer2( content=content, skill_registry=skill_registry, diff --git a/src/agentkit/core/goal_planner.py b/src/agentkit/core/goal_planner.py index d0c249f..6e2dfa5 100644 --- a/src/agentkit/core/goal_planner.py +++ b/src/agentkit/core/goal_planner.py @@ -21,7 +21,6 @@ from typing import Any from agentkit.core.plan_schema import ( ExecutionPlan, PlanStep, - PlanStepStatus, SkillGap, SkillGapLevel, ) @@ -268,7 +267,62 @@ class GoalPlanner: goal: str, available_skills: list[str], ) -> list[PlanStep]: - """分解简单目标为单步计划""" + """分解简单目标 + + 尝试通过常见任务动词模式识别多步骤结构, + 如果无法识别则回退到单步计划。 + """ + # 常见多步骤任务模式 + task_patterns: list[tuple[list[str], str]] = [ + # (verb_patterns, task_type) + (["搜索", "查找", "调研", "search", "find", "research"], "research"), + (["分析", "对比", "比较", "analyze", "compare"], "analysis"), + (["生成", "撰写", "写", "报告", "generate", "write", "report"], "generation"), + (["部署", "发布", "deploy", "release"], "deployment"), + (["测试", "验证", "test", "verify"], "testing"), + ] + + # 检查目标是否包含多个阶段的动词 + matched_phases: list[tuple[str, str]] = [] # (verb, phase_type) + for verbs, phase_type in task_patterns: + for verb in verbs: + if verb in goal.lower(): + matched_phases.append((verb, phase_type)) + break + + # 如果匹配到 2+ 个不同阶段,生成多步计划 + unique_phases = list(dict.fromkeys(pt for _, pt in matched_phases)) + if len(unique_phases) >= 2: + steps: list[PlanStep] = [] + for i, phase_type in enumerate(unique_phases): + phase_names = { + "research": "搜索调研", + "analysis": "分析处理", + "generation": "生成输出", + "deployment": "部署执行", + "testing": "测试验证", + } + phase_descs = { + "research": "搜索和收集相关信息", + "analysis": "分析收集到的信息,提取关键洞察", + "generation": "基于分析结果生成最终输出", + "deployment": "执行部署操作", + "testing": "验证执行结果", + } + required_skills = self._infer_required_skills( + phase_descs.get(phase_type, phase_type), available_skills + ) + steps.append(PlanStep( + step_id=f"step-{i}", + name=phase_names.get(phase_type, f"Phase {i}"), + description=phase_descs.get(phase_type, f"Execute {phase_type} phase"), + dependencies=[f"step-{i - 1}"] if i > 0 else [], + parallel_group=i, + required_skills=required_skills, + )) + return steps + + # 回退到单步计划 required_skills = self._infer_required_skills(goal, available_skills) return [ PlanStep( @@ -386,17 +440,31 @@ class GoalPlanner: skills_str = ", ".join(available_skills) if available_skills else "无" prompt = ( - f"Refine the following execution plan for the given goal.\n\n" + "You are a task decomposition expert. Break down the given goal into a structured " + "execution plan with clear, actionable steps.\n\n" f"Goal: {goal}\n\n" - f"Initial Plan (generated by rules):\n{initial_summary}\n\n" - f"Available Skills: {skills_str}\n\n" + f"Initial Plan (generated by rules, may need improvement):\n{initial_summary}\n\n" + f"Available Skills/Tools: {skills_str}\n\n" f"Context: {json.dumps(context, ensure_ascii=False) if context else 'None'}\n\n" - 'Respond ONLY with a JSON array of steps: ' - '[{"name": "...", "description": "...", "dependencies": [], ' - '"required_skills": [...]}]\n' - "The dependencies field lists step indices (0-based) that must complete first.\n" - "Each step should have a clear, specific description (at least 20 characters).\n" - "Do not include any other text." + "## Requirements for each step:\n" + "- name: Short descriptive name (5-10 words)\n" + "- description: Detailed description of what to do (at least 20 characters)\n" + "- dependencies: List of step indices (0-based) that must complete before this step\n" + "- required_tools: List of tool/skill names from Available Skills that this step needs\n\n" + "## Example (goal: 'Analyze competitor products and generate report'):\n" + '[\n' + ' {"name": "Search competitor info", "description": "Search the web for detailed ' + 'information about each competitor product, including features, pricing, and reviews", ' + '"dependencies": [], "required_tools": ["web_search"]},\n' + ' {"name": "Analyze and compare", "description": "Analyze the gathered information, ' + 'identify key differences, strengths and weaknesses of each competitor", ' + '"dependencies": [0], "required_tools": []},\n' + ' {"name": "Generate comparison report", "description": "Compile the analysis into ' + 'a structured comparison report with recommendations", "dependencies": [1], ' + '"required_tools": []}\n' + ']\n\n' + "Respond ONLY with a JSON array of steps in the same format. " + "Do not include any other text or markdown." ) try: @@ -412,13 +480,15 @@ class GoalPlanner: steps: list[PlanStep] = [] for i, defn in enumerate(step_defs): depends_on = [f"step-{j}" for j in defn.get("dependencies", [])] + # Support both required_tools (new) and required_skills (legacy) + required = defn.get("required_tools", defn.get("required_skills", [])) steps.append(PlanStep( step_id=f"step-{i}", name=defn.get("name", f"Step {i}"), description=defn.get("description", ""), dependencies=depends_on, parallel_group=0, # 后续由 _build_parallel_groups 重新计算 - required_skills=defn.get("required_skills", []), + required_skills=required, )) return ExecutionPlan( diff --git a/src/agentkit/core/plan_exec_engine.py b/src/agentkit/core/plan_exec_engine.py index db380b7..c00684c 100644 --- a/src/agentkit/core/plan_exec_engine.py +++ b/src/agentkit/core/plan_exec_engine.py @@ -14,22 +14,22 @@ from __future__ import annotations import asyncio import json import logging -import time from dataclasses import dataclass, field from datetime import datetime, timezone -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Awaitable, Callable from agentkit.core.exceptions import TaskCancelledError, TaskTimeoutError from agentkit.core.goal_planner import GoalPlanner -from agentkit.core.plan_executor import PlanExecutor, PlanExecutionResult, StepExecutionResult +from agentkit.core.plan_executor import PlanExecutor, PlanExecutionResult from agentkit.core.plan_schema import ExecutionPlan, PlanStep, PlanStepStatus from agentkit.core.protocol import CancellationToken, TaskMessage, TaskResult, TaskStatus from agentkit.core.react import ReActEvent, ReActResult, ReActStep +from agentkit.core.shared_workspace import SharedWorkspace from agentkit.orchestrator.reflection import PipelineReflector, PipelineReplanner -from agentkit.orchestrator.pipeline_schema import Pipeline, PipelineResult, ReflectionReport, StageResult, StageStatus +from agentkit.orchestrator.pipeline_schema import Pipeline, PipelineResult, StageResult, StageStatus if TYPE_CHECKING: - from agentkit.core.compressor import CompressionStrategy, ContextCompressor + from agentkit.core.compressor import CompressionStrategy from agentkit.core.trace import TraceRecorder from agentkit.memory.retriever import MemoryRetriever from agentkit.llm.gateway import LLMGateway @@ -71,16 +71,23 @@ class PlanExecEngine: llm_gateway: "LLMGateway | None" = None, max_replans: int = _DEFAULT_MAX_REPLANS, default_timeout: float = 300.0, + workspace: SharedWorkspace | None = None, + step_event_callback: "Callable[[str, dict[str, Any]], Awaitable[None]] | None" = None, ): """ Args: llm_gateway: LLM Gateway,传递给 GoalPlanner / PipelineReplanner max_replans: 最大重规划次数 default_timeout: 默认超时秒数 + workspace: SharedWorkspace 实例,用于步骤间状态传递 + step_event_callback: 步骤事件回调,用于非流式执行时推送进度 """ self._llm_gateway = llm_gateway self._max_replans = max_replans self._default_timeout = default_timeout + self._workspace = workspace + self._step_event_callback = step_event_callback + self._confirmation_handler: Any | None = None # 组合子组件 self._planner = GoalPlanner(llm_gateway=llm_gateway) @@ -106,6 +113,7 @@ class PlanExecEngine: retrieval_config: dict[str, Any] | None = None, cancellation_token: CancellationToken | None = None, timeout_seconds: float | None = None, + confirmation_handler: Any | None = None, ) -> ReActResult: """执行 Plan-and-Execute 流程 @@ -113,6 +121,7 @@ class PlanExecEngine: 2. Executor Phase: 逐步执行 3. Replanner Phase: 失败时重规划 """ + self._confirmation_handler = confirmation_handler effective_timeout = timeout_seconds if timeout_seconds is not None else self._default_timeout try: @@ -174,6 +183,7 @@ class PlanExecEngine: retrieval_config: dict[str, Any] | None = None, cancellation_token: CancellationToken | None = None, timeout_seconds: float | None = None, + confirmation_handler: Any | None = None, ): """执行 Plan-and-Execute 流程,逐步 yield ReActEvent @@ -185,6 +195,7 @@ class PlanExecEngine: - "replanning": 触发重规划 - "final_answer": 最终结果 """ + self._confirmation_handler = confirmation_handler # Memory retrieval if memory_retriever: try: @@ -274,6 +285,16 @@ class PlanExecEngine: plan_result = await executor.execute(current_plan, task_msg) + # Write step results to workspace for cross-execution state sharing + if self._workspace: + for sid, step_result in plan_result.step_results.items(): + if step_result.status == PlanStepStatus.COMPLETED and step_result.result: + await self._workspace.write( + f"plan:{current_plan.plan_id}:step:{sid}:result", + step_result.result, + agent_id=agent_name or "plan_exec", + ) + # 将步骤结果映射到 trajectory 并 yield 事件 for sid, step_result in plan_result.step_results.items(): plan_step = current_plan.get_step(sid) @@ -470,6 +491,17 @@ class PlanExecEngine: available_skills=available_skills, ) + # Emit plan_generated event + if self._step_event_callback: + try: + await self._step_event_callback("plan_generated", { + "plan_id": plan.plan_id, + "goal": plan.goal, + "steps": [s.to_dict() for s in plan.steps], + }) + except Exception as e: + logger.warning(f"Step event callback failed: {e}") + trajectory.append(ReActStep( step=1, action="plan_generated", @@ -591,6 +623,16 @@ class PlanExecEngine: plan_result = await executor.execute(current_plan, task_msg) + # Write step results to workspace for cross-execution state sharing + if self._workspace: + for sid, step_result in plan_result.step_results.items(): + if step_result.status == PlanStepStatus.COMPLETED and step_result.result: + await self._workspace.write( + f"plan:{current_plan.plan_id}:step:{sid}:result", + step_result.result, + agent_id=agent_name or "plan_exec", + ) + # 将步骤结果映射到 trajectory for sid, step_result in plan_result.step_results.items(): plan_step = current_plan.get_step(sid) @@ -603,6 +645,20 @@ class PlanExecEngine: tokens=0, )) + # Emit step event callback + if self._step_event_callback: + event_type = "step_completed" if step_result.status == PlanStepStatus.COMPLETED else "step_failed" + try: + await self._step_event_callback(event_type, { + "step_id": sid, + "step_name": step_name, + "status": step_result.status.value, + "result": step_result.result, + "error": step_result.error, + }) + except Exception as e: + logger.warning(f"Step event callback failed: {e}") + if trace_recorder is not None: trace_recorder.record_step( step=len(trajectory), @@ -640,6 +696,17 @@ class PlanExecEngine: # 保留已完成步骤的结果到新计划 self._merge_completed_results(current_plan, plan_result) + # Emit replanning event + if self._step_event_callback: + try: + await self._step_event_callback("replanning", { + "replan_count": replan_count, + "root_cause": reflection_report.root_cause, + "new_plan_id": current_plan.plan_id, + }) + except Exception as e: + logger.warning(f"Step event callback failed: {e}") + trajectory.append(ReActStep( step=len(trajectory) + 1, action="replanning", @@ -712,18 +779,31 @@ class PlanExecEngine: model: str, system_prompt: str | None, tools: list["Tool"] | None, + step_executor_type: str = "react", ) -> PlanExecutor: """创建 PlanExecutor 实例 - 使用 _LLMStepExecutor 作为 agent_pool,使每个步骤通过 LLM 直接调用执行。 + Args: + step_executor_type: "react" 使用 ReActStepExecutor(默认,支持工具调用), + "llm" 使用 _LLMStepExecutor(纯 LLM 调用,无工具) """ - step_executor = _LLMStepExecutor( - llm_gateway=self._llm_gateway, - messages=messages, - model=model, - system_prompt=system_prompt, - tools=tools, - ) + if step_executor_type == "llm": + step_executor: _LLMStepExecutor | ReActStepExecutor = _LLMStepExecutor( + llm_gateway=self._llm_gateway, + messages=messages, + model=model, + system_prompt=system_prompt, + tools=tools, + ) + else: + step_executor = ReActStepExecutor( + llm_gateway=self._llm_gateway, + messages=messages, + model=model, + system_prompt=system_prompt, + tools=tools, + confirmation_handler=self._confirmation_handler, + ) return PlanExecutor( agent_pool=step_executor, max_retries=1, @@ -845,7 +925,7 @@ class PlanExecEngine: name = plan_step.name if plan_step else sid failed_info.append(f"- {name}: {sr.error if sr else 'unknown error'}") if failed_info: - return f"Plan execution failed.\nFailed steps:\n" + "\n".join(failed_info) + return "Plan execution failed.\nFailed steps:\n" + "\n".join(failed_info) return "Plan execution completed with no output." # 简单聚合:将所有成功步骤结果格式化 @@ -909,6 +989,151 @@ class _LLMStepExecutor: return agent +class ReActStepExecutor: + """ReAct 循环步骤执行器 + + 使用 ReActEngine 执行每个 PlanStep,支持工具调用和多步推理。 + 作为 PlanExecutor 的 agent_pool 替代品。 + """ + + def __init__( + self, + llm_gateway: "LLMGateway | None" = None, + messages: list[dict[str, str]] | None = None, + model: str = "default", + system_prompt: str | None = None, + tools: list["Tool"] | None = None, + max_steps: int = 5, + confirmation_handler: Any | None = None, + ): + self._llm_gateway = llm_gateway + self._messages = messages or [] + self._model = model + self._system_prompt = system_prompt + self._tools = tools or [] + self._max_steps = max_steps + self._confirmation_handler = confirmation_handler + self._agents: dict[str, _ReActStepAgent] = {} + + async def create_agent_from_skill(self, skill_name: str): + """创建 ReAct 步骤 Agent""" + agent = _ReActStepAgent( + name=skill_name, + llm_gateway=self._llm_gateway, + messages=self._messages, + model=self._model, + system_prompt=self._system_prompt, + tools=self._tools, + max_steps=self._max_steps, + confirmation_handler=self._confirmation_handler, + ) + self._agents[skill_name] = agent + return agent + + def get_agent(self, key: str): + """获取已创建的 Agent""" + if key in self._agents: + return self._agents[key] + agent = _ReActStepAgent( + name=key, + llm_gateway=self._llm_gateway, + messages=self._messages, + model=self._model, + system_prompt=self._system_prompt, + tools=self._tools, + max_steps=self._max_steps, + ) + self._agents[key] = agent + return agent + + +class _ReActStepAgent: + """ReAct 循环步骤 Agent + + 将 PlanStep 的描述作为任务交给 ReActEngine 执行, + 支持工具调用和多步 think-act-observe 循环。 + """ + + def __init__( + self, + name: str, + llm_gateway: "LLMGateway | None" = None, + messages: list[dict[str, str]] | None = None, + model: str = "default", + system_prompt: str | None = None, + tools: list["Tool"] | None = None, + max_steps: int = 5, + confirmation_handler: Any | None = None, + ): + self.name = name + self._llm_gateway = llm_gateway + self._messages = messages or [] + self._model = model + self._system_prompt = system_prompt + self._tools = tools or [] + self._max_steps = max_steps + self._confirmation_handler = confirmation_handler + + async def execute(self, task_msg: TaskMessage) -> "TaskResult": + """执行步骤:通过 ReActEngine 循环调用""" + if self._llm_gateway is None: + raise RuntimeError(f"No LLM gateway available for step '{task_msg.task_id}'") + + from agentkit.core.react import ReActEngine + + input_data = task_msg.input_data + step_name = input_data.get("step_name", task_msg.task_id) + step_description = input_data.get("step_description", "") + dep_results = input_data.get("dependency_results", {}) + + # 构建步骤 prompt + prompt_parts = [f"Execute the following task step:\n\nStep: {step_name}\nDescription: {step_description}"] + if dep_results: + prompt_parts.append( + f"\nResults from previous steps:\n{json.dumps(dep_results, ensure_ascii=False, indent=2)}" + ) + prompt_parts.append("\nProvide a clear, structured result for this step.") + + # 构建 ReActEngine + engine = ReActEngine( + llm_gateway=self._llm_gateway, + max_steps=self._max_steps, + ) + + # 构建 messages + step_messages: list[dict[str, str]] = list(self._messages) + step_messages.append({"role": "user", "content": "\n".join(prompt_parts)}) + + # 执行 ReAct 循环 + react_result = await engine.execute( + messages=step_messages, + tools=self._tools if self._tools else None, + model=self._model, + agent_name=self.name, + system_prompt=self._system_prompt, + confirmation_handler=self._confirmation_handler, + ) + + now = datetime.now(timezone.utc) + status = TaskStatus.COMPLETED.value + if react_result.status in ("timeout", "cancelled"): + status = TaskStatus.FAILED.value + + return TaskResult( + task_id=task_msg.task_id, + agent_name=self.name, + status=status, + output_data={ + "content": react_result.output, + "steps": react_result.total_steps, + "tokens": react_result.total_tokens, + }, + error_message=None if react_result.status == "success" else react_result.status, + started_at=now, + completed_at=now, + ) + + class _LLMStepAgent: """LLM 直接调用步骤 Agent diff --git a/src/agentkit/experts/orchestrator.py b/src/agentkit/experts/orchestrator.py index 075bb97..14fe1f4 100644 --- a/src/agentkit/experts/orchestrator.py +++ b/src/agentkit/experts/orchestrator.py @@ -12,8 +12,12 @@ from __future__ import annotations import asyncio import logging +from datetime import datetime, timezone from typing import Any +from agentkit.core.protocol import TaskMessage, TaskResult, TaskStatus +from agentkit.core.shared_workspace import SharedWorkspace + from .expert import Expert from .plan import ( CollaborationPlan, @@ -33,10 +37,18 @@ class TeamOrchestrator: MAX_RETRIES = 1 # Retry once on failure before fallback MAX_INTERACTION_ROUNDS = 20 # Prevent infinite collaboration loops + MAX_REPLANS = 2 # Maximum replanning attempts before fallback - def __init__(self, team: ExpertTeam) -> None: + def __init__( + self, + team: ExpertTeam, + workspace: "SharedWorkspace | None" = None, + max_replans: int = 2, + ) -> None: self._team = team + self._workspace = workspace or team._workspace self._interaction_count = 0 + self._max_replans = max_replans async def execute_plan(self, plan: CollaborationPlan) -> dict[str, Any]: """Execute a CollaborationPlan within the team. @@ -58,11 +70,12 @@ class TeamOrchestrator: } plan.status = PlanStatus.EXECUTING - self._team._status = TeamStatus.EXECUTING + self._team.set_status(TeamStatus.EXECUTING) self._interaction_count = 0 # Reset for each plan execution phase_results: dict[str, dict[str, Any]] = {} retry_counts: dict[str, int] = {} # Per-phase retry tracking + replan_count = 0 try: while True: @@ -144,13 +157,33 @@ class TeamOrchestrator: result = await self._execute_phase(phase, plan, phase_results) if result is None: - # Still failed after retry — fallback to single agent - logger.warning( - f"Phase {phase.id} failed after retry, falling back to single agent" - ) - return await self._fallback_to_single_agent( - plan, phase_results - ) + # Still failed after retry — try replanning before fallback + if replan_count < self._max_replans: + replan_count += 1 + logger.info( + f"Phase {phase.id} failed after retry, " + f"attempting replan ({replan_count}/{self._max_replans})" + ) + await self._broadcast_event( + "replanning", + { + "phase_id": phase.id, + "replan_count": replan_count, + "reason": "phase_failed", + }, + ) + # Reset phase status for replan + plan.update_phase_status(phase.id, PhaseStatus.PENDING) + result = await self._execute_phase(phase, plan, phase_results) + + if result is None: + # Still failed after replan — fallback to single agent + logger.warning( + f"Phase {phase.id} failed after replan, falling back to single agent" + ) + return await self._fallback_to_single_agent( + plan, phase_results + ) phase_results[phase.id] = result @@ -205,11 +238,11 @@ class TeamOrchestrator: # Synthesize final result plan.status = PlanStatus.COMPLETED - self._team._status = TeamStatus.SYNTHESIZING + self._team.set_status(TeamStatus.SYNTHESIZING) final_result = await self._synthesize_results(plan, phase_results) - self._team._status = TeamStatus.COMPLETED + self._team.set_status(TeamStatus.COMPLETED) return { "status": "completed", "result": final_result, @@ -247,18 +280,75 @@ class TeamOrchestrator: ) # Get the assigned expert - expert = self._team._experts.get(phase.assigned_expert) + expert = self._team.get_expert(phase.assigned_expert) if not expert or not expert.is_active: - raise RuntimeError( - f"Expert '{phase.assigned_expert}' not available" + # Fallback to lead expert or first active expert + expert = self._team.lead_expert + if not expert or not expert.is_active: + active = self._team.active_experts + if not active: + raise RuntimeError( + f"Expert '{phase.assigned_expert}' not available and no active fallback" + ) + expert = active[0] + logger.warning( + f"Expert '{phase.assigned_expert}' not available, " + f"falling back to '{expert.config.name}'" ) - # Execute the task via the expert's agent - # In a real implementation, this would call expert.agent.execute(task) - # For now, we simulate by having the expert process the task - result: dict[str, Any] = { - "output": f"Phase '{phase.name}' completed by {phase.assigned_expert}" + # Build TaskMessage for real execution + input_data: dict[str, Any] = { + "phase_name": phase.name, + "phase_description": phase.task_description or phase.name, + "team_id": self._team.team_id, } + # Inject dependency results from previous phases + if phase.depends_on: + dep_results: dict[str, dict[str, Any]] = {} + for dep_id in phase.depends_on: + # Try workspace first, then fall back to in-memory phase_results + if self._workspace: + ws_data = await self._workspace.read( + f"team:{self._team.team_id}:phase:{dep_id}:result" + ) + if ws_data: + dep_results[dep_id] = ws_data.get("value", {}) + continue + if dep_id in phase_results: + dep_results[dep_id] = phase_results[dep_id] + if dep_results: + input_data["dependency_results"] = dep_results + + task_msg = TaskMessage( + task_id=phase.id, + agent_name=expert.config.name, + task_type="team_phase", + priority=0, + input_data=input_data, + callback_url=None, + created_at=datetime.now(timezone.utc), + ) + + # Execute the task via the expert's agent + task_result: TaskResult = await expert.agent.execute(task_msg) + + if task_result.status != TaskStatus.COMPLETED.value: + raise RuntimeError( + f"Agent execution failed: {task_result.error_message or 'unknown error'}" + ) + + result = task_result.output_data or {"content": ""} + + # Write result to workspace for cross-phase state sharing + if self._workspace: + try: + await self._workspace.write( + f"team:{self._team.team_id}:phase:{phase.id}:result", + result, + agent_id=expert.config.name, + ) + except Exception as e: + logger.warning(f"Workspace write failed for phase {phase.id}: {e}") # Check milestone if phase.milestone: @@ -334,60 +424,268 @@ class TeamOrchestrator: self, expert: Expert, phase: PlanPhase ) -> dict[str, Any]: """Run a single competitor for a competitive phase.""" - # Simulate expert execution + # Build TaskMessage for real execution + task_msg = TaskMessage( + task_id=f"{phase.id}_{expert.config.name}", + agent_name=expert.config.name, + task_type="team_competitive", + priority=0, + input_data={ + "phase_name": phase.name, + "phase_description": phase.task_description or phase.name, + "team_id": self._team.team_id, + }, + callback_url=None, + created_at=datetime.now(timezone.utc), + ) + + task_result: TaskResult = await expert.agent.execute(task_msg) + + if task_result.status != TaskStatus.COMPLETED.value: + raise RuntimeError( + f"Competitor {expert.config.name} failed: {task_result.error_message or 'unknown'}" + ) + return { "expert": expert.config.name, - "output": f"Competitive result from {expert.config.name}", + "output": task_result.output_data or {}, + "status": task_result.status, } + def _get_llm_gateway(self) -> Any: + """Get LLM gateway from the lead expert's agent.""" + lead = self._team.lead_expert + if lead and hasattr(lead, "agent") and hasattr(lead.agent, "_llm_gateway"): + return lead.agent._llm_gateway + # Fallback: try first active expert + for expert in self._team.active_experts: + if hasattr(expert, "agent") and hasattr(expert.agent, "_llm_gateway"): + return expert.agent._llm_gateway + return None + + @staticmethod + def _build_result_summaries(results: list[dict[str, Any]], max_len: int = 500) -> list[str]: + """Build text summaries from competitor results for LLM evaluation.""" + summaries = [] + for i, r in enumerate(results): + output = r.get("output", {}) + content = output.get("content", str(output)) if isinstance(output, dict) else str(output) + summaries.append(f"Result {i + 1} (by {r.get('expert', 'unknown')}):\n{content[:max_len]}") + return summaries + + async def _llm_pick_best( + self, task: str, results: list[dict[str, Any]] + ) -> dict[str, Any]: + """Use LLM to evaluate and pick the best result.""" + gateway = self._get_llm_gateway() + if not gateway: + return results[0] + + # Build evaluation prompt + result_summaries = self._build_result_summaries(results) + + prompt = ( + f"Task: {task}\n\n" + f"Below are {len(results)} candidate results. Pick the BEST one based on " + f"completeness, accuracy, and relevance to the task.\n\n" + + "\n---\n".join(result_summaries) + + "\n\nReply with ONLY the number of the best result (e.g., '1' or '2')." + ) + + try: + response = await gateway.chat( + messages=[{"role": "user", "content": prompt}], + model="default", + ) + choice = response.content.strip() + # Parse the number from the response + for ch in choice: + if ch.isdigit(): + idx = int(ch) - 1 + if 0 <= idx < len(results): + return results[idx] + except Exception as e: + logger.warning(f"LLM best-pick failed, falling back to first result: {e}") + + return results[0] + + async def _llm_vote( + self, task: str, results: list[dict[str, Any]] + ) -> dict[str, Any]: + """Use LLM voting to select the best result.""" + gateway = self._get_llm_gateway() + if not gateway: + return results[0] + + scores: dict[int, float] = {} + result_summaries = self._build_result_summaries(results) + + # Each expert votes by ranking results (excluding their own) + for voter_idx, r in enumerate(results): + # Build summaries excluding the voter's own result to avoid self-voting bias + other_indices = [i for i in range(len(results)) if i != voter_idx] + other_summaries = [result_summaries[i] for i in other_indices] + + prompt = ( + f"Task: {task}\n\n" + f"Below are {len(other_summaries)} candidate results. Rank them from best to worst.\n\n" + + "\n---\n".join(other_summaries) + + "\n\nReply with ONLY a comma-separated list of result numbers, best first (e.g., '2,1,3')." + ) + try: + response = await gateway.chat( + messages=[{"role": "user", "content": prompt}], + model="default", + ) + # Parse ranking: map back to original indices + for rank_pos, ch in enumerate(response.content.strip().split(",")): + ch = ch.strip() + if ch.isdigit(): + local_idx = int(ch) - 1 + if 0 <= local_idx < len(other_indices): + original_idx = other_indices[local_idx] + scores[original_idx] = scores.get(original_idx, 0) + ( + len(other_indices) - rank_pos + ) + except Exception as e: + logger.warning(f"Voter {voter_idx} vote failed: {e}") + # On failure, distribute 1 point evenly across other results + for oi in other_indices: + scores[oi] = scores.get(oi, 0) + 1 + + if not scores: + return results[0] + + best_idx = max(scores, key=scores.get) # type: ignore[arg-type] + return results[best_idx] + + async def _llm_fuse( + self, task: str, results: list[dict[str, Any]] + ) -> dict[str, Any]: + """Use LLM to fuse multiple results into one.""" + gateway = self._get_llm_gateway() + if not gateway: + # Fallback: concatenate all results + combined = "\n\n".join( + str(r.get("output", {})) for r in results + ) + return {"content": combined, "fused_from": len(results)} + + result_summaries = self._build_result_summaries(results, max_len=800) + + prompt = ( + f"Task: {task}\n\n" + f"Below are {len(results)} results from different experts working on the same task. " + f"Fuse them into a single comprehensive result that combines the best elements.\n\n" + + "\n---\n".join(result_summaries) + + "\n\nProvide the fused result directly." + ) + + try: + response = await gateway.chat( + messages=[{"role": "user", "content": prompt}], + model="default", + ) + return { + "content": response.content.strip(), + "fused_from": len(results), + "strategy": "fusion", + } + except Exception as e: + logger.warning(f"LLM fusion failed, falling back to concatenation: {e}") + combined = "\n\n".join( + str(r.get("output", {})) for r in results + ) + return {"content": combined, "fused_from": len(results)} + async def _merge_results( self, phase: PlanPhase, results: list[dict[str, Any]] ) -> dict[str, Any]: """Merge competitive parallel results based on merge strategy.""" + if not results: + return {} + strategy = phase.merge_strategy or MergeStrategy.BEST + task_desc = phase.task_description or phase.name if strategy == MergeStrategy.BEST: - # Lead Expert picks the best result - lead = self._team.lead_expert - if lead: - return { - "merged": True, - "strategy": "best", - "selected": results[0], - "all_results": results, - } - return results[0] + selected = await self._llm_pick_best(task_desc, results) + return { + "merged": True, + "strategy": "best", + "selected": selected, + "all_results": results, + } elif strategy == MergeStrategy.VOTE: - # All experts vote — for now, simple majority with Lead Expert tie-breaking + selected = await self._llm_vote(task_desc, results) return { "merged": True, "strategy": "vote", - "selected": results[0], + "selected": selected, "all_results": results, } elif strategy == MergeStrategy.FUSION: - # Lead Expert fuses all results - lead = self._team.lead_expert - if lead: - return { - "merged": True, - "strategy": "fusion", - "fused_from": len(results), - "all_results": results, - } - return results[0] + fused = await self._llm_fuse(task_desc, results) + return { + "merged": True, + "strategy": "fusion", + "fused_from": len(results), + "selected": fused, + "all_results": results, + } return results[0] async def _check_milestone( self, phase: PlanPhase, result: dict[str, Any] ) -> bool: - """Check if a phase result passes its milestone checkpoint.""" - # In a real implementation, this would use LLM evaluation - # For now, always pass if there's a result - return result is not None + """Check if a phase result passes its milestone checkpoint. + + Uses LLM evaluation when available, falls back to basic content check. + """ + milestone = phase.milestone + if not milestone: + return True + + # Basic check: result must have non-empty content + output = result.get("output", result) if isinstance(result, dict) else result + content = output.get("content", str(output)) if isinstance(output, dict) else str(output) + if not content or content.strip() == "": + return False + + # LLM-based milestone evaluation + gateway = self._get_llm_gateway() + if not gateway: + # Without LLM, do basic keyword matching + milestone_lower = milestone.lower() + content_lower = content.lower() + # Check if milestone keywords appear in content + keywords = [w for w in milestone_lower.split() if len(w) > 2] + if keywords and not any(kw in content_lower for kw in keywords): + return False + return True + + prompt = ( + f"Task: {phase.task_description or phase.name}\n" + f"Milestone requirement: {milestone}\n" + f"Result:\n{content[:500]}\n\n" + f"Does this result meet the milestone requirement? " + f"Reply with ONLY 'yes' or 'no'." + ) + + try: + response = await gateway.chat( + messages=[{"role": "user", "content": prompt}], + model="default", + ) + answer = response.content.strip().lower() + return answer.startswith("yes") + except Exception as e: + logger.warning(f"Milestone LLM check failed for phase {phase.id}: {e}") + # On LLM failure, pass the milestone (conservative — don't block on infra issues) + return True async def _synthesize_results( self, plan: CollaborationPlan, phase_results: dict[str, dict[str, Any]] @@ -431,10 +729,23 @@ class TeamOrchestrator: fallback_result = None if expert: try: - # Execute the original task with a single expert - fallback_result = { - "output": f"Task completed by {expert.config.name} (fallback mode)", - "task": plan.task, + # Execute the original task with a single expert via real agent + task_msg = TaskMessage( + task_id=f"fallback_{plan.id}", + agent_name=expert.config.name, + task_type="fallback", + priority=0, + input_data={ + "task": plan.task, + "phase_results": phase_results, + "team_id": self._team.team_id, + }, + callback_url=None, + created_at=datetime.now(timezone.utc), + ) + task_result: TaskResult = await expert.agent.execute(task_msg) + fallback_result = task_result.output_data or { + "content": f"Task completed by {expert.config.name} (fallback mode)" } except Exception as e: logger.error(f"Fallback agent execution failed: {e}") @@ -452,7 +763,7 @@ class TeamOrchestrator: self, event_type: str, data: dict[str, Any] ) -> None: """Broadcast an orchestration event to the team channel.""" - if self._team._handoff_transport: - await self._team._handoff_transport.send( - self._team._team_channel, {"type": event_type, **data} + if self._team.handoff_transport: + await self._team.handoff_transport.send( + self._team.team_channel, {"type": event_type, **data} ) diff --git a/src/agentkit/experts/router.py b/src/agentkit/experts/router.py index c8f116d..8dc3bb4 100644 --- a/src/agentkit/experts/router.py +++ b/src/agentkit/experts/router.py @@ -3,9 +3,8 @@ import logging import re from dataclasses import dataclass, field -from typing import Any -from .config import ExpertConfig, ExpertTemplate +from .config import ExpertConfig from .registry import ExpertTemplateRegistry logger = logging.getLogger(__name__) @@ -67,7 +66,9 @@ class ExpertTeamRouter: result.matched = True result.team_mode = True - result.task_content = task if task else content # Fall back to full content when no task after prefix + result.task_content = ( + task if task else content + ) # Fall back to full content when no task after prefix result.match_method = "explicit_team" if expert_list_str: @@ -84,7 +85,9 @@ class ExpertTeamRouter: for name in result.specified_experts: template = self._registry.get(name) if template is None: - logger.warning(f"ExpertTemplate '{name}' not found, will be dynamically generated") + logger.warning( + f"ExpertTemplate '{name}' not found, will be dynamically generated" + ) else: # No specific experts — auto-compose result.auto_compose = True @@ -108,6 +111,28 @@ class ExpertTeamRouter: result.complexity = complexity return result + def can_handle(self, content: str) -> bool: + """Check whether any registered expert template can handle the given content. + + Used by CostAwareRouter to decide whether to upgrade REACT → TEAM_COLLAB. + Returns True if at least one template's name or description overlaps with + content tokens, or if any templates exist (auto-compose can always form a team). + """ + if not self._registry or not self._registry._templates: + return False + content_lower = content.lower() + for template in self._registry._templates.values(): + if template.name.lower() in content_lower: + return True + if template.description and any( + word in content_lower + for word in template.description.lower().split() + if len(word) > 2 + ): + return True + # Auto-compose can form a team from any available templates + return bool(self._registry._templates) + def resolve_expert_configs(self, specified_experts: list[str]) -> list[ExpertConfig]: """Resolve expert names to ExpertConfig instances. diff --git a/src/agentkit/experts/team.py b/src/agentkit/experts/team.py index 93319cf..cc1295b 100644 --- a/src/agentkit/experts/team.py +++ b/src/agentkit/experts/team.py @@ -78,6 +78,29 @@ class ExpertTeam: def active_experts(self) -> list[Expert]: return [e for e in self._experts.values() if e.is_active] + @property + def workspace(self) -> SharedWorkspace: + """Public read access to the team's shared workspace.""" + return self._workspace + + @property + def handoff_transport(self): + """Public read access to the team's handoff transport.""" + return self._handoff_transport + + @property + def team_channel(self) -> str: + """Public read access to the team's communication channel.""" + return self._team_channel + + def get_expert(self, name: str) -> Expert | None: + """Get an expert by name. Returns None if not found.""" + return self._experts.get(name) + + def set_status(self, status: TeamStatus) -> None: + """Update the team's status.""" + self._status = status + async def create_team( self, lead_config: ExpertConfig, diff --git a/src/agentkit/server/routes/chat.py b/src/agentkit/server/routes/chat.py index 4b6c38b..c4a0564 100644 --- a/src/agentkit/server/routes/chat.py +++ b/src/agentkit/server/routes/chat.py @@ -99,6 +99,8 @@ chat_manager = ChatConnectionManager() _VALID_TEAM_EVENT_TYPES = frozenset({ "team_formed", "expert_step", "expert_result", "plan_update", "team_synthesis", "team_dissolved", + "plan_step", "phase_started", "phase_completed", "phase_failed", + "replanning", }) diff --git a/tests/unit/core/test_autonomous_research_task.py b/tests/unit/core/test_autonomous_research_task.py new file mode 100644 index 0000000..8e3ce64 --- /dev/null +++ b/tests/unit/core/test_autonomous_research_task.py @@ -0,0 +1,250 @@ +"""U7: 多步研究任务端到端验证 + +用 mock LLM Gateway 验证 PlanExecEngine + SharedWorkspace + ReActStepExecutor 的完整闭环。 +场景:"分析竞品并生成报告" → 搜索→分析→生成 +""" + +from __future__ import annotations + +import json +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from agentkit.core.plan_exec_engine import PlanExecEngine +from agentkit.core.shared_workspace import SharedWorkspace +from agentkit.llm.protocol import LLMResponse, TokenUsage + + +def _make_mock_gateway(responses: list[str]) -> MagicMock: + """Create a mock LLM Gateway that returns preset responses in order. + + Each response is wrapped in an LLMResponse dataclass to match the real + LLMGateway.chat() return format. + """ + gateway = MagicMock() + call_count = 0 + + async def _chat(messages, model="default", **kwargs): + nonlocal call_count + if call_count < len(responses): + resp = responses[call_count] + call_count += 1 + return LLMResponse( + content=resp, + model=model, + usage=TokenUsage(prompt_tokens=10, completion_tokens=len(resp) // 4), + ) + return LLMResponse( + content='{"error": "no more responses"}', + model=model, + usage=TokenUsage(prompt_tokens=0, completion_tokens=0), + ) + + gateway.chat = AsyncMock(side_effect=_chat) + return gateway + + +class TestMultiStepResearchTask: + """多步研究任务端到端验证""" + + @pytest.mark.asyncio + async def test_plan_exec_research_task_with_workspace(self): + """验证 PlanExecEngine 执行多步研究任务,SharedWorkspace 传递状态""" + workspace = SharedWorkspace() + + # Mock LLM responses: + # 1. GoalPlanner: decompose "分析竞品并生成报告" into 3 steps + # 2-4. Step executor responses for each step + planner_response = json.dumps({ + "goal": "分析竞品并生成报告", + "steps": [ + { + "step_id": "search", + "name": "搜索竞品信息", + "description": "搜索飞书和钉钉的竞品信息", + "dependencies": [], + "required_tools": ["web_search"], + }, + { + "step_id": "analyze", + "name": "分析竞品对比", + "description": "基于搜索结果分析飞书和钉钉的优缺点", + "dependencies": ["search"], + "required_tools": [], + }, + { + "step_id": "report", + "name": "生成报告", + "description": "基于分析结果生成竞品对比报告", + "dependencies": ["analyze"], + "required_tools": [], + }, + ], + }) + + step_responses = [ + planner_response, + "搜索完成:飞书是字节跳动的企业协作平台,钉钉是阿里巴巴的企业通讯工具", + "分析完成:飞书优势在于文档协作,钉钉优势在于即时通讯和考勤管理", + "报告:飞书与钉钉竞品对比报告\n1. 文档协作:飞书领先\n2. 即时通讯:钉钉领先\n3. 考勤管理:钉钉领先", + ] + + gateway = _make_mock_gateway(step_responses) + + engine = PlanExecEngine( + llm_gateway=gateway, + workspace=workspace, + max_replans=0, + ) + + result = await engine.execute( + messages=[{"role": "user", "content": "分析竞品并生成报告"}], + tools=[], + model="default", + agent_name="research_agent", + ) + + # Verify execution completed + assert result.status in ("success", "partial") + assert result.output is not None + assert len(result.output) > 0 + + # Verify workspace has step results + keys = await workspace.list_keys() + plan_keys = [k for k in keys if k.startswith("plan:")] + assert len(plan_keys) > 0, f"Expected workspace keys for plan steps, got: {keys}" + + @pytest.mark.asyncio + async def test_workspace_dependency_chain(self): + """验证 SharedWorkspace 在步骤间正确传递依赖结果""" + workspace = SharedWorkspace() + + # Write a dependency result + await workspace.write( + "plan:test_plan:step:search:result", + {"content": "搜索结果:飞书和钉钉的信息"}, + agent_id="search_agent", + ) + + # Read it back + data = await workspace.read("plan:test_plan:step:search:result") + assert data is not None + assert data["value"]["content"] == "搜索结果:飞书和钉钉的信息" + assert data["version"] == 1 + + # Write a second version + version = await workspace.write( + "plan:test_plan:step:search:result", + {"content": "更新后的搜索结果"}, + agent_id="search_agent", + ) + assert version == 2 + + # Verify version incremented + data = await workspace.read("plan:test_plan:step:search:result") + assert data["version"] == 2 + + @pytest.mark.asyncio + async def test_step_event_callback(self): + """验证 step_event_callback 在步骤完成时被调用""" + events: list[tuple[str, dict]] = [] + + async def on_step_event(event_type: str, data: dict): + events.append((event_type, data)) + + planner_response = json.dumps({ + "goal": "简单任务", + "steps": [ + { + "step_id": "step1", + "name": "执行步骤1", + "description": "执行一个简单步骤", + "dependencies": [], + "required_tools": [], + }, + ], + }) + + gateway = _make_mock_gateway([planner_response, "步骤1完成"]) + + engine = PlanExecEngine( + llm_gateway=gateway, + step_event_callback=on_step_event, + max_replans=0, + ) + + await engine.execute( + messages=[{"role": "user", "content": "简单任务"}], + tools=[], + ) + + # Verify callback was called + event_types = [e[0] for e in events] + assert "plan_generated" in event_types, f"Expected plan_generated event, got: {event_types}" + + # Verify step event contains expected data + step_events = [e for e in events if e[0] in ("step_completed", "step_failed")] + assert len(step_events) > 0, f"Expected step events, got: {events}" + + @pytest.mark.asyncio + async def test_no_workspace_fallback(self): + """验证 workspace=None 时不影响执行""" + planner_response = json.dumps({ + "goal": "无workspace任务", + "steps": [ + { + "step_id": "step1", + "name": "步骤1", + "description": "执行步骤", + "dependencies": [], + "required_tools": [], + }, + ], + }) + + gateway = _make_mock_gateway([planner_response, "完成"]) + + engine = PlanExecEngine( + llm_gateway=gateway, + workspace=None, + max_replans=0, + ) + + result = await engine.execute( + messages=[{"role": "user", "content": "无workspace任务"}], + tools=[], + ) + + assert result.status in ("success", "partial") + + @pytest.mark.asyncio + async def test_no_callback_fallback(self): + """验证 callback=None 时不影响执行""" + planner_response = json.dumps({ + "goal": "无callback任务", + "steps": [ + { + "step_id": "step1", + "name": "步骤1", + "description": "执行步骤", + "dependencies": [], + "required_tools": [], + }, + ], + }) + + gateway = _make_mock_gateway([planner_response, "完成"]) + + engine = PlanExecEngine( + llm_gateway=gateway, + step_event_callback=None, + max_replans=0, + ) + + result = await engine.execute( + messages=[{"role": "user", "content": "无callback任务"}], + tools=[], + ) + + assert result.status in ("success", "partial") diff --git a/tests/unit/experts/test_team_orchestrator.py b/tests/unit/experts/test_team_orchestrator.py index 24931ce..09c69f1 100644 --- a/tests/unit/experts/test_team_orchestrator.py +++ b/tests/unit/experts/test_team_orchestrator.py @@ -7,6 +7,7 @@ from unittest.mock import AsyncMock, MagicMock, patch import pytest from agentkit.core.handoff_transport import InProcessHandoffTransport +from agentkit.core.protocol import TaskResult, TaskStatus from agentkit.experts.config import ExpertConfig from agentkit.experts.expert import Expert from agentkit.experts.orchestrator import TeamOrchestrator @@ -59,6 +60,18 @@ def _make_mock_expert( "bound_skills": config.bound_skills, "is_lead": is_lead, } + # Mock agent.execute() to return a successful TaskResult + mock_agent = MagicMock() + mock_agent.execute = AsyncMock(return_value=TaskResult( + task_id="test", + agent_name=name, + status=TaskStatus.COMPLETED.value, + output_data={"content": f"Result from {name}"}, + error_message=None, + started_at=None, + completed_at=None, + )) + expert.agent = mock_agent return expert @@ -189,7 +202,7 @@ class TestSerialPhaseExecution: await orchestrator.execute_plan(plan) assert plan.status == PlanStatus.COMPLETED - assert team._status == TeamStatus.COMPLETED + assert team.status == TeamStatus.COMPLETED # ── 子任务并行阶段执行测试 ──────────────────────────────── @@ -227,7 +240,7 @@ class TestSubtaskParallelExecution: call_count += 1 if phase.id == "phase_1": raise RuntimeError("Simulated failure") - return await original_execute_phase(phase, p, pr) + return await original_execute(phase, p, pr) with patch.object( orchestrator, "_execute_phase", side_effect=mock_execute_phase @@ -421,6 +434,55 @@ class TestRetryAndFallback: assert result["status"] == "fallback" assert plan.status == PlanStatus.FALLBACK + @pytest.mark.asyncio + async def test_replan_before_fallback_on_failure(self): + """重试失败后尝试 replan,replan 成功则不 fallback""" + team = _make_team_with_experts() + orchestrator = TeamOrchestrator(team, max_replans=1) + plan = _make_serial_plan(num_phases=1) + + call_count = 0 + + async def mock_execute_phase(phase, p, pr): + nonlocal call_count + call_count += 1 + if call_count <= 2: + # First call + retry both fail + p.update_phase_status(phase.id, PhaseStatus.FAILED) + return None + # Replan attempt succeeds + p.update_phase_status(phase.id, PhaseStatus.COMPLETED, {"output": "replan ok"}) + return {"output": "replan ok"} + + with patch.object( + orchestrator, "_execute_phase", side_effect=mock_execute_phase + ): + result = await orchestrator.execute_plan(plan) + + # After retry fails → replan → succeeds, should complete + assert call_count == 3 # 1 initial + 1 retry + 1 replan + assert result["status"] == "completed" + + @pytest.mark.asyncio + async def test_replan_exhausted_then_fallback(self): + """replan 次数用尽后 fallback""" + team = _make_team_with_experts() + orchestrator = TeamOrchestrator(team, max_replans=2) + plan = _make_serial_plan(num_phases=1) + + async def mock_execute_phase(phase, p, pr): + # Always fail + p.update_phase_status(phase.id, PhaseStatus.FAILED) + return None + + with patch.object( + orchestrator, "_execute_phase", side_effect=mock_execute_phase + ): + result = await orchestrator.execute_plan(plan) + + # Exhausted retries + replans → fallback + assert result["status"] == "fallback" + # ── 最大交互轮次测试 ────────────────────────────────────── @@ -438,7 +500,7 @@ class TestMaxInteractionRounds: # Create a plan with many phases that would take many rounds plan = _make_serial_plan(num_phases=5) - result = await orchestrator.execute_plan(plan) + await orchestrator.execute_plan(plan) # Should stop after 1 round, not completing all phases # Only the first phase should complete (1 interaction round) @@ -629,8 +691,8 @@ class TestExpertUnavailable: """Expert 不可用测试""" @pytest.mark.asyncio - async def test_inactive_expert_causes_phase_failure(self): - """分配的 Expert 不活跃导致阶段失败""" + async def test_inactive_expert_falls_back_to_active(self): + """分配的 Expert 不活跃时自动降级到其他可用 Expert""" team = _make_team_with_experts() # Mark the lead expert as inactive team._experts["lead"].is_active = False @@ -639,12 +701,12 @@ class TestExpertUnavailable: result = await orchestrator.execute_plan(plan) - # Phase should fail because expert is not active → retry → still fail → fallback - assert result["status"] == "fallback" + # Phase should complete via fallback expert (member1) + assert result["status"] == "completed" @pytest.mark.asyncio - async def test_nonexistent_expert_causes_phase_failure(self): - """分配的 Expert 不存在导致阶段失败""" + async def test_nonexistent_expert_falls_back_to_lead(self): + """分配的 Expert 不存在时自动降级到 lead expert""" team = _make_team_with_experts() orchestrator = TeamOrchestrator(team) @@ -664,5 +726,20 @@ class TestExpertUnavailable: result = await orchestrator.execute_plan(plan) - # Expert doesn't exist → phase fails → retry → still fails → fallback + # Phase should complete via fallback to lead expert + assert result["status"] == "completed" + + @pytest.mark.asyncio + async def test_all_experts_unavailable_causes_failure(self): + """所有 Expert 都不可用时阶段失败""" + team = _make_team_with_experts() + # Mark all experts as inactive + for expert in team._experts.values(): + expert.is_active = False + orchestrator = TeamOrchestrator(team) + plan = _make_serial_plan(num_phases=1) + + result = await orchestrator.execute_plan(plan) + + # No expert available → phase fails → fallback assert result["status"] == "fallback"