40 KiB
| title | type | status | date |
|---|---|---|---|
| refactor: GEO Agent Framework — 统一架构重构为独立项目 | refactor | active | 2026-06-04 |
Summary
将 GEO 项目的 8 个 Agent 从代码重复的独立类重构为独立 Python 包(fischer-agentkit),采用统一 Agent Core + 配置驱动 + 可插拔 Tool/Skill/Memory 架构。新增 MCP 协议支持、三层记忆系统(Working/Episodic/Semantic)、Level 3 自我进化闭环(经验积累 → Prompt 自动优化 → 策略调整)、多 Agent 协同增强(并行执行、Handoff、动态 Pipeline),并实现 Agent 与业务系统的完全解耦。
Problem Frame
当前 GEO 项目的 Agent 体系存在以下结构性问题:
- 代码重复:8 个 Agent 的
execute()方法包含完全相同的计时、try/except、TaskResult 构建逻辑(~250 行模板代码),每新增一个 Agent 需复制 ~150 行 - 架构耦合:Agent 直接依赖业务 Service(CitationService、MonitorService 等),无法独立部署和复用
- 能力缺失:无记忆系统(每次执行无状态)、无自我进化(Prompt 硬编码不可调优)、无技能插件(能力静态绑定)、无 MCP 支持
- 编排局限:Pipeline 仅支持串行 DAG、无 Agent 间 Handoff、无动态路由
- 配置僵化:AgentType 硬编码枚举、Prompt 与 Agent 紧耦合、新增 Agent 需改 5+ 文件
Requirements
Agent Core 统一架构
R1. Agent 框架必须作为独立 Python 包(fischer-agentkit),可独立安装、版本管理、跨项目复用
R2. 所有 Agent 共享统一的 BaseAgent 生命周期(start → listen → execute → reflect → evolve → stop),子类只需实现 handle_task() 返回业务数据
R3. Agent 定义支持三种模式:Python 声明式(@task 装饰器)、YAML 配置驱动(零代码)、混合模式
R4. AgentType 从硬编码枚举改为动态注册,新增 Agent 类型无需修改框架代码
R5. Agent 的 input/output 支持 JSON Schema 声明与校验
Tool/Skill 插件系统
R6. 实现 Tool 抽象基类,支持 FunctionTool(函数工具)、AgentTool(Agent 即工具)、MCPTool(MCP 协议工具)三种类型
R7. 实现 ToolRegistry,支持工具的注册、发现、版本管理、标签分类
R8. 实现 MCP Server,将现有 Agent 能力暴露为 MCP 工具供外部调用
R9. 实现 MCP Client,Agent 可调用外部 MCP 工具服务器
R10. 工具支持组合:顺序链、并行扇出/扇入、动态选择
记忆系统
R11. 实现 Working Memory(Redis-based),存储当前任务的上下文和中间状态,生命周期为单次任务 R12. 实现 Episodic Memory(向量+关系数据库),记录每次任务的输入/输出/效果/反思,支持语义检索相似历史案例 R13. 实现 Semantic Memory,复用现有 RAG 知识库,所有 Agent 均可通过统一接口检索知识 R14. 记忆检索采用混合策略:向量语义 + 关键词 + 知识图谱 + 时间衰减 + RRF 融合排序
自我进化(Level 3)
R15. 实现经验积累:每次任务执行后自动记录成功/失败模式、效果指标、反思总结 R16. 实现 Prompt 自动优化:基于 DSPy 风格的编译器,从任务结果中自动生成/优化 Prompt 指令和 few-shot 示例 R17. 实现策略调整:根据历史效果数据自动调整 Agent 参数(temperature、tool 选择权重、Pipeline 路径) R18. 进化变更必须经过 A/B 测试验证后才可生效,支持回滚
多 Agent 协同与业务编排
R19. Pipeline Engine 支持同层并行执行(无依赖的 stages 使用 asyncio.gather 并行)
R20. 实现 Handoff 机制:Agent 可在运行时将任务转交给另一个 Agent,携带上下文
R21. 支持动态 Pipeline:运行时根据条件选择子流程、嵌套 Pipeline
R22. Agent 间通信支持事件驱动(Redis Pub/Sub),替代轮询等待
Agent 与业务解耦
R23. Agent 框架不依赖任何 GEO 业务代码(Service、Model、Repository),通过 Tool 接口调用业务能力 R24. 业务系统通过 Tool 注册将自身能力暴露给 Agent,Agent 通过 ToolRegistry 发现和调用 R25. Agent 配置(Prompt、Tool 绑定、Memory 策略)存储在数据库,支持热更新
Key Technical Decisions
KTD1. 独立 Python 包架构:fischer-agentkit 作为独立包发布到私有 PyPI,GEO 项目通过 pip install 引入。包结构遵循 src/ layout,支持独立测试和版本管理。理由:解耦 Agent 基础设施与业务代码,支持跨项目复用。
KTD2. 统一 Agent Core 的 execute 模板上移:将计时、try/except、TaskResult 构建、进度上报等公共逻辑全部上移到 BaseAgent.execute(),子类只需实现 handle_task(task) -> dict。理由:消除 8 个 Agent 中 ~250 行重复代码。
KTD3. Tool 抽象三层架构:FunctionTool(进程内函数调用)→ MCPTool(跨进程 MCP 协议调用)→ AgentTool(将 Agent 包装为 Tool)。理由:覆盖从简单到复杂的所有工具场景,MCP 作为标准协议确保生态兼容性。
KTD4. MCP 双向支持:同时实现 MCP Server(暴露 Agent 能力)和 MCP Client(调用外部工具)。传输层优先支持 Streamable HTTP(2025 标准),兼容 SSE。理由:MCP 是 2025 年工具协议事实标准,双向支持最大化生态连接能力。
KTD5. 三层记忆架构:Working Memory(Redis,单任务生命周期)→ Episodic Memory(pgvector + PostgreSQL,任务经验)→ Semantic Memory(复用现有 RAG + 知识图谱)。理由:不同记忆类型有不同生命周期和检索模式,分层实现职责清晰。
KTD6. Level 3 进化采用 DSPy 风格编译器:定义 Signature(输入/输出 schema)→ Module(可组合 Prompt 策略)→ Optimizer(自动优化),从任务结果中自动构建 few-shot 示例和优化指令。理由:DSPy 是目前最成熟的 Prompt 自动优化范式,其编译器模式与 Agent 的 execute-reflect-evolve 生命周期天然契合。
KTD7. 配置驱动的 Agent 定义:Agent 的元数据、Prompt、Tool 绑定、Memory 策略均通过 YAML/数据库配置,运行时由 ConfigDrivenAgent 自动组装。理由:将新增 Agent 从写 150 行代码降为 10-20 行配置。
KTD8. Pipeline 并行执行:将拓扑排序后的 stages 按依赖层级分组,同层内使用 asyncio.gather 并行执行。理由:引用检测和趋势分析等无依赖任务应并行,当前串行执行浪费时间。
KTD9. Handoff 基于 Redis Pub/Sub:Agent 通过 agent:{name}:handoff 频道发送转交请求,目标 Agent 监听并接管。理由:与现有 Redis Queue 架构一致,无需引入新组件。
High-Level Technical Design
整体架构
flowchart TB
subgraph FischerAgentKit["fischer-agentkit (独立包)"]
direction TB
Core["Agent Core"]
Tools["Tool System"]
Memory["Memory System"]
Evolution["Evolution Engine"]
Orchestrator["Orchestrator"]
MCP["MCP Layer"]
subgraph Core
BaseAgent["BaseAgent<br/>生命周期管理"]
ConfigDriven["ConfigDrivenAgent<br/>配置驱动"]
Registry["AgentRegistry<br/>注册发现"]
Dispatcher["TaskDispatcher<br/>任务调度"]
Protocol["Protocol<br/>通信协议"]
end
subgraph Tools
ToolRegistry["ToolRegistry<br/>注册发现"]
FunctionTool["FunctionTool<br/>函数工具"]
MCPTool["MCPTool<br/>MCP工具"]
AgentTool["AgentTool<br/>Agent即工具"]
end
subgraph Memory
Working["Working Memory<br/>Redis"]
Episodic["Episodic Memory<br/>pgvector+PG"]
Semantic["Semantic Memory<br/>RAG+Graph"]
Retriever["MemoryRetriever<br/>混合检索"]
end
subgraph Evolution
Reflector["Reflector<br/>执行反思"]
PromptOptimizer["PromptOptimizer<br/>DSPy编译器"]
StrategyTuner["StrategyTuner<br/>策略调优"]
ABTester["ABTester<br/>A/B测试"]
end
subgraph Orchestrator
PipelineEngine["PipelineEngine<br/>DAG+并行"]
Handoff["Handoff<br/>任务转交"]
DynamicPipeline["DynamicPipeline<br/>运行时组合"]
end
subgraph MCP
MCPServer["MCP Server<br/>暴露能力"]
MCPClient["MCP Client<br/>调用外部"]
end
end
subgraph GEOBusiness["GEO 业务系统"]
Services["业务 Services"]
Models["数据 Models"]
Repos["Repositories"]
end
Core --> Tools
Core --> Memory
Core --> Evolution
Core --> Orchestrator
Tools --> MCP
Services -.->|"注册为 FunctionTool"| ToolRegistry
Semantic -.->|"复用"| Services
Agent 生命周期
stateDiagram-v2
[*] --> Offline
Offline --> Online: start()
Online --> Listening: listen_for_tasks()
Listening --> Executing: receive_task()
Executing --> Reflecting: handle_task() 完成
Reflecting --> Evolving: reflect() 有改进建议
Reflecting --> Listening: reflect() 无改进
Evolving --> Listening: evolve() 完成
Executing --> Listening: handle_task() 异常
Online --> Offline: stop()
Listening --> Offline: stop()
state Executing {
[*] --> LoadMemory: 加载记忆
LoadMemory --> PlanTask: 规划任务
PlanTask --> RunTools: 执行工具
RunTools --> StoreMemory: 存储记忆
StoreMemory --> BuildResult: 构建结果
BuildResult --> [*]
}
state Reflecting {
[*] --> EvaluateOutcome: 评估结果
EvaluateOutcome --> ExtractPatterns: 提取模式
ExtractPatterns --> GenerateInsights: 生成洞察
GenerateInsights --> [*]
}
state Evolving {
[*] --> OptimizePrompt: 优化Prompt
OptimizePrompt --> TuneStrategy: 调优策略
TuneStrategy --> ABTest: A/B测试
ABTest --> ApplyOrRollback: 应用或回滚
ApplyOrRollback --> [*]
}
Tool 系统架构
flowchart LR
subgraph Agent["Agent"]
Executor["Task Executor"]
end
subgraph ToolRegistry["ToolRegistry"]
FT["FunctionTool<br/>name, description<br/>input_schema, output_schema<br/>execute(**kwargs)"]
MT["MCPTool<br/>server_url, tool_name<br/>input_schema, output_schema<br/>execute(**kwargs)"]
AT["AgentTool<br/>agent_name<br/>input_mapping, output_mapping<br/>execute(**kwargs)"]
end
subgraph MCPServer["MCP Server"]
MCPEndpoints["/tools/list<br/>/tools/call<br/>/resources/read"]
end
subgraph ExternalMCP["External MCP Servers"]
ExtTool1["File System"]
ExtTool2["GitHub"]
ExtTool3["Postgres"]
end
Executor -->|"select & call"| ToolRegistry
FT -->|"进程内调用"| BusinessLogic["业务函数"]
MT -->|"HTTP/SSE"| MCPServer
MT -->|"HTTP/SSE"| ExternalMCP
AT -->|"dispatch"| TargetAgent["目标 Agent"]
MCPServer -->|"暴露"| AgentCapabilities["Agent 能力"]
记忆系统数据流
flowchart TB
Task["新任务到达"] --> WM["Working Memory<br/>加载当前任务上下文"]
WM --> EM["Episodic Memory<br/>检索相似历史案例"]
EM --> SM["Semantic Memory<br/>检索知识库"]
SM --> Context["组装上下文"]
Context --> Execute["执行任务"]
Execute --> WM_Write["写入 Working Memory<br/>中间状态"]
WM_Write --> Execute
Execute --> EM_Write["写入 Episodic Memory<br/>任务经验"]
Execute --> Reflect["反思评估"]
Reflect --> EM_Update["更新 Episodic Memory<br/>反思结果"]
subgraph 检索策略
VS["向量语义检索<br/>权重 0.5"]
KS["关键词精确匹配<br/>权重 0.2"]
GT["知识图谱关联<br/>权重 0.3"]
RRF["RRF 融合排序"]
TD["时间衰减"]
VS --> RRF
KS --> RRF
GT --> RRF
RRF --> TD
end
进化闭环
flowchart TB
Execute["任务执行"] --> Result["任务结果"]
Result --> Evaluate["效果评估<br/>成功/失败/质量分"]
Evaluate --> Pattern["模式提取<br/>成功模式/失败模式"]
Pattern --> Optimize["优化生成"]
Optimize --> PromptOpt["Prompt 优化<br/>DSPy 编译器"]
Optimize --> StrategyOpt["策略调优<br/>参数/工具权重"]
Optimize --> PipelineOpt["Pipeline 优化<br/>路径选择"]
PromptOpt --> ABTest["A/B 测试"]
StrategyOpt --> ABTest
PipelineOpt --> ABTest
ABTest -->|"效果提升"| Apply["应用变更"]
ABTest -->|"效果下降"| Rollback["回滚"]
ABTest -->|"不确定"| Extend["延长测试"]
Apply --> Record["记录进化日志"]
Rollback --> Record
Record --> Execute
Output Structure
fischer-agentkit/ # 独立 Python 包
├── src/
│ └── agentkit/
│ ├── __init__.py
│ ├── core/
│ │ ├── __init__.py
│ │ ├── base.py # BaseAgent 生命周期
│ │ ├── config_driven.py # ConfigDrivenAgent 配置驱动
│ │ ├── registry.py # AgentRegistry 注册发现
│ │ ├── dispatcher.py # TaskDispatcher 任务调度
│ │ ├── protocol.py # 通信协议与数据结构
│ │ ├── exceptions.py # 异常体系
│ │ └── standalone.py # 独立启动器(自动发现)
│ ├── tools/
│ │ ├── __init__.py
│ │ ├── base.py # Tool 抽象基类
│ │ ├── function_tool.py # FunctionTool
│ │ ├── agent_tool.py # AgentTool
│ │ ├── mcp_tool.py # MCPTool
│ │ └── registry.py # ToolRegistry
│ ├── memory/
│ │ ├── __init__.py
│ │ ├── base.py # Memory 抽象基类
│ │ ├── working.py # Working Memory (Redis)
│ │ ├── episodic.py # Episodic Memory (pgvector+PG)
│ │ ├── semantic.py # Semantic Memory (RAG+Graph)
│ │ └── retriever.py # 混合检索器
│ ├── evolution/
│ │ ├── __init__.py
│ │ ├── reflector.py # 执行反思
│ │ ├── prompt_optimizer.py # DSPy 风格 Prompt 优化器
│ │ ├── strategy_tuner.py # 策略调优
│ │ ├── ab_tester.py # A/B 测试框架
│ │ └── evolution_store.py # 进化日志存储
│ ├── orchestrator/
│ │ ├── __init__.py
│ │ ├── pipeline_engine.py # DAG + 并行 Pipeline
│ │ ├── pipeline_schema.py # Pipeline 数据模型
│ │ ├── pipeline_loader.py # YAML 加载器
│ │ ├── handoff.py # Handoff 机制
│ │ └── dynamic_pipeline.py # 动态 Pipeline 组合
│ ├── mcp/
│ │ ├── __init__.py
│ │ ├── server.py # MCP Server
│ │ ├── client.py # MCP Client
│ │ └── transport.py # 传输层 (HTTP/SSE)
│ └── prompts/
│ ├── __init__.py
│ ├── template.py # PromptTemplate
│ └── section.py # PromptSection
├── tests/
│ ├── unit/
│ │ ├── test_base_agent.py
│ │ ├── test_config_driven.py
│ │ ├── test_tool_registry.py
│ │ ├── test_function_tool.py
│ │ ├── test_mcp_tool.py
│ │ ├── test_agent_tool.py
│ │ ├── test_working_memory.py
│ │ ├── test_episodic_memory.py
│ │ ├── test_semantic_memory.py
│ │ ├── test_memory_retriever.py
│ │ ├── test_reflector.py
│ │ ├── test_prompt_optimizer.py
│ │ ├── test_strategy_tuner.py
│ │ ├── test_ab_tester.py
│ │ ├── test_pipeline_parallel.py
│ │ ├── test_handoff.py
│ │ ├── test_dynamic_pipeline.py
│ │ ├── test_mcp_server.py
│ │ └── test_mcp_client.py
│ └── integration/
│ ├── test_agent_lifecycle.py
│ ├── test_tool_composition.py
│ ├── test_evolution_loop.py
│ └── test_mcp_roundtrip.py
├── pyproject.toml
└── README.md
geo/backend/ # GEO 业务系统(改造后)
├── app/
│ ├── agent_framework/ # 保留为适配层
│ │ ├── __init__.py
│ │ ├── agents/ # 改为配置驱动
│ │ │ ├── __init__.py
│ │ │ ├── configs/ # Agent YAML 配置
│ │ │ │ ├── citation_detector.yaml
│ │ │ │ ├── content_generator.yaml
│ │ │ │ ├── deai_agent.yaml
│ │ │ │ ├── geo_optimizer.yaml
│ │ │ │ ├── monitor.yaml
│ │ │ │ ├── schema_advisor.yaml
│ │ │ │ ├── competitor_analyzer.yaml
│ │ │ │ └── trend_agent.yaml
│ │ │ └── custom_handlers/ # 仅复杂 Agent 需自定义 handler
│ │ │ ├── citation_handler.py
│ │ │ └── monitor_handler.py
│ │ ├── tools/ # 业务 Tool 注册
│ │ │ ├── __init__.py
│ │ │ ├── citation_tools.py
│ │ │ ├── content_tools.py
│ │ │ ├── knowledge_tools.py
│ │ │ ├── monitor_tools.py
│ │ │ └── schema_tools.py
│ │ └── prompts/ # Prompt 模板(保留,可热更新)
│ ├── models/
│ │ └── agent.py # 新增 evolution_logs, episodic_memories 表
│ └── ...
Implementation Units
U1. 独立包脚手架与 BaseAgent 重构
Goal: 创建 fischer-agentkit 独立包,重构 BaseAgent 将 execute 模板代码上移,子类只需实现 handle_task()。
Dependencies: 无
Files:
fischer-agentkit/src/agentkit/__init__.pyfischer-agentkit/src/agentkit/core/__init__.pyfischer-agentkit/src/agentkit/core/base.pyfischer-agentkit/src/agentkit/core/protocol.pyfischer-agentkit/src/agentkit/core/exceptions.pyfischer-agentkit/src/agentkit/core/registry.pyfischer-agentkit/src/agentkit/core/dispatcher.pyfischer-agentkit/pyproject.tomlfischer-agentkit/tests/unit/test_base_agent.py
Approach:
- 创建
fischer-agentkit包,使用src/layout,pyproject.toml声明依赖(redis[hiredis],pydantic>=2.0,sqlalchemy[asyncio]>=2.0) - 从 GEO 项目迁移
protocol.py(AgentCapability, TaskMessage, TaskResult, TaskProgress, TaskStatus, AgentStatus),移除 AgentType 硬编码枚举,改为动态注册 - 重构
BaseAgent:execute()方法变为 final(非抽象),包含完整的计时、try/except、TaskResult 构建、进度上报- 新增抽象方法
handle_task(task) -> dict,子类只需返回 output_data - 新增生命周期钩子:
on_task_start(),on_task_complete(),on_task_failed() - 新增
tools: list[Tool]属性(默认空,U3 实现) - 新增
memory: Memory属性(默认空,U4 实现)
- 迁移
registry.py,dispatcher.py,exceptions.py,去除 GEO 业务依赖 AgentRegistry.get_available_agent()增加负载均衡策略(轮询/最少任务)
Patterns to follow: 现有 backend/app/agent_framework/base.py 的双模式(Redis/本地)设计
Test scenarios:
- BaseAgent 子类实现 handle_task 返回 dict,execute 自动包装为 TaskResult
- handle_task 抛异常时 execute 自动构建 FAILED TaskResult
- on_task_start/complete/failed 钩子按序调用
- AgentRegistry 动态注册新 AgentType 不报错
- AgentRegistry.get_available_agent 轮询策略返回不同 Agent
Verification: fischer-agentkit 可独立 pip install -e .,单元测试全部通过
U2. Protocol 扩展与配置驱动 Agent
Goal: 扩展 Protocol 支持 JSON Schema 校验、Handoff 消息;实现 ConfigDrivenAgent,支持 YAML/数据库配置驱动 Agent 定义。
Dependencies: U1
Files:
fischer-agentkit/src/agentkit/core/protocol.py(扩展)fischer-agentkit/src/agentkit/core/config_driven.pyfischer-agentkit/src/agentkit/core/standalone.pyfischer-agentkit/src/agentkit/prompts/template.pyfischer-agentkit/src/agentkit/prompts/section.pyfischer-agentkit/tests/unit/test_config_driven.pyfischer-agentkit/tests/unit/test_protocol.py
Approach:
- Protocol 扩展:
AgentCapability增加input_schema: dict | None、output_schema: dict | None(JSON Schema)- 新增
HandoffMessage数据类(source_agent, target_agent, task_context, reason) - 新增
EvolutionEvent数据类(agent_name, change_type, before, after, metrics) TaskMessage增加conversation_id: str | None支持多轮对话
- ConfigDrivenAgent:
- 接受 YAML 配置或数据库配置,自动组装:Prompt 模板 + LLM 参数 + Tool 绑定 + Memory 策略
- 内置 LLM 调用 + JSON 输出解析 + 降级兜底的标准流程
- 支持三种任务模式:
llm_generate(Prompt → LLM → 解析)、tool_call(调用指定 Tool)、custom(自定义 handler)
- PromptTemplate 迁移并增强:支持动态 section 组合、版本标记
- Standalone runner 改为自动发现:扫描
agent_configs/目录下的 YAML 文件,自动注册和启动
Patterns to follow: 现有 prompts/base_template.py 的 PromptSection 设计
Test scenarios:
- ConfigDrivenAgent 从 YAML 加载配置并正确组装
- llm_generate 模式:渲染 Prompt → 调用 Mock LLM → 解析 JSON 输出
- tool_call 模式:调用注册的 FunctionTool 并返回结果
- input_schema 校验:缺少必填字段时返回校验错误
- HandoffMessage 序列化/反序列化正确
- 自动发现:在 configs/ 目录放置 YAML 后 standalone 可自动加载
Verification: 通过 YAML 配置定义一个新 Agent,无需写 Python 代码即可运行
U3. Tool/Skill 插件系统
Goal: 实现 Tool 抽象基类、FunctionTool、AgentTool、ToolRegistry,支持工具注册、发现、组合。
Dependencies: U1
Files:
fischer-agentkit/src/agentkit/tools/__init__.pyfischer-agentkit/src/agentkit/tools/base.pyfischer-agentkit/src/agentkit/tools/function_tool.pyfischer-agentkit/src/agentkit/tools/agent_tool.pyfischer-agentkit/src/agentkit/tools/registry.pyfischer-agentkit/tests/unit/test_tool_registry.pyfischer-agentkit/tests/unit/test_function_tool.pyfischer-agentkit/tests/unit/test_agent_tool.pyfischer-agentkit/tests/integration/test_tool_composition.py
Approach:
Tool抽象基类:- 属性:
name,description,input_schema(JSON Schema),output_schema(JSON Schema),version,tags - 抽象方法:
async execute(**kwargs) -> dict - 生命周期钩子:
before_execute(),after_execute(),on_error()
- 属性:
FunctionTool:包装普通 Python 函数为 Tool,自动从函数签名推断 input_schemaAgentTool:包装另一个 Agent 为 Tool,输入/输出通过 mapping 适配,内部通过 Dispatcher 分发任务ToolRegistry:register(tool),unregister(name),get(name),list_tools(tag=None)- 支持版本管理:同一工具多版本共存,默认使用最新版
- 支持标签分类:
[citation, analysis, generation, optimization]
- 工具组合:
SequentialChain([tool_a, tool_b]):顺序执行,前一个输出作为后一个输入ParallelFanOut([tool_a, tool_b, tool_c]):并行执行,结果合并DynamicSelector(llm, tools):LLM 根据任务动态选择工具
Patterns to follow: 现有 LLMFactory 的注册-创建模式
Test scenarios:
- FunctionTool 从函数自动推断 schema 并执行
- AgentTool 分发任务到目标 Agent 并返回结果
- ToolRegistry 注册/发现/按标签过滤
- SequentialChain 顺序执行两个工具
- ParallelFanOut 并行执行三个工具
- DynamicSelector 根据 LLM 判断选择合适工具
- 工具版本管理:注册 v1 和 v2,默认返回 v2
Verification: 通过 ToolRegistry 注册 3 个 FunctionTool,Agent 可声明式绑定并调用
U4. 记忆系统
Goal: 实现三层记忆系统(Working/Episodic/Semantic)和混合检索器。
Dependencies: U1
Files:
fischer-agentkit/src/agentkit/memory/__init__.pyfischer-agentkit/src/agentkit/memory/base.pyfischer-agentkit/src/agentkit/memory/working.pyfischer-agentkit/src/agentkit/memory/episodic.pyfischer-agentkit/src/agentkit/memory/semantic.pyfischer-agentkit/src/agentkit/memory/retriever.pyfischer-agentkit/tests/unit/test_working_memory.pyfischer-agentkit/tests/unit/test_episodic_memory.pyfischer-agentkit/tests/unit/test_semantic_memory.pyfischer-agentkit/tests/unit/test_memory_retriever.py
Approach:
Memory抽象基类:async store(key, value, metadata),async retrieve(query, top_k),async delete(key)async search(query, top_k, filters)— 语义检索
WorkingMemory(Redis):- 以
agent:{name}:working_memory:{task_id}为 key 前缀 - 支持自动过期(TTL = 任务超时时间 × 2)
- 提供
get_context()方法,返回格式化的上下文字符串
- 以
EpisodicMemory(pgvector + PostgreSQL):- 表结构:
id, agent_name, task_type, input_summary, output_summary, outcome(success/fail), quality_score, reflection, embedding, created_at - 写入时自动生成 embedding(复用 Embedder 接口)
- 检索:向量语义 + 关键词 + 时间衰减 + RRF 融合
- 表结构:
SemanticMemory:- 适配器模式,对接 GEO 项目的
RAGService和GraphQuery - 提供统一的
search(query, knowledge_base_ids, top_k)接口
- 适配器模式,对接 GEO 项目的
MemoryRetriever(混合检索器):- 并行查询三层记忆,按权重融合排序
- 时间衰减:
score *= exp(-0.01 * age_hours) - 上下文窗口管理:总 token 不超过预算
Patterns to follow: 现有 HybridRetriever 的 RRF 融合排序模式
Test scenarios:
- WorkingMemory 存取任务上下文,TTL 过期后自动清除
- EpisodicMemory 写入任务经验并按语义检索相似案例
- EpisodicMemory 时间衰减:近期经验权重高于远期
- SemanticMemory 通过适配器调用 RAGService
- MemoryRetriever 混合检索三层记忆并按权重融合
- 上下文窗口管理:检索结果超过 token 预算时智能截断
Verification: Agent 执行任务后自动写入 EpisodicMemory,后续相似任务可检索到历史经验
U5. MCP Server 与 Client
Goal: 实现 MCP Server(暴露 Agent 能力)和 MCP Client(调用外部 MCP 工具),完成 MCPTool。
Dependencies: U3
Files:
fischer-agentkit/src/agentkit/mcp/__init__.pyfischer-agentkit/src/agentkit/mcp/server.pyfischer-agentkit/src/agentkit/mcp/client.pyfischer-agentkit/src/agentkit/mcp/transport.pyfischer-agentkit/src/agentkit/tools/mcp_tool.pyfischer-agentkit/tests/unit/test_mcp_server.pyfischer-agentkit/tests/unit/test_mcp_client.pyfischer-agentkit/tests/unit/test_mcp_tool.pyfischer-agentkit/tests/integration/test_mcp_roundtrip.py
Approach:
- MCP Server:
- 基于 FastAPI 实现,支持 Streamable HTTP 传输
- 端点:
/tools/list(列出可用工具)、/tools/call(调用工具)、/resources/read(读取资源) - 自动将 ToolRegistry 中注册的工具暴露为 MCP 工具
- 支持 SSE 流式响应
- MCP Client:
- 连接外部 MCP Server(HTTP/SSE)
- 自动发现远程工具并注册到本地 ToolRegistry
- 支持工具调用的流式响应
- MCPTool:
- 继承 Tool 基类,内部通过 MCP Client 调用远程工具
- 自动从 MCP Server 获取 input_schema
- Transport 层:
- 抽象
Transport接口(send_request,receive_response) - 实现
HTTPTransport(Streamable HTTP)和SSETransport(Server-Sent Events)
- 抽象
Patterns to follow: MCP 官方 Python SDK 的 Server/Client 模式
Test scenarios:
- MCP Server 启动后
/tools/list返回已注册工具列表 - MCP Client 连接 Server 并调用工具返回正确结果
- MCPTool 通过 Client 调用远程工具
- SSE 流式响应正确传输
- MCP Server 自动暴露 ToolRegistry 中的 FunctionTool
- 外部 MCP Server 的工具自动注册到本地 ToolRegistry
Verification: 启动 MCP Server,通过 MCP Client 调用 Agent 能力,端到端成功
U6. 自我进化引擎(Level 3)
Goal: 实现执行反思、DSPy 风格 Prompt 自动优化、策略调优、A/B 测试框架。
Dependencies: U1, U4
Files:
fischer-agentkit/src/agentkit/evolution/__init__.pyfischer-agentkit/src/agentkit/evolution/reflector.pyfischer-agentkit/src/agentkit/evolution/prompt_optimizer.pyfischer-agentkit/src/agentkit/evolution/strategy_tuner.pyfischer-agentkit/src/agentkit/evolution/ab_tester.pyfischer-agentkit/src/agentkit/evolution/evolution_store.pyfischer-agentkit/tests/unit/test_reflector.pyfischer-agentkit/tests/unit/test_prompt_optimizer.pyfischer-agentkit/tests/unit/test_strategy_tuner.pyfischer-agentkit/tests/unit/test_ab_tester.pyfischer-agentkit/tests/integration/test_evolution_loop.py
Approach:
Reflector(执行反思):- 每次任务完成后自动评估:成功/失败、质量评分(基于 output_schema 约束和业务指标)
- 提取模式:常见失败原因、高效策略、低效路径
- 生成反思总结存入 EpisodicMemory
PromptOptimizer(DSPy 风格编译器):Signature:定义input_fields -> output_fields的结构化签名Module:可组合的 Prompt 策略(ChainOfThought, ReAct, FewShot)Optimizer:BootstrapFewShot:从成功案例中自动构建 few-shot 示例MIPROv2:多目标 Prompt 优化(指令 + few-shot 联合优化)- 基于历史任务数据编译,产出优化后的 Prompt
StrategyTuner(策略调优):- 可调参数:temperature, tool_selection_weights, pipeline_path
- 基于 Bayesian Optimization 搜索最优参数组合
- 每次调优记录 before/after 指标
ABTester(A/B 测试框架):- 支持配置分流比例(如 80% 原版 / 20% 实验版)
- 自动收集实验组和对照组的效果指标
- 统计显著性检验(t-test),达到置信度后自动决策
EvolutionStore(进化日志):- 表结构:
id, agent_name, change_type(prompt/strategy/pipeline), before, after, ab_test_id, status(active/rolled_back), created_at - 支持回滚:
rollback(evolution_id)
- 表结构:
Patterns to follow: DSPy 的 Signature/Module/Optimizer 三层架构
Test scenarios:
- Reflector 评估成功任务生成正面反思
- Reflector 评估失败任务提取失败模式
- PromptOptimizer 从 10 个成功案例自动生成 few-shot 示例
- PromptOptimizer 优化后的 Prompt 在测试集上效果提升
- StrategyTuner 调整 temperature 后效果改善
- ABTester 80/20 分流,实验组效果显著后自动应用
- EvolutionStore 记录变更并支持回滚
Verification: Agent 执行 20 次任务后,PromptOptimizer 自动优化 Prompt,ABTest 验证效果提升后自动应用
U7. 多 Agent 协同增强
Goal: Pipeline Engine 支持并行执行、Handoff 机制、动态 Pipeline 组合。
Dependencies: U1, U2
Files:
fischer-agentkit/src/agentkit/orchestrator/__init__.pyfischer-agentkit/src/agentkit/orchestrator/pipeline_engine.pyfischer-agentkit/src/agentkit/orchestrator/pipeline_schema.pyfischer-agentkit/src/agentkit/orchestrator/pipeline_loader.pyfischer-agentkit/src/agentkit/orchestrator/handoff.pyfischer-agentkit/src/agentkit/orchestrator/dynamic_pipeline.pyfischer-agentkit/tests/unit/test_pipeline_parallel.pyfischer-agentkit/tests/unit/test_handoff.pyfischer-agentkit/tests/unit/test_dynamic_pipeline.py
Approach:
- Pipeline 并行执行:
- 拓扑排序后按依赖层级分组(同层无依赖)
- 同层内使用
asyncio.gather并行执行 - 并行 stage 的结果合并后传给下一层
- Handoff 机制:
HandoffManager管理转交请求- Agent 调用
self.handoff(target_agent, context, reason)发起转交 - 通过 Redis Pub/Sub
agent:{target}:handoff频道通知目标 Agent - 目标 Agent 接收后创建新任务,携带源 Agent 的上下文
- 源 Agent 的任务状态变为
HANDOFF
- 动态 Pipeline:
- 支持
sub_pipelinestage 类型:引用另一个 YAML Pipeline - 支持
conditional_pipeline:根据运行时条件选择子流程 - 支持
loop_pipeline:循环执行直到条件满足
- 支持
- 事件驱动替代轮询:
- Pipeline Engine 通过 Redis Pub/Sub 订阅
agent:{name}:result频道 - 任务完成后自动触发下一 stage,无需轮询
- Pipeline Engine 通过 Redis Pub/Sub 订阅
Patterns to follow: 现有 PipelineEngine 的 DAG 拓扑排序和变量解析
Test scenarios:
- 3 个无依赖 stage 并行执行,总耗时约等于最长单个 stage
- Handoff:Agent A 转交任务给 Agent B,B 接收并执行
- 动态 Pipeline:根据条件选择不同的子流程
- 子 Pipeline 嵌套执行并正确传递变量
- 事件驱动:任务完成后自动触发下一 stage,无轮询间隔
- 循环 Pipeline:条件满足后退出循环
Verification: 内容生产 Pipeline 的 deai_processing 和 geo_optimization 并行执行,总耗时减少约 40%
U8. GEO 业务系统适配与迁移
Goal: 将 GEO 项目的 8 个 Agent 迁移到新框架,业务 Service 注册为 Tool,实现完全解耦。
Dependencies: U1, U2, U3, U4, U5, U6, U7
Files:
geo/backend/app/agent_framework/agents/configs/citation_detector.yamlgeo/backend/app/agent_framework/agents/configs/content_generator.yamlgeo/backend/app/agent_framework/agents/configs/deai_agent.yamlgeo/backend/app/agent_framework/agents/configs/geo_optimizer.yamlgeo/backend/app/agent_framework/agents/configs/monitor.yamlgeo/backend/app/agent_framework/agents/configs/schema_advisor.yamlgeo/backend/app/agent_framework/agents/configs/competitor_analyzer.yamlgeo/backend/app/agent_framework/agents/configs/trend_agent.yamlgeo/backend/app/agent_framework/agents/custom_handlers/citation_handler.pygeo/backend/app/agent_framework/agents/custom_handlers/monitor_handler.pygeo/backend/app/agent_framework/tools/__init__.pygeo/backend/app/agent_framework/tools/citation_tools.pygeo/backend/app/agent_framework/tools/content_tools.pygeo/backend/app/agent_framework/tools/knowledge_tools.pygeo/backend/app/agent_framework/tools/monitor_tools.pygeo/backend/app/agent_framework/tools/schema_tools.pygeo/backend/app/agent_framework/tools/competitor_tools.pygeo/backend/app/agent_framework/tools/trend_tools.pygeo/backend/app/models/agent.py(新增表)geo/backend/requirements.txt(添加 fischer-agentkit 依赖)geo/backend/app/agent_framework/__init__.py(适配层)
Approach:
- 业务 Tool 注册:
CitationTools:execute_single_platform,get_or_create_task,calculate_next_query_atContentTools:retrieve_knowledge(包装 RAGService)MonitorTools:check_and_compare,generate_change_report,create_monitoring_recordSchemaTools:identify_missing_dimensions,match_templates,validate_json_ldCompetitorTools:analyze_competitorTrendTools:analyze_trends,get_hotspots
- Agent 配置迁移:
- LLM 驱动型 Agent(ContentGenerator, DeAI, GEOOptimizer, SchemaAdvisor)→ YAML 配置 +
llm_generate模式,零代码 - Service 代理型 Agent(CitationDetector, Monitor)→ YAML 配置 +
customhandler,仅保留业务逻辑方法 - CompetitorAnalyzer, TrendAgent → YAML 配置 +
tool_call模式
- LLM 驱动型 Agent(ContentGenerator, DeAI, GEOOptimizer, SchemaAdvisor)→ YAML 配置 +
- 数据库迁移:
- 新增
episodic_memories表 - 新增
evolution_logs表 - 新增
ab_test_configs表
- 新增
- 适配层:
geo/backend/app/agent_framework/保留为适配层,import fromagentkit- 现有 API 端点(
/agents/,/agents/tasks/)保持不变 - 现有 Pipeline YAML 保持兼容
Patterns to follow: 现有 Agent 的业务逻辑实现,迁移时保持行为一致
Test scenarios:
- 8 个 Agent 迁移后行为与原版一致(回归测试)
- 新增 Agent 只需 YAML 配置,无需写 Python 代码
- 业务 Tool 注册后 Agent 可通过 ToolRegistry 发现和调用
- EpisodicMemory 自动记录任务经验
- EvolutionStore 记录进化变更
- 现有 API 端点功能不变
- 现有 Pipeline YAML 正常执行
Verification: 运行现有 Agent 集成测试全部通过,新增一个 YAML-only Agent 成功执行任务
Scope Boundaries
In Scope
fischer-agentkit独立包的完整实现(Core, Tools, Memory, Evolution, Orchestrator, MCP)- GEO 项目 8 个 Agent 的迁移和适配
- MCP Server/Client 双向支持
- Level 3 自我进化(反思 + Prompt 优化 + 策略调优 + A/B 测试)
- Pipeline 并行执行、Handoff、动态 Pipeline
- 三层记忆系统
- 数据库迁移(新增表)
Deferred for Later
- A2A Protocol 支持(跨组织 Agent 协作,待 MCP 稳定后再评估)
- Agent 可视化编排 UI(前端拖拽式 Pipeline 编辑器)
- Agent 市场/共享机制(跨组织共享 Agent 配置和 Tool)
- 多租户隔离增强(Agent 级别的资源隔离和配额)
- Agent 安全沙箱(Tool 执行的权限控制和审计)
Outside This Project's Identity
- 通用 LLM Gateway(不属于 Agent 框架范畴)
- 替代现有 LLMFactory(保持现有 LLM 调用体系不变)
- 前端 Agent 管理界面重构(本次聚焦后端架构)
Risks & Dependencies
| Risk | Impact | Mitigation |
|---|---|---|
| MCP 协议规范变动(2025-2026 仍在演进) | MCPTool/MCPClient 需要跟进修改 | 抽象 Transport 层,协议变更只影响 Transport 实现 |
| DSPy 风格 Prompt 优化可能需要大量训练数据 | 优化效果不明显 | BootstrapFewShot 从少量数据开始,MIPROv2 需 50+ 案例时才启用 |
| 独立包与 GEO 项目的版本同步 | GEO 升级 agentkit 版本可能引入 breaking change | 语义化版本管理,GEO 项目 pin 版本,CI 自动测试兼容性 |
| Episodic Memory 数据量增长 | pgvector 查询性能下降 | 设置 TTL 自动清理、定期归档、HNSW 索引优化 |
| 迁移期间 8 个 Agent 行为不一致 | 业务回归 | 逐个迁移 + 回归测试,保留旧代码直到新代码验证通过 |
| Pipeline 并行执行引入并发问题 | 结果不一致 | 同层 stage 只读共享变量,写入隔离 |
System-Wide Impact
- 数据库:新增 3 张表(episodic_memories, evolution_logs, ab_test_configs),需 Alembic 迁移
- Redis:新增 Working Memory key 空间和 Handoff 频道,内存使用增加约 20%
- API:现有
/agents/端点保持兼容,新增/agents/{name}/evolution查看进化历史 - 部署:
fischer-agentkit需发布到私有 PyPI,GEO 项目 Dockerfile 需添加安装步骤 - 监控:新增进化相关 Prometheus 指标(evolution_attempts_total, evolution_success_rate, ab_test_decisions)
- 依赖:新增
mcpPython 包依赖(MCP SDK)
Sources & Research
- 现有 Agent 框架代码:
backend/app/agent_framework/全部文件 - 现有 RAG 服务:
backend/app/services/knowledge/rag_service.py,retriever.py,enhanced_rag.py - 现有 LLM 工厂:
backend/app/services/llm/factory.py,base.py - MCP 官方规范:Model Context Protocol (Anthropic, 2024-2025)
- DSPy 框架:Stanford NLP, Prompt 自动优化范式
- Google A2A Protocol:Agent-to-Agent HTTP 协议 (2025.4)
- LangGraph 0.2.x:StateGraph + Checkpoint 架构
- OpenAI Agents SDK 1.0:Handoff 模式
- Google ADK 0.1:Agent 组合模式(Sequential/Parallel/Loop)