geo/.qoder/repowiki/zh/content/任务调度系统/查询执行流程.md

20 KiB
Raw Permalink Blame History

查询执行流程

**本文档引用的文件** - [backend/app/workers/scheduler.py](file://backend/app/workers/scheduler.py) - [backend/app/workers/citation_engine.py](file://backend/app/workers/citation_engine.py) - [backend/app/workers/platforms/kimi.py](file://backend/app/workers/platforms/kimi.py) - [backend/app/workers/platforms/wenxin.py](file://backend/app/workers/platforms/wenxin.py) - [backend/app/models/query.py](file://backend/app/models/query.py) - [backend/app/models/query_task.py](file://backend/app/models/query_task.py) - [backend/app/models/citation_record.py](file://backend/app/models/citation_record.py) - [backend/app/services/citation.py](file://backend/app/services/citation.py) - [backend/app/api/citations.py](file://backend/app/api/citations.py) - [backend/app/main.py](file://backend/app/main.py) - [backend/app/database.py](file://backend/app/database.py) - [backend/app/config.py](file://backend/app/config.py) - [tests/test_queries.py](file://tests/test_queries.py)

目录

  1. 简介
  2. 项目结构
  3. 核心组件
  4. 架构总览
  5. 详细组件分析
  6. 依赖分析
  7. 性能考虑
  8. 故障排查指南
  9. 结论
  10. 附录

简介

本文件系统性梳理“查询执行流程”的完整生命周期,覆盖从任务检查、数据库事务处理、到异常处理与状态更新的全过程。重点解析以下内容:

  • 定时调度器如何筛选到期查询并触发执行
  • CitationEngine 的单查询执行过程,包括平台适配器调用、品牌匹配、竞争品牌检测与结果记录
  • 数据模型之间的状态流转与事务边界
  • 错误隔离与恢复策略
  • 性能监控指标建议与调试技巧

项目结构

后端采用分层架构:

  • API 层FastAPI 路由与依赖注入
  • 服务层:业务逻辑封装(查询 CRUD、引用统计、立即执行
  • 工作器层:调度器与引用检测引擎,平台适配器
  • 模型层SQLAlchemy ORM 映射
  • 配置与数据库:连接池与环境变量
graph TB
subgraph "API 层"
API_Q["queries.py<br/>查询接口"]
API_C["citations.py<br/>引用接口"]
end
subgraph "服务层"
Svc_Query["services/query.py<br/>查询 CRUD"]
Svc_Citation["services/citation.py<br/>引用统计/立即执行"]
end
subgraph "工作器层"
Sch["workers/scheduler.py<br/>定时调度器"]
Eng["workers/citation_engine.py<br/>引用检测引擎"]
Plat_K["workers/platforms/kimi.py<br/>Kimi 适配器"]
Plat_W["workers/platforms/wenxin.py<br/>文心一言适配器"]
end
subgraph "模型层"
M_Query["models/query.py"]
M_Task["models/query_task.py"]
M_Record["models/citation_record.py"]
end
subgraph "基础设施"
DB["database.py<br/>AsyncSessionLocal"]
CFG["config.py<br/>Settings"]
APP["main.py<br/>lifespan 启停"]
end
API_Q --> Svc_Query
API_C --> Svc_Citation
Svc_Citation --> Sch
Sch --> Eng
Eng --> M_Query
Eng --> M_Task
Eng --> M_Record
Eng --> Plat_K
Eng --> Plat_W
DB -.-> M_Query
DB -.-> M_Task
DB -.-> M_Record
APP --> Sch
CFG --> DB

图表来源

章节来源

核心组件

  • 定时调度器:每小时扫描到期查询,逐条执行
  • 引用检测引擎:负责品牌匹配、竞争品牌检测、平台适配器调用与记录写入
  • 平台适配器Kimi 与文心一言,基于 Playwright 的网页自动化
  • 数据模型Query、QueryTask、CitationRecord支撑状态与结果持久化
  • 服务与 API查询 CRUD、引用统计、立即执行接口

章节来源

架构总览

查询执行的总体时序如下:

sequenceDiagram
participant Timer as "调度器"
participant DB as "数据库会话"
participant Engine as "引用检测引擎"
participant Task as "QueryTask"
participant Record as "CitationRecord"
participant Platform as "平台适配器"
Timer->>DB : 查询状态=active 且 next_query_at<=now()
Timer->>Engine : execute_query(query, db)
Engine->>Task : 获取或创建任务记录
Engine->>Task : 状态=running写入started_at
Engine->>Platform : execute_single_platform(keyword, platform,...)
Platform-->>Engine : 原始回复文本
Engine->>Engine : 品牌匹配/竞争品牌检测
Engine->>Record : 写入引用记录
Engine->>Task : 状态=success写入completed_at
Engine->>DB : 更新Query.next_query_at
Engine-->>Timer : 返回记录列表

图表来源

详细组件分析

定时调度器与任务检查

  • 触发周期:每小时一次
  • 条件筛选:查询状态为 active 且 next_query_at 小于等于当前 UTC 时间
  • 批量执行策略:逐条执行,单条失败不影响后续
  • 事件循环兼容:若无运行中事件循环则新建事件循环执行
flowchart TD
Start(["定时触发"]) --> BuildStmt["构建查询语句<br/>status='active' 且 next_query_at <= now()"]
BuildStmt --> Fetch["查询结果集"]
Fetch --> HasItems{"是否有待执行项?"}
HasItems -- 否 --> End(["结束"])
HasItems -- 是 --> Loop["逐条执行 _execute_single_query"]
Loop --> NextItem["下一项"]
NextItem --> Loop
Loop --> End

图表来源

章节来源

check_and_execute_queries 方法详解

  • 查询状态检查:仅处理 active 且到期的查询
  • 批量执行策略:串行逐条执行,异常被捕获并记录,避免中断整体流程
  • 错误隔离机制:单条查询异常不影响其他查询;记录错误信息到 QueryTask并生成一条 cited=False 的占位记录
flowchart TD
Enter(["进入 check_and_execute_queries"]) --> AcquireDB["获取数据库会话"]
AcquireDB --> BuildQuery["构建查询active 且到期"]
BuildQuery --> ExecQuery["执行查询并获取结果"]
ExecQuery --> Count{"结果数量>0 ?"}
Count -- 否 --> Exit(["退出"])
Count -- 是 --> ForEach["遍历每个查询"]
ForEach --> TryExec["try: _execute_single_query"]
TryExec --> OnErr["except: 记录错误并 continue"]
OnErr --> NextQ["下一个查询"]
TryExec --> NextQ
NextQ --> Done{"全部处理完?"}
Done -- 否 --> ForEach
Done -- 是 --> Exit

图表来源

章节来源

单个查询执行流程CitationEngine

  • 初始化:创建 BrandMatcher准备平台映射
  • 任务管理:为每个平台获取或创建 QueryTask状态切换至 running
  • 平台执行:调用 execute_single_platform内部通过适配器查询平台并返回原始回复
  • 结果处理:品牌匹配与竞争品牌检测,构造 CitationRecord
  • 状态更新:成功则状态切换为 success失败则状态切换为 failed并写入错误信息
  • 查询更新:更新 Query 的 last_queried_at 与 next_query_at
sequenceDiagram
participant Sch as "调度器"
participant Eng as "CitationEngine"
participant DB as "数据库"
participant Task as "QueryTask"
participant Plat as "平台适配器"
participant Rec as "CitationRecord"
Sch->>Eng : execute_query(query, db)
Eng->>DB : 查询/创建 QueryTask
Eng->>Task : 设置状态=running
Eng->>Plat : execute_single_platform(keyword, platform,...)
Plat-->>Eng : 原始回复
Eng->>Eng : 品牌匹配/竞争品牌检测
Eng->>Rec : 创建并写入记录
Eng->>Task : 设置状态=success 或 failed
Eng->>DB : 更新 Query.next_query_at
Eng-->>Sch : 返回记录列表

图表来源

章节来源

平台适配器Kimi / 文心一言)

  • 自动化流程:确保浏览器启动 → 新建上下文 → 导航到平台 → 定位输入框 → 填充关键词 → 提交 → 等待回复稳定
  • 稳定性保障:等待回复文本连续 N 次一致才视为稳定,超时返回当前文本
  • 重试机制:最多 3 次尝试,指数退避
  • 资源管理:统一关闭页面与上下文,异常时也进行清理
flowchart TD
Start(["开始 query"]) --> Ensure["确保浏览器启动"]
Ensure --> NewCtx["新建上下文/页面"]
NewCtx --> Navigate["导航到平台URL"]
Navigate --> Locate["定位输入框多选择器"]
Locate --> Fill["填充关键词"]
Fill --> Submit["提交按钮或回车"]
Submit --> WaitStable["等待回复稳定多次检测"]
WaitStable --> Return["返回原始回复文本"]
Ensure --> |失败| Raise["抛出异常"]
Navigate --> |失败| Raise
Locate --> |失败| Raise
WaitStable --> |超时| Return

图表来源

章节来源

数据模型与状态转换

  • Query查询主表包含关键词、目标品牌、平台列表、频率、状态与时间戳
  • QueryTask按平台拆分的任务记录状态、错误信息与时间点
  • CitationRecord每次查询的结果记录包含是否引用、位置、文本、竞争品牌与原始回复
  • 状态机QueryTaskpending → running → success 或 failed
stateDiagram-v2
[*] --> pending
pending --> running : "开始执行"
running --> success : "平台返回成功"
running --> failed : "平台异常/超时"
success --> [*]
failed --> [*]

图表来源

章节来源

API 与服务集成

  • 立即执行接口:/api/v1/queries/{query_id}/run-now校验所有权与状态为每个平台创建 pending 任务
  • 引用统计接口:支持按查询、平台、时间范围过滤,计算引用率、平均位置、按平台统计与趋势
  • 查询 CRUD 接口:创建时根据频率计算 next_query_at更新时可重新计算
sequenceDiagram
participant Client as "客户端"
participant API as "citations.py"
participant Svc as "services.citation"
participant DB as "数据库"
participant Task as "QueryTask"
Client->>API : POST /queries/{id}/run-now
API->>Svc : trigger_query_now(db, user_id, query_id)
Svc->>DB : 校验查询归属与状态
Svc->>Task : 为每个平台创建 pending 任务
Svc-->>API : 返回首个任务
API-->>Client : 202 + 任务信息

图表来源

章节来源

依赖分析

  • 组件耦合
    • 调度器依赖 CitationEngine 与数据库会话
    • 引用检测引擎依赖平台适配器与数据模型
    • 平台适配器依赖 Playwright受环境变量控制
  • 外部依赖
    • 数据库PostgreSQL异步驱动
    • 调度APScheduler异步调度器
    • 浏览器PlaywrightChromium
graph LR
Sch["调度器"] --> Eng["引用检测引擎"]
Eng --> K["Kimi 适配器"]
Eng --> W["文心一言适配器"]
Eng --> DB["数据库会话"]
DB --> Q["Query"]
DB --> T["QueryTask"]
DB --> R["CitationRecord"]
APP["应用生命周期"] --> Sch
CFG["配置"] --> DB
CFG --> K
CFG --> W

图表来源

章节来源

性能考虑

  • 批量策略
    • 调度器按小时扫描,逐条执行,避免一次性大量并发请求
    • 平台适配器内置重试与指数退避,降低瞬时失败影响
  • 数据库事务
    • 每个平台执行前后均进行 commit保证状态一致性
    • 使用索引优化查询queries(status, next_query_at)、query_tasks(status)
  • 资源管理
    • Playwright 上下文与页面在 finally 中关闭,防止资源泄漏
    • 引擎关闭时逐个适配器关闭,避免阻塞
  • 监控建议
    • 指标:每小时到期查询数、成功/失败率、平均响应时间、平台成功率
    • 日志:调度器扫描日志、平台适配器重试与超时告警
    • 健康检查:/health 接口与数据库连接池状态

[本节为通用性能指导,无需特定文件来源]

故障排查指南

  • 调度器未执行
    • 检查应用生命周期是否正确启动与关闭调度器
    • 确认时区与 UTC 时间比较逻辑
  • 查询未被执行
    • 核对查询状态与 next_query_at 是否满足条件
    • 检查数据库索引与查询语句
  • 平台适配器失败
    • Playwright 未安装:根据日志提示安装 Chromium
    • 页面选择器失效:适配器内存在多选择器回退策略
    • 超时:等待回复稳定机制会返回当前文本,属预期行为
  • 引用记录缺失
    • 确认异常分支是否生成 cited=False 的占位记录
    • 检查 QueryTask 状态是否被正确更新
  • 立即执行无效
    • 校验查询归属与状态,确认平台列表非空

章节来源

结论

该系统通过“定时调度 + 平台适配器 + 引用检测引擎”的组合,实现了高可靠、可扩展的查询执行链路。其关键优势在于:

  • 明确的状态机与事务边界,确保数据一致性
  • 平台适配器的稳定性与容错设计,提升整体鲁棒性
  • 清晰的错误隔离与日志输出,便于问题定位与恢复

[本节为总结性内容,无需特定文件来源]

附录

关键流程时序图(端到端)

sequenceDiagram
participant User as "用户"
participant API as "API"
participant Sch as "调度器"
participant Eng as "引擎"
participant Plat as "平台"
participant DB as "数据库"
User->>API : 触发/等待查询
Sch->>DB : 查询到期的 active 查询
Sch->>Eng : 执行查询
Eng->>DB : 创建/更新 QueryTask
Eng->>Plat : 平台查询
Plat-->>Eng : 返回回复
Eng->>DB : 写入 CitationRecord
Eng->>DB : 更新 Query.next_query_at
API-->>User : 返回结果/状态

图表来源

数据模型 ER 图

erDiagram
QUERIES {
uuid id PK
uuid user_id FK
string keyword
string target_brand
jsonb brand_aliases
jsonb platforms
string frequency
string status
timestamp last_queried_at
timestamp next_query_at
timestamp created_at
timestamp updated_at
}
QUERY_TASKS {
uuid id PK
uuid query_id FK
string platform
string status
text error_message
timestamp scheduled_at
timestamp started_at
timestamp completed_at
}
CITATION_RECORDS {
uuid id PK
uuid query_id FK
string platform
boolean cited
integer citation_position
text citation_text
jsonb competitor_brands
text raw_response
timestamp queried_at
}
QUERIES ||--o{ QUERY_TASKS : "拥有"
QUERIES ||--o{ CITATION_RECORDS : "拥有"

图表来源

测试参考

  • 查询创建与权限限制:参考测试用例对权限错误的断言
  • 查询列表与更新:验证分页与字段更新逻辑
  • 查询删除与不存在场景:验证 404 行为

章节来源