geo/docs/plans/2026-06-04-010-refactor-uni...

40 KiB
Raw Permalink Blame History

title type status date
refactor: GEO Agent Framework — 统一架构重构为独立项目 refactor active 2026-06-04

Summary

将 GEO 项目的 8 个 Agent 从代码重复的独立类重构为独立 Python 包(fischer-agentkit),采用统一 Agent Core + 配置驱动 + 可插拔 Tool/Skill/Memory 架构。新增 MCP 协议支持、三层记忆系统Working/Episodic/Semantic、Level 3 自我进化闭环(经验积累 → Prompt 自动优化 → 策略调整)、多 Agent 协同增强并行执行、Handoff、动态 Pipeline并实现 Agent 与业务系统的完全解耦。

Problem Frame

当前 GEO 项目的 Agent 体系存在以下结构性问题:

  1. 代码重复8 个 Agent 的 execute() 方法包含完全相同的计时、try/except、TaskResult 构建逻辑(~250 行模板代码),每新增一个 Agent 需复制 ~150 行
  2. 架构耦合Agent 直接依赖业务 ServiceCitationService、MonitorService 等),无法独立部署和复用
  3. 能力缺失无记忆系统每次执行无状态、无自我进化Prompt 硬编码不可调优)、无技能插件(能力静态绑定)、无 MCP 支持
  4. 编排局限Pipeline 仅支持串行 DAG、无 Agent 间 Handoff、无动态路由
  5. 配置僵化AgentType 硬编码枚举、Prompt 与 Agent 紧耦合、新增 Agent 需改 5+ 文件

Requirements

Agent Core 统一架构

R1. Agent 框架必须作为独立 Python 包(fischer-agentkit),可独立安装、版本管理、跨项目复用 R2. 所有 Agent 共享统一的 BaseAgent 生命周期start → listen → execute → reflect → evolve → stop子类只需实现 handle_task() 返回业务数据 R3. Agent 定义支持三种模式Python 声明式(@task 装饰器、YAML 配置驱动(零代码)、混合模式 R4. AgentType 从硬编码枚举改为动态注册,新增 Agent 类型无需修改框架代码 R5. Agent 的 input/output 支持 JSON Schema 声明与校验

Tool/Skill 插件系统

R6. 实现 Tool 抽象基类,支持 FunctionTool(函数工具)、AgentToolAgent 即工具)、MCPToolMCP 协议工具)三种类型 R7. 实现 ToolRegistry,支持工具的注册、发现、版本管理、标签分类 R8. 实现 MCP Server将现有 Agent 能力暴露为 MCP 工具供外部调用 R9. 实现 MCP ClientAgent 可调用外部 MCP 工具服务器 R10. 工具支持组合:顺序链、并行扇出/扇入、动态选择

记忆系统

R11. 实现 Working MemoryRedis-based存储当前任务的上下文和中间状态生命周期为单次任务 R12. 实现 Episodic Memory向量+关系数据库),记录每次任务的输入/输出/效果/反思,支持语义检索相似历史案例 R13. 实现 Semantic Memory复用现有 RAG 知识库,所有 Agent 均可通过统一接口检索知识 R14. 记忆检索采用混合策略:向量语义 + 关键词 + 知识图谱 + 时间衰减 + RRF 融合排序

自我进化Level 3

R15. 实现经验积累:每次任务执行后自动记录成功/失败模式、效果指标、反思总结 R16. 实现 Prompt 自动优化:基于 DSPy 风格的编译器,从任务结果中自动生成/优化 Prompt 指令和 few-shot 示例 R17. 实现策略调整:根据历史效果数据自动调整 Agent 参数temperature、tool 选择权重、Pipeline 路径) R18. 进化变更必须经过 A/B 测试验证后才可生效,支持回滚

多 Agent 协同与业务编排

R19. Pipeline Engine 支持同层并行执行(无依赖的 stages 使用 asyncio.gather 并行) R20. 实现 Handoff 机制Agent 可在运行时将任务转交给另一个 Agent携带上下文 R21. 支持动态 Pipeline运行时根据条件选择子流程、嵌套 Pipeline R22. Agent 间通信支持事件驱动Redis Pub/Sub替代轮询等待

Agent 与业务解耦

R23. Agent 框架不依赖任何 GEO 业务代码Service、Model、Repository通过 Tool 接口调用业务能力 R24. 业务系统通过 Tool 注册将自身能力暴露给 AgentAgent 通过 ToolRegistry 发现和调用 R25. Agent 配置Prompt、Tool 绑定、Memory 策略)存储在数据库,支持热更新


Key Technical Decisions

KTD1. 独立 Python 包架构fischer-agentkit 作为独立包发布到私有 PyPIGEO 项目通过 pip install 引入。包结构遵循 src/ layout支持独立测试和版本管理。理由解耦 Agent 基础设施与业务代码,支持跨项目复用。

KTD2. 统一 Agent Core 的 execute 模板上移将计时、try/except、TaskResult 构建、进度上报等公共逻辑全部上移到 BaseAgent.execute(),子类只需实现 handle_task(task) -> dict。理由:消除 8 个 Agent 中 ~250 行重复代码。

KTD3. Tool 抽象三层架构FunctionTool(进程内函数调用)→ MCPTool(跨进程 MCP 协议调用)→ AgentTool(将 Agent 包装为 Tool。理由覆盖从简单到复杂的所有工具场景MCP 作为标准协议确保生态兼容性。

KTD4. MCP 双向支持:同时实现 MCP Server暴露 Agent 能力)和 MCP Client调用外部工具。传输层优先支持 Streamable HTTP2025 标准),兼容 SSE。理由MCP 是 2025 年工具协议事实标准,双向支持最大化生态连接能力。

KTD5. 三层记忆架构Working MemoryRedis单任务生命周期→ Episodic Memorypgvector + PostgreSQL任务经验→ Semantic Memory复用现有 RAG + 知识图谱)。理由:不同记忆类型有不同生命周期和检索模式,分层实现职责清晰。

KTD6. Level 3 进化采用 DSPy 风格编译器:定义 Signature(输入/输出 schemaModule(可组合 Prompt 策略)→ Optimizer(自动优化),从任务结果中自动构建 few-shot 示例和优化指令。理由DSPy 是目前最成熟的 Prompt 自动优化范式,其编译器模式与 Agent 的 execute-reflect-evolve 生命周期天然契合。

KTD7. 配置驱动的 Agent 定义Agent 的元数据、Prompt、Tool 绑定、Memory 策略均通过 YAML/数据库配置,运行时由 ConfigDrivenAgent 自动组装。理由:将新增 Agent 从写 150 行代码降为 10-20 行配置。

KTD8. Pipeline 并行执行:将拓扑排序后的 stages 按依赖层级分组,同层内使用 asyncio.gather 并行执行。理由:引用检测和趋势分析等无依赖任务应并行,当前串行执行浪费时间。

KTD9. Handoff 基于 Redis Pub/SubAgent 通过 agent:{name}:handoff 频道发送转交请求,目标 Agent 监听并接管。理由:与现有 Redis Queue 架构一致,无需引入新组件。


High-Level Technical Design

整体架构

flowchart TB
    subgraph FischerAgentKit["fischer-agentkit (独立包)"]
        direction TB
        Core["Agent Core"]
        Tools["Tool System"]
        Memory["Memory System"]
        Evolution["Evolution Engine"]
        Orchestrator["Orchestrator"]
        MCP["MCP Layer"]

        subgraph Core
            BaseAgent["BaseAgent<br/>生命周期管理"]
            ConfigDriven["ConfigDrivenAgent<br/>配置驱动"]
            Registry["AgentRegistry<br/>注册发现"]
            Dispatcher["TaskDispatcher<br/>任务调度"]
            Protocol["Protocol<br/>通信协议"]
        end

        subgraph Tools
            ToolRegistry["ToolRegistry<br/>注册发现"]
            FunctionTool["FunctionTool<br/>函数工具"]
            MCPTool["MCPTool<br/>MCP工具"]
            AgentTool["AgentTool<br/>Agent即工具"]
        end

        subgraph Memory
            Working["Working Memory<br/>Redis"]
            Episodic["Episodic Memory<br/>pgvector+PG"]
            Semantic["Semantic Memory<br/>RAG+Graph"]
            Retriever["MemoryRetriever<br/>混合检索"]
        end

        subgraph Evolution
            Reflector["Reflector<br/>执行反思"]
            PromptOptimizer["PromptOptimizer<br/>DSPy编译器"]
            StrategyTuner["StrategyTuner<br/>策略调优"]
            ABTester["ABTester<br/>A/B测试"]
        end

        subgraph Orchestrator
            PipelineEngine["PipelineEngine<br/>DAG+并行"]
            Handoff["Handoff<br/>任务转交"]
            DynamicPipeline["DynamicPipeline<br/>运行时组合"]
        end

        subgraph MCP
            MCPServer["MCP Server<br/>暴露能力"]
            MCPClient["MCP Client<br/>调用外部"]
        end
    end

    subgraph GEOBusiness["GEO 业务系统"]
        Services["业务 Services"]
        Models["数据 Models"]
        Repos["Repositories"]
    end

    Core --> Tools
    Core --> Memory
    Core --> Evolution
    Core --> Orchestrator
    Tools --> MCP
    Services -.->|"注册为 FunctionTool"| ToolRegistry
    Semantic -.->|"复用"| Services

Agent 生命周期

stateDiagram-v2
    [*] --> Offline
    Offline --> Online: start()
    Online --> Listening: listen_for_tasks()
    Listening --> Executing: receive_task()
    Executing --> Reflecting: handle_task() 完成
    Reflecting --> Evolving: reflect() 有改进建议
    Reflecting --> Listening: reflect() 无改进
    Evolving --> Listening: evolve() 完成
    Executing --> Listening: handle_task() 异常
    Online --> Offline: stop()
    Listening --> Offline: stop()

    state Executing {
        [*] --> LoadMemory: 加载记忆
        LoadMemory --> PlanTask: 规划任务
        PlanTask --> RunTools: 执行工具
        RunTools --> StoreMemory: 存储记忆
        StoreMemory --> BuildResult: 构建结果
        BuildResult --> [*]
    }

    state Reflecting {
        [*] --> EvaluateOutcome: 评估结果
        EvaluateOutcome --> ExtractPatterns: 提取模式
        ExtractPatterns --> GenerateInsights: 生成洞察
        GenerateInsights --> [*]
    }

    state Evolving {
        [*] --> OptimizePrompt: 优化Prompt
        OptimizePrompt --> TuneStrategy: 调优策略
        TuneStrategy --> ABTest: A/B测试
        ABTest --> ApplyOrRollback: 应用或回滚
        ApplyOrRollback --> [*]
    }

Tool 系统架构

flowchart LR
    subgraph Agent["Agent"]
        Executor["Task Executor"]
    end

    subgraph ToolRegistry["ToolRegistry"]
        FT["FunctionTool<br/>name, description<br/>input_schema, output_schema<br/>execute(**kwargs)"]
        MT["MCPTool<br/>server_url, tool_name<br/>input_schema, output_schema<br/>execute(**kwargs)"]
        AT["AgentTool<br/>agent_name<br/>input_mapping, output_mapping<br/>execute(**kwargs)"]
    end

    subgraph MCPServer["MCP Server"]
        MCPEndpoints["/tools/list<br/>/tools/call<br/>/resources/read"]
    end

    subgraph ExternalMCP["External MCP Servers"]
        ExtTool1["File System"]
        ExtTool2["GitHub"]
        ExtTool3["Postgres"]
    end

    Executor -->|"select & call"| ToolRegistry
    FT -->|"进程内调用"| BusinessLogic["业务函数"]
    MT -->|"HTTP/SSE"| MCPServer
    MT -->|"HTTP/SSE"| ExternalMCP
    AT -->|"dispatch"| TargetAgent["目标 Agent"]
    MCPServer -->|"暴露"| AgentCapabilities["Agent 能力"]

记忆系统数据流

flowchart TB
    Task["新任务到达"] --> WM["Working Memory<br/>加载当前任务上下文"]
    WM --> EM["Episodic Memory<br/>检索相似历史案例"]
    EM --> SM["Semantic Memory<br/>检索知识库"]
    SM --> Context["组装上下文"]
    Context --> Execute["执行任务"]
    Execute --> WM_Write["写入 Working Memory<br/>中间状态"]
    WM_Write --> Execute
    Execute --> EM_Write["写入 Episodic Memory<br/>任务经验"]
    Execute --> Reflect["反思评估"]
    Reflect --> EM_Update["更新 Episodic Memory<br/>反思结果"]

    subgraph 检索策略
        VS["向量语义检索<br/>权重 0.5"]
        KS["关键词精确匹配<br/>权重 0.2"]
        GT["知识图谱关联<br/>权重 0.3"]
        RRF["RRF 融合排序"]
        TD["时间衰减"]

        VS --> RRF
        KS --> RRF
        GT --> RRF
        RRF --> TD
    end

进化闭环

flowchart TB
    Execute["任务执行"] --> Result["任务结果"]
    Result --> Evaluate["效果评估<br/>成功/失败/质量分"]
    Evaluate --> Pattern["模式提取<br/>成功模式/失败模式"]
    Pattern --> Optimize["优化生成"]

    Optimize --> PromptOpt["Prompt 优化<br/>DSPy 编译器"]
    Optimize --> StrategyOpt["策略调优<br/>参数/工具权重"]
    Optimize --> PipelineOpt["Pipeline 优化<br/>路径选择"]

    PromptOpt --> ABTest["A/B 测试"]
    StrategyOpt --> ABTest
    PipelineOpt --> ABTest

    ABTest -->|"效果提升"| Apply["应用变更"]
    ABTest -->|"效果下降"| Rollback["回滚"]
    ABTest -->|"不确定"| Extend["延长测试"]

    Apply --> Record["记录进化日志"]
    Rollback --> Record
    Record --> Execute

Output Structure

fischer-agentkit/                          # 独立 Python 包
├── src/
│   └── agentkit/
│       ├── __init__.py
│       ├── core/
│       │   ├── __init__.py
│       │   ├── base.py                    # BaseAgent 生命周期
│       │   ├── config_driven.py           # ConfigDrivenAgent 配置驱动
│       │   ├── registry.py               # AgentRegistry 注册发现
│       │   ├── dispatcher.py             # TaskDispatcher 任务调度
│       │   ├── protocol.py               # 通信协议与数据结构
│       │   ├── exceptions.py             # 异常体系
│       │   └── standalone.py             # 独立启动器(自动发现)
│       ├── tools/
│       │   ├── __init__.py
│       │   ├── base.py                   # Tool 抽象基类
│       │   ├── function_tool.py          # FunctionTool
│       │   ├── agent_tool.py             # AgentTool
│       │   ├── mcp_tool.py              # MCPTool
│       │   └── registry.py              # ToolRegistry
│       ├── memory/
│       │   ├── __init__.py
│       │   ├── base.py                   # Memory 抽象基类
│       │   ├── working.py               # Working Memory (Redis)
│       │   ├── episodic.py              # Episodic Memory (pgvector+PG)
│       │   ├── semantic.py              # Semantic Memory (RAG+Graph)
│       │   └── retriever.py             # 混合检索器
│       ├── evolution/
│       │   ├── __init__.py
│       │   ├── reflector.py             # 执行反思
│       │   ├── prompt_optimizer.py       # DSPy 风格 Prompt 优化器
│       │   ├── strategy_tuner.py         # 策略调优
│       │   ├── ab_tester.py             # A/B 测试框架
│       │   └── evolution_store.py        # 进化日志存储
│       ├── orchestrator/
│       │   ├── __init__.py
│       │   ├── pipeline_engine.py        # DAG + 并行 Pipeline
│       │   ├── pipeline_schema.py        # Pipeline 数据模型
│       │   ├── pipeline_loader.py        # YAML 加载器
│       │   ├── handoff.py               # Handoff 机制
│       │   └── dynamic_pipeline.py       # 动态 Pipeline 组合
│       ├── mcp/
│       │   ├── __init__.py
│       │   ├── server.py                # MCP Server
│       │   ├── client.py                # MCP Client
│       │   └── transport.py             # 传输层 (HTTP/SSE)
│       └── prompts/
│           ├── __init__.py
│           ├── template.py              # PromptTemplate
│           └── section.py               # PromptSection
├── tests/
│   ├── unit/
│   │   ├── test_base_agent.py
│   │   ├── test_config_driven.py
│   │   ├── test_tool_registry.py
│   │   ├── test_function_tool.py
│   │   ├── test_mcp_tool.py
│   │   ├── test_agent_tool.py
│   │   ├── test_working_memory.py
│   │   ├── test_episodic_memory.py
│   │   ├── test_semantic_memory.py
│   │   ├── test_memory_retriever.py
│   │   ├── test_reflector.py
│   │   ├── test_prompt_optimizer.py
│   │   ├── test_strategy_tuner.py
│   │   ├── test_ab_tester.py
│   │   ├── test_pipeline_parallel.py
│   │   ├── test_handoff.py
│   │   ├── test_dynamic_pipeline.py
│   │   ├── test_mcp_server.py
│   │   └── test_mcp_client.py
│   └── integration/
│       ├── test_agent_lifecycle.py
│       ├── test_tool_composition.py
│       ├── test_evolution_loop.py
│       └── test_mcp_roundtrip.py
├── pyproject.toml
└── README.md

geo/backend/                               # GEO 业务系统(改造后)
├── app/
│   ├── agent_framework/                   # 保留为适配层
│   │   ├── __init__.py
│   │   ├── agents/                        # 改为配置驱动
│   │   │   ├── __init__.py
│   │   │   ├── configs/                   # Agent YAML 配置
│   │   │   │   ├── citation_detector.yaml
│   │   │   │   ├── content_generator.yaml
│   │   │   │   ├── deai_agent.yaml
│   │   │   │   ├── geo_optimizer.yaml
│   │   │   │   ├── monitor.yaml
│   │   │   │   ├── schema_advisor.yaml
│   │   │   │   ├── competitor_analyzer.yaml
│   │   │   │   └── trend_agent.yaml
│   │   │   └── custom_handlers/           # 仅复杂 Agent 需自定义 handler
│   │   │       ├── citation_handler.py
│   │   │       └── monitor_handler.py
│   │   ├── tools/                         # 业务 Tool 注册
│   │   │   ├── __init__.py
│   │   │   ├── citation_tools.py
│   │   │   ├── content_tools.py
│   │   │   ├── knowledge_tools.py
│   │   │   ├── monitor_tools.py
│   │   │   └── schema_tools.py
│   │   └── prompts/                       # Prompt 模板(保留,可热更新)
│   ├── models/
│   │   └── agent.py                       # 新增 evolution_logs, episodic_memories 表
│   └── ...

Implementation Units

U1. 独立包脚手架与 BaseAgent 重构

Goal: 创建 fischer-agentkit 独立包,重构 BaseAgent 将 execute 模板代码上移,子类只需实现 handle_task()

Dependencies:

Files:

  • fischer-agentkit/src/agentkit/__init__.py
  • fischer-agentkit/src/agentkit/core/__init__.py
  • fischer-agentkit/src/agentkit/core/base.py
  • fischer-agentkit/src/agentkit/core/protocol.py
  • fischer-agentkit/src/agentkit/core/exceptions.py
  • fischer-agentkit/src/agentkit/core/registry.py
  • fischer-agentkit/src/agentkit/core/dispatcher.py
  • fischer-agentkit/pyproject.toml
  • fischer-agentkit/tests/unit/test_base_agent.py

Approach:

  1. 创建 fischer-agentkit 包,使用 src/ layoutpyproject.toml 声明依赖(redis[hiredis], pydantic>=2.0, sqlalchemy[asyncio]>=2.0
  2. 从 GEO 项目迁移 protocol.pyAgentCapability, TaskMessage, TaskResult, TaskProgress, TaskStatus, AgentStatus移除 AgentType 硬编码枚举,改为动态注册
  3. 重构 BaseAgent
    • execute() 方法变为 final非抽象包含完整的计时、try/except、TaskResult 构建、进度上报
    • 新增抽象方法 handle_task(task) -> dict,子类只需返回 output_data
    • 新增生命周期钩子:on_task_start(), on_task_complete(), on_task_failed()
    • 新增 tools: list[Tool] 属性默认空U3 实现)
    • 新增 memory: Memory 属性默认空U4 实现)
  4. 迁移 registry.py, dispatcher.py, exceptions.py,去除 GEO 业务依赖
  5. AgentRegistry.get_available_agent() 增加负载均衡策略(轮询/最少任务)

Patterns to follow: 现有 backend/app/agent_framework/base.py 的双模式Redis/本地)设计

Test scenarios:

  • BaseAgent 子类实现 handle_task 返回 dictexecute 自动包装为 TaskResult
  • handle_task 抛异常时 execute 自动构建 FAILED TaskResult
  • on_task_start/complete/failed 钩子按序调用
  • AgentRegistry 动态注册新 AgentType 不报错
  • AgentRegistry.get_available_agent 轮询策略返回不同 Agent

Verification: fischer-agentkit 可独立 pip install -e .,单元测试全部通过


U2. Protocol 扩展与配置驱动 Agent

Goal: 扩展 Protocol 支持 JSON Schema 校验、Handoff 消息;实现 ConfigDrivenAgent支持 YAML/数据库配置驱动 Agent 定义。

Dependencies: U1

Files:

  • fischer-agentkit/src/agentkit/core/protocol.py(扩展)
  • fischer-agentkit/src/agentkit/core/config_driven.py
  • fischer-agentkit/src/agentkit/core/standalone.py
  • fischer-agentkit/src/agentkit/prompts/template.py
  • fischer-agentkit/src/agentkit/prompts/section.py
  • fischer-agentkit/tests/unit/test_config_driven.py
  • fischer-agentkit/tests/unit/test_protocol.py

Approach:

  1. Protocol 扩展:
    • AgentCapability 增加 input_schema: dict | Noneoutput_schema: dict | NoneJSON Schema
    • 新增 HandoffMessage 数据类source_agent, target_agent, task_context, reason
    • 新增 EvolutionEvent 数据类agent_name, change_type, before, after, metrics
    • TaskMessage 增加 conversation_id: str | None 支持多轮对话
  2. ConfigDrivenAgent
    • 接受 YAML 配置或数据库配置自动组装Prompt 模板 + LLM 参数 + Tool 绑定 + Memory 策略
    • 内置 LLM 调用 + JSON 输出解析 + 降级兜底的标准流程
    • 支持三种任务模式:llm_generatePrompt → LLM → 解析)、tool_call(调用指定 Toolcustom(自定义 handler
  3. PromptTemplate 迁移并增强:支持动态 section 组合、版本标记
  4. Standalone runner 改为自动发现:扫描 agent_configs/ 目录下的 YAML 文件,自动注册和启动

Patterns to follow: 现有 prompts/base_template.py 的 PromptSection 设计

Test scenarios:

  • ConfigDrivenAgent 从 YAML 加载配置并正确组装
  • llm_generate 模式:渲染 Prompt → 调用 Mock LLM → 解析 JSON 输出
  • tool_call 模式:调用注册的 FunctionTool 并返回结果
  • input_schema 校验:缺少必填字段时返回校验错误
  • HandoffMessage 序列化/反序列化正确
  • 自动发现:在 configs/ 目录放置 YAML 后 standalone 可自动加载

Verification: 通过 YAML 配置定义一个新 Agent无需写 Python 代码即可运行


U3. Tool/Skill 插件系统

Goal: 实现 Tool 抽象基类、FunctionTool、AgentTool、ToolRegistry支持工具注册、发现、组合。

Dependencies: U1

Files:

  • fischer-agentkit/src/agentkit/tools/__init__.py
  • fischer-agentkit/src/agentkit/tools/base.py
  • fischer-agentkit/src/agentkit/tools/function_tool.py
  • fischer-agentkit/src/agentkit/tools/agent_tool.py
  • fischer-agentkit/src/agentkit/tools/registry.py
  • fischer-agentkit/tests/unit/test_tool_registry.py
  • fischer-agentkit/tests/unit/test_function_tool.py
  • fischer-agentkit/tests/unit/test_agent_tool.py
  • fischer-agentkit/tests/integration/test_tool_composition.py

Approach:

  1. Tool 抽象基类:
    • 属性:name, description, input_schemaJSON Schema, output_schemaJSON Schema, version, tags
    • 抽象方法:async execute(**kwargs) -> dict
    • 生命周期钩子:before_execute(), after_execute(), on_error()
  2. FunctionTool:包装普通 Python 函数为 Tool自动从函数签名推断 input_schema
  3. AgentTool:包装另一个 Agent 为 Tool输入/输出通过 mapping 适配,内部通过 Dispatcher 分发任务
  4. ToolRegistry
    • register(tool), unregister(name), get(name), list_tools(tag=None)
    • 支持版本管理:同一工具多版本共存,默认使用最新版
    • 支持标签分类:[citation, analysis, generation, optimization]
  5. 工具组合:
    • SequentialChain([tool_a, tool_b]):顺序执行,前一个输出作为后一个输入
    • ParallelFanOut([tool_a, tool_b, tool_c]):并行执行,结果合并
    • DynamicSelector(llm, tools)LLM 根据任务动态选择工具

Patterns to follow: 现有 LLMFactory 的注册-创建模式

Test scenarios:

  • FunctionTool 从函数自动推断 schema 并执行
  • AgentTool 分发任务到目标 Agent 并返回结果
  • ToolRegistry 注册/发现/按标签过滤
  • SequentialChain 顺序执行两个工具
  • ParallelFanOut 并行执行三个工具
  • DynamicSelector 根据 LLM 判断选择合适工具
  • 工具版本管理:注册 v1 和 v2默认返回 v2

Verification: 通过 ToolRegistry 注册 3 个 FunctionToolAgent 可声明式绑定并调用


U4. 记忆系统

Goal: 实现三层记忆系统Working/Episodic/Semantic和混合检索器。

Dependencies: U1

Files:

  • fischer-agentkit/src/agentkit/memory/__init__.py
  • fischer-agentkit/src/agentkit/memory/base.py
  • fischer-agentkit/src/agentkit/memory/working.py
  • fischer-agentkit/src/agentkit/memory/episodic.py
  • fischer-agentkit/src/agentkit/memory/semantic.py
  • fischer-agentkit/src/agentkit/memory/retriever.py
  • fischer-agentkit/tests/unit/test_working_memory.py
  • fischer-agentkit/tests/unit/test_episodic_memory.py
  • fischer-agentkit/tests/unit/test_semantic_memory.py
  • fischer-agentkit/tests/unit/test_memory_retriever.py

Approach:

  1. Memory 抽象基类:
    • async store(key, value, metadata), async retrieve(query, top_k), async delete(key)
    • async search(query, top_k, filters) — 语义检索
  2. WorkingMemoryRedis
    • agent:{name}:working_memory:{task_id} 为 key 前缀
    • 支持自动过期TTL = 任务超时时间 × 2
    • 提供 get_context() 方法,返回格式化的上下文字符串
  3. EpisodicMemorypgvector + PostgreSQL
    • 表结构:id, agent_name, task_type, input_summary, output_summary, outcome(success/fail), quality_score, reflection, embedding, created_at
    • 写入时自动生成 embedding复用 Embedder 接口)
    • 检索:向量语义 + 关键词 + 时间衰减 + RRF 融合
  4. SemanticMemory
    • 适配器模式,对接 GEO 项目的 RAGServiceGraphQuery
    • 提供统一的 search(query, knowledge_base_ids, top_k) 接口
  5. MemoryRetriever(混合检索器):
    • 并行查询三层记忆,按权重融合排序
    • 时间衰减:score *= exp(-0.01 * age_hours)
    • 上下文窗口管理:总 token 不超过预算

Patterns to follow: 现有 HybridRetriever 的 RRF 融合排序模式

Test scenarios:

  • WorkingMemory 存取任务上下文TTL 过期后自动清除
  • EpisodicMemory 写入任务经验并按语义检索相似案例
  • EpisodicMemory 时间衰减:近期经验权重高于远期
  • SemanticMemory 通过适配器调用 RAGService
  • MemoryRetriever 混合检索三层记忆并按权重融合
  • 上下文窗口管理:检索结果超过 token 预算时智能截断

Verification: Agent 执行任务后自动写入 EpisodicMemory后续相似任务可检索到历史经验


U5. MCP Server 与 Client

Goal: 实现 MCP Server暴露 Agent 能力)和 MCP Client调用外部 MCP 工具),完成 MCPTool。

Dependencies: U3

Files:

  • fischer-agentkit/src/agentkit/mcp/__init__.py
  • fischer-agentkit/src/agentkit/mcp/server.py
  • fischer-agentkit/src/agentkit/mcp/client.py
  • fischer-agentkit/src/agentkit/mcp/transport.py
  • fischer-agentkit/src/agentkit/tools/mcp_tool.py
  • fischer-agentkit/tests/unit/test_mcp_server.py
  • fischer-agentkit/tests/unit/test_mcp_client.py
  • fischer-agentkit/tests/unit/test_mcp_tool.py
  • fischer-agentkit/tests/integration/test_mcp_roundtrip.py

Approach:

  1. MCP Server
    • 基于 FastAPI 实现,支持 Streamable HTTP 传输
    • 端点:/tools/list(列出可用工具)、/tools/call(调用工具)、/resources/read(读取资源)
    • 自动将 ToolRegistry 中注册的工具暴露为 MCP 工具
    • 支持 SSE 流式响应
  2. MCP Client
    • 连接外部 MCP ServerHTTP/SSE
    • 自动发现远程工具并注册到本地 ToolRegistry
    • 支持工具调用的流式响应
  3. MCPTool
    • 继承 Tool 基类,内部通过 MCP Client 调用远程工具
    • 自动从 MCP Server 获取 input_schema
  4. Transport 层:
    • 抽象 Transport 接口(send_request, receive_response
    • 实现 HTTPTransportStreamable HTTPSSETransportServer-Sent Events

Patterns to follow: MCP 官方 Python SDK 的 Server/Client 模式

Test scenarios:

  • MCP Server 启动后 /tools/list 返回已注册工具列表
  • MCP Client 连接 Server 并调用工具返回正确结果
  • MCPTool 通过 Client 调用远程工具
  • SSE 流式响应正确传输
  • MCP Server 自动暴露 ToolRegistry 中的 FunctionTool
  • 外部 MCP Server 的工具自动注册到本地 ToolRegistry

Verification: 启动 MCP Server通过 MCP Client 调用 Agent 能力,端到端成功


U6. 自我进化引擎Level 3

Goal: 实现执行反思、DSPy 风格 Prompt 自动优化、策略调优、A/B 测试框架。

Dependencies: U1, U4

Files:

  • fischer-agentkit/src/agentkit/evolution/__init__.py
  • fischer-agentkit/src/agentkit/evolution/reflector.py
  • fischer-agentkit/src/agentkit/evolution/prompt_optimizer.py
  • fischer-agentkit/src/agentkit/evolution/strategy_tuner.py
  • fischer-agentkit/src/agentkit/evolution/ab_tester.py
  • fischer-agentkit/src/agentkit/evolution/evolution_store.py
  • fischer-agentkit/tests/unit/test_reflector.py
  • fischer-agentkit/tests/unit/test_prompt_optimizer.py
  • fischer-agentkit/tests/unit/test_strategy_tuner.py
  • fischer-agentkit/tests/unit/test_ab_tester.py
  • fischer-agentkit/tests/integration/test_evolution_loop.py

Approach:

  1. Reflector(执行反思):
    • 每次任务完成后自动评估:成功/失败、质量评分(基于 output_schema 约束和业务指标)
    • 提取模式:常见失败原因、高效策略、低效路径
    • 生成反思总结存入 EpisodicMemory
  2. PromptOptimizerDSPy 风格编译器):
    • Signature:定义 input_fields -> output_fields 的结构化签名
    • Module:可组合的 Prompt 策略ChainOfThought, ReAct, FewShot
    • Optimizer
      • BootstrapFewShot:从成功案例中自动构建 few-shot 示例
      • MIPROv2:多目标 Prompt 优化(指令 + few-shot 联合优化)
      • 基于历史任务数据编译,产出优化后的 Prompt
  3. StrategyTuner(策略调优):
    • 可调参数temperature, tool_selection_weights, pipeline_path
    • 基于 Bayesian Optimization 搜索最优参数组合
    • 每次调优记录 before/after 指标
  4. ABTesterA/B 测试框架):
    • 支持配置分流比例(如 80% 原版 / 20% 实验版)
    • 自动收集实验组和对照组的效果指标
    • 统计显著性检验t-test达到置信度后自动决策
  5. EvolutionStore(进化日志):
    • 表结构:id, agent_name, change_type(prompt/strategy/pipeline), before, after, ab_test_id, status(active/rolled_back), created_at
    • 支持回滚:rollback(evolution_id)

Patterns to follow: DSPy 的 Signature/Module/Optimizer 三层架构

Test scenarios:

  • Reflector 评估成功任务生成正面反思
  • Reflector 评估失败任务提取失败模式
  • PromptOptimizer 从 10 个成功案例自动生成 few-shot 示例
  • PromptOptimizer 优化后的 Prompt 在测试集上效果提升
  • StrategyTuner 调整 temperature 后效果改善
  • ABTester 80/20 分流,实验组效果显著后自动应用
  • EvolutionStore 记录变更并支持回滚

Verification: Agent 执行 20 次任务后PromptOptimizer 自动优化 PromptABTest 验证效果提升后自动应用


U7. 多 Agent 协同增强

Goal: Pipeline Engine 支持并行执行、Handoff 机制、动态 Pipeline 组合。

Dependencies: U1, U2

Files:

  • fischer-agentkit/src/agentkit/orchestrator/__init__.py
  • fischer-agentkit/src/agentkit/orchestrator/pipeline_engine.py
  • fischer-agentkit/src/agentkit/orchestrator/pipeline_schema.py
  • fischer-agentkit/src/agentkit/orchestrator/pipeline_loader.py
  • fischer-agentkit/src/agentkit/orchestrator/handoff.py
  • fischer-agentkit/src/agentkit/orchestrator/dynamic_pipeline.py
  • fischer-agentkit/tests/unit/test_pipeline_parallel.py
  • fischer-agentkit/tests/unit/test_handoff.py
  • fischer-agentkit/tests/unit/test_dynamic_pipeline.py

Approach:

  1. Pipeline 并行执行:
    • 拓扑排序后按依赖层级分组(同层无依赖)
    • 同层内使用 asyncio.gather 并行执行
    • 并行 stage 的结果合并后传给下一层
  2. Handoff 机制:
    • HandoffManager 管理转交请求
    • Agent 调用 self.handoff(target_agent, context, reason) 发起转交
    • 通过 Redis Pub/Sub agent:{target}:handoff 频道通知目标 Agent
    • 目标 Agent 接收后创建新任务,携带源 Agent 的上下文
    • 源 Agent 的任务状态变为 HANDOFF
  3. 动态 Pipeline
    • 支持 sub_pipeline stage 类型:引用另一个 YAML Pipeline
    • 支持 conditional_pipeline:根据运行时条件选择子流程
    • 支持 loop_pipeline:循环执行直到条件满足
  4. 事件驱动替代轮询:
    • Pipeline Engine 通过 Redis Pub/Sub 订阅 agent:{name}:result 频道
    • 任务完成后自动触发下一 stage无需轮询

Patterns to follow: 现有 PipelineEngine 的 DAG 拓扑排序和变量解析

Test scenarios:

  • 3 个无依赖 stage 并行执行,总耗时约等于最长单个 stage
  • HandoffAgent A 转交任务给 Agent BB 接收并执行
  • 动态 Pipeline根据条件选择不同的子流程
  • 子 Pipeline 嵌套执行并正确传递变量
  • 事件驱动:任务完成后自动触发下一 stage无轮询间隔
  • 循环 Pipeline条件满足后退出循环

Verification: 内容生产 Pipeline 的 deai_processing 和 geo_optimization 并行执行,总耗时减少约 40%


U8. GEO 业务系统适配与迁移

Goal: 将 GEO 项目的 8 个 Agent 迁移到新框架,业务 Service 注册为 Tool实现完全解耦。

Dependencies: U1, U2, U3, U4, U5, U6, U7

Files:

  • geo/backend/app/agent_framework/agents/configs/citation_detector.yaml
  • geo/backend/app/agent_framework/agents/configs/content_generator.yaml
  • geo/backend/app/agent_framework/agents/configs/deai_agent.yaml
  • geo/backend/app/agent_framework/agents/configs/geo_optimizer.yaml
  • geo/backend/app/agent_framework/agents/configs/monitor.yaml
  • geo/backend/app/agent_framework/agents/configs/schema_advisor.yaml
  • geo/backend/app/agent_framework/agents/configs/competitor_analyzer.yaml
  • geo/backend/app/agent_framework/agents/configs/trend_agent.yaml
  • geo/backend/app/agent_framework/agents/custom_handlers/citation_handler.py
  • geo/backend/app/agent_framework/agents/custom_handlers/monitor_handler.py
  • geo/backend/app/agent_framework/tools/__init__.py
  • geo/backend/app/agent_framework/tools/citation_tools.py
  • geo/backend/app/agent_framework/tools/content_tools.py
  • geo/backend/app/agent_framework/tools/knowledge_tools.py
  • geo/backend/app/agent_framework/tools/monitor_tools.py
  • geo/backend/app/agent_framework/tools/schema_tools.py
  • geo/backend/app/agent_framework/tools/competitor_tools.py
  • geo/backend/app/agent_framework/tools/trend_tools.py
  • geo/backend/app/models/agent.py(新增表)
  • geo/backend/requirements.txt(添加 fischer-agentkit 依赖)
  • geo/backend/app/agent_framework/__init__.py(适配层)

Approach:

  1. 业务 Tool 注册:
    • CitationToolsexecute_single_platform, get_or_create_task, calculate_next_query_at
    • ContentToolsretrieve_knowledge(包装 RAGService
    • MonitorToolscheck_and_compare, generate_change_report, create_monitoring_record
    • SchemaToolsidentify_missing_dimensions, match_templates, validate_json_ld
    • CompetitorToolsanalyze_competitor
    • TrendToolsanalyze_trends, get_hotspots
  2. Agent 配置迁移:
    • LLM 驱动型 AgentContentGenerator, DeAI, GEOOptimizer, SchemaAdvisor→ YAML 配置 + llm_generate 模式,零代码
    • Service 代理型 AgentCitationDetector, Monitor→ YAML 配置 + custom handler仅保留业务逻辑方法
    • CompetitorAnalyzer, TrendAgent → YAML 配置 + tool_call 模式
  3. 数据库迁移:
    • 新增 episodic_memories
    • 新增 evolution_logs
    • 新增 ab_test_configs
  4. 适配层:
    • geo/backend/app/agent_framework/ 保留为适配层import from agentkit
    • 现有 API 端点(/agents/, /agents/tasks/)保持不变
    • 现有 Pipeline YAML 保持兼容

Patterns to follow: 现有 Agent 的业务逻辑实现,迁移时保持行为一致

Test scenarios:

  • 8 个 Agent 迁移后行为与原版一致(回归测试)
  • 新增 Agent 只需 YAML 配置,无需写 Python 代码
  • 业务 Tool 注册后 Agent 可通过 ToolRegistry 发现和调用
  • EpisodicMemory 自动记录任务经验
  • EvolutionStore 记录进化变更
  • 现有 API 端点功能不变
  • 现有 Pipeline YAML 正常执行

Verification: 运行现有 Agent 集成测试全部通过,新增一个 YAML-only Agent 成功执行任务


Scope Boundaries

In Scope

  • fischer-agentkit 独立包的完整实现Core, Tools, Memory, Evolution, Orchestrator, MCP
  • GEO 项目 8 个 Agent 的迁移和适配
  • MCP Server/Client 双向支持
  • Level 3 自我进化(反思 + Prompt 优化 + 策略调优 + A/B 测试)
  • Pipeline 并行执行、Handoff、动态 Pipeline
  • 三层记忆系统
  • 数据库迁移(新增表)

Deferred for Later

  • A2A Protocol 支持(跨组织 Agent 协作,待 MCP 稳定后再评估)
  • Agent 可视化编排 UI前端拖拽式 Pipeline 编辑器)
  • Agent 市场/共享机制(跨组织共享 Agent 配置和 Tool
  • 多租户隔离增强Agent 级别的资源隔离和配额)
  • Agent 安全沙箱Tool 执行的权限控制和审计)

Outside This Project's Identity

  • 通用 LLM Gateway不属于 Agent 框架范畴)
  • 替代现有 LLMFactory保持现有 LLM 调用体系不变)
  • 前端 Agent 管理界面重构(本次聚焦后端架构)

Risks & Dependencies

Risk Impact Mitigation
MCP 协议规范变动2025-2026 仍在演进) MCPTool/MCPClient 需要跟进修改 抽象 Transport 层,协议变更只影响 Transport 实现
DSPy 风格 Prompt 优化可能需要大量训练数据 优化效果不明显 BootstrapFewShot 从少量数据开始MIPROv2 需 50+ 案例时才启用
独立包与 GEO 项目的版本同步 GEO 升级 agentkit 版本可能引入 breaking change 语义化版本管理GEO 项目 pin 版本CI 自动测试兼容性
Episodic Memory 数据量增长 pgvector 查询性能下降 设置 TTL 自动清理、定期归档、HNSW 索引优化
迁移期间 8 个 Agent 行为不一致 业务回归 逐个迁移 + 回归测试,保留旧代码直到新代码验证通过
Pipeline 并行执行引入并发问题 结果不一致 同层 stage 只读共享变量,写入隔离

System-Wide Impact

  • 数据库:新增 3 张表episodic_memories, evolution_logs, ab_test_configs需 Alembic 迁移
  • Redis:新增 Working Memory key 空间和 Handoff 频道,内存使用增加约 20%
  • API:现有 /agents/ 端点保持兼容,新增 /agents/{name}/evolution 查看进化历史
  • 部署fischer-agentkit 需发布到私有 PyPIGEO 项目 Dockerfile 需添加安装步骤
  • 监控:新增进化相关 Prometheus 指标evolution_attempts_total, evolution_success_rate, ab_test_decisions
  • 依赖:新增 mcp Python 包依赖MCP SDK

Sources & Research

  • 现有 Agent 框架代码:backend/app/agent_framework/ 全部文件
  • 现有 RAG 服务:backend/app/services/knowledge/rag_service.py, retriever.py, enhanced_rag.py
  • 现有 LLM 工厂:backend/app/services/llm/factory.py, base.py
  • MCP 官方规范Model Context Protocol (Anthropic, 2024-2025)
  • DSPy 框架Stanford NLP, Prompt 自动优化范式
  • Google A2A ProtocolAgent-to-Agent HTTP 协议 (2025.4)
  • LangGraph 0.2.xStateGraph + Checkpoint 架构
  • OpenAI Agents SDK 1.0Handoff 模式
  • Google ADK 0.1Agent 组合模式Sequential/Parallel/Loop