geo/.qoder/repowiki/zh/content/后端系统架构/后端系统架构.md

51 KiB
Raw Permalink Blame History

后端系统架构

**本文档引用的文件** - [backend/app/main.py](file://backend/app/main.py) - [backend/app/config.py](file://backend/app/config.py) - [backend/app/database.py](file://backend/app/database.py) - [backend/app/api/auth.py](file://backend/app/api/auth.py) - [backend/app/api/queries.py](file://backend/app/api/queries.py) - [backend/app/api/citations.py](file://backend/app/api/citations.py) - [backend/app/api/deps.py](file://backend/app/api/deps.py) - [backend/app/api/admin.py](file://backend/app/api/admin.py) - [backend/app/api/reports.py](file://backend/app/api/reports.py) - [backend/app/api/subscriptions.py](file://backend/app/api/subscriptions.py) - [backend/app/middleware/logging_middleware.py](file://backend/app/middleware/logging_middleware.py) - [backend/app/middleware/rate_limit.py](file://backend/app/middleware/rate_limit.py) - [backend/app/schemas/auth.py](file://backend/app/schemas/auth.py) - [backend/app/schemas/subscription.py](file://backend/app/schemas/subscription.py) - [backend/app/models/user.py](file://backend/app/models/user.py) - [backend/app/models/query.py](file://backend/app/models/query.py) - [backend/app/models/citation_record.py](file://backend/app/models/citation_record.py) - [backend/app/models/subscription.py](file://backend/app/models/subscription.py) - [backend/app/models/query_task.py](file://backend/app/models/query_task.py) - [backend/app/services/auth.py](file://backend/app/services/auth.py) - [backend/app/services/admin.py](file://backend/app/services/admin.py) - [backend/app/services/subscription.py](file://backend/app/services/subscription.py) - [backend/app/workers/scheduler.py](file://backend/app/workers/scheduler.py) - [backend/app/workers/citation_engine.py](file://backend/app/workers/citation_engine.py) - [backend/app/workers/llm_adapter.py](file://backend/app/workers/llm_adapter.py) - [backend/app/agent_framework/base.py](file://backend/app/agent_framework/base.py) - [backend/app/agent_framework/dispatcher.py](file://backend/app/agent_framework/dispatcher.py) - [backend/app/agent_framework/registry.py](file://backend/app/agent_framework/registry.py) - [backend/app/agent_framework/pipeline/engine.py](file://backend/app/agent_framework/pipeline/engine.py) - [backend/app/agent_framework/pipeline/loader.py](file://backend/app/agent_framework/pipeline/loader.py) - [backend/app/agent_framework/pipeline/schema.py](file://backend/app/agent_framework/pipeline/schema.py) - [backend/app/agent_framework/protocol.py](file://backend/app/agent_framework/protocol.py) - [backend/app/agent_framework/agents/content_generator_agent.py](file://backend/app/agent_framework/agents/content_generator_agent.py) - [backend/app/agent_framework/agents/geo_optimizer_agent.py](file://backend/app/agent_framework/agents/geo_optimizer_agent.py) - [backend/app/services/llm/factory.py](file://backend/app/services/llm/factory.py) - [backend/app/models/agent.py](file://backend/app/models/agent.py) - [backend/pipelines/content_production.yaml](file://backend/pipelines/content_production.yaml) - [backend/pipelines/diagnosis.yaml](file://backend/pipelines/diagnosis.yaml)

更新摘要

所做更改

  • 新增代理框架架构章节包含Agent基类、注册中心、任务分发器和工作流引擎
  • 新增LLM服务集成章节包含LLM工厂模式和多提供商支持
  • 新增工作器系统扩展章节包含LLM适配器和平台适配器
  • 新增分布式发布系统章节包含Pipeline编排和任务编排
  • 更新项目结构图反映新增的代理框架和LLM服务模块
  • 更新架构总览图展示新增的代理系统和LLM集成

目录

  1. 引言
  2. 项目结构
  3. 核心组件
  4. 架构总览
  5. 详细组件分析
  6. 代理框架架构
  7. LLM服务集成
  8. 工作器系统扩展
  9. 分布式发布系统
  10. 中间件系统
  11. 管理员服务
  12. 订阅服务
  13. 报告服务
  14. 依赖关系分析
  15. 性能考量
  16. 故障排查指南
  17. 结论
  18. 附录

引言

本文件为 GEO 平台后端系统的架构文档,基于 FastAPI 构建,采用异步 SQLAlchemy ORM、APScheduler 定时任务与多平台适配器模式,实现查询词管理、引用检测与报告统计等功能。文档覆盖应用配置、中间件、路由组织、生命周期管理、数据库连接与 ORM、异步处理、认证与权限控制、API 设计与错误处理、系统监控与日志、性能优化策略,并给出架构决策的技术背景与权衡。

更新 本次更新显著扩展了系统架构新增代理框架架构、LLM服务集成、工作器系统扩展和分布式发布系统等核心组件形成了更加完整的企业级AI内容生产与管理平台。

项目结构

后端采用分层与功能域结合的组织方式:

  • 应用入口与生命周期app/main.py
  • 配置中心app/config.py
  • 数据库与依赖注入app/database.py
  • API 层app/api/ 下按功能模块划分auth、queries、citations、admin、reports、subscriptions、deps
  • 中间件层app/middleware/logging_middleware、rate_limit
  • 模型层app/models/SQLAlchemy ORM 映射)
  • 服务层app/services/business logic encapsulation
  • 代理框架app/agent_framework/Agent基类、注册中心、任务分发器、工作流引擎
  • LLM服务app/services/llm/LLM工厂、提供商适配器
  • 工作器与调度app/workers/APScheduler 调度器、引用检测引擎、平台适配器、LLM适配器
  • 流水线定义pipelines/YAML配置文件
  • 测试tests/pytest
graph TB
subgraph "应用入口"
MAIN["app/main.py"]
end
subgraph "配置与数据库"
CFG["app/config.py"]
DB["app/database.py"]
end
subgraph "中间件层"
LOGMW["middleware/logging_middleware.py"]
RATEMW["middleware/rate_limit.py"]
end
subgraph "API 层"
AUTH["api/auth.py"]
QUERIES["api/queries.py"]
CITATIONS["api/citations.py"]
ADMIN["api/admin.py"]
REPORTS["api/reports.py"]
SUBSCRIPTIONS["api/subscriptions.py"]
DEPS["api/deps.py"]
end
subgraph "代理框架"
AGENT_BASE["agent_framework/base.py"]
REGISTRY["agent_framework/registry.py"]
DISPATCHER["agent_framework/dispatcher.py"]
PIPELINE_ENGINE["agent_framework/pipeline/engine.py"]
PIPELINE_LOADER["agent_framework/pipeline/loader.py"]
PIPELINE_SCHEMA["agent_framework/pipeline/schema.py"]
PROTOCOL["agent_framework/protocol.py"]
END
subgraph "LLM服务"
LLM_FACTORY["services/llm/factory.py"]
OPENAI_PROVIDER["services/llm/openai_provider.py"]
DEEPSEEK_PROVIDER["services/llm/deepseek_provider.py"]
END
subgraph "模型与服务"
MODEL_USER["models/user.py"]
MODEL_QUERY["models/query.py"]
MODEL_CIT["models/citation_record.py"]
MODEL_SUB["models/subscription.py"]
MODEL_TASK["models/query_task.py"]
MODEL_AGENT["models/agent.py"]
SVC_AUTH["services/auth.py"]
SVC_ADMIN["services/admin.py"]
SVC_SUB["services/subscription.py"]
END
subgraph "工作器与调度"
SCHED["workers/scheduler.py"]
ENGINE["workers/citation_engine.py"]
LLM_ADAPTER["workers/llm_adapter.py"]
PLATFORMS["workers/platforms/"]
END
subgraph "流水线定义"
CONTENT_PRODUCTION["pipelines/content_production.yaml"]
DIAGNOSIS["pipelines/diagnosis.yaml"]
END
MAIN --> LOGMW
MAIN --> RATEMW
MAIN --> AUTH
MAIN --> QUERIES
MAIN --> CITATIONS
MAIN --> ADMIN
MAIN --> REPORTS
MAIN --> SUBSCRIPTIONS
AUTH --> SVC_AUTH
ADMIN --> SVC_ADMIN
SUBSCRIPTIONS --> SVC_SUB
AUTH --> DB
QUERIES --> DB
CITATIONS --> DB
ADMIN --> DB
REPORTS --> DB
SUBSCRIPTIONS --> DB
DEPS --> DB
SVC_AUTH --> CFG
SVC_ADMIN --> CFG
SVC_SUB --> CFG
SCHED --> DB
SCHED --> ENGINE
ENGINE --> MODEL_QUERY
ENGINE --> MODEL_CIT
LLM_ADAPTER --> CFG
REGISTRY --> DB
DISPATCHER --> DB
DISPATCHER --> AGENT_BASE
PIPELINE_ENGINE --> DISPATCHER
PIPELINE_LOADER --> PIPELINE_SCHEMA
LLM_FACTORY --> OPENAI_PROVIDER
LLM_FACTORY --> DEEPSEEK_PROVIDER

图表来源

章节来源

核心组件

  • 应用入口与生命周期:通过 lifespan 钩子在启动时初始化模型与调度器,在关闭时优雅停止。
  • 中间件:启用 CORS、限流中间件和日志中间件提供安全防护和性能监控。
  • 路由组织:按模块拆分,统一前缀与标签,便于 API 文档生成与维护。
  • 数据库:异步 SQLAlchemy 引擎与会话工厂,依赖注入式获取会话。
  • 认证与权限OAuth2 密码流 + JWT依赖注入解析当前用户未授权时抛出 401。
  • 引擎与调度APScheduler 定时扫描到期查询,调用 CitationEngine 执行并持久化结果。
  • 新增 代理框架基于Redis的消息队列支持Agent注册、发现、任务分发和工作流编排。
  • 新增 LLM服务统一的LLM提供商工厂支持OpenAI和DeepSeek等多种提供商。
  • 新增 分布式发布基于YAML的流水线编排系统支持复杂的内容生产工作流。

更新 新增代理框架、LLM服务集成和分布式发布系统显著增强了系统的智能化和自动化能力。

章节来源

架构总览

系统采用"API 层-中间件层-服务层-模型层-基础设施"的分层架构,配合异步 I/O 与定时任务实现高并发与可扩展的查询与检测能力。新增的代理框架通过Redis实现分布式任务调度LLM服务提供统一的AI能力接口分布式发布系统支持复杂的工作流编排。

graph TB
CLIENT["客户端/前端"]
FASTAPI["FastAPI 应用<br/>lifespan/CORS"]
MIDDLEWARE["中间件层<br/>限流/日志"]
ROUTER_AUTH["认证路由"]
ROUTER_QUERIES["查询路由"]
ROUTER_CIT["引用路由"]
ROUTER_ADMIN["管理员路由"]
ROUTER_REPORTS["报告路由"]
ROUTER_SUB["订阅路由"]
DEPS["依赖注入<br/>OAuth2/JWT 解析"]
SVC_AUTH["认证服务<br/>密码哈希/JWT"]
SVC_ADMIN["管理员服务<br/>用户管理/统计"]
SVC_SUB["订阅服务<br/>套餐管理/订阅流程"]
DB["异步数据库<br/>Session 工厂"]
SCHED["查询调度器<br/>APScheduler"]
ENGINE["引用检测引擎<br/>平台适配器"]
LLM_ADAPTER["LLM适配器<br/>DeepSeek API"]
AGENT_FRAMEWORK["代理框架<br/>Redis队列/Agent管理"]
REGISTRY["注册中心<br/>Agent注册/发现"]
DISPATCHER["任务分发器<br/>消息队列/状态管理"]
PIPELINE_ENGINE["工作流引擎<br/>YAML编排/DAG执行"]
LLM_SERVICE["LLM服务<br/>工厂模式/多提供商"]
CLIENT --> FASTAPI
FASTAPI --> MIDDLEWARE
MIDDLEWARE --> ROUTER_AUTH
MIDDLEWARE --> ROUTER_QUERIES
MIDDLEWARE --> ROUTER_CIT
MIDDLEWARE --> ROUTER_ADMIN
MIDDLEWARE --> ROUTER_REPORTS
MIDDLEWARE --> ROUTER_SUB
ROUTER_AUTH --> DEPS
ROUTER_QUERIES --> DEPS
ROUTER_CIT --> DEPS
ROUTER_ADMIN --> DEPS
ROUTER_REPORTS --> DEPS
ROUTER_SUB --> DEPS
ROUTER_AUTH --> SVC_AUTH
ROUTER_ADMIN --> SVC_ADMIN
ROUTER_SUB --> SVC_SUB
ROUTER_QUERIES --> DB
ROUTER_CIT --> DB
ROUTER_REPORTS --> DB
DEPS --> DB
SVC_AUTH --> DB
SVC_ADMIN --> DB
SVC_SUB --> DB
SCHED --> DB
SCHED --> ENGINE
ENGINE --> DB
LLM_ADAPTER --> DB
AGENT_FRAMEWORK --> REGISTRY
AGENT_FRAMEWORK --> DISPATCHER
AGENT_FRAMEWORK --> PIPELINE_ENGINE
DISPATCHER --> DB
REGISTRY --> DB
PIPELINE_ENGINE --> DB
LLM_SERVICE --> DB

图表来源

详细组件分析

应用入口与生命周期

  • 使用 lifespan 钩子在启动时导入模型并启动查询调度器;在关闭时优雅停止调度器与引擎资源。
  • 注册 CORS 中间件,允许前端跨域访问。
  • 统一注册认证、查询、引用、报告、订阅、管理员路由,并为"立即执行"路由复用同一前缀。
sequenceDiagram
participant Client as "客户端"
participant App as "FastAPI 应用"
participant Life as "lifespan"
participant Sched as "查询调度器"
Client->>App : 启动请求
App->>Life : 进入 lifespan
Life->>Sched : start()
App-->>Client : 200 OK
Note over App,Sched : 应用运行中
Client->>App : 关闭请求
App->>Life : 退出 lifespan
Life->>Sched : shutdown()
App-->>Client : 200 OK

图表来源

章节来源

配置与数据库

  • 配置项数据库连接、Redis、JWT 秘钥与过期时间、浏览器路径、平台 API Key 等。
  • 数据库:异步引擎、会话工厂、基础模型类;提供依赖注入函数以获取会话。
  • 会话行为:非自动提交/刷新/回滚,显式管理事务边界。
flowchart TD
Start(["应用启动"]) --> LoadCfg["加载配置"]
LoadCfg --> InitEngine["创建异步引擎"]
InitEngine --> InitSession["创建会话工厂"]
InitSession --> RegisterDep["注册 get_db 依赖"]
RegisterDep --> Ready(["就绪"])

图表来源

章节来源

认证系统

  • 登录:校验邮箱与密码,成功则签发 JWT。
  • 注册:检查邮箱唯一性,哈希密码后创建用户。
  • 当前用户:通过 OAuth2 密码流获取令牌,解码 JWT 提取用户 ID查询数据库返回当前用户。
  • 错误处理:未通过凭据验证时返回 401。
sequenceDiagram
participant Client as "客户端"
participant Auth as "认证路由"
participant Svc as "认证服务"
participant DB as "数据库"
Client->>Auth : POST /api/v1/auth/login
Auth->>Svc : authenticate_user(email, password)
Svc->>DB : 查询用户
DB-->>Svc : 用户对象
Svc-->>Auth : 用户或空
Auth-->>Client : {access_token, user} 或 401
Client->>Auth : GET /api/v1/auth/me (携带 Bearer Token)
Auth->>Svc : verify_token(token)
Svc-->>Auth : 载荷
Auth->>DB : 查询用户 by id
DB-->>Auth : 用户对象
Auth-->>Client : 用户信息

图表来源

章节来源

查询与引用 API

  • 查询 API支持分页、创建、读取、更新、删除均需当前用户权限。
  • 引用 API支持分页查询、统计、立即执行查询任务返回任务状态
  • 权限控制:所有路由依赖 get_current_user未通过验证返回 401。
sequenceDiagram
participant Client as "客户端"
participant Q as "查询路由"
participant D as "依赖注入"
participant S as "服务层"
participant DB as "数据库"
Client->>Q : POST /api/v1/queries/
Q->>D : get_current_user()
D-->>Q : 当前用户
Q->>S : create_query(...)
S->>DB : 写入
DB-->>S : 成功
S-->>Q : 查询对象
Q-->>Client : 201 + 查询对象

图表来源

章节来源

引擎与调度

  • 调度器:每小时扫描 queries 表中 status='active' 且 next_query_at <= now() 的记录,逐条执行。
  • 引擎:对每个平台执行查询,进行品牌匹配与竞争品牌检测,写入 citation_records并更新查询时间字段。
  • 平台适配器:抽象不同平台的查询接口,统一返回原始响应供匹配器处理。
flowchart TD
Tick["定时触发(每小时)"] --> Scan["扫描到期查询"]
Scan --> Found{"找到待执行查询?"}
Found -- 否 --> Wait["等待下一轮"]
Found -- 是 --> Exec["遍历查询平台执行"]
Exec --> Record["写入引用记录"]
Record --> Update["更新查询时间字段"]
Update --> Done["完成"]
Wait --> Tick

图表来源

章节来源

数据模型与关系

  • 用户:主键 UUID、邮箱唯一、密码哈希、计划与配额、活跃状态、活跃状态、时间戳。
  • 查询:外键用户、关键词、目标品牌、别名、平台集合、频率、状态与时间字段。
  • 引用记录:外键查询、平台、是否引用、位置、文本、竞争品牌、原始响应、时间戳。
  • 订阅:外键用户、套餐类型、状态、开始结束日期、金额、支付方式、时间戳。
  • 查询任务:外键查询、平台、状态、错误信息、调度时间、开始完成时间。
  • 新增 Agent注册表Agent元数据、状态、能力声明、心跳时间。
  • 新增 Agent任务表任务执行状态、输入输出数据、执行指标。
  • 新增 Agent任务日志表任务执行日志、进度跟踪、错误信息。
erDiagram
USERS {
uuid id PK
string email UK
string password_hash
string name
string plan
int max_queries
bool is_active
bool is_admin
timestamp created_at
timestamp updated_at
}
QUERIES {
uuid id PK
uuid user_id FK
string keyword
string target_brand
jsonb brand_aliases
jsonb platforms
string frequency
string status
timestamp last_queried_at
timestamp next_query_at
timestamp created_at
timestamp updated_at
}
CITATION_RECORDS {
uuid id PK
uuid query_id FK
string platform
bool cited
int citation_position
text citation_text
jsonb competitor_brands
text raw_response
timestamp queried_at
}
SUBSCRIPTIONS {
uuid id PK
uuid user_id FK
string plan
string status
date start_date
date end_date
numeric amount
string payment_method
timestamp created_at
}
QUERY_TASKS {
uuid id PK
uuid query_id FK
string platform
string status
text error_message
timestamp scheduled_at
timestamp started_at
timestamp completed_at
timestamp created_at
timestamp updated_at
}
AGENT_REGISTRY {
uuid id PK
string name UK
string display_name
string agent_type
string description
string version
string endpoint
string status
jsonb capabilities
timestamp last_heartbeat
timestamp created_at
timestamp updated_at
}
AGENT_TASKS {
uuid id PK
uuid agent_id FK
string task_type
string status
int priority
jsonb input_data
jsonb output_data
text error_message
uuid created_by
uuid organization_id FK
uuid project_id FK
timestamp scheduled_at
timestamp started_at
timestamp completed_at
timestamp created_at
}
AGENT_TASK_LOGS {
uuid id PK
uuid task_id FK
uuid agent_id FK
string log_level
text message
jsonb extra_metadata
timestamp created_at
}
USERS ||--o{ QUERIES : "拥有"
QUERIES ||--o{ CITATION_RECORDS : "产生"
USERS ||--o{ SUBSCRIPTIONS : "订阅"
QUERIES ||--o{ QUERY_TASKS : "包含"
AGENT_REGISTRY ||--o{ AGENT_TASKS : "执行"
AGENT_TASKS ||--o{ AGENT_TASK_LOGS : "记录"

图表来源

章节来源

代理框架架构

Agent基类与生命周期管理

代理框架的核心是BaseAgent基类它定义了所有Agent的标准生命周期和行为

  • 启动流程初始化Redis连接、注册到注册中心、启动心跳、开始监听任务队列
  • 任务执行异步监听Redis队列执行具体任务逻辑上报进度和结果
  • 状态管理维护Agent在线状态、忙碌状态支持优雅停机
  • 心跳机制:定期向注册中心上报心跳,保持活跃状态
sequenceDiagram
participant Agent as "Agent实例"
participant Redis as "Redis服务器"
participant Registry as "注册中心"
participant Dispatcher as "任务分发器"
Agent->>Agent : start()
Agent->>Redis : 连接Redis
Agent->>Registry : register(capabilities)
Registry-->>Agent : 注册成功
Agent->>Agent : 启动心跳循环
Agent->>Agent : 启动任务监听
loop 每30秒
Agent->>Registry : update_heartbeat()
end
Redis-->>Agent : 任务消息
Agent->>Agent : execute(task)
Agent->>Dispatcher : handle_result(result)
Agent->>Agent : report_progress(progress)

图表来源

注册中心与Agent发现

注册中心负责管理所有Agent的生命周期和状态

  • 注册流程Agent启动时向注册中心注册保存能力声明和端点信息
  • 状态维护实时更新Agent心跳时间超时自动标记为离线
  • 发现机制根据任务类型动态查找可用的Agent实例
  • 健康检查定期扫描超时的Agent并更新状态
flowchart TD
Start["Agent启动"] --> Connect["连接Redis"]
Connect --> Register["向注册中心注册"]
Register --> SaveInfo["保存Agent信息<br/>能力声明/端点/状态"]
SaveInfo --> Heartbeat["启动心跳循环"]
Heartbeat --> Monitor["监控Agent状态"]
Monitor --> Timeout{"心跳超时?"}
Timeout -- 是 --> MarkOffline["标记为离线"]
Timeout -- 否 --> Monitor
Monitor --> Discover["Agent发现"]
Discover --> Match{"任务类型匹配?"}
Match -- 是 --> Dispatch["分发任务"]
Match -- 否 --> Wait["等待其他Agent"]

图表来源

任务分发器与消息队列

任务分发器通过Redis实现Agent间的异步通信

  • 任务分发将TaskMessage推送到指定Agent的队列
  • 状态管理维护AgentTask表跟踪任务执行状态
  • 结果处理接收Agent返回的TaskResult更新数据库状态
  • 进度上报处理TaskProgress消息记录执行进度
sequenceDiagram
participant API as "API服务"
participant Dispatcher as "任务分发器"
participant Redis as "Redis队列"
participant Agent as "目标Agent"
participant DB as "数据库"
API->>Dispatcher : dispatch(task)
Dispatcher->>DB : 写入AgentTask记录
DB-->>Dispatcher : 确认写入
Dispatcher->>Redis : LPUSH agent : {name} : tasks
Redis-->>Agent : 任务消息
Agent->>Agent : execute(task)
Agent->>Dispatcher : handle_result(result)
Dispatcher->>DB : 更新任务状态
DB-->>API : 任务完成

图表来源

章节来源

LLM服务集成

LLM工厂模式与多提供商支持

LLM服务采用工厂模式统一管理不同的AI提供商

  • OpenAI提供商支持GPT系列模型提供标准的ChatCompletion接口
  • DeepSeek提供商支持DeepSeek系列模型提供高性能的推理能力
  • 统一接口所有提供商实现相同的LLMProvider接口支持透明切换
  • 配置管理通过环境变量和配置文件管理API密钥和模型参数
flowchart TD
Factory["LLM工厂"] --> OpenAI["OpenAI提供商"]
Factory --> DeepSeek["DeepSeek提供商"]
OpenAI --> GPT4["GPT-4模型"]
OpenAI --> GPT35["GPT-3.5模型"]
DeepSeek --> DeepSeekChat["DeepSeek-chat模型"]
DeepSeek --> DeepSeekCoder["DeepSeek-coder模型"]
Factory --> Config["配置管理"]
Config --> Env["环境变量"]
Config --> Settings["应用配置"]
Env --> Factory
Settings --> Factory

图表来源

LLM适配器与品牌引用检测

LLM适配器专门用于品牌引用检测任务集成DeepSeek API

  • 提示词工程:精心设计的提示词模板,确保准确的品牌识别
  • JSON输出解析标准化的JSON格式输出包含引用状态、置信度等信息
  • 错误处理:完善的异常处理和重试机制,确保服务稳定性
  • 模拟模式在禁用LLM时提供模拟结果保证系统可用性
sequenceDiagram
participant Engine as "引用检测引擎"
participant Adapter as "LLM适配器"
participant DeepSeek as "DeepSeek API"
Engine->>Adapter : query_brand_citation(keyword, brand, aliases)
Adapter->>Adapter : 构建提示词
Adapter->>DeepSeek : chat.completions.create
DeepSeek-->>Adapter : JSON响应
Adapter->>Adapter : 解析JSON输出
Adapter-->>Engine : CitationResult

图表来源

章节来源

工作器系统扩展

平台适配器与多平台支持

工作器系统扩展了原有的平台适配器支持更多AI平台

  • 现有平台Doubao、Kimi、Qingyan、Tiangong、Tongyi、Wenxin、Xinghuo
  • 搜索引擎:通用搜索平台适配器
  • 统一接口所有平台实现相同的BasePlatform接口
  • 配置管理通过配置文件管理平台API密钥和参数
graph TB
PlatformBase["平台基类<br/>BasePlatform"]
Doubao["Doubao平台<br/>doubao.py"]
Kimi["Kimi平台<br/>kimi.py"]
Qingyan["Qingyan平台<br/>qingyan.py"]
SearchEngine["搜索引擎<br/>search_engine.py"]
Tiangong["Tiangong平台<br/>tiangong.py"]
Tongyi["Tongyi平台<br/>tongyi.py"]
Wenxin["Wenxin平台<br/>wenxin.py"]
Xinghuo["Xinghuo平台<br/>xinghuo.py"]
PlatformBase --> Doubao
PlatformBase --> Kimi
PlatformBase --> Qingyan
PlatformBase --> SearchEngine
PlatformBase --> Tiangong
PlatformBase --> Tongyi
PlatformBase --> Wenxin
PlatformBase --> Xinghuo

图表来源

内容生成Agent与GEO优化Agent

新增的专业Agent实现了特定的AI内容生成功能

  • 内容生成Agent支持选题生成和文章生成集成RAG知识库检索
  • GEO优化Agent专门优化内容在AI搜索引擎中的可见性
  • 进度上报:实时上报任务执行进度,支持用户监控
  • 错误处理:完善的异常处理和重试机制
flowchart TD
ContentAgent["内容生成Agent"] --> Topics["选题生成"]
ContentAgent --> Article["文章生成"]
Topics --> RAG["RAG知识库检索"]
Article --> LLM["LLM调用"]
GEOAgent["GEO优化Agent"] --> Optimize["内容优化"]
Optimize --> LLM2["LLM调用"]
RAG --> LLM

图表来源

章节来源

分布式发布系统

Pipeline工作流编排

分布式发布系统基于YAML配置实现复杂的工作流编排

  • 内容生产流水线:从选题到发布的完整内容生产流程
  • 诊断分析流水线:引用检测与竞争分析的诊断流程
  • DAG执行:支持有向无环图的任务依赖关系
  • 变量解析:支持复杂的变量引用和上下文传递
graph TB
ContentProduction["内容生产流水线"] --> TopicSelection["选题选择"]
ContentProduction --> ContentGeneration["内容生成"]
ContentProduction --> DeAIProcessing["去AI化处理"]
ContentProduction --> GEOOptimization["GEO优化"]
ContentProduction --> RuleValidation["规则验证"]
TopicSelection --> ContentGeneration
ContentGeneration --> DeAIProcessing
DeAIProcessing --> GEOOptimization
GEOOptimization --> RuleValidation
Diagnosis["诊断分析流水线"] --> CitationDetection["引用检测"]
Diagnosis --> CompetitorAnalysis["竞争分析"]
CitationDetection --> CompetitorAnalysis

图表来源

工作流引擎与任务编排

工作流引擎负责执行复杂的任务编排逻辑:

  • 拓扑排序使用Kahn算法进行DAG拓扑排序
  • 条件执行:支持基于条件表达式的任务执行
  • 重试机制:支持任务级别的重试和超时控制
  • 进度跟踪:实时跟踪每个阶段的执行进度
sequenceDiagram
participant User as "用户"
participant Engine as "工作流引擎"
participant Dispatcher as "任务分发器"
participant Agent as "Agent实例"
User->>Engine : execute(pipeline, context)
Engine->>Engine : 拓扑排序
Engine->>Engine : 变量解析
Engine->>Dispatcher : 分发阶段任务
Dispatcher->>Agent : 任务消息
Agent->>Agent : 执行任务
Agent->>Dispatcher : 任务结果
Dispatcher->>Engine : 更新状态
Engine->>Engine : 下一阶段执行
Engine-->>User : 完整执行结果

图表来源

章节来源

中间件系统

限流中间件

限流中间件提供基于内存的请求限制功能,支持多种限流规则:

  • 认证接口限流针对登录、注册、忘记密码接口每分钟最多5次请求
  • 查询执行限流:针对/run-now接口每小时最多10次请求
  • 全局限流全局请求频率限制每分钟最多100次请求
  • 健康检查豁免/health、/docs、/openapi 路径不受限流限制
flowchart TD
Req["请求到达"] --> Health{"健康检查?"}
Health -- 是 --> Pass["直接通过"]
Health -- 否 --> AuthCheck{"认证接口?"}
AuthCheck -- 是 --> AuthKey["auth:{client_ip}"]
AuthCheck -- 否 --> RunCheck{"查询执行?"}
RunCheck -- 是 --> RunKey["query_run:{client_ip}"]
RunCheck -- 否 --> GlobalKey["global:{client_ip}"]
AuthKey --> CheckAuth["检查认证限流"]
RunKey --> CheckRun["检查查询限流"]
GlobalKey --> CheckGlobal["检查全局限流"]
CheckAuth --> AuthLimited{"超过限制?"}
CheckRun --> RunLimited{"超过限制?"}
CheckGlobal --> GlobalLimited{"超过限制?"}
AuthLimited -- 是 --> Block["返回429"]
RunLimited -- 是 --> Block
GlobalLimited -- 是 --> Block
AuthLimited -- 否 --> Pass
RunLimited -- 否 --> Pass
GlobalLimited -- 否 --> Pass
Pass --> Next["继续处理"]
Block --> End["结束"]
Next --> End

图表来源

日志中间件

日志中间件提供统一的请求日志记录功能:

  • 访问日志记录请求方法、URL、状态码、耗时、客户端IP
  • 性能监控:精确到毫秒的响应时间统计
  • 结构化日志:使用 geo.access 日志器,便于日志聚合和分析

章节来源

管理员服务

系统管理功能

管理员服务提供完整的系统管理能力:

  • 系统统计:统计总用户数、查询数、引用数、引用率、当日活跃用户
  • 用户管理:用户列表查询、详情查看、启用/禁用切换、套餐更新
  • 权限控制管理员专用路由非管理员访问返回403
sequenceDiagram
participant Admin as "管理员客户端"
participant AdminAPI as "管理员API"
participant AdminSvc as "管理员服务"
participant DB as "数据库"
Admin->>AdminAPI : GET /api/v1/admin/stats
AdminAPI->>AdminSvc : get_system_stats()
AdminSvc->>DB : 统计查询
DB-->>AdminSvc : 统计数据
AdminSvc-->>AdminAPI : {total_users, total_queries, ...}
AdminAPI-->>Admin : 系统统计信息
Admin->>AdminAPI : POST /api/v1/admin/users/{user_id}/toggle-active
AdminAPI->>AdminSvc : toggle_user_active(user_id)
AdminSvc->>DB : 更新用户状态
DB-->>AdminSvc : 更新结果
AdminSvc-->>AdminAPI : {id, is_active, message}
AdminAPI-->>Admin : 操作结果

图表来源

章节来源

订阅服务

套餐管理功能

订阅服务提供完整的套餐管理能力:

  • 套餐定义:免费版、入门版、专业版、企业版四个套餐
  • 功能特性基于套餐的差异化功能支持CSV导出、PDF报告、定时查询等
  • 订阅流程:创建订阅、取消订阅、历史记录查询
  • 用户配额:根据套餐更新用户最大查询次数
flowchart TD
Plan["套餐定义"] --> Features["功能特性映射"]
Features --> Free["免费版<br/>max_queries=5"]
Features --> Starter["入门版<br/>max_queries=20"]
Features --> Pro["专业版<br/>max_queries=100"]
Features --> Enterprise["企业版<br/>max_queries=500"]
User["用户"] --> Subscribe["订阅流程"]
Subscribe --> Create["创建订阅"]
Create --> UpdateUser["更新用户套餐"]
UpdateUser --> Active["激活状态"]
Active --> Cancel["取消订阅"]
Cancel --> Downgrade["降级到免费版"]

图表来源

章节来源

报告服务

导出功能

报告服务提供多种格式的数据导出能力:

  • CSV导出支持查询结果的CSV格式导出Stream响应
  • PDF报告支持生成PDF格式的详细报告
  • 权限控制:仅限当前用户访问自己的数据
  • 错误处理数据不存在时返回404错误
sequenceDiagram
participant Client as "客户端"
participant Reports as "报告API"
participant Service as "导出服务"
participant DB as "数据库"
Client->>Reports : GET /export/csv?query_id=...
Reports->>Service : export_citations_csv(query_id)
Service->>DB : 查询引用数据
DB-->>Service : 引用数据
Service-->>Reports : CSV内容
Reports-->>Client : StreamResponse CSV
Client->>Reports : GET /export/pdf?query_id=...
Reports->>Service : export_citations_pdf(query_id)
Service-->>Reports : PDF字节流
Reports-->>Client : Response PDF

图表来源

章节来源

依赖关系分析

  • 组件内聚API 路由与服务层职责清晰,模型仅负责映射。
  • 组件耦合API 依赖服务,服务依赖数据库与配置;调度器依赖引擎与数据库;引擎依赖平台适配器。
  • 新增 代理框架Agent依赖Redis和注册中心任务分发器依赖数据库和Redis。
  • 新增 LLM服务Agent依赖LLM工厂LLM工厂依赖具体的提供商实现。
  • 新增 工作流引擎:依赖任务分发器和管道加载器,支持复杂的任务编排。
  • 依赖注入:通过 FastAPI 依赖系统注入数据库会话与当前用户。
  • 循环依赖:未见明显循环依赖。
graph LR
MIDDLEWARE["middleware/*"] --> API_AUTH["api/auth.py"]
MIDDLEWARE --> API_ADMIN["api/admin.py"]
MIDDLEWARE --> API_SUB["api/subscriptions.py"]
MIDDLEWARE --> API_REPORTS["api/reports.py"]
API_AUTH --> SVC_AUTH["services/auth.py"]
API_ADMIN --> SVC_ADMIN["services/admin.py"]
API_SUB --> SVC_SUB["services/subscription.py"]
API_AUTH --> DEPS["api/deps.py"]
API_ADMIN --> DEPS
API_SUB --> DEPS
API_REPORTS --> DEPS
SVC_AUTH --> DB["database.py"]
SVC_ADMIN --> DB
SVC_SUB --> DB
DEPS --> DB
SCHED["workers/scheduler.py"] --> DB
SCHED --> ENGINE["workers/citation_engine.py"]
ENGINE --> MODELS["models/*.py"]
LLM_ADAPTER["workers/llm_adapter.py"] --> LLM_FACTORY["services/llm/factory.py"]
LLM_FACTORY --> PROVIDERS["services/llm/*"]
AGENT_FRAMEWORK["agent_framework/*"] --> REGISTRY["agent_framework/registry.py"]
AGENT_FRAMEWORK --> DISPATCHER["agent_framework/dispatcher.py"]
DISPATCHER --> MODELS["models/agent.py"]
PIPELINE_ENGINE["agent_framework/pipeline/engine.py"] --> DISPATCHER
PIPELINE_LOADER["agent_framework/pipeline/loader.py"] --> PIPELINE_SCHEMA["agent_framework/pipeline/schema.py"]

图表来源

章节来源

性能考量

  • 异步 I/O数据库与平台查询均采用异步提升并发吞吐。
  • 会话管理:显式事务边界,避免长事务占用连接池。
  • 定时任务APScheduler 异步调度,事件循环兼容处理,降低阻塞风险。
  • 索引优化:查询与引用表建立复合索引,加速过滤与排序。
  • 新增 Redis缓存代理框架使用Redis作为消息队列支持高并发任务分发。
  • 新增 LLM优化LLM调用采用异步模式支持批量处理和错误重试。
  • 新增 工作流优化DAG拓扑排序确保任务执行顺序避免死锁和循环依赖。
  • 缓存建议:可引入 Redis 缓存热点查询结果与用户会话信息(当前配置已准备)。
  • 日志采样:生产环境建议开启采样与结构化日志,避免高频日志影响性能。
  • 中间件性能:限流中间件使用内存存储,性能开销低;日志中间件仅记录必要信息。

更新 新增代理框架、LLM服务和工作流系统的性能考量包括Redis缓存、异步LLM调用和DAG执行优化。

故障排查指南

  • 认证失败:检查 JWT 秘钥、过期时间与前端令牌传递;确认 OAuth2 tokenUrl 与 Bearer 头正确。
  • 数据库连接:核对 DATABASE_URL确认容器网络可达查看连接池与超时配置。
  • 定时任务异常:关注调度器日志,检查查询状态与平台适配器可用性;确认 next_query_at 计算逻辑。
  • 引擎执行失败:查看平台适配器错误与原始响应;检查品牌匹配器与竞争品牌检测逻辑。
  • CORS 问题:确认前端域名与请求头是否在允许范围内。
  • 新增 代理框架问题检查Redis连接、Agent注册状态、任务队列是否正常。
  • 新增 LLM服务问题检查API密钥配置、提供商可用性、请求超时设置。
  • 新增 工作流执行问题检查YAML配置语法、依赖关系、变量引用是否正确。
  • 中间件问题检查限流规则配置确认健康检查路径是否被正确豁免验证日志中间件的logger配置。
  • 管理员权限:确认用户 is_admin 字段,检查管理员路由的权限验证逻辑。
  • 订阅状态:检查用户套餐与订阅状态的一致性,验证订阅历史记录的查询逻辑。

更新 新增代理框架、LLM服务和分布式发布系统的故障排查指导。

章节来源

结论

该架构以 FastAPI 为核心结合异步数据库、定时任务与多平台适配器形成高可用、可扩展的查询与引用检测系统。通过明确的分层与依赖注入系统具备良好的可测试性与可维护性。新增的代理框架、LLM服务集成、工作器系统扩展和分布式发布系统显著增强了系统的智能化、自动化和企业级服务能力。系统现已支持复杂的AI内容生产工作流、多提供商的LLM集成、分布式任务调度和实时进度监控为GEO平台的商业化运营奠定了坚实的技术基础。

更新 本次更新大幅扩展了系统功能新增代理框架、LLM服务集成、工作器系统扩展和分布式发布系统形成了更加完整的企业级AI内容生产与管理平台。

附录

  • API 设计原则:统一前缀与标签、明确响应模型、一致的状态码与错误消息。
  • 错误处理:在路由层捕获业务异常并转换为标准 HTTP 状态码;在依赖层统一 401 未授权。
  • 响应格式:遵循 Pydantic 模型序列化,确保前后端契约一致。
  • 架构决策背景:选择异步栈以提升 I/O 密集场景性能APScheduler 简化定时任务编排JWT 适合无状态认证场景;中间件系统提供安全防护和性能监控。
  • 新增功能背景代理框架满足分布式任务调度需求LLM服务满足AI能力集成需求工作器系统扩展满足多平台适配需求分布式发布系统满足复杂工作流编排需求。