geo/.qoder/repowiki/zh/content/后端系统架构/引用检测引擎.md

36 KiB
Raw Permalink Blame History

引用检测引擎

**本文档引用的文件** - [backend/app/workers/citation_engine.py](file://backend/app/workers/citation_engine.py) - [backend/app/workers/platforms/base.py](file://backend/app/workers/platforms/base.py) - [backend/app/workers/platforms/kimi.py](file://backend/app/workers/platforms/kimi.py) - [backend/app/workers/platforms/wenxin.py](file://backend/app/workers/platforms/wenxin.py) - [backend/app/workers/platforms/tongyi.py](file://backend/app/workers/platforms/tongyi.py) - [backend/app/workers/platforms/doubao.py](file://backend/app/workers/platforms/doubao.py) - [backend/app/workers/platforms/qingyan.py](file://backend/app/workers/platforms/qingyan.py) - [backend/app/workers/platforms/tiangong.py](file://backend/app/workers/platforms/tiangong.py) - [backend/app/workers/platforms/xinghuo.py](file://backend/app/workers/platforms/xinghuo.py) - [backend/app/workers/platforms/search_engine.py](file://backend/app/workers/platforms/search_engine.py) - [backend/app/workers/scheduler.py](file://backend/app/workers/scheduler.py) - [backend/app/services/citation.py](file://backend/app/services/citation.py) - [backend/app/api/citations.py](file://backend/app/api/citations.py) - [backend/app/models/citation_record.py](file://backend/app/models/citation_record.py) - [backend/app/models/query.py](file://backend/app/models/query.py) - [backend/app/models/query_task.py](file://backend/app/models/query_task.py) - [backend/app/config.py](file://backend/app/config.py) - [backend/app/main.py](file://backend/app/main.py) - [tests/test_citation_engine.py](file://tests/test_citation_engine.py) - [backend/requirements.txt](file://backend/requirements.txt)

更新摘要

变更内容

  • 新增7个搜索引擎适配器通义千问、豆包、智谱清言、天工、讯飞星火
  • 改进错误处理机制,实现指数退避重试策略
  • 新增字符清理机制防止PostgreSQL插入失败
  • 新增异步任务执行功能,支持立即查询执行
  • 优化搜索引擎模式,统一平台查询策略

目录

  1. 简介
  2. 项目结构
  3. 核心组件
  4. 架构总览
  5. 详细组件分析
  6. 依赖分析
  7. 性能考虑
  8. 故障排查指南
  9. 结论
  10. 附录

简介

本技术文档面向"GEO引用检测引擎"系统性阐述引用检测算法的实现原理与工程实践涵盖文本预处理、品牌识别与上下文分析详解BrandMatcher类的设计与实现精确匹配、别名匹配、模糊匹配策略与正则规则解析CompetitorDetector的竞争品牌识别机制竞争关系定义、相似度计算与过滤规则说明置信度评分体系评分算法、阈值与结果排序梳理异步处理与并发控制策略并提供错误处理、日志记录与性能监控建议以及与AI平台的集成接口与数据流转过程。

更新 新增搜索引擎适配器支持,改进错误处理机制,新增异步任务执行功能。

项目结构

后端采用FastAPI + SQLAlchemy异步ORM + APScheduler定时任务的分层架构

  • API层提供引用数据查询、统计与立即执行接口
  • 服务层:封装数据访问与聚合统计逻辑
  • 工作器层:引用检测引擎、平台适配器、调度器
  • 模型层:查询、任务、引用记录等持久化模型
  • 配置与入口应用生命周期管理、CORS、路由注册
graph TB
subgraph "API层"
API_C["API: 引用数据<br/>citations.py"]
API_Q["API: 查询管理<br/>queries.py"]
end
subgraph "服务层"
SVC["服务: 引用数据<br/>services/citation.py"]
SVC_Q["服务: 查询管理<br/>services/query.py"]
end
subgraph "工作器层"
ENG["引擎: 引用检测<br/>workers/citation_engine.py"]
SCH["调度器<br/>workers/scheduler.py"]
ADP_BASE["适配器基类<br/>workers/platforms/base.py"]
ADP_KIMI["适配器: Kimi<br/>workers/platforms/kimi.py"]
ADP_WENXIN["适配器: 文心一言<br/>workers/platforms/wenxin.py"]
ADP_TONGYI["适配器: 通义千问<br/>workers/platforms/tongyi.py"]
ADP_DOUBAO["适配器: 豆包<br/>workers/platforms/doubao.py"]
ADP_QINGYAN["适配器: 智谱清言<br/>workers/platforms/qingyan.py"]
ADP_TIANGONG["适配器: 天工<br/>workers/platforms/tiangong.py"]
ADP_XINGHUO["适配器: 讯飞星火<br/>workers/platforms/xinghuo.py"]
SEARCH_ENGINE["搜索引擎模块<br/>workers/platforms/search_engine.py"]
end
subgraph "模型层"
M_QUERY["模型: 查询<br/>models/query.py"]
M_TASK["模型: 任务<br/>models/query_task.py"]
M_CIT["模型: 引用记录<br/>models/citation_record.py"]
end
subgraph "配置与入口"
CFG["配置<br/>config.py"]
MAIN["入口<br/>main.py"]
end
API_C --> SVC
API_Q --> SVC_Q
SVC --> ENG
SVC_Q --> ENG
ENG --> ADP_KIMI
ENG --> ADP_WENXIN
ENG --> ADP_TONGYI
ENG --> ADP_DOUBAO
ENG --> ADP_QINGYAN
ENG --> ADP_TIANGONG
ENG --> ADP_XINGHUO
ENG --> SEARCH_ENGINE
SCH --> ENG
ENG --> M_CIT
SVC --> M_CIT
API_C --> M_CIT
ENG --> M_QUERY
ENG --> M_TASK
MAIN --> SCH
MAIN --> API_C
MAIN --> API_Q
CFG --> MAIN

图表来源

章节来源

核心组件

  • 引用检测引擎:负责跨平台查询、品牌匹配、竞争品牌识别与结果落库
  • 品牌匹配器BrandMatcher精确匹配、别名匹配、模糊匹配三阶段策略
  • 竞争品牌检测器CompetitorDetector基于预定义品牌类别集合进行竞争关系识别
  • 平台适配器Kimi、文心一言、通义千问、豆包、智谱清言、天工、讯飞星火适配器封装Playwright自动化查询流程
  • 搜索引擎模块提供DuckDuckGo和Wikipedia搜索功能作为备用查询源
  • 定时调度器基于APScheduler的周期性任务调度
  • 服务与API提供查询历史、统计与立即执行能力

更新 新增7个搜索引擎适配器改进错误处理机制新增异步任务执行功能。

章节来源

架构总览

引擎通过API触发或定时调度器自动触发调用平台适配器获取AI回复随后由BrandMatcher进行品牌识别CompetitorDetector识别竞争品牌最终将结果写入数据库并返回。

sequenceDiagram
participant Client as "客户端"
participant API as "API : 引用数据"
participant Svc as "服务 : 引用数据"
participant Eng as "引擎 : 引用检测"
participant Plat as "平台适配器"
participant Search as "搜索引擎模块"
participant DB as "数据库"
Client->>API : GET /api/v1/citations/stats
API->>Svc : 统计查询
Svc->>DB : 查询引用记录
DB-->>Svc : 结果集
Svc-->>API : 统计数据
API-->>Client : 响应
Client->>API : POST /api/v1/queries/{id}/run-now
API->>Svc : 触发立即查询
Svc-->>API : 任务ID
API-->>Client : 202 Accepted
Note over Eng,DB : 定时调度器每小时检查到期查询
Eng->>Plat : query(keyword)
Note over Plat,Search : 搜索引擎模式通过fetch_search_content获取真实内容
Plat->>Search : fetch_search_content(keyword)
Search-->>Plat : 搜索结果摘要
Plat-->>Eng : 原始回复文本
Eng->>Eng : BrandMatcher.match(text)
Eng->>Eng : CompetitorDetector.detect(text, target)
Eng->>DB : 写入CitationRecord
DB-->>Eng : 确认

图表来源

详细组件分析

BrandMatcher品牌匹配器

  • 设计要点
    • 三阶段匹配策略:精确匹配 > 别名匹配 > 模糊匹配
    • 上下文提取:定位首次出现段落位置与截取片段
    • 置信度评分:精确=1.0,别名=0.9,模糊=基于相似度四舍五入
  • 实现细节
    • 精确匹配:直接包含判断
    • 别名匹配:遍历别名列表
    • 模糊匹配对候选词按非文字字符拆分且长度≥2使用序列相似度比较阈值>0.4
    • 候选词提取:支持中文与英文混合场景
    • 上下文截取按换行切分段落取首个命中段落前200字符
  • 复杂度分析
    • 候选词提取O(n)n为文本长度
    • 模糊匹配O(m*k)m为候选词数量k为别名/目标词数量
    • 总体近似线性,受文本长度与候选词规模影响
flowchart TD
Start(["进入 match(text)"]) --> Empty{"text为空"}
Empty --> |是| RetEmpty["返回未命中"]
Empty --> |否| Exact["精确匹配 target_brand"]
Exact --> |命中| PosCtx["提取段落位置与上下文"] --> RetExact["返回精确匹配结果"]
Exact --> |未命中| Alias["遍历别名进行匹配"]
Alias --> |命中| PosCtx2["提取段落位置与上下文"] --> RetAlias["返回别名匹配结果"]
Alias --> |未命中| Fuzzy["提取候选词并计算相似度"]
Fuzzy --> Best{"最佳相似度>0.4"}
Best --> |是| PosCtx3["提取段落位置与上下文"] --> RetFuzzy["返回模糊匹配结果"]
Best --> |否| RetEmpty

图表来源

章节来源

CompetitorDetector竞争品牌检测器

  • 设计要点
    • 基于预定义品牌类别集合(如保险、金融、科技)
    • 排除目标品牌,返回去重后的竞争品牌列表
  • 实现细节
    • 遍历类别与品牌,进行包含判断
    • 使用集合去重,最终排序输出
  • 复杂度分析
    • O(C*B)C为类别数B为类别内品牌数
flowchart TD
Start2(["进入 detect(text, target)"]) --> Empty2{"text为空"}
Empty2 --> |是| RetEmpty2["返回空列表"]
Empty2 --> |否| LoopCat["遍历已知品牌类别"]
LoopCat --> LoopBrand["遍历类别内品牌"]
LoopBrand --> Filter{"品牌==target"}
Filter --> |是| NextBrand["跳过"]
Filter --> |否| Contain{"品牌出现在text中"}
Contain --> |是| Add["加入集合"]
Contain --> |否| NextBrand
NextBrand --> LoopBrand
LoopBrand --> Done["排序并返回列表"]

图表来源

章节来源

CitationEngine引用检测引擎

  • 职责
    • 组织跨平台查询与检测流程
    • 维护查询任务状态与时间字段
    • 将结果持久化为引用记录
    • 实施字符清理机制,防止数据库插入失败
  • 关键流程
    • 初始化平台适配器与匹配器
    • 遍历平台列表,执行单平台查询与检测
    • 写入CitationRecord更新Query时间字段
    • 异常处理:记录失败状态与错误信息
  • 并发与异步
    • 引擎方法为异步,平台适配器亦为异步
    • 平台间顺序执行(当前实现),可扩展为并行执行以提升吞吐
  • 字符清理
    • 新增_sanitize_raw_response函数,移除无效控制字符
    • 防止PostgreSQL UTF-8插入失败
sequenceDiagram
participant Q as "Query"
participant E as "CitationEngine"
participant P as "平台适配器"
participant M as "BrandMatcher"
participant C as "CompetitorDetector"
participant R as "CitationRecord"
E->>E : 创建BrandMatcher(target, aliases)
loop 遍历平台
E->>P : query(keyword)
P-->>E : 原始回复文本
E->>E : _sanitize_raw_response(raw_response)
E->>M : match(sanitized_text)
M-->>E : 匹配结果
E->>C : detect(sanitized_text, target)
C-->>E : 竞争品牌
E->>R : 写入记录
end
E->>Q : 更新last_queried_at/next_query_at

图表来源

章节来源

平台适配器Kimi、文心一言与新增搜索引擎适配器

  • 抽象基类
    • BasePlatformAdapter定义平台名称、URL与抽象query/close方法
  • 具体实现
    • 传统适配器Kimi、文心一言启动Playwright与无头浏览器导航至平台URL
    • 搜索引擎适配器通义千问、豆包、智谱清言、天工、讯飞星火通过fetch_search_content获取真实内容
    • 自动查找输入框、填充关键词、提交查询(回车或点击发送)
    • 等待回复稳定(连续多次检测文本不变),返回最新回复
    • 失败重试与指数退避,超时警告
    • 资源清理关闭page/context/browser与playwright实例
  • 错误处理
    • 无法找到输入框、页面超时、最终失败抛出运行时异常
    • 日志记录每次重试与最终失败原因
    • 指数退避重试机制最多3次尝试
classDiagram
class BasePlatformAdapter {
+string platform_name
+string platform_url
+query(keyword) str
+close() void
}
class KimiAdapter {
+platform_name = "kimi"
+platform_url = "https : //kimi.moonshot.cn"
+query(keyword) str
+close() void
}
class WenxinAdapter {
+platform_name = "wenxin"
+platform_url = "https : //yiyan.baidu.com"
+query(keyword) str
+close() void
}
class TongyiAdapter {
+platform_name = "tongyi"
+platform_url = "https : //tongyi.aliyun.com/qianwen"
+query(keyword) str
+close() void
}
class DoubaoAdapter {
+platform_name = "doubao"
+platform_url = "https : //www.doubao.com/"
+query(keyword) str
+close() void
}
class QingyanAdapter {
+platform_name = "qingyan"
+platform_url = "https : //chatglm.cn/"
+query(keyword) str
+close() void
}
class TiangongAdapter {
+platform_name = "tiangong"
+platform_url = "https : //www.tiangong.cn/"
+query(keyword) str
+close() void
}
class XinghuoAdapter {
+platform_name = "xinghuo"
+platform_url = "https : //xinghuo.xfyun.cn/"
+query(keyword) str
+close() void
}
KimiAdapter --|> BasePlatformAdapter
WenxinAdapter --|> BasePlatformAdapter
TongyiAdapter --|> BasePlatformAdapter
DoubaoAdapter --|> BasePlatformAdapter
QingyanAdapter --|> BasePlatformAdapter
TiangongAdapter --|> BasePlatformAdapter
XinghuoAdapter --|> BasePlatformAdapter

图表来源

章节来源

搜索引擎模块fetch_search_content

  • 功能
    • 提供统一的搜索引擎接口支持DuckDuckGo和Wikipedia搜索
    • 作为备用查询源,当平台适配器无法正常工作时使用
  • 实现细节
    • DuckDuckGo HTML搜索无需API Key自动回退到Wikipedia
    • Wikipedia API搜索稳定可靠返回百科内容摘要
    • HTML内容清理去除标签和实体保留可读文本
    • 字符串清理:移除引用标记和多余空白
  • 错误处理
    • 搜索失败时自动回退到Wikipedia
    • 所有搜索源均失败时抛出运行时异常

章节来源

定时调度器QueryScheduler

  • 职责
    • 每小时扫描到期查询status=active且next_query_at<=now
    • 逐条执行CitationEngine.execute_query
    • 应用生命周期内启动与关闭
    • 兜底检查每分钟检查遗留的pending任务
  • 并发控制
    • 使用AsyncIOScheduler与事件循环
    • 检查与执行过程均为异步,避免阻塞
flowchart TD
StartS(["启动调度器"]) --> AddJob["添加每小时任务"]
AddJob --> AddJob2["添加每分钟任务"]
AddJob2 --> Loop["每小时执行检查"]
Loop --> Select["查询到期的Query"]
Select --> Exec["逐条执行execute_query"]
Exec --> Loop
Loop --> PendingCheck["每分钟检查遗留任务"]
PendingCheck --> ExecPending["执行遗留任务"]
ExecPending --> Loop

图表来源

章节来源

服务与API引用数据与统计

  • API
    • 列表查询支持按query_id、平台、时间范围筛选分页
    • 统计接口:总查询次数、引用次数、引用率、平均位置、按平台统计、趋势
    • 立即执行将查询任务加入队列返回任务ID
  • 服务
    • 权限校验:仅允许用户访问自己的查询数据
    • 统计聚合使用SQL聚合函数计算各项指标
    • CSV导出按查询导出引用记录CSV
    • 异步任务执行使用asyncio.create_task后台执行查询
sequenceDiagram
participant Client as "客户端"
participant API as "API : 引用数据"
participant Svc as "服务 : 引用数据"
participant DB as "数据库"
Client->>API : GET /api/v1/citations/?query_id=...
API->>Svc : get_citations(...)
Svc->>DB : 查询引用记录(带权限校验)
DB-->>Svc : 记录列表与总数
Svc-->>API : 返回数据
API-->>Client : 响应
Client->>API : GET /api/v1/citations/stats?query_id=...
API->>Svc : get_citation_stats(...)
Svc->>DB : 聚合统计
DB-->>Svc : 统计结果
Svc-->>API : 返回统计
API-->>Client : 响应
Client->>API : POST /api/v1/queries/{id}/run-now
API->>Svc : trigger_query_now
Svc->>DB : 创建QueryTask
Svc->>Svc : asyncio.create_task执行查询
Svc-->>API : 202 Accepted + task_id
API-->>Client : 响应

图表来源

章节来源

数据模型:查询、任务与引用记录

  • Query查询词、目标品牌、别名、平台、频率、状态与时间字段
  • QueryTask查询任务记录平台、状态、错误信息与时间戳
  • CitationRecord引用记录包含是否引用、引用位置、引用文本、竞争品牌、原始响应与查询时间
erDiagram
QUERIES {
uuid id PK
uuid user_id FK
string keyword
string target_brand
json brand_aliases
json platforms
string frequency
string status
timestamp last_queried_at
timestamp next_query_at
}
QUERY_TASKS {
uuid id PK
uuid query_id FK
string platform
string status
text error_message
timestamp scheduled_at
timestamp started_at
timestamp completed_at
}
CITATION_RECORDS {
uuid id PK
uuid query_id FK
string platform
boolean cited
int citation_position
text citation_text
json competitor_brands
text raw_response
timestamp queried_at
}
QUERIES ||--o{ QUERY_TASKS : "包含"
QUERIES ||--o{ CITATION_RECORDS : "包含"

图表来源

章节来源

依赖分析

  • 外部依赖
    • Web框架FastAPI、Uvicorn
    • ORM与迁移SQLAlchemy、Alembic
    • 任务调度APScheduler
    • 浏览器自动化Playwright
    • HTTP客户端httpx
    • 缓存Redis配置项存在
    • 配置pydantic-settings
  • 内部模块耦合
    • API依赖服务层服务层依赖模型与引擎引擎依赖适配器与模型
    • 调度器独立于API通过引擎执行查询
    • 适配器与引擎解耦,便于扩展新平台
graph LR
FastAPI --> API
API --> Service
Service --> Engine
Engine --> AdapterBase
Engine --> Models
Engine --> SearchEngine
Scheduler --> Engine
AdapterKimi --> AdapterBase
AdapterWenxin --> AdapterBase
AdapterTongyi --> AdapterBase
AdapterDoubao --> AdapterBase
AdapterQingyan --> AdapterBase
AdapterTiangong --> AdapterBase
AdapterXinghuo --> AdapterBase
SearchEngine --> Engine
Config --> Main
Main --> Scheduler
Main --> API

图表来源

章节来源

性能考虑

  • 品牌匹配
    • 候选词提取与模糊匹配复杂度与文本长度、候选词数量相关,建议对长文本分段处理或限制最大长度
    • 模糊匹配阈值可按业务需求调整,平衡召回与精度
  • 平台适配器
    • Playwright初始化成本较高适配器内部维护浏览器实例避免重复启动
    • 等待回复稳定的轮询间隔与超时可按平台特性微调
    • 新增 搜索引擎适配器避免了浏览器自动化成本,提升了整体性能
  • 引擎与调度
    • 当前平台遍历为串行,可扩展为并发执行以提升吞吐(注意平台限流与稳定性)
    • 定时任务每小时检查一次,可根据业务需要调整频率
    • 新增 异步任务执行机制使用asyncio.create_task提升响应速度
  • 数据库
    • 引用记录、查询与任务均建立索引,统计查询使用聚合函数,建议定期分析与更新统计信息
    • 新增 字符清理机制,避免无效字符导致的数据库插入失败

故障排查指南

章节来源

结论

该引擎以清晰的分层设计实现了从AI平台抓取、品牌识别、竞争品牌检测到结果落库与统计展示的完整链路。BrandMatcher与CompetitorDetector提供了稳健的文本处理与识别能力平台适配器封装了复杂的浏览器自动化流程搜索引擎适配器提供了更稳定的替代方案调度器保障了周期性任务的可靠执行。新增的异步任务执行机制和字符清理机制进一步提升了系统的稳定性和性能。建议后续在并发执行、阈值调优与平台扩展方面持续优化。

附录

算法优化建议

  • 品牌匹配
    • 引入编辑距离或音近相似度算法,进一步提升模糊匹配鲁棒性
    • 对候选词进行词干化或拼音首字母预处理,减少误判
  • 并发与限流
    • 平台查询改为并发执行,并引入令牌桶限流防止被封禁
    • 新增 搜索引擎适配器天然支持并发,可优先考虑使用
  • 缓存与降噪
    • 对热点关键词与平台响应进行缓存,降低重复请求
    • 上下文截取长度可动态调整,兼顾性能与准确性
    • 新增 字符清理机制可进一步优化,减少无效字符传输

自定义扩展指南

与AI平台的集成接口与数据流转

  • 接口职责
    • 平台适配器统一query接口屏蔽平台差异
    • 引擎:组织流程、落库、异常处理
    • API/服务:对外提供查询与统计能力
    • 新增 搜索引擎模块:提供备用查询源
  • 数据流
    • 用户发起查询 → API/服务 → 引擎 → 平台适配器 → AI平台/搜索引擎 → 引擎 → 数据库 → API/服务 → 前端展示
    • 新增 搜索引擎模式通过fetch_search_content获取真实内容
  • 监控与日志

章节来源