23 KiB
GEO 项目迁移至 AgentKit v2 Mode A 方案
1. 目标
将 GEO 项目从当前的旧框架 + import 混合模式迁移至 AgentKit v2 Mode A(HTTP API 模式)。
迁移完成后:
- AgentKit Server 独立部署,GEO 通过 HTTP API 调用
- LLM 调用统一由 AgentKit Server 的 LLM Gateway 管理
- 意图识别、ReAct 循环、质量检查、标准化输出全部在 AgentKit Server 内完成
- GEO 项目不再直接 import agentkit 内部类
2. 当前架构 vs 目标架构
当前架构(3 条调用链并存)
┌─────────────────────────────────────────────────────────┐
│ GEO Backend │
│ │
│ Chain A: API Route → TaskDispatcher → Redis → BaseAgent │
│ Chain B: Service → 直接实例化 Agent → 直接调用 execute() │
│ Chain C: Adapter → ConfigDrivenAgent → custom_handler │
│ │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ GEO 内部的旧框架(BaseAgent + Redis Queue + DB) │ │
│ │ + agentkit import(ConfigDrivenAgent + ToolRegistry)│ │
│ │ + LLMFactory(GEO 自己的 LLM 封装) │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────┘
目标架构(Mode A)
┌──────────────────────┐ HTTP API ┌──────────────────────────┐
│ GEO Backend │ ───────────────→ │ AgentKit Server │
│ │ │ │
│ API Routes │ POST /tasks │ Intent Router │
│ Services │ GET /tasks/{id} │ ReAct Engine │
│ Workers │ GET /llm/usage │ LLM Gateway │
│ │ │ Quality Gate │
│ 不再 import │ │ Output Standardizer │
│ agentkit 内部类 │ │ AgentPool │
│ │ │ SkillRegistry │
│ 只用 AgentKitClient │ │ ToolRegistry │
│ │ │ MCP Bridge │
└──────────────────────┘ └──────────────────────────┘
│
┌─────┴─────┐
│ LLM APIs │
└───────────┘
3. 需要改动的文件清单
3.1 必须改动(核心迁移)
| 文件 | 当前用法 | 改动内容 |
|---|---|---|
app/agent_framework/adapter.py |
import agentkit 内部类 | 改为只提供 get_agentkit_client() 和 submit_task_via_api() |
app/agent_framework/__init__.py |
导出大量 agentkit 类 | 精简导出,只暴露 AgentKitClient 相关 |
app/api/agents.py |
用旧 TaskDispatcher + TaskMessage |
改为调用 AgentKitClient.submit_task() |
app/services/content/content_generation_service.py |
用旧 TaskDispatcher + 轮询 |
改为调用 AgentKitClient.submit_task() |
app/services/citation/citation.py |
直接实例化 CitationDetectorAgent |
改为调用 AgentKitClient.submit_task() |
app/workers/scheduler.py |
直接实例化 CitationDetectorAgent |
改为调用 AgentKitClient.submit_task() |
3.2 需要迁移到 AgentKit Server 的代码
| 当前位置 | 功能 | 迁移目标 |
|---|---|---|
app/agent_framework/agents/custom_handlers/citation_handler.py |
引用检测业务逻辑 | AgentKit Server 的 Tool 或 custom_handler |
app/agent_framework/agents/custom_handlers/monitor_handler.py |
监控业务逻辑 | AgentKit Server 的 Tool 或 custom_handler |
app/agent_framework/agents/custom_handlers/schema_handler.py |
Schema 建议业务逻辑 | AgentKit Server 的 Tool 或 custom_handler |
app/agent_framework/tools/*.py(14 个 FunctionTool) |
业务 Tool 定义 | AgentKit Server 的 ToolRegistry |
app/agent_framework/agents/configs/*.yaml(8 个) |
Agent 配置 | AgentKit Server 的 SkillLoader 加载目录 |
3.3 可删除(迁移完成后)
| 文件/目录 | 原因 |
|---|---|
app/agent_framework/base.py |
旧 BaseAgent,被 AgentKit Server 取代 |
app/agent_framework/dispatcher.py |
旧 TaskDispatcher,被 AgentKit Server 取代 |
app/agent_framework/registry.py |
旧 AgentRegistry,被 AgentKit Server 取代 |
app/agent_framework/protocol.py |
旧协议类,被 agentkit.core.protocol 取代 |
app/agent_framework/exceptions.py |
旧异常类,被 agentkit.core.exceptions 取代 |
app/agent_framework/config_manager.py |
旧配置管理,被 SkillConfig 取代 |
app/agent_framework/standalone.py |
旧运行器,被 AgentKit Server 取代 |
app/agent_framework/pipeline/ |
旧 Pipeline,被 AgentKit Server 编排取代 |
app/agent_framework/agents/ 下的旧 Agent 类 |
被 YAML 配置 + Skill 取代 |
4. 分步迁移方案
Phase 1:部署 AgentKit Server + 配置迁移
目标:AgentKit Server 能独立运行,加载 GEO 的 8 个 Skill 配置和 14 个 Tool。
4.1.1 创建 AgentKit Server 启动配置
在 fischer-agentkit/ 项目中创建:
# configs/llm_config.yaml — LLM Provider 配置
providers:
deepseek:
api_key: "${DEEPSEEK_API_KEY}"
base_url: "https://api.deepseek.com/v1"
models:
deepseek-chat:
max_tokens: 64000
cost_per_1k_input: 0.00014
cost_per_1k_output: 0.00028
model_aliases:
default: "deepseek-chat"
fast: "deepseek-chat"
powerful: "deepseek-chat"
fallbacks:
deepseek-chat: []
4.1.2 迁移 YAML 配置为 SkillConfig
现有 8 个 YAML 无需修改即可加载(SkillConfig 向后兼容 AgentConfig)。
但建议为需要意图识别的 Skill 添加 intent 字段:
# content_generator.yaml — 增加的 v2 字段
intent:
keywords: ["生成内容", "写文章", "选题", "generate", "content"]
description: "用户需要生成SEO/GEO优化内容、推荐选题或撰写文章"
examples:
- "帮我写一篇关于AI的文章"
- "推荐一些选题"
execution_mode: react # 使用 ReAct 引擎
max_steps: 5
quality_gate:
required_fields: ["content"]
min_word_count: 500
max_retries: 1
4.1.3 迁移 14 个 FunctionTool 到 AgentKit Server
将 GEO 的 Tool 注册代码迁移为 AgentKit Server 的 Tool 插件。
方式 A(推荐):在 AgentKit Server 启动时注册 Tool
# fischer-agentkit/configs/geo_tools.py
"""GEO 项目的 Tool 注册 — 供 AgentKit Server 使用"""
from agentkit.tools.function_tool import FunctionTool
from agentkit.tools.registry import ToolRegistry
def register_geo_tools(registry: ToolRegistry) -> None:
"""注册 GEO 项目的所有 Tool"""
# --- Citation Tools ---
async def execute_single_platform(keyword: str, platform: str,
target_brand: str, brand_aliases: list[str] = None):
"""在单个 AI 平台执行引用检测"""
# 调用 GEO 的业务服务(通过 HTTP 调用 GEO Backend API)
from agentkit.tools.function_tool import FunctionTool
# ... 实现 ...
registry.register(FunctionTool(
name="execute_single_platform",
description="在单个AI平台执行引用检测",
func=execute_single_platform,
input_schema={...},
tags=["citation", "detection"],
))
# ... 注册其他 13 个 Tool ...
方式 B:custom_handler 保持为 custom 模式
3 个 custom_handler(citation/monitor/schema)因为涉及复杂的 DB 操作和多服务编排,
可以保持 execution_mode: custom,在 AgentKit Server 中注册为 custom_handler。
# fischer-agentkit/configs/geo_handlers.py
"""GEO 项目的 Custom Handler — 供 AgentKit Server 使用"""
async def handle_citation_task(task):
"""引用检测 handler — 通过 HTTP 调用 GEO Backend 的业务 API"""
import httpx
async with httpx.AsyncClient() as client:
if task.task_type == "citation_detect":
resp = await client.post(
"http://geo-backend:8000/internal/citation/detect",
json=task.input_data,
)
return resp.json()
elif task.task_type == "citation_detect_single":
resp = await client.post(
"http://geo-backend:8000/internal/citation/detect-single",
json=task.input_data,
)
return resp.json()
关键决策:custom_handler 需要 DB 访问。有两种方案:
- 方案 1(推荐):AgentKit Server 通过 HTTP 回调 GEO Backend 的内部 API 访问 DB
- 方案 2:AgentKit Server 直接连接 GEO 的数据库(耦合度高,不推荐)
4.1.4 创建 AgentKit Server 启动脚本
# fischer-agentkit/configs/geo_server.py
"""GEO 专用 AgentKit Server 启动配置"""
from agentkit.server.app import create_app
from agentkit.llm.gateway import LLMGateway
from agentkit.llm.config import LLMConfig
from agentkit.skills.loader import SkillLoader
from agentkit.skills.registry import SkillRegistry
from agentkit.tools.registry import ToolRegistry
from configs.geo_tools import register_geo_tools
from configs.geo_handlers import handle_citation_task, handle_monitor_task, handle_schema_task
def create_geo_app():
# 1. 初始化 LLM Gateway
llm_config = LLMConfig.from_yaml("configs/llm_config.yaml")
llm_gateway = LLMGateway(config=llm_config)
# 2. 初始化 Tool Registry
tool_registry = ToolRegistry()
register_geo_tools(tool_registry)
# 3. 初始化 Skill Registry
skill_registry = SkillRegistry()
loader = SkillLoader(skill_registry=skill_registry, tool_registry=tool_registry)
loader.load_from_directory("configs/skills") # 8 个 YAML
# 4. 创建 FastAPI App
app = create_app(
llm_gateway=llm_gateway,
skill_registry=skill_registry,
tool_registry=tool_registry,
)
return app
# 启动命令:
# uvicorn configs.geo_server:create_geo_app --factory --host 0.0.0.0 --port 8000
Phase 2:GEO Backend 改造
目标:GEO Backend 不再直接使用 agentkit 内部类,全部通过 AgentKitClient 调用。
4.2.1 改造 adapter.py
# app/agent_framework/adapter.py — Mode A 版本
"""GEO Agent 适配层 — Mode A(HTTP API)
所有 Agent 操作通过 AgentKit Server 的 HTTP API 完成。
GEO Backend 不再 import agentkit 内部类。
"""
import logging
import os
from agentkit.server.client import AgentKitClient
logger = logging.getLogger(__name__)
_AGENTKIT_CLIENT: AgentKitClient | None = None
def get_agentkit_client() -> AgentKitClient:
"""获取 AgentKit Server HTTP 客户端
环境变量:
AGENTKIT_SERVER_URL: AgentKit Server 地址,默认 http://localhost:8000
"""
global _AGENTKIT_CLIENT
if _AGENTKIT_CLIENT is None:
base_url = os.getenv("AGENTKIT_SERVER_URL", "http://localhost:8000")
_AGENTKIT_CLIENT = AgentKitClient(base_url=base_url)
logger.info(f"AgentKitClient initialized: {base_url}")
return _AGENTKIT_CLIENT
async def submit_task(
input_data: dict,
skill_name: str | None = None,
agent_name: str | None = None,
) -> dict:
"""提交任务到 AgentKit Server
Args:
input_data: 任务输入数据
skill_name: 指定 Skill 名称(可选,不指定则自动路由)
agent_name: 指定 Agent 名称(可选)
Returns:
标准化输出结果,包含 skill_name, data, metadata
"""
client = get_agentkit_client()
result = await client.submit_task(
input_data=input_data,
skill_name=skill_name,
agent_name=agent_name,
)
return result
async def get_task_status(task_id: str) -> dict:
"""查询任务状态"""
client = get_agentkit_client()
return await client.get_task_status(task_id)
async def get_llm_usage(agent_name: str | None = None) -> dict:
"""查询 LLM 用量统计"""
client = get_agentkit_client()
return await client.get_usage(agent_name=agent_name)
4.2.2 改造 API 路由(app/api/agents.py)
# 改造前:
from app.agent_framework.dispatcher import TaskDispatcher
from app.agent_framework.protocol import TaskMessage, TaskStatus
task = TaskMessage(...)
dispatcher = TaskDispatcher(settings.REDIS_URL)
await dispatcher.dispatch(task, ...)
# 改造后:
from app.agent_framework.adapter import submit_task, get_task_status, get_llm_usage
result = await submit_task(
input_data=body.input_data,
skill_name=body.agent_name, # agent_name 映射为 skill_name
)
4.2.3 改造 ContentGenerationService
# 改造前(三阶段轮询):
from app.agent_framework.dispatcher import TaskDispatcher
from app.agent_framework.protocol import TaskMessage
dispatcher = TaskDispatcher(settings.REDIS_URL)
task = TaskMessage(agent_name="content_generator", ...)
dispatched_id = await dispatcher.dispatch(task, ...)
result = await self._poll_task_result(dispatcher, dispatched_id, timeout=300)
# 改造后(单次调用,AgentKit Server 内部编排):
from app.agent_framework.adapter import submit_task
result = await submit_task(
input_data={
"target_keyword": keyword,
"brand_name": brand_name,
"target_platform": platform,
"word_count": word_count,
"content_style": content_style,
"run_deai": run_deai,
"run_geo": run_geo,
},
skill_name="content_generator",
)
content = result["data"]["content"]
注意:当前 content_generation_service 的三阶段(generate → de-AI → GEO optimize) 是通过 3 次独立的 TaskDispatcher.dispatch 实现的。 迁移到 Mode A 后,有两种方案:
方案 1(推荐):在 AgentKit Server 中创建一个
content_productionPipeline Skill, 内部编排 3 个子 Skill 的执行顺序。GEO 只需一次submit_task调用。方案 2(简单):GEO 仍然调用 3 次
submit_task,每次指定不同的 skill_name。 改动最小,但调用方仍需编排逻辑。
4.2.4 改造 Citation 和 Scheduler
# 改造前(直接实例化):
from app.agent_framework.agents import CitationDetectorAgent
agent = CitationDetectorAgent()
result = await agent.execute(task)
# 改造后:
from app.agent_framework.adapter import submit_task
result = await submit_task(
input_data={"keyword": keyword, "platform": platform, ...},
skill_name="citation_detector",
)
Phase 3:GEO Backend 内部 API(供 AgentKit Server 回调)
custom_handler 需要 DB 访问,AgentKit Server 通过 HTTP 回调 GEO Backend。
4.3.1 新增内部 API 路由
# app/api/internal.py — 仅供 AgentKit Server 内部调用
"""内部 API — 供 AgentKit Server 回调访问 GEO 业务逻辑"""
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
router = APIRouter(prefix="/internal", tags=["internal"])
@router.post("/citation/detect")
async def citation_detect(input_data: dict, db: AsyncSession = Depends(get_db)):
"""引用检测 — 供 AgentKit Server 的 citation_handler 回调"""
from app.services.citation.citation import CitationService
service = CitationService()
return await service.detect_full(input_data, db=db)
@router.post("/citation/detect-single")
async def citation_detect_single(input_data: dict, db: AsyncSession = Depends(get_db)):
"""单平台引用检测 — 供 AgentKit Server 回调"""
from app.services.citation.citation import CitationService
service = CitationService()
return await service.detect_single(input_data, db=db)
@router.post("/monitor/check")
async def monitor_check(input_data: dict, db: AsyncSession = Depends(get_db)):
"""品牌监控检查 — 供 AgentKit Server 的 monitor_handler 回调"""
from app.services.monitor.monitor_service import MonitorService
service = MonitorService()
return await service.check_and_compare(input_data, db=db)
@router.post("/schema/advise")
async def schema_advise(input_data: dict, db: AsyncSession = Depends(get_db)):
"""Schema 建议 — 供 AgentKit Server 的 schema_handler 回调"""
from app.services.schema.schema_service import SchemaService
service = SchemaService()
return await service.advise(input_data, db=db)
@router.post("/knowledge/search")
async def knowledge_search(input_data: dict, db: AsyncSession = Depends(get_db)):
"""知识库检索 — 供 AgentKit Server 的 retrieve_knowledge Tool 回调"""
from app.services.knowledge.rag_service import RAGService
service = RAGService()
results = await service.search(
session=db,
query=input_data["query"],
knowledge_base_ids=input_data.get("knowledge_base_ids", []),
top_k=input_data.get("top_k", 3),
)
return {"results": results}
安全:内部 API 应限制只允许 AgentKit Server 的 IP 访问,或使用内部认证 Token。
Phase 4:清理旧代码
迁移完成并验证后,删除以下文件/目录:
app/agent_framework/
├── base.py # 删除
├── dispatcher.py # 删除
├── registry.py # 删除
├── protocol.py # 删除
├── exceptions.py # 删除
├── config_manager.py # 删除
├── standalone.py # 删除
├── pipeline/ # 删除
└── agents/
├── __init__.py # 删除(旧 Agent 类导出)
├── base_agent.py # 删除
├── citation_detector.py # 删除
├── ...其他旧 Agent 类 # 删除
└── configs/ # 保留(已迁移到 AgentKit Server)
保留的文件:
app/agent_framework/
├── __init__.py # 精简,只导出 AgentKitClient 相关
├── adapter.py # Mode A 版本
└── tools/ # 保留(Tool 定义已迁移到 AgentKit Server,但可作为参考)
5. 部署架构
5.1 docker-compose 配置
# docker-compose.yml
version: "3.8"
services:
# GEO Backend
geo-backend:
build: ./geo/backend
ports:
- "8000:8000"
environment:
- AGENTKIT_SERVER_URL=http://agentkit-server:8001
- DATABASE_URL=postgresql+asyncpg://...
- REDIS_URL=redis://redis:6379/0
depends_on:
- agentkit-server
- postgres
- redis
# AgentKit Server
agentkit-server:
build: ./fischer-agentkit
command: uvicorn configs.geo_server:create_geo_app --factory --host 0.0.0.0 --port 8001
ports:
- "8001:8001"
environment:
- DEEPSEEK_API_KEY=${DEEPSEEK_API_KEY}
- OPENAI_API_KEY=${OPENAI_API_KEY}
- GEO_BACKEND_URL=http://geo-backend:8000
volumes:
- ./fischer-agentkit/configs:/app/configs
depends_on:
- postgres
- redis
postgres:
image: pgvector/pg15:latest
ports:
- "5432:5432"
redis:
image: redis:7-alpine
ports:
- "6379:6379"
5.2 网络拓扑
┌──────────────┐
│ Frontend │
└──────┬───────┘
│
┌──────▼───────┐
│ GEO Backend │ :8000
│ (FastAPI) │
└──────┬───────┘
│ HTTP
┌──────▼───────┐
│ AgentKit Svr │ :8001
│ (FastAPI) │
└──────┬───────┘
┌────┼────┐
│ │ │
┌────▼┐ ┌▼───┐ ┌▼────┐
│Redis│ │ PG │ │ LLM │
└─────┘ └────┘ └─────┘
AgentKit Server ←→ GEO Backend:内部 API 回调(custom_handler 访问 DB)
GEO Backend ←→ AgentKit Server:HTTP API(submit_task / get_usage)
6. 迁移检查清单
Phase 1:AgentKit Server 部署
- 创建
configs/llm_config.yaml - 将 8 个 YAML 配置复制到
configs/skills/目录 - 为需要意图识别的 Skill 添加
intent字段 - 迁移 14 个 FunctionTool 到
configs/geo_tools.py - 迁移 3 个 custom_handler 到
configs/geo_handlers.py - 创建
configs/geo_server.py启动配置 - 验证 AgentKit Server 能独立启动并加载所有 Skill/Tool
- 验证
POST /api/v1/health返回 ok
Phase 2:GEO Backend 改造
- 改造
adapter.py为 Mode A 版本 - 改造
app/api/agents.py使用submit_task() - 改造
content_generation_service.py使用submit_task() - 改造
citation.py和scheduler.py使用submit_task() - 新增
app/api/internal.py内部 API - 配置
AGENTKIT_SERVER_URL环境变量 - 端到端测试:提交任务 → AgentKit 处理 → 返回结果
Phase 3:清理
- 删除旧框架文件(base.py, dispatcher.py, registry.py 等)
- 删除旧 Agent 类文件
- 更新
__init__.py导出 - 全量回归测试
7. 风险与缓解
| 风险 | 影响 | 缓解 |
|---|---|---|
| custom_handler 需要回调 GEO Backend | 增加网络延迟和故障点 | 内部 API 加超时+重试;AgentKit Server 和 GEO Backend 部署在同一网络 |
| 三阶段内容生成编排 | 调用方式变化 | 推荐 Pipeline Skill 方案,一次调用完成三阶段 |
| 旧代码删除导致其他模块 break | 运行时错误 | 逐文件删除,每次删除后跑全量测试 |
| AgentKit Server 单点故障 | 所有 Agent 功能不可用 | 部署多实例 + 负载均衡 |
| LLM API Key 安全 | 泄露风险 | AgentKit Server 环境变量注入,不写入代码或配置文件 |