20 KiB
20 KiB
查询执行流程
**本文档引用的文件** - [backend/app/workers/scheduler.py](file://backend/app/workers/scheduler.py) - [backend/app/workers/citation_engine.py](file://backend/app/workers/citation_engine.py) - [backend/app/workers/platforms/kimi.py](file://backend/app/workers/platforms/kimi.py) - [backend/app/workers/platforms/wenxin.py](file://backend/app/workers/platforms/wenxin.py) - [backend/app/models/query.py](file://backend/app/models/query.py) - [backend/app/models/query_task.py](file://backend/app/models/query_task.py) - [backend/app/models/citation_record.py](file://backend/app/models/citation_record.py) - [backend/app/services/citation.py](file://backend/app/services/citation.py) - [backend/app/api/citations.py](file://backend/app/api/citations.py) - [backend/app/main.py](file://backend/app/main.py) - [backend/app/database.py](file://backend/app/database.py) - [backend/app/config.py](file://backend/app/config.py) - [tests/test_queries.py](file://tests/test_queries.py)目录
简介
本文件系统性梳理“查询执行流程”的完整生命周期,覆盖从任务检查、数据库事务处理、到异常处理与状态更新的全过程。重点解析以下内容:
- 定时调度器如何筛选到期查询并触发执行
- CitationEngine 的单查询执行过程,包括平台适配器调用、品牌匹配、竞争品牌检测与结果记录
- 数据模型之间的状态流转与事务边界
- 错误隔离与恢复策略
- 性能监控指标建议与调试技巧
项目结构
后端采用分层架构:
- API 层:FastAPI 路由与依赖注入
- 服务层:业务逻辑封装(查询 CRUD、引用统计、立即执行)
- 工作器层:调度器与引用检测引擎,平台适配器
- 模型层:SQLAlchemy ORM 映射
- 配置与数据库:连接池与环境变量
graph TB
subgraph "API 层"
API_Q["queries.py<br/>查询接口"]
API_C["citations.py<br/>引用接口"]
end
subgraph "服务层"
Svc_Query["services/query.py<br/>查询 CRUD"]
Svc_Citation["services/citation.py<br/>引用统计/立即执行"]
end
subgraph "工作器层"
Sch["workers/scheduler.py<br/>定时调度器"]
Eng["workers/citation_engine.py<br/>引用检测引擎"]
Plat_K["workers/platforms/kimi.py<br/>Kimi 适配器"]
Plat_W["workers/platforms/wenxin.py<br/>文心一言适配器"]
end
subgraph "模型层"
M_Query["models/query.py"]
M_Task["models/query_task.py"]
M_Record["models/citation_record.py"]
end
subgraph "基础设施"
DB["database.py<br/>AsyncSessionLocal"]
CFG["config.py<br/>Settings"]
APP["main.py<br/>lifespan 启停"]
end
API_Q --> Svc_Query
API_C --> Svc_Citation
Svc_Citation --> Sch
Sch --> Eng
Eng --> M_Query
Eng --> M_Task
Eng --> M_Record
Eng --> Plat_K
Eng --> Plat_W
DB -.-> M_Query
DB -.-> M_Task
DB -.-> M_Record
APP --> Sch
CFG --> DB
图表来源
- backend/app/main.py:13-22
- backend/app/workers/scheduler.py:25-95
- backend/app/workers/citation_engine.py:148-309
- backend/app/workers/platforms/kimi.py:11-206
- backend/app/workers/platforms/wenxin.py:11-205
- backend/app/models/query.py:11-55
- backend/app/models/query_task.py:11-39
- backend/app/models/citation_record.py:11-42
- backend/app/database.py:1-29
- backend/app/config.py:1-17
章节来源
核心组件
- 定时调度器:每小时扫描到期查询,逐条执行
- 引用检测引擎:负责品牌匹配、竞争品牌检测、平台适配器调用与记录写入
- 平台适配器:Kimi 与文心一言,基于 Playwright 的网页自动化
- 数据模型:Query、QueryTask、CitationRecord,支撑状态与结果持久化
- 服务与 API:查询 CRUD、引用统计、立即执行接口
章节来源
- backend/app/workers/scheduler.py:25-95
- backend/app/workers/citation_engine.py:148-309
- backend/app/workers/platforms/kimi.py:11-206
- backend/app/workers/platforms/wenxin.py:11-205
- backend/app/models/query.py:11-55
- backend/app/models/query_task.py:11-39
- backend/app/models/citation_record.py:11-42
架构总览
查询执行的总体时序如下:
sequenceDiagram
participant Timer as "调度器"
participant DB as "数据库会话"
participant Engine as "引用检测引擎"
participant Task as "QueryTask"
participant Record as "CitationRecord"
participant Platform as "平台适配器"
Timer->>DB : 查询状态=active 且 next_query_at<=now()
Timer->>Engine : execute_query(query, db)
Engine->>Task : 获取或创建任务记录
Engine->>Task : 状态=running,写入started_at
Engine->>Platform : execute_single_platform(keyword, platform,...)
Platform-->>Engine : 原始回复文本
Engine->>Engine : 品牌匹配/竞争品牌检测
Engine->>Record : 写入引用记录
Engine->>Task : 状态=success,写入completed_at
Engine->>DB : 更新Query.next_query_at
Engine-->>Timer : 返回记录列表
图表来源
- backend/app/workers/scheduler.py:51-84
- backend/app/workers/citation_engine.py:159-234
- backend/app/models/query_task.py:11-39
- backend/app/models/citation_record.py:11-42
- backend/app/workers/platforms/kimi.py:33-48
- backend/app/workers/platforms/wenxin.py:33-48
详细组件分析
定时调度器与任务检查
- 触发周期:每小时一次
- 条件筛选:查询状态为 active 且 next_query_at 小于等于当前 UTC 时间
- 批量执行策略:逐条执行,单条失败不影响后续
- 事件循环兼容:若无运行中事件循环则新建事件循环执行
flowchart TD
Start(["定时触发"]) --> BuildStmt["构建查询语句<br/>status='active' 且 next_query_at <= now()"]
BuildStmt --> Fetch["查询结果集"]
Fetch --> HasItems{"是否有待执行项?"}
HasItems -- 否 --> End(["结束"])
HasItems -- 是 --> Loop["逐条执行 _execute_single_query"]
Loop --> NextItem["下一项"]
NextItem --> Loop
Loop --> End
图表来源
章节来源
check_and_execute_queries 方法详解
- 查询状态检查:仅处理 active 且到期的查询
- 批量执行策略:串行逐条执行,异常被捕获并记录,避免中断整体流程
- 错误隔离机制:单条查询异常不影响其他查询;记录错误信息到 QueryTask,并生成一条 cited=False 的占位记录
flowchart TD
Enter(["进入 check_and_execute_queries"]) --> AcquireDB["获取数据库会话"]
AcquireDB --> BuildQuery["构建查询:active 且到期"]
BuildQuery --> ExecQuery["执行查询并获取结果"]
ExecQuery --> Count{"结果数量>0 ?"}
Count -- 否 --> Exit(["退出"])
Count -- 是 --> ForEach["遍历每个查询"]
ForEach --> TryExec["try: _execute_single_query"]
TryExec --> OnErr["except: 记录错误并 continue"]
OnErr --> NextQ["下一个查询"]
TryExec --> NextQ
NextQ --> Done{"全部处理完?"}
Done -- 否 --> ForEach
Done -- 是 --> Exit
图表来源
章节来源
单个查询执行流程(CitationEngine)
- 初始化:创建 BrandMatcher,准备平台映射
- 任务管理:为每个平台获取或创建 QueryTask,状态切换至 running
- 平台执行:调用 execute_single_platform,内部通过适配器查询平台并返回原始回复
- 结果处理:品牌匹配与竞争品牌检测,构造 CitationRecord
- 状态更新:成功则状态切换为 success,失败则状态切换为 failed,并写入错误信息
- 查询更新:更新 Query 的 last_queried_at 与 next_query_at
sequenceDiagram
participant Sch as "调度器"
participant Eng as "CitationEngine"
participant DB as "数据库"
participant Task as "QueryTask"
participant Plat as "平台适配器"
participant Rec as "CitationRecord"
Sch->>Eng : execute_query(query, db)
Eng->>DB : 查询/创建 QueryTask
Eng->>Task : 设置状态=running
Eng->>Plat : execute_single_platform(keyword, platform,...)
Plat-->>Eng : 原始回复
Eng->>Eng : 品牌匹配/竞争品牌检测
Eng->>Rec : 创建并写入记录
Eng->>Task : 设置状态=success 或 failed
Eng->>DB : 更新 Query.next_query_at
Eng-->>Sch : 返回记录列表
图表来源
- backend/app/workers/citation_engine.py:159-234
- backend/app/models/query_task.py:11-39
- backend/app/models/citation_record.py:11-42
章节来源
平台适配器(Kimi / 文心一言)
- 自动化流程:确保浏览器启动 → 新建上下文 → 导航到平台 → 定位输入框 → 填充关键词 → 提交 → 等待回复稳定
- 稳定性保障:等待回复文本连续 N 次一致才视为稳定,超时返回当前文本
- 重试机制:最多 3 次尝试,指数退避
- 资源管理:统一关闭页面与上下文,异常时也进行清理
flowchart TD
Start(["开始 query"]) --> Ensure["确保浏览器启动"]
Ensure --> NewCtx["新建上下文/页面"]
NewCtx --> Navigate["导航到平台URL"]
Navigate --> Locate["定位输入框多选择器"]
Locate --> Fill["填充关键词"]
Fill --> Submit["提交按钮或回车"]
Submit --> WaitStable["等待回复稳定多次检测"]
WaitStable --> Return["返回原始回复文本"]
Ensure --> |失败| Raise["抛出异常"]
Navigate --> |失败| Raise
Locate --> |失败| Raise
WaitStable --> |超时| Return
图表来源
章节来源
数据模型与状态转换
- Query:查询主表,包含关键词、目标品牌、平台列表、频率、状态与时间戳
- QueryTask:按平台拆分的任务,记录状态、错误信息与时间点
- CitationRecord:每次查询的结果记录,包含是否引用、位置、文本、竞争品牌与原始回复
- 状态机(QueryTask):pending → running → success 或 failed
stateDiagram-v2
[*] --> pending
pending --> running : "开始执行"
running --> success : "平台返回成功"
running --> failed : "平台异常/超时"
success --> [*]
failed --> [*]
图表来源
章节来源
- backend/app/models/query.py:11-55
- backend/app/models/query_task.py:11-39
- backend/app/models/citation_record.py:11-42
API 与服务集成
- 立即执行接口:/api/v1/queries/{query_id}/run-now,校验所有权与状态,为每个平台创建 pending 任务
- 引用统计接口:支持按查询、平台、时间范围过滤,计算引用率、平均位置、按平台统计与趋势
- 查询 CRUD 接口:创建时根据频率计算 next_query_at,更新时可重新计算
sequenceDiagram
participant Client as "客户端"
participant API as "citations.py"
participant Svc as "services.citation"
participant DB as "数据库"
participant Task as "QueryTask"
Client->>API : POST /queries/{id}/run-now
API->>Svc : trigger_query_now(db, user_id, query_id)
Svc->>DB : 校验查询归属与状态
Svc->>Task : 为每个平台创建 pending 任务
Svc-->>API : 返回首个任务
API-->>Client : 202 + 任务信息
图表来源
章节来源
依赖分析
- 组件耦合
- 调度器依赖 CitationEngine 与数据库会话
- 引用检测引擎依赖平台适配器与数据模型
- 平台适配器依赖 Playwright,受环境变量控制
- 外部依赖
- 数据库:PostgreSQL(异步驱动)
- 调度:APScheduler(异步调度器)
- 浏览器:Playwright(Chromium)
graph LR
Sch["调度器"] --> Eng["引用检测引擎"]
Eng --> K["Kimi 适配器"]
Eng --> W["文心一言适配器"]
Eng --> DB["数据库会话"]
DB --> Q["Query"]
DB --> T["QueryTask"]
DB --> R["CitationRecord"]
APP["应用生命周期"] --> Sch
CFG["配置"] --> DB
CFG --> K
CFG --> W
图表来源
- backend/app/workers/scheduler.py:25-95
- backend/app/workers/citation_engine.py:148-309
- backend/app/workers/platforms/kimi.py:11-206
- backend/app/workers/platforms/wenxin.py:11-205
- backend/app/database.py:1-29
- backend/app/config.py:1-17
- backend/app/main.py:13-22
章节来源
性能考虑
- 批量策略
- 调度器按小时扫描,逐条执行,避免一次性大量并发请求
- 平台适配器内置重试与指数退避,降低瞬时失败影响
- 数据库事务
- 每个平台执行前后均进行 commit,保证状态一致性
- 使用索引优化查询:queries(status, next_query_at)、query_tasks(status)
- 资源管理
- Playwright 上下文与页面在 finally 中关闭,防止资源泄漏
- 引擎关闭时逐个适配器关闭,避免阻塞
- 监控建议
- 指标:每小时到期查询数、成功/失败率、平均响应时间、平台成功率
- 日志:调度器扫描日志、平台适配器重试与超时告警
- 健康检查:/health 接口与数据库连接池状态
[本节为通用性能指导,无需特定文件来源]
故障排查指南
- 调度器未执行
- 检查应用生命周期是否正确启动与关闭调度器
- 确认时区与 UTC 时间比较逻辑
- 查询未被执行
- 核对查询状态与 next_query_at 是否满足条件
- 检查数据库索引与查询语句
- 平台适配器失败
- Playwright 未安装:根据日志提示安装 Chromium
- 页面选择器失效:适配器内存在多选择器回退策略
- 超时:等待回复稳定机制会返回当前文本,属预期行为
- 引用记录缺失
- 确认异常分支是否生成 cited=False 的占位记录
- 检查 QueryTask 状态是否被正确更新
- 立即执行无效
- 校验查询归属与状态,确认平台列表非空
章节来源
- backend/app/main.py:13-22
- backend/app/workers/scheduler.py:51-84
- backend/app/workers/platforms/kimi.py:21-48
- backend/app/workers/platforms/wenxin.py:21-48
- backend/app/services/citation.py:204-234
结论
该系统通过“定时调度 + 平台适配器 + 引用检测引擎”的组合,实现了高可靠、可扩展的查询执行链路。其关键优势在于:
- 明确的状态机与事务边界,确保数据一致性
- 平台适配器的稳定性与容错设计,提升整体鲁棒性
- 清晰的错误隔离与日志输出,便于问题定位与恢复
[本节为总结性内容,无需特定文件来源]
附录
关键流程时序图(端到端)
sequenceDiagram
participant User as "用户"
participant API as "API"
participant Sch as "调度器"
participant Eng as "引擎"
participant Plat as "平台"
participant DB as "数据库"
User->>API : 触发/等待查询
Sch->>DB : 查询到期的 active 查询
Sch->>Eng : 执行查询
Eng->>DB : 创建/更新 QueryTask
Eng->>Plat : 平台查询
Plat-->>Eng : 返回回复
Eng->>DB : 写入 CitationRecord
Eng->>DB : 更新 Query.next_query_at
API-->>User : 返回结果/状态
图表来源
- backend/app/workers/scheduler.py:51-84
- backend/app/workers/citation_engine.py:159-234
- backend/app/models/query_task.py:11-39
- backend/app/models/citation_record.py:11-42
数据模型 ER 图
erDiagram
QUERIES {
uuid id PK
uuid user_id FK
string keyword
string target_brand
jsonb brand_aliases
jsonb platforms
string frequency
string status
timestamp last_queried_at
timestamp next_query_at
timestamp created_at
timestamp updated_at
}
QUERY_TASKS {
uuid id PK
uuid query_id FK
string platform
string status
text error_message
timestamp scheduled_at
timestamp started_at
timestamp completed_at
}
CITATION_RECORDS {
uuid id PK
uuid query_id FK
string platform
boolean cited
integer citation_position
text citation_text
jsonb competitor_brands
text raw_response
timestamp queried_at
}
QUERIES ||--o{ QUERY_TASKS : "拥有"
QUERIES ||--o{ CITATION_RECORDS : "拥有"
图表来源
- backend/app/models/query.py:11-55
- backend/app/models/query_task.py:11-39
- backend/app/models/citation_record.py:11-42
测试参考
- 查询创建与权限限制:参考测试用例对权限错误的断言
- 查询列表与更新:验证分页与字段更新逻辑
- 查询删除与不存在场景:验证 404 行为
章节来源