27 KiB
| title | status | created | plan_type | depth | origin | branch |
|---|---|---|---|---|---|---|
| feat: AgentKit Phase 6 — 工具生态与生产化 | active | 2026-06-07 | feat | deep | Phase 5 完成后行业对标评估 + GEO 系统本期需求 | feat/agentkit-phase6-toolkit |
AgentKit Phase 6 — 工具生态与生产化
Summary
基于 Phase 5 智能化升级(L4 级),Phase 6 聚焦三大目标:补齐 MCP stdio 传输层并集成开源工具生态(L3→L5)、生产化 GEO Pipeline(L3→L4)、基础可观测性(L0→L3)。以"GEO Skill 端到端可执行、Pipeline 可靠运行、优化效果可度量"为验收底线,同时确保架构设计支持未来非 GEO 场景扩展。
Problem Frame
Phase 5 完成后,AgentKit 在智能化方向(记忆、进化、RAG)达到行业前列,但在工程化方向存在三个关键缺口:
三大能力缺口
-
工具生态极度匮乏(L3 级)
- 仅 1 个内置工具(
retrieve_knowledge),7 个 GEO Skill 是空壳 Prompt - MCP 仅支持 HTTP/SSE 传输,无法对接 12000+ stdio MCP Server 生态
- 无搜索、爬取、浏览器、Schema 等基础能力,GEO 业务无法端到端闭环
- ConfigDrivenAgent 的 MCP 配置仅支持
dict[str, str](name→URL),无法配置 stdio 传输
- 仅 1 个内置工具(
-
Pipeline 不可靠(L3 级)
- Pipeline 执行状态无持久化,服务重启后丢失
- Dispatcher 轮询结果(1 秒间隔),非事件驱动
- 步骤失败即中断,无重试/补偿机制
- GEO 核心业务流程(检测→分析→优化→追踪)无法保证可靠执行
-
不可观测(L0 级)
- 无分布式追踪,无法定位跨 Agent 调用链瓶颈
- 无业务指标(引用检测准确率、优化效果对比)
- 无法向客户证明 GEO 产品的价值
成熟度目标
| 模块 | Phase 5 后 | Phase 6 目标 |
|---|---|---|
| MCP/工具生态 | 40% | 85% |
| Pipeline 可靠性 | 60% | 85% |
| 可观测性 | 0% | 60% |
| 整体 | L4 | L4+ |
Scope Boundaries
In Scope:
- MCP stdio 传输层实现
- MCP Server YAML 声明式配置体系
- 集成开源 MCP Server(百度搜索、Playwright、one-search)
- 内置 Python 工具封装(Crawl4AI、extruct、pydantic-schemaorg)
- Pipeline 执行状态持久化(Redis 热状态 + PG 冷持久化)
- Pipeline 步骤级重试 + 补偿机制
- OpenTelemetry 基础 trace + metric 集成
- GEO Skill 端到端工具绑定验证
Out of Scope:
- MCP Server 运行时动态注册(后续扩展)
- MCP resources/prompts 能力暴露
- 完整分布式追踪上下文传播(需改 Agent 间协议)
- K8s 部署清单
- 前端 Dashboard UI
- 性能压测
Deferred to Follow-Up Work:
- Agent 间协商/辩论/投票协议
- MCP Server 健康检查与自动重启
- 评估自动化流水线(定时评估、CI/CD 集成)
- 多模态支持(图片/文件输入)
Key Technical Decisions
KTD-1: MCP stdio 传输采用子进程管理模式
决策: StdioTransport 通过 asyncio.create_subprocess_exec 启动 MCP Server 子进程,通过 stdin/stdout 进行 JSON-RPC 消息收发。
理由: MCP 协议规范明确 stdio 为本地性、高性能、安全性好的传输方式。所有主流 MCP Server(baidu-search-mcp、@playwright/mcp 等)均支持 stdio 模式。子进程模式无需额外网络端口,资源隔离性好。
替代方案: 使用官方 mcp Python SDK 的 stdio_client() 上下文管理器。但该 SDK 引入重依赖(httpx-sse、pydantic 版本冲突风险),且 AgentKit 已有完整的 Transport 抽象层,自建 StdioTransport 更轻量可控。
KTD-2: MCP Server 配置采用 YAML 声明式静态加载
决策: 在 agentkit.yaml 中新增 mcp 配置节,声明式定义 MCP Server,应用启动时加载。
理由: GEO 场景的工具集固定(搜索+爬取+浏览器+Schema),无需运行时动态变更。YAML 声明式配置简单可靠,与现有 skills、llm 配置风格一致。后续可扩展动态注册 API。
KTD-3: Pipeline 状态采用 Redis 热状态 + PostgreSQL 冷持久化双写
决策: Pipeline 执行中的实时状态存 Redis(Hash + Sorted Set),完成后异步写入 PostgreSQL(JSONB)做持久化。
理由: Redis 提供亚毫秒级状态读写,适合运行中 Pipeline 的并发控制和实时监控。PostgreSQL 提供持久化、复杂查询和审计能力。两者互补,参考 Temporal 的 Event Sourcing 思想但简化实现。
KTD-4: OpenTelemetry 集成采用基础 trace + metric 模式
决策: 为 Agent 执行、Tool 调用、LLM 调用、Pipeline 步骤创建 OTel span,记录耗时/状态/Token 用量。不实现跨 Agent 的 trace context 传播。
理由: 基础 trace + metric 已能满足 GEO 场景的监控需求(延迟分布、成功率、Token 消耗趋势)。完整分布式追踪需改 Agent 间调用协议(HandoffMessage 需携带 traceparent),侵入性高,留作后续。
KTD-5: 工具集成采用 MCP Server + Python 库双轨模式
决策: 搜索和浏览器能力通过 MCP Server(子进程 stdio)集成;爬取和 Schema 能力通过 Python 库直接封装为 Tool。
理由: MCP Server 模式适合独立进程、有 npm 安装生态的工具(baidu-search-mcp、@playwright/mcp);Python 库模式适合轻量级、无独立进程需求的工具(Crawl4AI、extruct、pydantic-schemaorg)。双轨模式各取所长。
High-Level Technical Design
MCP stdio 传输与工具集成架构
agentkit.yaml
└── mcp:
└── servers:
├── baidu-search: { transport: stdio, command: npx, args: [baidu-search-mcp] }
├── playwright: { transport: stdio, command: npx, args: [@playwright/mcp] }
└── one-search: { transport: stdio, command: npx, args: [one-search-mcp] }
AgentKit Server 启动
├── 1. 加载 mcp 配置
├── 2. MCPManager 初始化
│ ├── 为每个 stdio server 创建 StdioTransport → 启动子进程
│ ├── 为每个 http/sse server 创建 HTTPTransport/SSETransport
│ ├── 执行 initialize 握手
│ └── 调用 tools/list 发现工具 → 注册到 ToolRegistry
├── 3. 内置 Python 工具注册
│ ├── WebCrawlTool (Crawl4AI)
│ ├── SchemaExtractTool (extruct)
│ └── SchemaGenerateTool (pydantic-schemaorg)
└── 4. Skill 绑定工具
├── citation_detector → baidu_search + web_crawl
├── competitor_analyzer → baidu_search + web_crawl + playwright
├── geo_optimizer → schema_generate
└── monitor → baidu_search + hotnews
Pipeline 状态持久化架构
Pipeline 执行流程
├── 1. 创建执行 → Redis Hash (pipeline:{id}) + Sorted Set (pipeline:index)
├── 2. 步骤开始 → 更新 Redis status=running, current_step
├── 3. 步骤完成 → 更新 Redis completed_steps, step_results
├── 4. 步骤失败 → 更新 Redis status=failed → 触发重试或补偿
├── 5. 执行完成 → 异步写入 PostgreSQL pipeline_executions + pipeline_step_history
└── 6. Redis TTL 7 天自动清理
状态查询
├── 实时状态(运行中)→ Redis
├── 历史查询/统计 → PostgreSQL
└── Redis miss → fallback PostgreSQL
OpenTelemetry Span 层级
[Root Span] POST /api/v1/tasks (2.3s)
├── [Span] agent.execute (2.2s)
│ ├── attributes: agent.name, agent.type
│ ├── [Span] gen_ai.chat qwen-max (1.8s)
│ │ ├── attributes: gen_ai.system, gen_ai.request.model, gen_ai.usage.input_tokens, gen_ai.usage.output_tokens
│ ├── [Span] tool.call baidu_search (0.12s)
│ │ ├── attributes: tool.name, tool.duration_ms
│ └── [Span] pipeline.step geo_optimizer (0.28s)
│ ├── attributes: pipeline.name, step.name, step.status
Implementation Units
Phase A (P0) — MCP stdio 传输与工具生态
U1: StdioTransport 传输层
Goal: 实现 MCP stdio 传输层,通过子进程 stdin/stdout 进行 JSON-RPC 通信,为对接开源 MCP Server 生态奠定基础。
Dependencies: 无
Files:
src/agentkit/mcp/transport.py— 新增 StdioTransport 类tests/unit/test_stdio_transport.py— 传输层测试
Approach:
-
新增
StdioTransport(Transport)类,核心状态:_process: asyncio.subprocess.Process— 子进程实例_request_id: int— 自增请求 ID_pending: dict[int, asyncio.Future]— 等待中的请求_reader_task: asyncio.Task— stdout 读取协程_connected: bool— 连接标志
-
connect()— 通过asyncio.create_subprocess_exec(command, *args, env=env, stdin=PIPE, stdout=PIPE, stderr=PIPE)启动子进程,启动_read_stdout()协程,发送initialize请求完成握手 -
disconnect()— 发送notifications/cancelled,关闭 stdin,等待子进程退出(超时后 kill),取消 reader task -
send_request()— 构造 JSON-RPC 消息,写入 stdin(process.stdin.write(json_line + b"\n")),创建 Future 放入_pending,await Future -
_read_stdout()— 持续从 stdout 逐行读取 JSON-RPC 响应/通知,根据id匹配_pending中的 Future 并 set_result;无id的为通知,放入通知队列 -
消息帧格式:每行一个 JSON 对象,UTF-8 编码,换行符分隔(遵循 MCP stdio 规范)
-
stderr 日志转发到 Python logger
Patterns to follow: 现有 HTTPTransport / SSETransport 的抽象方法实现模式
Test scenarios:
- 启动子进程并完成 initialize 握手
- 发送 tools/list 请求并接收响应
- 发送 tools/call 请求并接收响应
- 子进程异常退出时检测并抛出 TransportError
- disconnect 时正确关闭子进程
- 并发请求的 ID 匹配正确性
- 子进程 stderr 输出转发到 logger
- 连接超时处理
Verification: StdioTransport 能与真实 MCP Server(如 baidu-search-mcp)完成完整的 initialize → tools/list → tools/call 流程
U2: MCP Server 配置体系
Goal: 在 agentkit.yaml 中新增 mcp 配置节,支持声明式定义 MCP Server(stdio/http/sse),应用启动时自动加载并注册工具。
Dependencies: U1
Files:
src/agentkit/server/config.py— 新增 MCPServerConfig 数据模型和解析逻辑src/agentkit/mcp/manager.py— 新增 MCPManager 类src/agentkit/server/app.py— 集成 MCPManager 到应用启动流程tests/unit/test_mcp_config.py— 配置解析测试tests/unit/test_mcp_manager.py— Manager 生命周期测试
Approach:
-
新增
MCPServerConfig数据模型:@dataclass class MCPServerConfig: transport: str # "stdio" | "streamable_http" | "sse" command: str | None = None # stdio 专用 args: list[str] | None = None # stdio 专用 env: dict[str, str] | None = None # stdio 专用 url: str | None = None # http/sse 专用 headers: dict[str, str] | None = None # http/sse 专用 timeout: float = 30.0 -
YAML 配置格式:
mcp: servers: baidu-search: transport: stdio command: npx args: ["-y", "baidu-search-mcp", "--max-result=5"] playwright: transport: stdio command: npx args: ["-y", "@playwright/mcp@latest"] remote-rag: transport: streamable_http url: "http://localhost:8002/mcp" -
新增
MCPManager类:__init__(configs: dict[str, MCPServerConfig])— 接收配置async start_all()— 为每个配置创建 Transport,连接,发现工具,注册到 ToolRegistryasync stop_all()— 断开所有 Transportget_tool(server_name, tool_name)— 获取特定工具list_all_tools()— 列出所有已注册工具- 健康检查:定期 ping 各 server,标记不可用
-
集成到
create_app():在 lifespan 中调用MCPManager.start_all(),shutdown 时调用stop_all() -
ConfigDrivenAgent 的
_register_mcp_tools()改为从 MCPManager 获取已注册工具,而非自行创建 MCPClient
Patterns to follow: 现有 LLMGateway 的 Provider 注册模式、SkillRegistry 的加载模式
Test scenarios:
- 解析 stdio 类型 MCP Server 配置
- 解析 streamable_http 类型 MCP Server 配置
- 解析 sse 类型 MCP Server 配置
- 缺少必需字段时抛出验证错误
- MCPManager 启动时为每个 server 创建 Transport
- MCPManager 停止时断开所有 Transport
- 工具发现并注册到 ToolRegistry
- 配置中环境变量
${VAR:-default}解析 - server 启动失败时不影响其他 server
Verification: 在 agentkit.yaml 中配置 baidu-search-mcp,启动应用后能通过 API 调用百度搜索工具
U3: 内置 Python 工具封装
Goal: 将 Crawl4AI、extruct、pydantic-schemaorg 封装为 AgentKit Tool,提供网页抓取、Schema 提取和 Schema 生成能力。
Dependencies: 无(独立于 MCP,纯 Python 封装)
Files:
src/agentkit/tools/web_crawl.py— WebCrawlTool(Crawl4AI 封装)src/agentkit/tools/schema_tools.py— SchemaExtractTool + SchemaGenerateTooltests/unit/test_web_crawl_tool.py— 爬取工具测试tests/unit/test_schema_tools.py— Schema 工具测试
Approach:
-
WebCrawlTool — 封装 Crawl4AI:
execute(url, format="markdown", css_selector=None, js_wait=None)→{"content": ..., "status_code": ..., "links": [...]}- 内部使用
AsyncWebCrawler,支持 Markdown/HTML 输出 - CSS 选择器提取结构化数据
- 优雅降级:Crawl4AI 未安装时返回安装提示
-
SchemaExtractTool — 封装 extruct:
execute(url_or_html, formats=["json-ld", "microdata"])→{"schemas": [...]}- 从 HTML 中提取 JSON-LD / Microdata / RDFa 结构化数据
- 支持 URL 自动抓取 + 直接 HTML 输入
-
SchemaGenerateTool — 封装 pydantic-schemaorg:
execute(schema_type, properties)→{"jsonld": "..."}- 生成指定类型(Organization、Product、Article 等)的 JSON-LD 标记
- 支持常见 GEO Schema 类型:Organization、WebPage、FAQPage、HowTo
-
所有工具遵循 Tool 基类接口,自动推断 input_schema
-
可选依赖:Crawl4AI、extruct、pydantic-schemaorg 均为可选安装,
pip install agentkit[tools]
Patterns to follow: 现有 FunctionTool 的函数包装模式、retrieve_knowledge 工具的自动注册模式
Test scenarios:
- WebCrawlTool 抓取网页返回 Markdown 内容
- WebCrawlTool CSS 选择器提取结构化数据
- WebCrawlTool 无效 URL 返回错误
- WebCrawlTool Crawl4AI 未安装时优雅降级
- SchemaExtractTool 从 HTML 提取 JSON-LD
- SchemaExtractTool 从 URL 提取 Microdata
- SchemaExtractTool 无 Schema 数据时返回空列表
- SchemaGenerateTool 生成 Organization JSON-LD
- SchemaGenerateTool 生成 FAQPage JSON-LD
- SchemaGenerateTool 无效 schema_type 时返回错误
Verification: WebCrawlTool 能抓取真实网页,SchemaExtractTool 能提取真实网页的结构化数据,SchemaGenerateTool 能生成有效的 JSON-LD
U4: GEO Skill 工具绑定与端到端验证
Goal: 将搜索、爬取、浏览器、Schema 工具绑定到 7 个 GEO Skill,验证端到端可执行性。
Dependencies: U2, U3
Files:
configs/skills/citation_detector.yaml— 绑定 baidu_search + web_crawlconfigs/skills/competitor_analyzer.yaml— 绑定 baidu_search + web_crawl + playwrightconfigs/skills/geo_optimizer.yaml— 绑定 schema_generateconfigs/skills/monitor.yaml— 绑定 baidu_searchconfigs/skills/schema_advisor.yaml— 绑定 schema_extract + schema_generateconfigs/skills/trend_agent.yaml— 绑定 baidu_search + web_crawlconfigs/pipelines/geo_full_pipeline.yaml— 更新 Pipeline 配置tests/integration/test_geo_e2e.py— 端到端集成测试
Approach:
-
在每个 Skill YAML 中新增
tools字段,声明所需工具:tools: - baidu_search # 来自 MCP Server - web_crawl # 内置 Python 工具 -
ConfigDrivenAgent 加载 Skill 时,从 ToolRegistry 查找并绑定声明的工具
-
更新 GEO Pipeline YAML,确保步骤间数据映射正确
-
编写端到端集成测试:citation_detector 从搜索→爬取→分析完整流程
Patterns to follow: 现有 Skill YAML 配置格式、ConfigDrivenAgent 的工具注册模式
Test scenarios:
- citation_detector 绑定搜索+爬取工具后能执行完整检测流程
- competitor_analyzer 绑定搜索+浏览器工具后能执行竞品分析
- geo_optimizer 绑定 Schema 生成工具后能输出 JSON-LD
- schema_advisor 绑定提取+生成工具后能分析并建议 Schema
- GEO Pipeline 端到端执行:检测→分析→优化→追踪
- 工具不可用时 Skill 优雅降级(返回错误信息而非崩溃)
Verification: 完整 GEO Pipeline 能从品牌搜索→竞品分析→Schema 优化端到端执行
Phase B (P1) — Pipeline 生产化
U5: Pipeline 状态持久化
Goal: 实现 Pipeline 执行状态的 Redis 热状态 + PostgreSQL 冷持久化双写,确保服务重启后状态不丢失。
Dependencies: 无
Files:
src/agentkit/orchestrator/pipeline_state.py— PipelineStateRedis + PipelineStatePGsrc/agentkit/orchestrator/pipeline_models.py— PipelineExecution + PipelineStepHistory ORMsrc/agentkit/orchestrator/pipeline_engine.py— 修改执行引擎集成状态持久化tests/unit/test_pipeline_state.py— 状态管理测试
Approach:
-
PipelineStateRedis — Redis 热状态管理:
create_execution()— 创建执行,写入 Hash(pipeline:{id})+ Sorted Set(pipeline:index)update_step()— 更新步骤状态(原子操作)complete_execution()/fail_execution()— 标记执行完成/失败get_execution()— 获取执行状态list_executions()— 按时间倒序获取执行列表- TTL 7 天自动清理
-
PipelineStatePG — PostgreSQL 冷持久化:
PipelineExecution表:id, pipeline_name, status, current_step, completed_steps(JSONB), step_results(JSONB), input_data(JSONB), final_output(JSONB), error_message, created_at, updated_atPipelineStepHistory表:id, execution_id, step_name, status, input_data(JSONB), output_data(JSONB), error_message, duration_ms, started_at, completed_atpersist_execution()— 执行完成后异步写入 PGquery_executions()— 支持按状态/时间/名称查询
-
PipelineEngine 修改:
- 执行前调用
state.create_execution() - 步骤开始/完成/失败时调用
state.update_step() - 执行完成后调用
state.complete_execution()+ 异步pg.persist_execution() - 状态管理器通过构造函数注入,支持无状态模式(测试用)
- 执行前调用
Patterns to follow: 现有 TaskStore 的 Redis/内存双模式设计、EpisodeModel 的 SQLAlchemy ORM 模式
Test scenarios:
- 创建 Pipeline 执行并写入 Redis
- 更新步骤状态(开始/完成/失败)
- 标记执行完成并持久化到 PG
- 标记执行失败并记录错误信息
- 从 Redis 获取执行状态
- 从 PG 查询历史执行
- Redis miss 时 fallback 到 PG
- TTL 过期后 Redis 自动清理
- 无 Redis 时降级到内存模式
Verification: Pipeline 执行后重启服务,能从 PG 恢复历史执行记录
U6: Pipeline 步骤级重试与补偿
Goal: 为 Pipeline 步骤实现指数退避重试和 Saga 补偿机制,确保步骤失败后可自动恢复或优雅回滚。
Dependencies: U5
Files:
src/agentkit/orchestrator/retry.py— StepRetryPolicy + step_retry 装饰器src/agentkit/orchestrator/compensation.py— SagaStep + SagaOrchestratorsrc/agentkit/orchestrator/pipeline_engine.py— 集成重试和补偿src/agentkit/skills/geo_pipeline.py— GEO Pipeline 步骤补偿定义tests/unit/test_pipeline_retry.py— 重试测试tests/unit/test_pipeline_compensation.py— 补偿测试
Approach:
-
StepRetryPolicy — 步骤级重试策略:
max_attempts: int = 3— 最大重试次数base_delay: float = 1.0— 基础延迟max_delay: float = 60.0— 最大延迟exponential_base: float = 2.0— 指数基数jitter: bool = True— 随机抖动retryable_exceptions: tuple = (ConnectionError, TimeoutError)— 可重试异常- 退避公式:
delay = min(base_delay * exponential_base^attempt + jitter, max_delay)
-
PipelineStep 扩展 — 新增字段:
retry_policy: StepRetryPolicy | None— 步骤级重试配置compensate: str | None— 补偿 Skill 名称continue_on_failure: bool = False— 失败后是否继续
-
SagaOrchestrator — 补偿编排器:
- 执行步骤成功 → 记录到 completed_steps 栈
- 步骤失败且不可重试 → 按 LIFO 顺序执行已完成步骤的 compensate
- 补偿失败 → 记录并告警,不中断其他补偿
- 补偿结果写入 PipelineState
-
GEO Pipeline 补偿定义:
detect→ 无需补偿(只读)analyze_competitor→ 无需补偿(只读)optimize→compensate: revert_optimization(回滚优化变更)schema→ 无需补偿(Schema 生成是幂等的)monitor→ 无需补偿(只读)
Patterns to follow: 现有 RetryPolicy(LLM 重试)的指数退避模式、GEPA 的 FitnessScore Pareto 模式
Test scenarios:
- 步骤首次成功,不触发重试
- 步骤首次失败、重试后成功
- 步骤达到最大重试次数后标记失败
- 指数退避延迟计算正确
- 可重试异常触发重试,不可重试异常直接失败
- 步骤失败触发 LIFO 补偿
- 补偿步骤执行成功
- 补偿步骤执行失败时记录告警但不中断
- continue_on_failure 步骤失败后继续执行后续步骤
- GEO Pipeline 步骤补偿定义正确
Verification: 模拟 optimize 步骤失败后,补偿步骤 revert_optimization 被正确触发
Phase C (P2) — 可观测性
U7: OpenTelemetry 基础集成
Goal: 为 Agent 执行、Tool 调用、LLM 调用、Pipeline 步骤创建 OTel span 和 metric,遵循 GenAI Semantic Conventions。
Dependencies: 无
Files:
src/agentkit/telemetry/__init__.py— 模块入口src/agentkit/telemetry/setup.py— OTel 初始化(TracerProvider + MeterProvider + FastAPI 自动插桩)src/agentkit/telemetry/tracing.py— trace_agent / trace_tool / trace_llm / trace_pipeline_step 装饰器src/agentkit/telemetry/metrics.py— Agent/Tool/LLM/Pipeline 指标定义src/agentkit/server/app.py— 集成 OTel 初始化src/agentkit/core/react.py— ReAct 引擎埋点src/agentkit/llm/gateway.py— LLM Gateway 埋点src/agentkit/tools/base.py— Tool 基类埋点tests/unit/test_telemetry.py— 可观测性测试
Approach:
-
OTel 初始化 (
telemetry/setup.py):setup_telemetry(app, config)— 配置 TracerProvider + MeterProvider- 支持 OTLP gRPC/HTTP 导出器(可配置 endpoint)
- FastAPI 自动插桩(排除 health/metrics 端点)
- 可选依赖:
pip install agentkit[otel] - 未安装时所有 trace/metric 操作为 no-op
-
Tracing 装饰器 (
telemetry/tracing.py):trace_agent(agent_name)— 创建agent.executespan,记录 agent.name, agent.type, 成功/失败trace_tool(tool_name)— 创建tool.callspan,记录 tool.name, tool.duration_mstrace_llm(provider, model)— 创建gen_ai.chatspan,遵循 GenAI Semantic Conventions:gen_ai.system, gen_ai.request.model, gen_ai.usage.input_tokens, gen_ai.usage.output_tokenstrace_pipeline_step(pipeline_name, step_name)— 创建pipeline.stepspan
-
Metrics (
telemetry/metrics.py):agent.request.total— Counter,Agent 请求总数agent.execution.duration— Histogram,Agent 执行延迟gen_ai.usage.tokens— Histogram,Token 消耗分布tool.call.duration— Histogram,Tool 调用延迟pipeline.step.duration— Histogram,Pipeline 步骤延迟pipeline.execution.duration— Histogram,Pipeline 总延迟
-
埋点位置:
BaseAgent.execute()— trace_agentTool.safe_execute()— trace_toolLLMGateway.chat()/chat_stream()— trace_llmPipelineEngine._execute_step()— trace_pipeline_step
-
配置:
telemetry: enabled: true service_name: "fischer-agentkit" otlp_endpoint: "http://localhost:4317" # OTel Collector export_metrics: true export_traces: true
Patterns to follow: GenAI Semantic Conventions (gen_ai.* 属性)、FastAPI 自动插桩模式
Test scenarios:
- OTel 未安装时 trace/metric 操作为 no-op,不影响正常执行
- OTel 安装后 Agent 执行创建 span
- OTel 安装后 Tool 调用创建子 span
- OTel 安装后 LLM 调用记录 gen_ai.* 属性
- OTel 安装后 Pipeline 步骤创建 span
- Agent 执行失败时 span 状态为 ERROR
- Token 用量正确记录到 span 属性
- 指标计数器正确递增
- 配置 enabled=false 时不创建 span
- FastAPI 请求自动创建 root span
Verification: 启动应用后,Jaeger/Grafana Tempo 能看到完整的 Agent→Tool→LLM 调用链
Risks & Dependencies
| 风险 | 影响 | 缓解措施 |
|---|---|---|
| MCP Server 子进程管理复杂 | 子进程僵尸/泄漏 | 严格的超时控制 + 进程健康检查 + 优雅关闭 |
| baidu-search-mcp 等 npm 包稳定性 | 搜索功能不可用 | one-search-mcp 作为备选 + 内置 DuckDuckGo 回退 |
| Crawl4AI 依赖 Playwright 浏览器 | 安装体积大、CI 环境复杂 | 可选安装 + HTTP 策略降级(无浏览器模式) |
| OTel 依赖链较长 | 增加安装复杂度 | 可选依赖 agentkit[otel],未安装时 no-op |
| Pipeline PG 持久化需数据库迁移 | 部署复杂度增加 | 复用现有 PostgreSQL + Alembic 迁移 |
| MCP stdio 子进程在 Docker 中权限问题 | 容器化部署受阻 | Dockerfile 中预装 npx + Node.js |
Open Questions
- MCP Server 子进程最大并发数:多个 Agent 同时调用同一 MCP Server 时,是否需要连接池?MCP stdio 规范建议单连接,可能需要多实例。
- Crawl4AI 的浏览器依赖:生产环境是否需要无浏览器模式?Crawl4AI 的 HTTP 策略是否足够?
- OTel Collector 部署:GEO 生产环境是否有 OTel Collector?如果没有,是否需要内置简单的内存导出器?
Success Criteria
- 工具生态:MCP stdio 传输可用,至少 3 个开源 MCP Server 可集成,3 个内置 Python 工具可用
- GEO 端到端:citation_detector 能从搜索→爬取→分析完整执行,GEO Pipeline 端到端可运行
- Pipeline 可靠:步骤失败后自动重试(3 次),不可恢复时触发补偿,执行状态重启后可查
- 可观测:Agent/Tool/LLM 调用链在 Jaeger 中可见,Token 用量和延迟指标可查
- 测试:所有新增代码有单元测试,GEO Pipeline 有端到端集成测试