diff --git a/docs/plans/2026-06-07-012-feat-agentkit-phase6-toolkit-plan.md b/docs/plans/2026-06-07-012-feat-agentkit-phase6-toolkit-plan.md new file mode 100644 index 0000000..e3b201a --- /dev/null +++ b/docs/plans/2026-06-07-012-feat-agentkit-phase6-toolkit-plan.md @@ -0,0 +1,617 @@ +--- +title: "feat: AgentKit Phase 6 — 工具生态与生产化" +status: active +created: 2026-06-07 +plan_type: feat +depth: deep +origin: Phase 5 完成后行业对标评估 + GEO 系统本期需求 +branch: feat/agentkit-phase6-toolkit +--- + +# AgentKit Phase 6 — 工具生态与生产化 + +## Summary + +基于 Phase 5 智能化升级(L4 级),Phase 6 聚焦三大目标:**补齐 MCP stdio 传输层并集成开源工具生态**(L3→L5)、**生产化 GEO Pipeline**(L3→L4)、**基础可观测性**(L0→L3)。以"GEO Skill 端到端可执行、Pipeline 可靠运行、优化效果可度量"为验收底线,同时确保架构设计支持未来非 GEO 场景扩展。 + +## Problem Frame + +Phase 5 完成后,AgentKit 在智能化方向(记忆、进化、RAG)达到行业前列,但在工程化方向存在三个关键缺口: + +### 三大能力缺口 + +1. **工具生态极度匮乏(L3 级)** + - 仅 1 个内置工具(`retrieve_knowledge`),7 个 GEO Skill 是空壳 Prompt + - MCP 仅支持 HTTP/SSE 传输,无法对接 12000+ stdio MCP Server 生态 + - 无搜索、爬取、浏览器、Schema 等基础能力,GEO 业务无法端到端闭环 + - ConfigDrivenAgent 的 MCP 配置仅支持 `dict[str, str]`(name→URL),无法配置 stdio 传输 + +2. **Pipeline 不可靠(L3 级)** + - Pipeline 执行状态无持久化,服务重启后丢失 + - Dispatcher 轮询结果(1 秒间隔),非事件驱动 + - 步骤失败即中断,无重试/补偿机制 + - GEO 核心业务流程(检测→分析→优化→追踪)无法保证可靠执行 + +3. **不可观测(L0 级)** + - 无分布式追踪,无法定位跨 Agent 调用链瓶颈 + - 无业务指标(引用检测准确率、优化效果对比) + - 无法向客户证明 GEO 产品的价值 + +### 成熟度目标 + +| 模块 | Phase 5 后 | Phase 6 目标 | +|------|-----------|-------------| +| MCP/工具生态 | 40% | 85% | +| Pipeline 可靠性 | 60% | 85% | +| 可观测性 | 0% | 60% | +| 整体 | L4 | L4+ | + +## Scope Boundaries + +**In Scope:** +- MCP stdio 传输层实现 +- MCP Server YAML 声明式配置体系 +- 集成开源 MCP Server(百度搜索、Playwright、one-search) +- 内置 Python 工具封装(Crawl4AI、extruct、pydantic-schemaorg) +- Pipeline 执行状态持久化(Redis 热状态 + PG 冷持久化) +- Pipeline 步骤级重试 + 补偿机制 +- OpenTelemetry 基础 trace + metric 集成 +- GEO Skill 端到端工具绑定验证 + +**Out of Scope:** +- MCP Server 运行时动态注册(后续扩展) +- MCP resources/prompts 能力暴露 +- 完整分布式追踪上下文传播(需改 Agent 间协议) +- K8s 部署清单 +- 前端 Dashboard UI +- 性能压测 + +**Deferred to Follow-Up Work:** +- Agent 间协商/辩论/投票协议 +- MCP Server 健康检查与自动重启 +- 评估自动化流水线(定时评估、CI/CD 集成) +- 多模态支持(图片/文件输入) + +--- + +## Key Technical Decisions + +### KTD-1: MCP stdio 传输采用子进程管理模式 + +**决策**: StdioTransport 通过 `asyncio.create_subprocess_exec` 启动 MCP Server 子进程,通过 stdin/stdout 进行 JSON-RPC 消息收发。 + +**理由**: MCP 协议规范明确 stdio 为本地性、高性能、安全性好的传输方式。所有主流 MCP Server(baidu-search-mcp、@playwright/mcp 等)均支持 stdio 模式。子进程模式无需额外网络端口,资源隔离性好。 + +**替代方案**: 使用官方 `mcp` Python SDK 的 `stdio_client()` 上下文管理器。但该 SDK 引入重依赖(`httpx-sse`、`pydantic` 版本冲突风险),且 AgentKit 已有完整的 Transport 抽象层,自建 StdioTransport 更轻量可控。 + +### KTD-2: MCP Server 配置采用 YAML 声明式静态加载 + +**决策**: 在 `agentkit.yaml` 中新增 `mcp` 配置节,声明式定义 MCP Server,应用启动时加载。 + +**理由**: GEO 场景的工具集固定(搜索+爬取+浏览器+Schema),无需运行时动态变更。YAML 声明式配置简单可靠,与现有 `skills`、`llm` 配置风格一致。后续可扩展动态注册 API。 + +### KTD-3: Pipeline 状态采用 Redis 热状态 + PostgreSQL 冷持久化双写 + +**决策**: Pipeline 执行中的实时状态存 Redis(Hash + Sorted Set),完成后异步写入 PostgreSQL(JSONB)做持久化。 + +**理由**: Redis 提供亚毫秒级状态读写,适合运行中 Pipeline 的并发控制和实时监控。PostgreSQL 提供持久化、复杂查询和审计能力。两者互补,参考 Temporal 的 Event Sourcing 思想但简化实现。 + +### KTD-4: OpenTelemetry 集成采用基础 trace + metric 模式 + +**决策**: 为 Agent 执行、Tool 调用、LLM 调用、Pipeline 步骤创建 OTel span,记录耗时/状态/Token 用量。不实现跨 Agent 的 trace context 传播。 + +**理由**: 基础 trace + metric 已能满足 GEO 场景的监控需求(延迟分布、成功率、Token 消耗趋势)。完整分布式追踪需改 Agent 间调用协议(HandoffMessage 需携带 traceparent),侵入性高,留作后续。 + +### KTD-5: 工具集成采用 MCP Server + Python 库双轨模式 + +**决策**: 搜索和浏览器能力通过 MCP Server(子进程 stdio)集成;爬取和 Schema 能力通过 Python 库直接封装为 Tool。 + +**理由**: MCP Server 模式适合独立进程、有 npm 安装生态的工具(baidu-search-mcp、@playwright/mcp);Python 库模式适合轻量级、无独立进程需求的工具(Crawl4AI、extruct、pydantic-schemaorg)。双轨模式各取所长。 + +--- + +## High-Level Technical Design + +### MCP stdio 传输与工具集成架构 + +``` +agentkit.yaml +└── mcp: + └── servers: + ├── baidu-search: { transport: stdio, command: npx, args: [baidu-search-mcp] } + ├── playwright: { transport: stdio, command: npx, args: [@playwright/mcp] } + └── one-search: { transport: stdio, command: npx, args: [one-search-mcp] } + +AgentKit Server 启动 +├── 1. 加载 mcp 配置 +├── 2. MCPManager 初始化 +│ ├── 为每个 stdio server 创建 StdioTransport → 启动子进程 +│ ├── 为每个 http/sse server 创建 HTTPTransport/SSETransport +│ ├── 执行 initialize 握手 +│ └── 调用 tools/list 发现工具 → 注册到 ToolRegistry +├── 3. 内置 Python 工具注册 +│ ├── WebCrawlTool (Crawl4AI) +│ ├── SchemaExtractTool (extruct) +│ └── SchemaGenerateTool (pydantic-schemaorg) +└── 4. Skill 绑定工具 + ├── citation_detector → baidu_search + web_crawl + ├── competitor_analyzer → baidu_search + web_crawl + playwright + ├── geo_optimizer → schema_generate + └── monitor → baidu_search + hotnews +``` + +### Pipeline 状态持久化架构 + +``` +Pipeline 执行流程 +├── 1. 创建执行 → Redis Hash (pipeline:{id}) + Sorted Set (pipeline:index) +├── 2. 步骤开始 → 更新 Redis status=running, current_step +├── 3. 步骤完成 → 更新 Redis completed_steps, step_results +├── 4. 步骤失败 → 更新 Redis status=failed → 触发重试或补偿 +├── 5. 执行完成 → 异步写入 PostgreSQL pipeline_executions + pipeline_step_history +└── 6. Redis TTL 7 天自动清理 + +状态查询 +├── 实时状态(运行中)→ Redis +├── 历史查询/统计 → PostgreSQL +└── Redis miss → fallback PostgreSQL +``` + +### OpenTelemetry Span 层级 + +``` +[Root Span] POST /api/v1/tasks (2.3s) +├── [Span] agent.execute (2.2s) +│ ├── attributes: agent.name, agent.type +│ ├── [Span] gen_ai.chat qwen-max (1.8s) +│ │ ├── attributes: gen_ai.system, gen_ai.request.model, gen_ai.usage.input_tokens, gen_ai.usage.output_tokens +│ ├── [Span] tool.call baidu_search (0.12s) +│ │ ├── attributes: tool.name, tool.duration_ms +│ └── [Span] pipeline.step geo_optimizer (0.28s) +│ ├── attributes: pipeline.name, step.name, step.status +``` + +--- + +## Implementation Units + +### Phase A (P0) — MCP stdio 传输与工具生态 + +--- + +#### U1: StdioTransport 传输层 + +**Goal:** 实现 MCP stdio 传输层,通过子进程 stdin/stdout 进行 JSON-RPC 通信,为对接开源 MCP Server 生态奠定基础。 + +**Dependencies:** 无 + +**Files:** +- `src/agentkit/mcp/transport.py` — 新增 StdioTransport 类 +- `tests/unit/test_stdio_transport.py` — 传输层测试 + +**Approach:** + +1. 新增 `StdioTransport(Transport)` 类,核心状态: + - `_process: asyncio.subprocess.Process` — 子进程实例 + - `_request_id: int` — 自增请求 ID + - `_pending: dict[int, asyncio.Future]` — 等待中的请求 + - `_reader_task: asyncio.Task` — stdout 读取协程 + - `_connected: bool` — 连接标志 + +2. `connect()` — 通过 `asyncio.create_subprocess_exec(command, *args, env=env, stdin=PIPE, stdout=PIPE, stderr=PIPE)` 启动子进程,启动 `_read_stdout()` 协程,发送 `initialize` 请求完成握手 + +3. `disconnect()` — 发送 `notifications/cancelled`,关闭 stdin,等待子进程退出(超时后 kill),取消 reader task + +4. `send_request()` — 构造 JSON-RPC 消息,写入 stdin(`process.stdin.write(json_line + b"\n")`),创建 Future 放入 `_pending`,await Future + +5. `_read_stdout()` — 持续从 stdout 逐行读取 JSON-RPC 响应/通知,根据 `id` 匹配 `_pending` 中的 Future 并 set_result;无 `id` 的为通知,放入通知队列 + +6. 消息帧格式:每行一个 JSON 对象,UTF-8 编码,换行符分隔(遵循 MCP stdio 规范) + +7. stderr 日志转发到 Python logger + +**Patterns to follow:** 现有 `HTTPTransport` / `SSETransport` 的抽象方法实现模式 + +**Test scenarios:** +- 启动子进程并完成 initialize 握手 +- 发送 tools/list 请求并接收响应 +- 发送 tools/call 请求并接收响应 +- 子进程异常退出时检测并抛出 TransportError +- disconnect 时正确关闭子进程 +- 并发请求的 ID 匹配正确性 +- 子进程 stderr 输出转发到 logger +- 连接超时处理 + +**Verification:** StdioTransport 能与真实 MCP Server(如 baidu-search-mcp)完成完整的 initialize → tools/list → tools/call 流程 + +--- + +#### U2: MCP Server 配置体系 + +**Goal:** 在 agentkit.yaml 中新增 `mcp` 配置节,支持声明式定义 MCP Server(stdio/http/sse),应用启动时自动加载并注册工具。 + +**Dependencies:** U1 + +**Files:** +- `src/agentkit/server/config.py` — 新增 MCPServerConfig 数据模型和解析逻辑 +- `src/agentkit/mcp/manager.py` — 新增 MCPManager 类 +- `src/agentkit/server/app.py` — 集成 MCPManager 到应用启动流程 +- `tests/unit/test_mcp_config.py` — 配置解析测试 +- `tests/unit/test_mcp_manager.py` — Manager 生命周期测试 + +**Approach:** + +1. 新增 `MCPServerConfig` 数据模型: + ```python + @dataclass + class MCPServerConfig: + transport: str # "stdio" | "streamable_http" | "sse" + command: str | None = None # stdio 专用 + args: list[str] | None = None # stdio 专用 + env: dict[str, str] | None = None # stdio 专用 + url: str | None = None # http/sse 专用 + headers: dict[str, str] | None = None # http/sse 专用 + timeout: float = 30.0 + ``` + +2. YAML 配置格式: + ```yaml + mcp: + servers: + baidu-search: + transport: stdio + command: npx + args: ["-y", "baidu-search-mcp", "--max-result=5"] + playwright: + transport: stdio + command: npx + args: ["-y", "@playwright/mcp@latest"] + remote-rag: + transport: streamable_http + url: "http://localhost:8002/mcp" + ``` + +3. 新增 `MCPManager` 类: + - `__init__(configs: dict[str, MCPServerConfig])` — 接收配置 + - `async start_all()` — 为每个配置创建 Transport,连接,发现工具,注册到 ToolRegistry + - `async stop_all()` — 断开所有 Transport + - `get_tool(server_name, tool_name)` — 获取特定工具 + - `list_all_tools()` — 列出所有已注册工具 + - 健康检查:定期 ping 各 server,标记不可用 + +4. 集成到 `create_app()`:在 lifespan 中调用 `MCPManager.start_all()`,shutdown 时调用 `stop_all()` + +5. ConfigDrivenAgent 的 `_register_mcp_tools()` 改为从 MCPManager 获取已注册工具,而非自行创建 MCPClient + +**Patterns to follow:** 现有 `LLMGateway` 的 Provider 注册模式、`SkillRegistry` 的加载模式 + +**Test scenarios:** +- 解析 stdio 类型 MCP Server 配置 +- 解析 streamable_http 类型 MCP Server 配置 +- 解析 sse 类型 MCP Server 配置 +- 缺少必需字段时抛出验证错误 +- MCPManager 启动时为每个 server 创建 Transport +- MCPManager 停止时断开所有 Transport +- 工具发现并注册到 ToolRegistry +- 配置中环境变量 `${VAR:-default}` 解析 +- server 启动失败时不影响其他 server + +**Verification:** 在 agentkit.yaml 中配置 baidu-search-mcp,启动应用后能通过 API 调用百度搜索工具 + +--- + +#### U3: 内置 Python 工具封装 + +**Goal:** 将 Crawl4AI、extruct、pydantic-schemaorg 封装为 AgentKit Tool,提供网页抓取、Schema 提取和 Schema 生成能力。 + +**Dependencies:** 无(独立于 MCP,纯 Python 封装) + +**Files:** +- `src/agentkit/tools/web_crawl.py` — WebCrawlTool(Crawl4AI 封装) +- `src/agentkit/tools/schema_tools.py` — SchemaExtractTool + SchemaGenerateTool +- `tests/unit/test_web_crawl_tool.py` — 爬取工具测试 +- `tests/unit/test_schema_tools.py` — Schema 工具测试 + +**Approach:** + +1. **WebCrawlTool** — 封装 Crawl4AI: + - `execute(url, format="markdown", css_selector=None, js_wait=None)` → `{"content": ..., "status_code": ..., "links": [...]}` + - 内部使用 `AsyncWebCrawler`,支持 Markdown/HTML 输出 + - CSS 选择器提取结构化数据 + - 优雅降级:Crawl4AI 未安装时返回安装提示 + +2. **SchemaExtractTool** — 封装 extruct: + - `execute(url_or_html, formats=["json-ld", "microdata"])` → `{"schemas": [...]}` + - 从 HTML 中提取 JSON-LD / Microdata / RDFa 结构化数据 + - 支持 URL 自动抓取 + 直接 HTML 输入 + +3. **SchemaGenerateTool** — 封装 pydantic-schemaorg: + - `execute(schema_type, properties)` → `{"jsonld": "..."}` + - 生成指定类型(Organization、Product、Article 等)的 JSON-LD 标记 + - 支持常见 GEO Schema 类型:Organization、WebPage、FAQPage、HowTo + +4. 所有工具遵循 Tool 基类接口,自动推断 input_schema + +5. 可选依赖:Crawl4AI、extruct、pydantic-schemaorg 均为可选安装,`pip install agentkit[tools]` + +**Patterns to follow:** 现有 `FunctionTool` 的函数包装模式、`retrieve_knowledge` 工具的自动注册模式 + +**Test scenarios:** +- WebCrawlTool 抓取网页返回 Markdown 内容 +- WebCrawlTool CSS 选择器提取结构化数据 +- WebCrawlTool 无效 URL 返回错误 +- WebCrawlTool Crawl4AI 未安装时优雅降级 +- SchemaExtractTool 从 HTML 提取 JSON-LD +- SchemaExtractTool 从 URL 提取 Microdata +- SchemaExtractTool 无 Schema 数据时返回空列表 +- SchemaGenerateTool 生成 Organization JSON-LD +- SchemaGenerateTool 生成 FAQPage JSON-LD +- SchemaGenerateTool 无效 schema_type 时返回错误 + +**Verification:** WebCrawlTool 能抓取真实网页,SchemaExtractTool 能提取真实网页的结构化数据,SchemaGenerateTool 能生成有效的 JSON-LD + +--- + +#### U4: GEO Skill 工具绑定与端到端验证 + +**Goal:** 将搜索、爬取、浏览器、Schema 工具绑定到 7 个 GEO Skill,验证端到端可执行性。 + +**Dependencies:** U2, U3 + +**Files:** +- `configs/skills/citation_detector.yaml` — 绑定 baidu_search + web_crawl +- `configs/skills/competitor_analyzer.yaml` — 绑定 baidu_search + web_crawl + playwright +- `configs/skills/geo_optimizer.yaml` — 绑定 schema_generate +- `configs/skills/monitor.yaml` — 绑定 baidu_search +- `configs/skills/schema_advisor.yaml` — 绑定 schema_extract + schema_generate +- `configs/skills/trend_agent.yaml` — 绑定 baidu_search + web_crawl +- `configs/pipelines/geo_full_pipeline.yaml` — 更新 Pipeline 配置 +- `tests/integration/test_geo_e2e.py` — 端到端集成测试 + +**Approach:** + +1. 在每个 Skill YAML 中新增 `tools` 字段,声明所需工具: + ```yaml + tools: + - baidu_search # 来自 MCP Server + - web_crawl # 内置 Python 工具 + ``` + +2. ConfigDrivenAgent 加载 Skill 时,从 ToolRegistry 查找并绑定声明的工具 + +3. 更新 GEO Pipeline YAML,确保步骤间数据映射正确 + +4. 编写端到端集成测试:citation_detector 从搜索→爬取→分析完整流程 + +**Patterns to follow:** 现有 Skill YAML 配置格式、ConfigDrivenAgent 的工具注册模式 + +**Test scenarios:** +- citation_detector 绑定搜索+爬取工具后能执行完整检测流程 +- competitor_analyzer 绑定搜索+浏览器工具后能执行竞品分析 +- geo_optimizer 绑定 Schema 生成工具后能输出 JSON-LD +- schema_advisor 绑定提取+生成工具后能分析并建议 Schema +- GEO Pipeline 端到端执行:检测→分析→优化→追踪 +- 工具不可用时 Skill 优雅降级(返回错误信息而非崩溃) + +**Verification:** 完整 GEO Pipeline 能从品牌搜索→竞品分析→Schema 优化端到端执行 + +--- + +### Phase B (P1) — Pipeline 生产化 + +--- + +#### U5: Pipeline 状态持久化 + +**Goal:** 实现 Pipeline 执行状态的 Redis 热状态 + PostgreSQL 冷持久化双写,确保服务重启后状态不丢失。 + +**Dependencies:** 无 + +**Files:** +- `src/agentkit/orchestrator/pipeline_state.py` — PipelineStateRedis + PipelineStatePG +- `src/agentkit/orchestrator/pipeline_models.py` — PipelineExecution + PipelineStepHistory ORM +- `src/agentkit/orchestrator/pipeline_engine.py` — 修改执行引擎集成状态持久化 +- `tests/unit/test_pipeline_state.py` — 状态管理测试 + +**Approach:** + +1. **PipelineStateRedis** — Redis 热状态管理: + - `create_execution()` — 创建执行,写入 Hash(`pipeline:{id}`)+ Sorted Set(`pipeline:index`) + - `update_step()` — 更新步骤状态(原子操作) + - `complete_execution()` / `fail_execution()` — 标记执行完成/失败 + - `get_execution()` — 获取执行状态 + - `list_executions()` — 按时间倒序获取执行列表 + - TTL 7 天自动清理 + +2. **PipelineStatePG** — PostgreSQL 冷持久化: + - `PipelineExecution` 表:id, pipeline_name, status, current_step, completed_steps(JSONB), step_results(JSONB), input_data(JSONB), final_output(JSONB), error_message, created_at, updated_at + - `PipelineStepHistory` 表:id, execution_id, step_name, status, input_data(JSONB), output_data(JSONB), error_message, duration_ms, started_at, completed_at + - `persist_execution()` — 执行完成后异步写入 PG + - `query_executions()` — 支持按状态/时间/名称查询 + +3. **PipelineEngine 修改**: + - 执行前调用 `state.create_execution()` + - 步骤开始/完成/失败时调用 `state.update_step()` + - 执行完成后调用 `state.complete_execution()` + 异步 `pg.persist_execution()` + - 状态管理器通过构造函数注入,支持无状态模式(测试用) + +**Patterns to follow:** 现有 `TaskStore` 的 Redis/内存双模式设计、`EpisodeModel` 的 SQLAlchemy ORM 模式 + +**Test scenarios:** +- 创建 Pipeline 执行并写入 Redis +- 更新步骤状态(开始/完成/失败) +- 标记执行完成并持久化到 PG +- 标记执行失败并记录错误信息 +- 从 Redis 获取执行状态 +- 从 PG 查询历史执行 +- Redis miss 时 fallback 到 PG +- TTL 过期后 Redis 自动清理 +- 无 Redis 时降级到内存模式 + +**Verification:** Pipeline 执行后重启服务,能从 PG 恢复历史执行记录 + +--- + +#### U6: Pipeline 步骤级重试与补偿 + +**Goal:** 为 Pipeline 步骤实现指数退避重试和 Saga 补偿机制,确保步骤失败后可自动恢复或优雅回滚。 + +**Dependencies:** U5 + +**Files:** +- `src/agentkit/orchestrator/retry.py` — StepRetryPolicy + step_retry 装饰器 +- `src/agentkit/orchestrator/compensation.py` — SagaStep + SagaOrchestrator +- `src/agentkit/orchestrator/pipeline_engine.py` — 集成重试和补偿 +- `src/agentkit/skills/geo_pipeline.py` — GEO Pipeline 步骤补偿定义 +- `tests/unit/test_pipeline_retry.py` — 重试测试 +- `tests/unit/test_pipeline_compensation.py` — 补偿测试 + +**Approach:** + +1. **StepRetryPolicy** — 步骤级重试策略: + - `max_attempts: int = 3` — 最大重试次数 + - `base_delay: float = 1.0` — 基础延迟 + - `max_delay: float = 60.0` — 最大延迟 + - `exponential_base: float = 2.0` — 指数基数 + - `jitter: bool = True` — 随机抖动 + - `retryable_exceptions: tuple = (ConnectionError, TimeoutError)` — 可重试异常 + - 退避公式:`delay = min(base_delay * exponential_base^attempt + jitter, max_delay)` + +2. **PipelineStep 扩展** — 新增字段: + - `retry_policy: StepRetryPolicy | None` — 步骤级重试配置 + - `compensate: str | None` — 补偿 Skill 名称 + - `continue_on_failure: bool = False` — 失败后是否继续 + +3. **SagaOrchestrator** — 补偿编排器: + - 执行步骤成功 → 记录到 completed_steps 栈 + - 步骤失败且不可重试 → 按 LIFO 顺序执行已完成步骤的 compensate + - 补偿失败 → 记录并告警,不中断其他补偿 + - 补偿结果写入 PipelineState + +4. **GEO Pipeline 补偿定义**: + - `detect` → 无需补偿(只读) + - `analyze_competitor` → 无需补偿(只读) + - `optimize` → `compensate: revert_optimization`(回滚优化变更) + - `schema` → 无需补偿(Schema 生成是幂等的) + - `monitor` → 无需补偿(只读) + +**Patterns to follow:** 现有 `RetryPolicy`(LLM 重试)的指数退避模式、GEPA 的 FitnessScore Pareto 模式 + +**Test scenarios:** +- 步骤首次成功,不触发重试 +- 步骤首次失败、重试后成功 +- 步骤达到最大重试次数后标记失败 +- 指数退避延迟计算正确 +- 可重试异常触发重试,不可重试异常直接失败 +- 步骤失败触发 LIFO 补偿 +- 补偿步骤执行成功 +- 补偿步骤执行失败时记录告警但不中断 +- continue_on_failure 步骤失败后继续执行后续步骤 +- GEO Pipeline 步骤补偿定义正确 + +**Verification:** 模拟 optimize 步骤失败后,补偿步骤 revert_optimization 被正确触发 + +--- + +### Phase C (P2) — 可观测性 + +--- + +#### U7: OpenTelemetry 基础集成 + +**Goal:** 为 Agent 执行、Tool 调用、LLM 调用、Pipeline 步骤创建 OTel span 和 metric,遵循 GenAI Semantic Conventions。 + +**Dependencies:** 无 + +**Files:** +- `src/agentkit/telemetry/__init__.py` — 模块入口 +- `src/agentkit/telemetry/setup.py` — OTel 初始化(TracerProvider + MeterProvider + FastAPI 自动插桩) +- `src/agentkit/telemetry/tracing.py` — trace_agent / trace_tool / trace_llm / trace_pipeline_step 装饰器 +- `src/agentkit/telemetry/metrics.py` — Agent/Tool/LLM/Pipeline 指标定义 +- `src/agentkit/server/app.py` — 集成 OTel 初始化 +- `src/agentkit/core/react.py` — ReAct 引擎埋点 +- `src/agentkit/llm/gateway.py` — LLM Gateway 埋点 +- `src/agentkit/tools/base.py` — Tool 基类埋点 +- `tests/unit/test_telemetry.py` — 可观测性测试 + +**Approach:** + +1. **OTel 初始化** (`telemetry/setup.py`): + - `setup_telemetry(app, config)` — 配置 TracerProvider + MeterProvider + - 支持 OTLP gRPC/HTTP 导出器(可配置 endpoint) + - FastAPI 自动插桩(排除 health/metrics 端点) + - 可选依赖:`pip install agentkit[otel]` + - 未安装时所有 trace/metric 操作为 no-op + +2. **Tracing 装饰器** (`telemetry/tracing.py`): + - `trace_agent(agent_name)` — 创建 `agent.execute` span,记录 agent.name, agent.type, 成功/失败 + - `trace_tool(tool_name)` — 创建 `tool.call` span,记录 tool.name, tool.duration_ms + - `trace_llm(provider, model)` — 创建 `gen_ai.chat` span,遵循 GenAI Semantic Conventions:gen_ai.system, gen_ai.request.model, gen_ai.usage.input_tokens, gen_ai.usage.output_tokens + - `trace_pipeline_step(pipeline_name, step_name)` — 创建 `pipeline.step` span + +3. **Metrics** (`telemetry/metrics.py`): + - `agent.request.total` — Counter,Agent 请求总数 + - `agent.execution.duration` — Histogram,Agent 执行延迟 + - `gen_ai.usage.tokens` — Histogram,Token 消耗分布 + - `tool.call.duration` — Histogram,Tool 调用延迟 + - `pipeline.step.duration` — Histogram,Pipeline 步骤延迟 + - `pipeline.execution.duration` — Histogram,Pipeline 总延迟 + +4. **埋点位置**: + - `BaseAgent.execute()` — trace_agent + - `Tool.safe_execute()` — trace_tool + - `LLMGateway.chat()` / `chat_stream()` — trace_llm + - `PipelineEngine._execute_step()` — trace_pipeline_step + +5. **配置**: + ```yaml + telemetry: + enabled: true + service_name: "fischer-agentkit" + otlp_endpoint: "http://localhost:4317" # OTel Collector + export_metrics: true + export_traces: true + ``` + +**Patterns to follow:** GenAI Semantic Conventions (`gen_ai.*` 属性)、FastAPI 自动插桩模式 + +**Test scenarios:** +- OTel 未安装时 trace/metric 操作为 no-op,不影响正常执行 +- OTel 安装后 Agent 执行创建 span +- OTel 安装后 Tool 调用创建子 span +- OTel 安装后 LLM 调用记录 gen_ai.* 属性 +- OTel 安装后 Pipeline 步骤创建 span +- Agent 执行失败时 span 状态为 ERROR +- Token 用量正确记录到 span 属性 +- 指标计数器正确递增 +- 配置 enabled=false 时不创建 span +- FastAPI 请求自动创建 root span + +**Verification:** 启动应用后,Jaeger/Grafana Tempo 能看到完整的 Agent→Tool→LLM 调用链 + +--- + +## Risks & Dependencies + +| 风险 | 影响 | 缓解措施 | +|------|------|---------| +| MCP Server 子进程管理复杂 | 子进程僵尸/泄漏 | 严格的超时控制 + 进程健康检查 + 优雅关闭 | +| baidu-search-mcp 等 npm 包稳定性 | 搜索功能不可用 | one-search-mcp 作为备选 + 内置 DuckDuckGo 回退 | +| Crawl4AI 依赖 Playwright 浏览器 | 安装体积大、CI 环境复杂 | 可选安装 + HTTP 策略降级(无浏览器模式) | +| OTel 依赖链较长 | 增加安装复杂度 | 可选依赖 `agentkit[otel]`,未安装时 no-op | +| Pipeline PG 持久化需数据库迁移 | 部署复杂度增加 | 复用现有 PostgreSQL + Alembic 迁移 | +| MCP stdio 子进程在 Docker 中权限问题 | 容器化部署受阻 | Dockerfile 中预装 npx + Node.js | + +## Open Questions + +1. **MCP Server 子进程最大并发数**:多个 Agent 同时调用同一 MCP Server 时,是否需要连接池?MCP stdio 规范建议单连接,可能需要多实例。 +2. **Crawl4AI 的浏览器依赖**:生产环境是否需要无浏览器模式?Crawl4AI 的 HTTP 策略是否足够? +3. **OTel Collector 部署**:GEO 生产环境是否有 OTel Collector?如果没有,是否需要内置简单的内存导出器? + +## Success Criteria + +1. **工具生态**:MCP stdio 传输可用,至少 3 个开源 MCP Server 可集成,3 个内置 Python 工具可用 +2. **GEO 端到端**:citation_detector 能从搜索→爬取→分析完整执行,GEO Pipeline 端到端可运行 +3. **Pipeline 可靠**:步骤失败后自动重试(3 次),不可恢复时触发补偿,执行状态重启后可查 +4. **可观测**:Agent/Tool/LLM 调用链在 Jaeger 中可见,Token 用量和延迟指标可查 +5. **测试**:所有新增代码有单元测试,GEO Pipeline 有端到端集成测试