20 KiB
20 KiB
调度器设计
**本文档引用的文件** - [backend/app/workers/scheduler.py](file://backend/app/workers/scheduler.py) - [backend/app/workers/citation_engine.py](file://backend/app/workers/citation_engine.py) - [backend/app/models/query.py](file://backend/app/models/query.py) - [backend/app/models/query_task.py](file://backend/app/models/query_task.py) - [backend/app/main.py](file://backend/app/main.py) - [backend/app/database.py](file://backend/app/database.py) - [backend/app/config.py](file://backend/app/config.py) - [backend/app/workers/platforms/kimi.py](file://backend/app/workers/platforms/kimi.py) - [backend/app/workers/platforms/wenxin.py](file://backend/app/workers/platforms/wenxin.py) - [backend/app/workers/platforms/base.py](file://backend/app/workers/platforms/base.py) - [backend/app/api/citations.py](file://backend/app/api/citations.py) - [backend/app/services/query.py](file://backend/app/services/query.py) - [tests/test_scheduler.py](file://tests/test_scheduler.py)更新摘要
变更内容
- 新增分钟级轮询机制,每分钟检查并执行遗留的pending任务
- 新增
check_and_execute_pending_tasks()方法用于处理孤立的待处理任务 - 新增对QueryTask模型的完整支持,包括
scheduled_at字段的使用 - 增强系统弹性,提供双重检查机制以提高任务执行可靠性
目录
引言
本文件系统性阐述基于 APScheduler 的 AsyncIOScheduler 的调度器设计与实现,重点覆盖以下方面:
- 架构选择与初始化:如何通过 AsyncIOScheduler 实现与事件循环的无缝集成,并在启动时注册周期性任务。
- 双重触发器机制:使用小时级触发器每小时扫描并执行到期查询任务,同时使用分钟级触发器每分钟检查遗留的pending任务。
- 核心组件职责:QueryScheduler 类的设计模式、事件循环管理、异步任务包装策略。
- 启动与关闭流程:应用生命周期内调度器的启动与优雅停机,资源清理与并发安全。
- 配置参数与性能调优:数据库连接、触发器频率、重试与指数退避策略、平台适配器资源管理。
- 使用模式与最佳实践:手动触发查询、批量执行与错误隔离、日志与可观测性。
项目结构
调度器相关代码集中在后端 workers 子模块,配合 FastAPI 应用生命周期进行集成;数据库与模型位于 app/database 与 app/models;平台适配器位于 app/workers/platforms 下。
graph TB
subgraph "应用层"
FastAPI["FastAPI 应用<br/>lifespan 钩子"]
end
subgraph "工作器层"
Scheduler["QueryScheduler<br/>AsyncIOScheduler"]
Engine["CitationEngine<br/>品牌匹配/竞争品牌检测"]
Platforms["平台适配器<br/>Kimi/Wenxin"]
end
subgraph "数据层"
DB["AsyncSessionLocal<br/>异步会话工厂"]
Models["Query/QueryTask/CitationRecord<br/>SQLAlchemy 模型"]
end
FastAPI --> Scheduler
Scheduler --> DB
Scheduler --> Engine
Engine --> Platforms
DB --> Models
图表来源
- backend/app/main.py:13-22
- backend/app/workers/scheduler.py:25-95
- backend/app/workers/citation_engine.py:148-309
- backend/app/database.py:6-28
- backend/app/models/query.py:11-55
章节来源
- backend/app/main.py:1-48
- backend/app/workers/scheduler.py:1-95
- backend/app/database.py:1-29
- backend/app/models/query.py:1-55
核心组件
- QueryScheduler:封装 AsyncIOScheduler,负责注册周期性任务、事件循环管理、任务执行入口与优雅停机。
- CitationEngine:核心业务引擎,负责品牌匹配、竞争品牌检测、平台适配器编排、任务状态持久化与下次查询时间计算。
- 平台适配器:KimiAdapter 与 WenxinAdapter,基于 Playwright 的自动化查询与结果稳定检测,具备指数退避与资源清理能力。
- 数据层:Query 模型及其索引,驱动调度器的查询筛选条件与频率控制;QueryTask 模型用于跟踪任务执行状态。
章节来源
- backend/app/workers/scheduler.py:25-95
- backend/app/workers/citation_engine.py:148-309
- backend/app/workers/platforms/kimi.py:11-206
- backend/app/workers/platforms/wenxin.py:11-205
- backend/app/models/query.py:11-55
- backend/app/models/query_task.py:11-38
架构总览
调度器采用"应用生命周期 + 异步调度器 + 引擎编排"的分层架构,现已增强为双重检查机制:
- 应用启动时通过 lifespan 钩子启动调度器。
- 调度器以每小时为周期扫描数据库,筛选到期的 active 查询任务。
- 同时以每分钟为周期检查遗留的 pending 查询任务,确保系统弹性。
- 对每个查询任务,CitationEngine 负责跨平台执行、结果解析与持久化。
- 平台适配器负责具体平台的网页自动化与稳定性检测。
- 应用关闭时,调度器优雅停机并释放平台适配器资源。
sequenceDiagram
participant App as "FastAPI 应用"
participant Life as "lifespan 钩子"
participant Sched as "QueryScheduler"
participant APS as "AsyncIOScheduler"
participant DB as "AsyncSessionLocal"
participant Eng as "CitationEngine"
participant Plat as "平台适配器"
App->>Life : "应用启动"
Life->>Sched : "start()"
Sched->>APS : "add_job(IntervalTrigger(hours=1))"
Sched->>APS : "add_job(IntervalTrigger(minutes=1))"
APS-->>Sched : "注册成功"
APS->>Sched : "定时回调 _run_check"
APS->>Sched : "定时回调 _run_pending_tasks_check"
Sched->>Sched : "事件循环包装"
Sched->>DB : "创建异步会话"
DB-->>Sched : "会话实例"
Sched->>DB : "查询 active 且到期的 Query"
DB-->>Sched : "查询结果集"
loop 对每个 Query
Sched->>Eng : "execute_query(query)"
Eng->>Plat : "query(keyword)"
Plat-->>Eng : "原始响应文本"
Eng-->>Sched : "CitationRecord 列表"
Sched->>DB : "更新 Query.next_query_at"
end
Sched->>DB : "查询 pending 且超时的 QueryTask"
DB-->>Sched : "遗留任务列表"
loop 对每个遗留任务
Sched->>Eng : "execute_single_platform(query.keyword, task.platform)"
Eng->>Plat : "query(keyword)"
Plat-->>Eng : "原始响应文本"
Eng-->>Sched : "CitationRecord 列表"
Sched->>DB : "更新 QueryTask 状态"
end
App->>Life : "应用关闭"
Life->>Sched : "shutdown()"
Sched->>APS : "shutdown(wait=False)"
Sched->>Eng : "close()"
图表来源
- backend/app/main.py:13-22
- backend/app/workers/scheduler.py:30-90
- backend/app/workers/citation_engine.py:159-234
- backend/app/workers/platforms/kimi.py:33-48
- backend/app/workers/platforms/wenxin.py:33-48
详细组件分析
QueryScheduler 组件
- 设计模式:职责分离 + 生命周期集成。将调度逻辑与业务执行解耦,通过 CitationEngine 承载核心业务。
- 初始化与启动:构造函数创建 AsyncIOScheduler 与 CitationEngine;start() 注册每小时触发的任务和每分钟触发的遗留任务检查,替换同名任务以避免重复。
- 事件循环管理:_run_check() 和 _run_pending_tasks_check() 在无运行中事件循环时使用 asyncio.run() 启动新事件循环,否则通过 loop.create_task() 将异步任务调度到当前事件循环,保证并发安全与可观察性。
- 任务执行:check_and_execute_queries() 以异步会话查询到期的 active 查询,逐条调用 _execute_single_query(),捕获异常并继续处理其他任务;check_and_execute_pending_tasks() 处理超过1分钟仍未执行的遗留任务。
- 关闭流程:shutdown() 调用 APScheduler 的 shutdown(wait=False) 与 CitationEngine.close(),确保平台资源释放。
classDiagram
class QueryScheduler {
+start() void
+shutdown() async
-_run_check() void
+check_and_execute_queries() async
-_execute_single_query(query, db) async
-_run_pending_tasks_check() void
+check_and_execute_pending_tasks() async
-scheduler AsyncIOScheduler
-engine CitationEngine
}
class CitationEngine {
+execute_query(query, db) async
+execute_single_platform(keyword, platform, target_brand, brand_aliases) async
+close() async
-_get_or_create_task(db, query_id, platform) async
-_calculate_next_query_at(frequency) datetime
-platforms dict
-matcher BrandMatcher
-competitor_detector CompetitorDetector
}
QueryScheduler --> CitationEngine : "依赖"
图表来源
章节来源
CitationEngine 组件
- 品牌匹配:BrandMatcher 支持精确、别名与模糊匹配,返回置信度、匹配类型与上下文片段。
- 竞争品牌检测:CompetitorDetector 从预定义类别中识别竞争品牌。
- 平台编排:遍历 Query.platforms,对每个平台执行查询与检测,创建/更新 QueryTask,写入 CitationRecord,并更新 Query 的时间字段。
- 错误处理:平台失败时记录错误并生成一条 cited=False 的占位记录,保证数据一致性。
- 资源管理:close() 关闭各平台适配器,统一异常处理。
flowchart TD
Start(["开始执行查询"]) --> InitMatcher["初始化 BrandMatcher"]
InitMatcher --> IteratePlatforms{"遍历平台"}
IteratePlatforms --> |是| GetOrCreateTask["获取或创建 QueryTask"]
GetOrCreateTask --> SetRunning["设置任务状态为 running"]
SetRunning --> CallPlatform["调用平台适配器查询"]
CallPlatform --> ParseResult["品牌匹配与竞争品牌检测"]
ParseResult --> CreateRecord["创建 CitationRecord"]
CreateRecord --> SetSuccess["设置任务状态为 success"]
SetSuccess --> UpdateQuery["更新 Query 时间字段"]
UpdateQuery --> IteratePlatforms
IteratePlatforms --> |否| End(["结束"])
图表来源
章节来源
- backend/app/workers/citation_engine.py:19-100
- backend/app/workers/citation_engine.py:122-146
- backend/app/workers/citation_engine.py:148-309
平台适配器组件
- KimiAdapter 与 WenxinAdapter 均继承 BasePlatformAdapter,实现 query() 与 close()。
- 自动化流程:确保浏览器启动 → 新建上下文与页面 → 定位输入框 → 填充关键词 → 提交查询 → 等待回复稳定 → 返回文本。
- 稳定性检测:_wait_for_response_stable() 检测文本连续多次一致后判定稳定,超时则返回当前文本。
- 重试与指数退避:query() 内部最多三次尝试,失败时按 2^attempt 秒退避。
- 资源清理:close() 关闭浏览器与 Playwright 实例。
classDiagram
class BasePlatformAdapter {
<<abstract>>
+platform_name : str
+platform_url : str
+query(keyword) async*
+close() async
}
class KimiAdapter {
+query(keyword) async
+close() async
-_ensure_browser() async
-_do_query(keyword) async
-_wait_for_response_stable(page, timeout) async
}
class WenxinAdapter {
+query(keyword) async
+close() async
-_ensure_browser() async
-_do_query(keyword) async
-_wait_for_response_stable(page, timeout) async
}
KimiAdapter --|> BasePlatformAdapter
WenxinAdapter --|> BasePlatformAdapter
图表来源
- backend/app/workers/platforms/base.py:4-18
- backend/app/workers/platforms/kimi.py:11-206
- backend/app/workers/platforms/wenxin.py:11-205
章节来源
- backend/app/workers/platforms/kimi.py:21-48
- backend/app/workers/platforms/kimi.py:126-197
- backend/app/workers/platforms/wenxin.py:21-48
- backend/app/workers/platforms/wenxin.py:124-195
数据模型与触发条件
- Query 模型包含用户外键、关键词、目标品牌、别名、平台列表、频率、状态与时间戳字段,并建立多处索引以优化查询。
- QueryTask 模型用于跟踪任务执行状态,包含 scheduled_at 字段用于标识任务计划执行时间。
- 触发条件:调度器按每小时扫描 status='active' 且 next_query_at <= now() 的记录,按每分钟扫描 status='pending' 且 scheduled_at <= one_minute_ago 的遗留任务。
章节来源
- backend/app/models/query.py:11-55
- backend/app/models/query_task.py:11-38
- backend/app/workers/scheduler.py:57-62
- backend/app/workers/scheduler.py:107-112
应用生命周期与手动触发
- 应用启动:lifespan 钩子在应用启动时调用 query_scheduler.start(),在关闭时调用 shutdown()。
- 手动触发:/api/v1/queries/{query_id}/run-now 接口调用服务层 trigger_query_now,将指定查询立即加入执行队列(由 CitationEngine 编排)。
章节来源
依赖关系分析
- QueryScheduler 依赖 AsyncIOScheduler、AsyncSessionLocal 与 CitationEngine。
- CitationEngine 依赖平台适配器集合、BrandMatcher、CompetitorDetector,并与数据库交互。
- 平台适配器依赖 Playwright,需在运行环境中安装对应浏览器。
- 应用通过 lifespan 钩子与调度器耦合,确保生命周期内资源正确管理。
graph LR
S["QueryScheduler"] --> A["AsyncIOScheduler"]
S --> E["CitationEngine"]
E --> P1["KimiAdapter"]
E --> P2["WenxinAdapter"]
E --> D["AsyncSessionLocal"]
D --> M["Query/QueryTask/CitationRecord"]
图表来源
- backend/app/workers/scheduler.py:25-95
- backend/app/workers/citation_engine.py:148-309
- backend/app/database.py:6-28
- backend/app/models/query.py:11-55
章节来源
- backend/app/workers/scheduler.py:13-20
- backend/app/workers/citation_engine.py:148-157
- backend/app/database.py:1-29
性能考虑
- 触发器频率:默认每小时一次主检查,每分钟一次遗留任务检查,可根据业务负载调整;过短可能导致数据库压力与平台限流风险。
- 数据库索引:Query 和 QueryTask 模型已建立多处索引,建议结合 EXPLAIN 分析查询计划,避免全表扫描。
- 异步并发:调度器在事件循环中调度异步任务,避免阻塞;平台适配器内部使用 Playwright,注意浏览器资源占用与并发上限。
- 重试与退避:平台适配器内置最多三次重试与指数退避,降低瞬时失败影响。
- 资源清理:关闭时调用 shutdown(wait=False) 与 CitationEngine.close(),确保浏览器与数据库连接及时释放。
- 系统弹性:分钟级轮询机制提供冗余检查,确保即使主调度器出现问题,遗留任务也能得到处理。
章节来源
- backend/app/workers/scheduler.py:32-38
- backend/app/models/query.py:50-54
- backend/app/models/query_task.py:36-38
- backend/app/workers/platforms/kimi.py:33-48
- backend/app/workers/platforms/wenxin.py:33-48
- backend/app/workers/citation_engine.py:302-309
故障排查指南
- 调度器未启动:确认 lifespan 钩子已正确导入与调用 start()/shutdown()。
- 无事件循环:_run_check() 和 _run_pending_tasks_check() 已处理无运行中事件循环的情况,若仍报错,检查事件循环状态与线程模型。
- 数据库连接失败:检查 DATABASE_URL 配置与网络连通性。
- 平台适配器异常:Playwright 未安装或浏览器不可用时会抛出明确异常;按提示运行安装命令。
- 查询失败:CitationEngine 会在平台失败时记录错误并生成占位记录,便于后续重试与审计。
- 遗留任务堆积:检查 QueryTask 表中 status='pending' 且 scheduled_at 超过1分钟的任务,确认主调度器是否正常工作。
- 资源泄漏:确保关闭流程调用 shutdown() 与 close(),避免浏览器与数据库连接泄露。
章节来源
- backend/app/main.py:13-22
- backend/app/workers/scheduler.py:44-49
- backend/app/config.py
- backend/app/workers/platforms/kimi.py:23-31
- backend/app/workers/platforms/wenxin.py:23-31
- backend/app/workers/citation_engine.py:211-227
结论
该调度器以 AsyncIOScheduler 为核心,结合 FastAPI 生命周期管理,实现了高可用、可扩展的定时查询任务体系。通过双重检查机制(每小时主检查 + 每分钟遗留任务检查),系统提供了更强的弹性与可靠性。通过 CitationEngine 的平台编排与品牌匹配能力,以及平台适配器的稳定性保障,系统能够在异步环境下高效、可靠地执行跨平台查询任务。建议在生产环境关注触发频率、数据库索引与平台限流策略,并完善监控与告警机制。
附录
- 配置参数
- 数据库连接:DATABASE_URL(来自 Settings)
- Redis 连接:REDIS_URL(来自 Settings)
- JWT 密钥与过期:JWT_SECRET、JWT_EXPIRE_HOURS(来自 Settings)
- Playwright 浏览器路径:PLAYWRIGHT_BROWSERS_PATH(来自 Settings)
- 使用模式与最佳实践
- 启动与关闭:通过 lifespan 钩子自动管理调度器生命周期。
- 手动触发:调用 /api/v1/queries/{query_id}/run-now 接口将查询立即加入执行队列。
- 错误隔离:平台失败不影响整体调度,CitationEngine 记录占位记录并继续处理其他任务。
- 性能调优:根据业务负载调整触发器频率、数据库索引与平台并发上限。
- 遗留任务处理:系统自动处理超过1分钟的遗留 pending 任务,确保任务最终被执行。
章节来源