fischer-agentkit/docs/plans/2026-06-06-011-feat-agentki...

538 lines
21 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

---
title: "feat: AgentKit Phase 5 — 智能进化与多Agent协作"
status: completed
created: 2026-06-06
plan_type: feat
depth: deep
origin: Phase 4 完成后成熟度评估 + L4/L5 级能力建设需求
branch: feat/agentkit-phase5-intelligence
---
# AgentKit Phase 5 — 智能进化与多Agent协作
## Summary
基于 Phase 4 企业级生产化升级(整体 L3 级Phase 5 聚焦三大核心能力跃迁:**RAG 自纠正闭环**L3→L4、**多 Agent 协作编排**L3→L4、**GEPA 遗传算法进化**L3→L5。同时完成国内 Provider 接入和 Contextual Retrieval 优化,以"GEO 系统 RAG 质量可度量、多 Skill 自动编排、Prompt 自主进化"为验收底线。
## Problem Frame
Phase 4 完成后AgentKit 达到 L3 级别(生产可用),但存在三个关键能力缺口:
### 三大能力缺口
1. **RAG 不可自纠L3 级)**
- 检索结果无质量评估,错误检索直接传递给 LLM 生成
- 缺少"检索→评估→改写→重检索"闭环
- EpisodicMemory ORM 集成未完成session_factory=None
- 无 Contextual Retrieval分块后上下文丢失
2. **多 Agent 无法协作L3 级)**
- HandoffManager 仅支持单向转交,无双向协作通信
- 缺少中央编排器协调多 Agent 并行/串行执行
- 无共享工作空间Agent 间只能通过 Handoff 传递 context
- GEO 8 个 Skill 缺少端到端 Pipeline 编排
3. **进化系统非遗传L3 级)**
- 当前进化是单个体逐任务优化,无种群/代际概念
- 缺少交叉算子Crossover无法发现跨模块组合
- StrategyTuner 仅支持 2 个参数,无多维策略空间
- 缺少多目标适应度(准确率+延迟+成本)
### 成熟度目标
| 模块 | Phase 4 后 | Phase 5 目标 |
|------|-----------|-------------|
| 进化系统 | 75% | 90% |
| 记忆/RAG | 85% | 95% |
| 核心引擎 | 90% | 95% |
| LLM Gateway | 85% | 95% |
| Server | 90% | 92% |
| 整体 | L3 | L4 |
## Scope Boundaries
**In Scope:**
- RAG 自纠正循环CRAG 模式)
- Contextual Retrieval上下文增强分块
- 多 Agent Orchestrator-Worker 编排
- 共享工作空间
- GEPA 遗传算法进化框架
- 国内 Provider文心/豆包/元宝)
- Ragas 评估管线
- GEO Pipeline 编排
**Out of Scope:**
- 前端 UI 开发GEO Dashboard 属于独立项目)
- 分布式追踪OpenTelemetryPhase 6
- 本地向量库ChromaDB/FAISSPhase 6
- 多跳推理检索Phase 6
- Agent 能力发现和动态路由Phase 6
## Implementation Units
### Phase A (P0) — RAG 质量闭环
---
#### U1: RAG 自纠正循环CRAG
**Goal:** 实现 Corrective RAG 模式,检索结果经评估后决定通过/改写/降级,形成自纠正闭环。
**Files:**
- Create: `src/agentkit/memory/rag_loop.py`
- Create: `src/agentkit/memory/relevance_scorer.py`
- Modify: `src/agentkit/memory/retriever.py`
- Create: `tests/unit/test_rag_loop.py`
- Create: `tests/unit/test_relevance_scorer.py`
**Approach:**
1. 实现 `RelevanceScorer`轻量级评估器对检索结果逐文档评分0-1基于查询-文档语义相似度 + 关键词重叠
2. 实现 `RAGSelfCorrectionLoop`:状态机驱动的检索-评估-纠正循环
- 状态RETRIEVE → EVALUATE → CORRECT/DEGRADE → GENERATE
- 评估RelevanceScorer 评分阈值判断correct/ambiguous/incorrect
- 纠正QueryTransformer 改写查询,重新检索(最多 max_retries 次)
- 降级:超过重试次数,返回降级结果(标记 low_confidence
3. 集成到 MemoryRetriever`enable_self_correction=True` 时,检索走 CRAG 循环
4. 熔断器max_retries=3防止无限循环
**Patterns to follow:**
- `src/agentkit/memory/query_transformer.py` — 策略模式LLM/Rule/NoOp
- `src/agentkit/llm/retry.py` — CircuitBreaker 熔断模式
- `src/agentkit/core/react.py` — 状态机驱动的循环
**Verification:**
- 单元测试RelevanceScorer 评分准确性、RAGSelfCorrectionLoop 状态转换、熔断器触发
- 集成测试:低质量检索触发自纠正、高质量检索直接通过、超限降级
---
#### U2: Contextual Retrieval上下文增强分块
**Goal:** 在嵌入前为每个文档块添加 LLM 生成的上下文前缀,解决分块后上下文丢失问题。
**Files:**
- Create: `src/agentkit/memory/contextual_retrieval.py`
- Modify: `src/agentkit/memory/http_rag.py`
- Create: `tests/unit/test_contextual_retrieval.py`
**Approach:**
1. 实现 `ContextualChunker`
- 输入:原始文档 + 分块列表
- 处理:对每个块,调用 LLM优先用轻量模型生成简洁上下文语句
- 输出:增强后的块(上下文前缀 + 原始内容)
- Prompt 模板:`"给定完整文档和文档中的一个特定块,请编写简短的上下文,帮助理解这个块在整体中的位置。仅输出上下文,不要解释。"`
2. 集成到 HttpRAGService
- `ingest()` 方法可选启用 contextual_chunking
- 使用 EmbeddingCache 缓存上下文生成结果
3. 成本优化:
- 文档级 Prompt Caching同一文档的多个块共享文档前缀
- 批处理batch_size=8
**Patterns to follow:**
- `src/agentkit/memory/embedder.py` — EmbeddingCache 缓存模式
- `src/agentkit/memory/query_transformer.py` — LLM 调用 + 模板模式
**Verification:**
- 单元测试:上下文生成正确性、缓存命中/失效、批处理逻辑
- 对比测试:有/无 Contextual Retrieval 的检索质量差异
---
#### U3: EpisodicMemory ORM 集成完成
**Goal:** 完成 EpisodicMemory 与 PostgreSQL 的完整 ORM 集成,替换当前的 session_factory=None 状态。
**Files:**
- Modify: `src/agentkit/memory/episodic.py`
- Modify: `src/agentkit/server/app.py`
- Create: `src/agentkit/memory/models.py`
- Modify: `tests/unit/test_episodic_memory.py`
- Modify: `tests/unit/test_episodic_vector_search.py`
**Approach:**
1. 定义 `EpisodeModel` ORM 模型SQLAlchemy
- 字段id, agent_id, task_type, content, embedding(vector), quality_score, created_at, metadata(JSON)
- pgvector 索引ivfflat 或 hnsw
2. 修改 EpisodicMemory
- 注入 session_factory 和 EpisodeModel
- `store()` → INSERT INTO episodes
- `retrieve()` → pgvector 原生搜索cosine distance
- 移除客户端 O(N) 全量扫描降级路径
3. 修改 Server 初始化:
- app.py 中创建真实的 session_factory 和 EpisodeModel
- 数据库表自动创建alembic 迁移)
**Patterns to follow:**
- `src/agentkit/evolution/models.py` — ORM 模型定义
- `src/agentkit/evolution/evolution_store.py` — SQLAlchemy session 使用模式
- `src/agentkit/server/app.py` — 服务初始化
**Verification:**
- 单元测试ORM CRUD、pgvector 搜索、时间衰减评分
- 集成测试Server 启动后 EpisodicMemory 可用
---
### Phase B (P1) — 多 Agent 协作
---
#### U4: 多 Agent Orchestrator
**Goal:** 实现中央编排器,支持 Orchestrator-Worker 模式的多 Agent 协作。
**Files:**
- Create: `src/agentkit/core/orchestrator.py`
- Create: `src/agentkit/core/shared_workspace.py`
- Modify: `src/agentkit/core/protocol.py`
- Create: `tests/unit/test_orchestrator.py`
- Create: `tests/unit/test_shared_workspace.py`
**Approach:**
1. 定义 `AgentRole` 枚举ORCHESTRATOR, WORKER, REVIEWER
2. 实现 `SharedWorkspace`
- 基于 Redis 的共享状态存储
- 操作write(key, value, agent_id), read(key), subscribe(key), lock(key)
- 支持版本控制和冲突检测
3. 实现 `Orchestrator`
- 任务分解LLM 驱动将复杂任务拆解为子任务
- Agent 分配:基于 Skill 能力匹配子任务到 Worker Agent
- 执行监控:跟踪子任务状态,处理超时/失败
- 结果聚合:汇总 Worker 结果,生成最终输出
4. 扩展 Protocol
- 新增 `CollaborationMessage`agent_id, target_agent_id, message_type(request/response/broadcast), payload
- 新增 `SubTask`task_id, parent_task_id, assigned_agent, status, result
**Patterns to follow:**
- `src/agentkit/core/base.py` — BaseAgent 生命周期模式
- `src/agentkit/core/agent_pool.py` — Agent 实例池管理
- `src/agentkit/core/dispatcher.py` — Redis Queue 任务分发
- `src/agentkit/skills/pipeline.py` — Pipeline 编排模式
**Verification:**
- 单元测试任务分解、Agent 分配、结果聚合、超时处理
- 集成测试2-3 个 Agent 协作完成复杂任务
---
#### U5: GEO Pipeline 编排
**Goal:** 实现 GEO 端到端工作流编排(检测→分析→优化→追踪),作为多 Agent 协作的实际应用。
**Files:**
- Create: `src/agentkit/skills/geo_pipeline.py`
- Create: `configs/pipelines/geo_full_pipeline.yaml`
- Modify: `src/agentkit/server/routes/tasks.py`
- Create: `tests/unit/test_geo_pipeline.py`
**Approach:**
1. 定义 GEO Pipeline YAML 配置:
```yaml
name: geo_full_pipeline
steps:
- name: detect
skill: citation_detector
input_mapping: {brand: $.input.brand, platforms: $.input.platforms}
- name: analyze_competitor
skill: competitor_analyzer
input_mapping: {brand: $.input.brand, detection_result: $.steps.detect.output}
depends_on: [detect]
- name: analyze_trend
skill: trend_agent
input_mapping: {brand: $.input.brand}
depends_on: [detect]
parallel_with: [analyze_competitor]
- name: optimize
skill: geo_optimizer
input_mapping: {brand: $.input.brand, analysis: $.steps.analyze_competitor.output}
depends_on: [analyze_competitor, analyze_trend]
- name: schema
skill: schema_advisor
input_mapping: {brand: $.input.brand, optimization: $.steps.optimize.output}
depends_on: [optimize]
- name: monitor
skill: monitor
input_mapping: {brand: $.input.brand}
depends_on: [optimize]
```
2. 实现 `GEOPipeline`
- 加载 YAML 配置,构建 DAG
- 拓扑排序确定执行顺序
- 并行执行无依赖的步骤
- 步骤间数据通过 SharedWorkspace 传递
3. 集成到 Server
- `POST /api/v1/pipelines/execute` 端点
- 支持 WebSocket 推送 Pipeline 进度
**Patterns to follow:**
- `src/agentkit/skills/pipeline.py` — SkillPipeline 编排
- `src/agentkit/core/config_driven.py` — 配置驱动模式
- `configs/skills/*.yaml` — YAML 配置格式
**Verification:**
- 单元测试DAG 构建、拓扑排序、并行执行、步骤间数据传递
- 集成测试:完整 GEO Pipeline 端到端执行
---
### Phase C (P1) — GEPA 遗传算法进化
---
#### U6: GEPA 种群与遗传算子
**Goal:** 实现 GEPAGenetic-Pareto Prompt Evolution核心框架包括种群管理、交叉/变异算子、Pareto 选择。
**Files:**
- Create: `src/agentkit/evolution/genetic.py`
- Modify: `src/agentkit/evolution/lifecycle.py`
- Create: `tests/unit/test_genetic_evolution.py`
**Approach:**
1. 定义核心数据结构:
- `PromptChromosome`:一个完整的 Prompt 变体identity + instructions + demos + constraints
- `GEPAPopulation`:种群管理(初始化、添加、淘汰、获取精英)
- `FitnessScore`多目标适应度accuracy, latency, cost
2. 实现遗传算子:
- `CrossoverOperator`:从两个父代 Prompt 生成子代
- 指令段交叉:交换 instructions 的子段落
- Demo 交叉:交换 few-shot 示例
- 约束交叉:交换约束条件
- `MutationOperator`:基于 LLM 反思的结构化变异
- 指令变异LLM 重写指令段落
- Demo 变异:替换/重排 few-shot 示例
- 约束变异:增删约束条件
- `SelectionStrategy`
- 锦标赛选择Tournament Selection
- 精英保留Elitism保留 top-k 最优个体
3. Pareto 前沿维护:
- 多目标非支配排序
- 拥挤度距离计算
- 保留 Pareto 前沿上的最优解
4. 集成到 EvolutionMixin
-`evolution_mode=gepa` 时,使用遗传进化替代逐任务优化
- 代际进化:每 N 个任务触发一代进化
**Patterns to follow:**
- `src/agentkit/evolution/prompt_optimizer.py` — Prompt 优化模式
- `src/agentkit/evolution/ab_tester.py` — A/B 测试和统计检验
- `src/agentkit/evolution/llm_reflector.py` — LLM 驱动反思
**Verification:**
- 单元测试CrossoverOperator 交叉正确性、MutationOperator 变异合理性、Pareto 前沿维护、锦标赛选择
- 集成测试3-5 代进化后 Prompt 质量提升
---
#### U7: 多目标适应度与策略空间扩展
**Goal:** 实现多目标适应度评估和扩展的策略空间,使进化系统能优化准确率+延迟+成本的综合表现。
**Files:**
- Create: `src/agentkit/evolution/fitness.py`
- Modify: `src/agentkit/evolution/strategy_tuner.py`
- Create: `tests/unit/test_fitness.py`
**Approach:**
1. 实现 `MultiObjectiveFitness`
- 维度accuracy0-1、latencyms越低越好、costtoken 数,越低越好)
- 归一化:各维度归一化到 [0, 1]
- 加权组合:可配置权重(默认 accuracy=0.6, latency=0.2, cost=0.2
- Pareto 支配判断a 支配 b ⟺ a 在所有维度 ≥ b 且至少一个维度 > b
2. 扩展 StrategyTuner
- 参数空间扩展temperature, max_iterations, tool_weights, top_k, retrieval_mode
- Bayesian 优化升级:从 1D 升级到多维 Bayesian Optimization使用高斯过程
- 约束支持:参数范围约束(如 temperature ∈ [0, 2]
3. 适应度数据收集:
- 从 TraceRecorder 提取任务执行指标
- 从 UsageTracker 提取 token 使用量
- 从 QualityGate 提取质量评分
**Patterns to follow:**
- `src/agentkit/evolution/strategy_tuner.py` — 当前 1D 优化模式
- `src/agentkit/core/trace.py` — 执行轨迹记录
- `src/agentkit/llm/providers/tracker.py` — Usage 追踪
**Verification:**
- 单元测试多目标归一化、Pareto 支配判断、Bayesian 优化收敛性
- 集成测试:多目标进化后综合表现提升
---
### Phase D (P2) — 生态扩展
---
#### U8: 国内 Provider 实现(文心/豆包/元宝)
**Goal:** 实现文心、豆包、元宝三个国内 LLM Provider扩展 AgentKit 的 AI 引擎覆盖。
**Files:**
- Create: `src/agentkit/llm/providers/wenxin.py`
- Create: `src/agentkit/llm/providers/doubao.py`
- Create: `src/agentkit/llm/providers/yuanbao.py`
- Modify: `src/agentkit/llm/providers/__init__.py`
- Modify: `src/agentkit/llm/config.py`
- Create: `tests/unit/test_wenxin_provider.py`
- Create: `tests/unit/test_doubao_provider.py`
- Create: `tests/unit/test_yuanbao_provider.py`
**Approach:**
1. **WenxinProvider**(百度文心):
- 鉴权AK/SK → access_token缓存 29 天)
- API`https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/{model}`
- 模型映射ernie-4.5-turbo-128k, ernie-5.0, ernie-x1.1
- 特有功能web_search 联网搜索
- 流式SSE
2. **DoubaoProvider**(字节豆包):
- 鉴权:火山引擎 IAMBearer token
- API`https://ark.cn-beijing.volces.com/api/v3/chat/completions`
- 模型映射doubao-pro-32k, doubao-lite
- 特有功能Function Calling
- 流式SSE
3. **YuanbaoProvider**(腾讯混元):
- 鉴权Bearer API Key
- API`https://api.hunyuan.cloud.tencent.com/v1/chat/completions`OpenAI 兼容)
- 模型映射hunyuan-turbos-latest, hunyuan-2.0
- 特有功能enable_enhancement 增强模式
- 流式SSE
4. 统一注册到 LLMGateway
- 配置格式:`wenxin/ernie-4.5-turbo-128k`, `doubao/doubao-pro-32k`, `yuanbao/hunyuan-turbos-latest`
- 环境变量WENXIN_AK/SK, DOUBAO_API_KEY, YUANBAO_API_KEY
**Patterns to follow:**
- `src/agentkit/llm/providers/openai.py` — OpenAICompatibleProvider 模式
- `src/agentkit/llm/providers/anthropic.py` — 原生 API Provider 模式
- `src/agentkit/llm/providers/gemini.py` — 原生 API Provider 模式
**Verification:**
- 单元测试:鉴权流程、请求格式、响应解析、流式处理、错误处理
- 集成测试:通过 Gateway 调用各 Providermock 模式)
---
#### U9: Ragas 评估管线
**Goal:** 集成 Ragas 评估框架,为 RAG 质量提供可度量的指标体系。
**Files:**
- Create: `src/agentkit/evaluation/__init__.py`
- Create: `src/agentkit/evaluation/ragas_evaluator.py`
- Create: `src/agentkit/evaluation/dataset_builder.py`
- Create: `tests/unit/test_ragas_evaluator.py`
**Approach:**
1. 实现 `RagasEvaluator`
- 指标Faithfulness, AnswerRelevancy, ContextPrecision, ContextRecall
- LLM Judge使用配置的 LLM 作为 Judge
- 评估流程:构建评估数据集 → 调用 Ragas evaluate → 返回指标 DataFrame
2. 实现 `EvalDatasetBuilder`
- 从 TraceRecorder 提取历史任务数据
- 转换为 Ragas 格式user_input, response, retrieved_contexts, reference
- 支持人工标注 reference 的导入
3. Server 集成:
- `POST /api/v1/evaluation/run`:触发评估
- `GET /api/v1/evaluation/results`:获取评估结果
4. 评估触发策略:
- 手动触发API 调用
- 定时触发:配置 cron 表达式
- 进化触发:每 N 代进化后自动评估
**Patterns to follow:**
- `src/agentkit/core/trace.py` — 执行轨迹数据
- `src/agentkit/memory/retriever.py` — 检索结果数据
- `src/agentkit/server/routes/evolution.py` — API 路由模式
**Verification:**
- 单元测试:数据集构建、评估流程、指标计算
- 集成测试:端到端评估(使用 mock LLM Judge
---
#### U10: Agent 状态锁优化与配置热加载完善
**Goal:** 完善 Phase 4 U12 的 Agent 状态锁和配置热加载,修复已知问题。
**Files:**
- Modify: `src/agentkit/core/base.py`
- Modify: `src/agentkit/server/app.py`
- Modify: `src/agentkit/server/config.py`
- Modify: `tests/unit/test_base_agent.py`
**Approach:**
1. 状态锁优化:
- 当前 asyncio.Lock 在高并发下可能死锁,改用 asyncio.Event + 超时
- 增加锁状态查询 API`GET /api/v1/agents/{id}/lock-status`
2. 配置热加载完善:
- 修复 `_on_config_change` 中 skill 配置变更不生效的问题
- 增加配置变更审计日志
- 增加配置回滚机制(保留最近 N 个配置版本)
3. 优雅滚动更新:
- 等待当前任务完成后再应用配置变更
- 新任务使用新配置,进行中的任务继续使用旧配置
**Patterns to follow:**
- `src/agentkit/core/base.py` — Agent 状态管理
- `src/agentkit/server/config.py` — 配置加载
**Verification:**
- 单元测试:锁超时、配置变更生效、配置回滚
- 集成测试:运行中任务不受配置变更影响
---
## Dependencies
```
U1 (CRAG) ─────────────────────────────────────┐
U2 (Contextual Retrieval) ──────────────────────┤
U3 (EpisodicMemory ORM) ───────────────────────┤
├──→ U9 (Ragas 评估)
U4 (Orchestrator) ──→ U5 (GEO Pipeline) ───────┤
U6 (GEPA 种群) ──→ U7 (多目标适应度) ───────────┤
U8 (国内 Provider) ────────────────────────────┤
U10 (状态锁优化) ──────────────────────────────┘
```
- U1, U2, U3 互相独立,可并行
- U4 是 U5 的前置依赖
- U6 是 U7 的前置依赖
- U9 依赖 U1需要 CRAG 的检索结果做评估)
- U8, U10 独立,可随时执行
## Test Strategy
### 新增测试文件
| Unit | 测试文件 | 预估用例数 |
|------|----------|-----------|
| U1 | test_rag_loop.py, test_relevance_scorer.py | 25 |
| U2 | test_contextual_retrieval.py | 15 |
| U3 | test_episodic_memory.py (更新), test_episodic_vector_search.py (更新) | 10 |
| U4 | test_orchestrator.py, test_shared_workspace.py | 25 |
| U5 | test_geo_pipeline.py | 15 |
| U6 | test_genetic_evolution.py | 20 |
| U7 | test_fitness.py | 15 |
| U8 | test_wenxin_provider.py, test_doubao_provider.py, test_yuanbao_provider.py | 30 |
| U9 | test_ragas_evaluator.py | 15 |
| U10 | test_base_agent.py (更新) | 10 |
### 验收标准
- 所有测试通过0 failed
- 总测试数 ≥ 1500当前 1353 + 新增 ~180
- 新增代码测试覆盖率 ≥ 85%
## Risk Assessment
| 风险 | 概率 | 影响 | 缓解措施 |
|------|------|------|---------|
| GEPA 进化效果不显著 | 中 | 中 | 保留 Phase 4 的逐任务优化作为 fallback |
| 多 Agent 编排死锁 | 中 | 高 | 超时机制 + 死锁检测 + 优雅降级 |
| 国内 Provider API 变更 | 低 | 低 | 抽象层隔离 + 配置化端点 |
| Ragas 评估成本过高 | 中 | 低 | 使用轻量模型做 Judge + 采样评估 |
| Contextual Retrieval 延迟 | 低 | 中 | Prompt Caching + 批处理 + 异步预处理 |