25 KiB
任务调度系统
**本文档引用的文件** - [backend/app/workers/scheduler.py](file://backend/app/workers/scheduler.py) - [backend/app/workers/citation_engine.py](file://backend/app/workers/citation_engine.py) - [backend/app/workers/platforms/kimi.py](file://backend/app/workers/platforms/kimi.py) - [backend/app/workers/platforms/wenxin.py](file://backend/app/workers/platforms/wenxin.py) - [backend/app/workers/platforms/base.py](file://backend/app/workers/platforms/base.py) - [backend/app/models/query.py](file://backend/app/models/query.py) - [backend/app/models/query_task.py](file://backend/app/models/query_task.py) - [backend/app/models/citation_record.py](file://backend/app/models/citation_record.py) - [backend/app/services/query.py](file://backend/app/services/query.py) - [backend/app/api/queries.py](file://backend/app/api/queries.py) - [backend/app/database.py](file://backend/app/database.py) - [backend/app/config.py](file://backend/app/config.py) - [backend/app/main.py](file://backend/app/main.py) - [tests/test_scheduler.py](file://tests/test_scheduler.py) - [tests/test_queries.py](file://tests/test_queries.py)更新摘要
所做更改
- 新增了遗留任务检查机制的详细说明,包括每分钟检查 pending 任务的兜底逻辑
- 完善了调度器测试用例的文档,包括启动/关闭测试、查询筛选测试和频率计算测试
- 增强了性能优化策略部分,增加了遗留任务处理和资源管理的说明
- 更新了故障排查指南,增加了遗留任务状态异常的处理方法
- 完善了调度器设计的详细分析,包括双调度器模式和事件循环兼容性
目录
引言
本文件面向任务调度系统的技术与非技术读者,系统性阐述基于 APscheduler 的异步任务调度架构,涵盖调度器配置、任务队列管理、并发控制机制;详述查询任务的生命周期(创建、状态跟踪、执行监控、错误恢复);文档化异步任务处理流程(分发、优先级与资源管理);给出性能优化策略、监控指标与故障处理机制;并提供配置项、扩展方法与调试技巧。
更新 本次更新完善了调度器设计细节,新增了遗留任务检查机制和详细的测试用例说明。
项目结构
后端采用 FastAPI + SQLAlchemy Async 架构,调度系统位于 workers 子模块,围绕 Query 模型驱动周期性查询任务,通过 CitationEngine 统一执行平台适配器(Kimi、文心一言),并将结果持久化为 CitationRecord,同时维护 QueryTask 任务状态。
graph TB
subgraph "应用入口"
MAIN["app/main.py<br/>生命周期管理"]
END
subgraph "调度层"
SCHED["workers/scheduler.py<br/>QueryScheduler<br/>双调度器模式"]
END
subgraph "业务逻辑"
CE["workers/citation_engine.py<br/>CitationEngine"]
SVC["services/query.py<br/>查询服务"]
END
subgraph "模型与存储"
Q["models/query.py<br/>查询模型"]
QT["models/query_task.py<br/>任务模型"]
CR["models/citation_record.py<br/>引用记录模型"]
DB["database.py<br/>异步会话"]
END
subgraph "平台适配"
BASE["workers/platforms/base.py<br/>适配器基类"]
KIMI["workers/platforms/kimi.py<br/>Kimi适配器"]
WENXIN["workers/platforms/wenxin.py<br/>文心一言适配器"]
END
MAIN --> SCHED
SCHED --> CE
CE --> KIMI
CE --> WENXIN
CE --> Q
CE --> QT
CE --> CR
SVC --> Q
SVC --> DB
SCHED --> DB
CE --> DB
图表来源
- backend/app/main.py:13-22
- backend/app/workers/scheduler.py:25-95
- backend/app/workers/citation_engine.py:148-309
- backend/app/models/query.py:11-55
- backend/app/models/query_task.py:11-39
- backend/app/models/citation_record.py:11-42
- backend/app/services/query.py:12-130
- backend/app/database.py:1-29
- backend/app/workers/platforms/base.py:4-18
- backend/app/workers/platforms/kimi.py:11-206
- backend/app/workers/platforms/wenxin.py:11-205
章节来源
- backend/app/main.py:13-22
- backend/app/workers/scheduler.py:25-95
- backend/app/workers/citation_engine.py:148-309
- backend/app/models/query.py:11-55
- backend/app/models/query_task.py:11-39
- backend/app/models/citation_record.py:11-42
- backend/app/services/query.py:12-130
- backend/app/database.py:1-29
- backend/app/workers/platforms/base.py:4-18
- backend/app/workers/platforms/kimi.py:11-206
- backend/app/workers/platforms/wenxin.py:11-205
核心组件
- 调度器:基于 APscheduler 的 AsyncIOScheduler,采用双调度器模式,定时扫描待执行查询并触发执行,同时每分钟检查遗留的 pending 任务。
- 引擎:CitationEngine 负责跨平台查询、品牌匹配、竞争品牌检测、任务状态更新与结果落库。
- 平台适配器:KimiAdapter、WenxinAdapter 基于 Playwright 实现网页交互与响应抽取。
- 数据模型:Query、QueryTask、CitationRecord 支撑任务生命周期与结果存储。
- 服务与API:查询服务与查询 API 路由负责用户侧的查询管理与频率控制。
- 数据库:SQLAlchemy Async Engine + Session,统一事务与连接管理。
更新 新增了遗留任务检查机制,通过双调度器模式提高系统的容错性和可靠性。
章节来源
- backend/app/workers/scheduler.py:25-95
- backend/app/workers/citation_engine.py:148-309
- backend/app/workers/platforms/kimi.py:11-206
- backend/app/workers/platforms/wenxin.py:11-205
- backend/app/models/query.py:11-55
- backend/app/models/query_task.py:11-39
- backend/app/models/citation_record.py:11-42
- backend/app/services/query.py:12-130
- backend/app/database.py:1-29
架构总览
调度系统以"定时扫描 + 异步执行 + 平台适配 + 结果落库"为主线,通过 Query 的状态与时间字段驱动执行节奏,QueryTask 记录每次平台执行的状态,CitationRecord 记录最终检测结果。新增的遗留任务检查机制提供了额外的容错保护。
sequenceDiagram
participant Timer as "调度器<br/>AsyncIOScheduler"
participant Scheduler as "QueryScheduler"
participant DB as "数据库<br/>AsyncSession"
participant Engine as "CitationEngine"
participant Platform as "平台适配器<br/>Kimi/Wenxin"
Timer->>Scheduler : "每小时触发"
Scheduler->>DB : "查询 active 且 next_query_at<=now 的 Query"
Scheduler->>Engine : "逐条执行 execute_query(query)"
Engine->>DB : "获取/创建 QueryTask 并置为 running"
Engine->>Platform : "调用 query(keyword)"
Platform-->>Engine : "返回原始响应文本"
Engine->>Engine : "品牌匹配/竞争品牌检测"
Engine->>DB : "写入 CitationRecord"
Engine->>DB : "更新 QueryTask 为 success/fail"
Engine->>DB : "更新 Query 的 last_queried_at/next_query_at"
Engine-->>Scheduler : "返回本次批次记录"
Note over Timer,Scheduler : 额外的遗留任务检查
Timer->>Scheduler : "每分钟触发"
Scheduler->>DB : "查询 pending 且 scheduled_at<=1分钟前的 QueryTask"
Scheduler->>Engine : "重新执行遗留任务"
Engine->>DB : "更新 QueryTask 状态并写入结果"
图表来源
- backend/app/workers/scheduler.py:30-90
- backend/app/workers/scheduler.py:95-172
- backend/app/workers/citation_engine.py:159-234
- backend/app/models/query.py:24-31
- backend/app/models/query_task.py:24-32
- backend/app/models/citation_record.py:24-29
详细组件分析
调度器:QueryScheduler
- 启动与注册:使用 AsyncIOScheduler 注册两个定时任务,每小时检查到期查询任务,每分钟检查遗留的 pending 任务,replace_existing=true 确保重复启动不冲突。
- 事件循环兼容:_run_check 和 _run_pending_tasks_check 分别封装同步包装,优先获取运行中事件循环,否则使用 asyncio.run 启动新事件循环,保证在不同运行环境下均可执行。
- 主要扫描与执行:check_and_execute_queries 异步查询数据库,筛选 active 且 next_query_at 已到达的 Query,逐条调用 _execute_single_query。
- 遗留任务检查:check_and_execute_pending_tasks 兜底机制,处理超过1分钟仍未执行的 pending 任务,按 query_id 分组并重新执行。
- 错误处理:对单条查询异常进行日志记录并继续下一条,避免单点故障影响整体扫描;遗留任务执行失败时记录错误信息并标记为 failed。
- 关闭流程:shutdown 调用 scheduler.shutdown(wait=False) 与 engine.close(),确保资源释放。
flowchart TD
Start(["启动调度器"]) --> AddJobs["注册两个定时任务<br/>每小时检查到期任务<br/>每分钟检查遗留任务"]
AddJobs --> StartSched["启动 AsyncIOScheduler"]
StartSched --> HourlyLoop["每小时触发"]
HourlyLoop --> Scan["查询数据库<br/>筛选到期的 Query"]
Scan --> HasQ{"是否有待执行查询?"}
HasQ -- 否 --> MinuteLoop["等待下一分钟"]
HasQ -- 是 --> ExecOne["逐条执行 _execute_single_query"]
ExecOne --> NextQ["继续下一条"]
NextQ --> HasQ
MinuteLoop --> PendingCheck["每分钟检查<br/>遗留的 pending 任务"]
PendingCheck --> HasPending{"是否有遗留任务?"}
HasPending -- 否 --> HourlyLoop
HasPending -- 是 --> ReExec["重新执行遗留任务"]
ReExec --> UpdateStatus["更新任务状态并写入结果"]
UpdateStatus --> HasPending
图表来源
章节来源
引擎:CitationEngine
- 单查询执行:execute_query 接收 Query 与 AsyncSession,创建 BrandMatcher,遍历 Query.platforms,逐平台执行。
- 任务状态管理:_get_or_create_task 获取或创建 QueryTask,执行前置为 running,成功置为 success,失败置为 failed 并记录错误信息。
- 结果落库:构造 CitationRecord 写入数据库,包含 cited、confidence、position、citation_text、competitor_brands、raw_response 等字段。
- 时间推进:执行完成后更新 Query.last_queried_at 与 next_query_at,next_query_at 基于 frequency 映射为天数增量。
- 平台适配:execute_single_platform 通过平台映射调用对应 Adapter.query,再进行品牌匹配与竞争品牌检测。
- 资源关闭:close 遍历适配器并逐一关闭,捕获异常仅告警。
classDiagram
class CitationEngine {
+execute_query(query, db) CitationRecord[]
+execute_single_platform(keyword, platform, target_brand, brand_aliases) dict
+close() void
-_get_or_create_task(db, query_id, platform) QueryTask
-_calculate_next_query_at(frequency) datetime
}
class BrandMatcher {
+match(text) dict
-_extract_candidates(text) list
-_extract_position_and_context(text, keyword) tuple
}
class CompetitorDetector {
+detect(text, target_brand) list
}
CitationEngine --> BrandMatcher : "使用"
CitationEngine --> CompetitorDetector : "使用"
图表来源
章节来源
平台适配器:KimiAdapter 与 WenxinAdapter
- 基类约束:BasePlatformAdapter 定义 platform_name、platform_url 与抽象 query 方法,close 可选。
- 浏览器生命周期:_ensure_browser 确保 Playwright 与 Chromium 启动,若未安装则抛出可读错误提示。
- 查询流程:query 带重试(最多3次,指数退避),_do_query 完成页面导航、输入关键词、提交、等待回复稳定。
- 回复稳定检测:_wait_for_response_stable 检测消息容器,连续多次文本一致视为稳定,超时返回当前文本。
- 资源回收:close 关闭 browser 与 playwright,避免资源泄漏。
sequenceDiagram
participant CE as "CitationEngine"
participant AD as "Kimi/WenxinAdapter"
participant PW as "Playwright"
participant PG as "目标页面"
CE->>AD : "query(keyword)"
AD->>AD : "_ensure_browser()"
AD->>PW : "启动/获取浏览器"
AD->>PG : "goto(platform_url)"
AD->>PG : "查找输入框/填入关键词/提交"
AD->>AD : "_wait_for_response_stable()"
AD-->>CE : "返回原始响应文本"
图表来源
- backend/app/workers/platforms/kimi.py:33-125
- backend/app/workers/platforms/wenxin.py:33-124
- backend/app/workers/platforms/base.py:4-18
章节来源
- backend/app/workers/platforms/kimi.py:11-206
- backend/app/workers/platforms/wenxin.py:11-205
- backend/app/workers/platforms/base.py:4-18
数据模型与任务队列
- Query:用户维度的查询任务,包含关键词、目标品牌、别名、平台列表、频率、状态、时间戳等;索引覆盖 user_id、status、next_query_at。
- QueryTask:单次查询在各平台上的执行记录,状态包括 pending/running/success/failed,带 scheduled_at/started_at/completed_at。
- CitationRecord:每次平台查询的结果记录,包含 cited、confidence、position、citation_text、competitor_brands、raw_response、queried_at。
- 关系:Query 一对多关联 CitationRecord 与 QueryTask;QueryTask 外键级联删除。
erDiagram
QUERIES {
uuid id PK
uuid user_id FK
string keyword
string target_brand
jsonb brand_aliases
jsonb platforms
string frequency
string status
timestamp last_queried_at
timestamp next_query_at
timestamp created_at
timestamp updated_at
}
QUERY_TASKS {
uuid id PK
uuid query_id FK
string platform
string status
text error_message
timestamp scheduled_at
timestamp started_at
timestamp completed_at
}
CITATION_RECORDS {
uuid id PK
uuid query_id FK
string platform
boolean cited
integer citation_position
text citation_text
jsonb competitor_brands
text raw_response
timestamp queried_at
}
QUERIES ||--o{ QUERY_TASKS : "包含"
QUERIES ||--o{ CITATION_RECORDS : "产生"
图表来源
- backend/app/models/query.py:11-55
- backend/app/models/query_task.py:11-39
- backend/app/models/citation_record.py:11-42
章节来源
- backend/app/models/query.py:11-55
- backend/app/models/query_task.py:11-39
- backend/app/models/citation_record.py:11-42
查询服务与API
- 服务层:提供查询的增删改查、数量限制校验、频率变更时 next_query_at 重新计算。
- API 层:提供查询列表、创建、获取、更新、删除接口,配合权限与分页参数。
章节来源
依赖分析
- 组件耦合
- QueryScheduler 依赖 CitationEngine、数据库会话与 Query 模型。
- CitationEngine 依赖 Query、QueryTask、CitationRecord、平台适配器。
- 平台适配器依赖 Playwright,受环境与网络影响较大。
- 外部依赖
- APscheduler:异步调度框架。
- SQLAlchemy Async:异步 ORM。
- Playwright:浏览器自动化。
- 潜在环路
- 当前模块间为单向依赖,无明显循环导入。
graph LR
S["scheduler.py"] --> E["citation_engine.py"]
E --> P1["kimi.py"]
E --> P2["wenxin.py"]
E --> M1["query.py"]
E --> M2["query_task.py"]
E --> M3["citation_record.py"]
S --> D["database.py"]
E --> D
API["api/queries.py"] --> SVC["services/query.py"]
SVC --> D
图表来源
- backend/app/workers/scheduler.py:25-95
- backend/app/workers/citation_engine.py:148-309
- backend/app/workers/platforms/kimi.py:11-206
- backend/app/workers/platforms/wenxin.py:11-205
- backend/app/models/query.py:11-55
- backend/app/models/query_task.py:11-39
- backend/app/models/citation_record.py:11-42
- backend/app/database.py:1-29
- backend/app/api/queries.py:15-86
- backend/app/services/query.py:12-130
性能考虑
- 调度频率与并发
- 当前调度器采用双调度器模式:每小时扫描到期查询,每分钟检查遗留任务,适合低至中等并发场景;如需更高吞吐,可考虑缩短周期或引入多进程/多实例。
- 数据库访问
- 扫描查询使用 UTC 时间比较,建议在数据库层面为 next_query_at 建立高效索引,减少全表扫描。
- 异步执行
- CitationEngine 逐平台串行执行,平台间可并行化(例如 asyncio.gather),但需注意平台限流与资源占用。
- 平台稳定性
- 平台适配器内置重试与等待稳定机制,建议结合指数退避与超时上限,避免长时间阻塞。
- 资源管理
- 浏览器与 Playwright 生命周期严格管理,关闭时序正确,避免内存与句柄泄漏。
- 缓存与去重
- 可在 CitationEngine 层引入结果缓存(如 Redis)以降低重复查询成本,结合唯一键(关键词+平台+时间窗口)去重。
- 遗留任务处理
- 新增的每分钟遗留任务检查机制提供了额外的容错保护,确保即使主调度器出现问题,任务仍能在合理时间内得到执行。
更新 新增了遗留任务处理机制的性能考虑,提高了系统的整体可靠性。
故障排查指南
- 调度器未启动
- 检查 lifespan 中是否调用 start(),以及是否在生产环境正确部署。
- 查询未被执行
- 核查 Query.status 是否为 active,next_query_at 是否已到达;确认数据库时区与 UTC 一致性。
- 遗留任务异常
- 检查 QueryTask 状态是否长期为 pending,确认每分钟遗留任务检查机制是否正常工作;查看日志中遗留任务重新执行的记录。
- 平台适配器异常
- Playwright 未安装:参考适配器错误提示运行安装命令;网络超时:调整等待稳定阈值与超时参数。
- 任务状态异常
- QueryTask 状态长期为 running:检查数据库事务提交与异常捕获路径,确保异常分支也能更新状态。
- 结果缺失
- 确认 CitationRecord 写入逻辑与 QueryTask 成功分支;失败分支也会写入一条 cited=False 的记录作为占位。
更新 新增了遗留任务相关的故障排查指导。
章节来源
- backend/app/workers/scheduler.py:42-90
- backend/app/workers/scheduler.py:95-172
- backend/app/workers/citation_engine.py:175-234
- backend/app/workers/platforms/kimi.py:21-48
- backend/app/workers/platforms/wenxin.py:21-48
结论
该调度系统以轻量、清晰的模块划分实现了"定时扫描 + 异步执行 + 平台适配 + 结果落库"的完整闭环。通过 Query/QueryTask/CitationRecord 的三层状态与数据模型,系统具备良好的可观测性与可扩展性。新增的双调度器模式和遗留任务检查机制进一步提高了系统的可靠性和容错能力。建议在高并发场景下引入并行化与缓存策略,并持续完善监控与告警体系。
更新 本次更新完善了调度器设计细节,增强了系统的容错性和可靠性。
附录
配置选项
- 数据库连接:DATABASE_URL(来自配置类 Settings)
- 日志与中间件:FastAPI CORS 配置(允许本地前端跨域)
- 运行时生命周期:lifespan 在应用启动时启动调度器,在关闭时优雅退出
章节来源
扩展方法
- 新增平台适配器
- 继承 BasePlatformAdapter,实现 query 与可选 close;在 CitationEngine.platforms 映射中注册。
- 调整调度策略
- 修改调度器触发间隔或引入多调度器实例;在 Query 上增加优先级字段以实现差异化执行。
- 结果聚合与报表
- 基于 CitationRecord 与 QueryTask 构建统计视图,输出趋势与失败率报表。
章节来源
- backend/app/workers/platforms/base.py:4-18
- backend/app/workers/citation_engine.py:152-157
- backend/app/models/citation_record.py:11-42
- backend/app/models/query_task.py:11-39
调试技巧
- 启用数据库回显:在数据库引擎创建时开启 echo(当前为关闭,便于生产环境降噪)
- 逐步验证:先验证调度器扫描逻辑,再验证单平台适配器,最后验证 CitationEngine 整体流程
- 单元测试:利用测试夹具模拟 Query 对象,验证 API 与服务层行为
- 调度器测试:使用专门的测试用例验证调度器启动/关闭、查询筛选和频率计算功能
更新 新增了调度器测试相关的调试技巧。
章节来源
调度器测试详细说明
启动/关闭测试
验证调度器能够正确启动和关闭,包括:
- 调度作业的注册和命名验证
- 引擎资源的正确关闭
- 作业重复启动的安全性
查询任务筛选测试
验证调度器能够正确筛选待执行的查询任务:
- active 状态且 next_query_at 已到达的任务会被执行
- 未来时间的任务不会被错误执行
- paused 状态的任务不会被执行
频率计算测试
验证频率映射的正确性:
- daily 频率:next_query_at 增加 1 天
- weekly 频率:next_query_at 增加 7 天
- 默认频率:next_query_at 增加 7 天
新增 详细说明了调度器测试用例的设计和验证要点。
章节来源