17 KiB
17 KiB
性能优化
**本文引用的文件** - [backend/app/config.py](file://backend/app/config.py) - [backend/app/database.py](file://backend/app/database.py) - [backend/app/main.py](file://backend/app/main.py) - [backend/app/workers/scheduler.py](file://backend/app/workers/scheduler.py) - [backend/app/workers/citation_engine.py](file://backend/app/workers/citation_engine.py) - [backend/app/workers/platforms/base.py](file://backend/app/workers/platforms/base.py) - [backend/app/workers/platforms/kimi.py](file://backend/app/workers/platforms/kimi.py) - [backend/app/workers/platforms/wenxin.py](file://backend/app/workers/platforms/wenxin.py) - [backend/app/api/queries.py](file://backend/app/api/queries.py) - [backend/app/models/query.py](file://backend/app/models/query.py) - [backend/app/services/query.py](file://backend/app/services/query.py) - [backend/requirements.txt](file://backend/requirements.txt) - [docker-compose.yml](file://docker-compose.yml)目录
简介
本文件聚焦于调度系统的性能优化,围绕并发控制、资源管理、内存优化、异步任务并发限制、数据库连接池配置、事件循环优化、调度频率调优、批量处理策略与缓存机制进行系统化梳理,并提供性能监控指标、基准测试方法与分析工具使用建议。同时给出高负载下的稳定性与响应性保障策略及实际优化案例与配置参数调整建议。
项目结构
后端采用 FastAPI + SQLAlchemy Async + APScheduler 异步调度的架构。前端通过 Next.js 提供可视化界面,后端通过 Docker Compose 统一编排数据库与缓存服务。
graph TB
subgraph "后端"
A["FastAPI 应用<br/>生命周期管理"]
B["调度器<br/>APScheduler AsyncIOScheduler"]
C["引用检测引擎<br/>CitationEngine"]
D["平台适配器<br/>Kimi/Wenxin"]
E["数据库<br/>SQLAlchemy Async"]
F["配置<br/>Settings"]
end
subgraph "外部服务"
G["PostgreSQL"]
H["Redis"]
end
A --> B
B --> C
C --> D
C --> E
A --> E
F --> A
F --> E
A --> H
图表来源
- backend/app/main.py:13-22
- backend/app/workers/scheduler.py:25-40
- backend/app/workers/citation_engine.py:148-157
- backend/app/config.py:4-16
- backend/app/database.py:6-18
章节来源
- backend/app/main.py:13-22
- backend/app/config.py:4-16
- backend/app/database.py:6-18
- docker-compose.yml:1-71
核心组件
- 调度器:基于 APScheduler 的 AsyncIOScheduler,按小时扫描到期查询并异步执行。
- 引用检测引擎:负责跨平台查询、品牌匹配、竞争品牌检测与结果持久化。
- 平台适配器:Kimi 与 Wenxin 的 Playwright 自动化适配器,带指数退避与超时控制。
- 数据库:SQLAlchemy Async Engine + AsyncSessionMaker,支持异步事务与连接复用。
- 配置:统一读取环境变量,包含数据库、缓存、密钥等关键参数。
章节来源
- backend/app/workers/scheduler.py:25-95
- backend/app/workers/citation_engine.py:148-309
- backend/app/workers/platforms/kimi.py:11-206
- backend/app/workers/platforms/wenxin.py:11-205
- backend/app/database.py:6-29
- backend/app/config.py:4-16
架构总览
调度器在应用生命周期内启动,周期性扫描待执行查询;对每个查询,引擎创建任务记录、逐平台执行查询与检测,并更新查询时间字段;数据库连接由异步会话管理;平台适配器通过 Playwright 控制浏览器,具备重试与稳定文本检测逻辑。
sequenceDiagram
participant S as "调度器"
participant DB as "数据库"
participant CE as "引用检测引擎"
participant PA as "平台适配器"
participant PG as "PostgreSQL"
S->>DB : 查询 active 且 next_query_at 到期的查询
DB-->>S : 返回查询列表
loop 针对每个查询
S->>CE : 执行查询
CE->>DB : 创建/刷新 QueryTask
CE->>PA : 平台查询
PA-->>CE : 返回原始响应
CE->>DB : 写入 CitationRecord
CE->>DB : 更新 Query 时间字段
end
图表来源
- backend/app/workers/scheduler.py:51-84
- backend/app/workers/citation_engine.py:159-234
- backend/app/models/query.py:11-55
详细组件分析
调度器与事件循环
- 触发频率:每小时一次,避免过于频繁的轮询造成数据库压力。
- 事件循环:若无运行中事件循环则使用新事件循环执行;否则在现有事件循环中创建任务,降低阻塞风险。
- 并发策略:当前为串行遍历查询并逐个执行,未引入全局并发限制,存在潜在的平台适配器并发风暴风险。
flowchart TD
Start(["启动调度器"]) --> AddJob["注册每小时任务"]
AddJob --> Loop["事件循环中调度检查"]
Loop --> Check["查询到期的查询"]
Check --> ForEach{"是否有待执行查询?"}
ForEach --> |是| Exec["逐个执行查询"]
ForEach --> |否| Wait["等待下一小时"]
Exec --> Next["进入下一个查询"]
Next --> ForEach
Wait --> Loop
图表来源
- backend/app/workers/scheduler.py:30-40
- backend/app/workers/scheduler.py:42-50
- backend/app/workers/scheduler.py:51-84
章节来源
引用检测引擎与平台适配器
- 品牌匹配:支持精确、别名与模糊匹配,返回置信度与上下文片段。
- 竞争品牌检测:基于预定义行业品牌集合进行识别。
- 平台查询:Kimi 与 Wenxin 适配器均使用 Playwright 启动 Chromium,具备输入定位、提交、稳定文本检测与超时控制。
- 重试策略:单平台查询最多重试三次,采用指数退避,提升稳定性。
classDiagram
class CitationEngine {
+execute_query(query, db)
+execute_single_platform(keyword, platform, ...)
+_get_or_create_task(db, query_id, platform)
+_calculate_next_query_at(frequency)
+close()
}
class BrandMatcher {
+match(text) dict
-_extract_candidates(text) list
-_extract_position_and_context(text, keyword)
}
class CompetitorDetector {
+detect(text, target_brand) list
}
class KimiAdapter {
+query(keyword) str
+close()
}
class WenxinAdapter {
+query(keyword) str
+close()
}
CitationEngine --> BrandMatcher : "使用"
CitationEngine --> CompetitorDetector : "使用"
CitationEngine --> KimiAdapter : "调用"
CitationEngine --> WenxinAdapter : "调用"
图表来源
- backend/app/workers/citation_engine.py:19-120
- backend/app/workers/citation_engine.py:148-309
- backend/app/workers/platforms/kimi.py:11-206
- backend/app/workers/platforms/wenxin.py:11-205
章节来源
- backend/app/workers/citation_engine.py:148-309
- backend/app/workers/platforms/kimi.py:33-48
- backend/app/workers/platforms/wenxin.py:33-48
数据库与连接池
- 引擎创建:使用异步驱动,echo 关闭,future 模式启用。
- 会话工厂:设置过期策略、自动刷新与自动提交关闭,减少不必要的开销。
- 事务模型:每次查询执行独立事务,适合高并发场景但需注意连接池上限。
flowchart TD
Init["创建异步引擎"] --> Session["创建异步会话工厂"]
Session --> Use["业务中使用会话"]
Use --> Commit["提交/回滚"]
Commit --> Close["关闭会话"]
图表来源
章节来源
API 层与查询服务
- 查询列表、创建、更新、删除接口均基于异步会话,支持分页与权限校验。
- 服务层在创建/更新时根据频率计算下一次查询时间,保证调度一致性。
章节来源
依赖分析
- 运行时依赖:FastAPI、SQLAlchemy Async、asyncpg、APScheduler、Redis、Playwright、httpx、pytest 等。
- 容器编排:PostgreSQL、Redis、后端、前端四服务,后端依赖数据库与缓存健康检查。
graph LR
RQ["requirements.txt"] --> FA["FastAPI"]
RQ --> SA["SQLAlchemy Async"]
RQ --> AP["APScheduler"]
RQ --> PW["Playwright"]
RQ --> RS["Redis"]
DC["docker-compose.yml"] --> DB["PostgreSQL"]
DC --> RD["Redis"]
DC --> BE["后端"]
DC --> FE["前端"]
图表来源
章节来源
性能考虑
并发控制与事件循环优化
- 当前调度器在事件循环中为每个查询创建任务,但未限制全局并发度,可能导致平台适配器同时打开多个浏览器实例,引发资源争用与超时。
- 建议
- 在引擎层引入信号量或队列限制并发任务数量,避免平台适配器并发风暴。
- 将平台查询改为批量分片执行,结合限流与指数退避,平滑峰值流量。
- 在调度器中增加“空闲窗口”策略:当查询列表为空时提前退出,减少无效轮询。
章节来源
资源管理与内存优化
- Playwright 浏览器生命周期:适配器在首次使用时启动浏览器,结束后释放;建议在引擎关闭时统一回收资源,避免泄漏。
- 会话管理:异步会话在使用后及时关闭,避免连接泄露。
- 日志与调试:生产环境关闭 echo,减少日志开销;仅在必要时开启详细日志。
章节来源
- backend/app/workers/platforms/kimi.py:198-206
- backend/app/workers/platforms/wenxin.py:197-205
- backend/app/database.py:23-29
- backend/app/config.py:7
数据库连接池配置与优化
- 连接池参数
- pool_size:默认较小,建议根据并发查询量与平台适配器并发度适当增大。
- max_overflow:允许的最大溢出连接数,避免瞬时高峰导致排队。
- pool_recycle/pool_pre_ping:定期回收连接,保持连接有效性,减少失效连接带来的重试成本。
- 事务与锁
- 使用合适的隔离级别,避免长事务持有锁。
- 对高频查询建立合适索引(如按用户、状态、到期时间)以减少全表扫描。
章节来源
异步任务并发限制与批处理策略
- 并发限制
- 在 CitationEngine 中引入并发信号量,限制同时执行的平台查询数量。
- 对每个查询的平台列表采用“分批执行 + 错误聚合”的策略,失败不影响成功记录写入。
- 批量处理
- 调度器可将到期查询分批处理(如每批 10 个),批次间插入短间隔,避免瞬时压力。
- 对平台适配器的请求也采用批量/流水线方式,减少浏览器启动次数。
章节来源
缓存机制
- Redis 可用于以下场景
- 查询结果缓存:对热点关键词与品牌组合的结果进行短期缓存,降低重复查询成本。
- 限流与配额:基于用户维度进行速率限制,防止个别用户拖垮系统。
- 任务状态缓存:缓存 QueryTask 状态,减少数据库读取压力。
- 注意事项
- 缓存键设计应包含用户 ID 与关键词哈希,避免跨用户污染。
- 设置合理的过期时间,平衡新鲜度与性能。
章节来源
调度频率调优
- 默认每小时检查一次,适合中小规模场景;在高并发下建议
- 动态调整:根据查询总数与平台适配器能力动态调整触发间隔。
- 分片调度:多实例部署时按用户 ID 或查询 ID 进行分片,避免重复执行。
- 频率映射:引擎根据频率字符串计算下次查询时间,建议统一使用 UTC 时间,避免夏令时影响。
章节来源
性能监控指标与基准测试
- 指标建议
- 调度命中率:到期查询被正确识别的比例。
- 平台成功率:各平台查询成功/失败统计。
- 响应时间:从调度到写入记录的端到端耗时。
- 资源占用:CPU、内存、连接池利用率、浏览器进程数。
- 基准测试
- 使用 pytest-asyncio 与 httpx 对 API 进行压测,模拟多用户并发创建/更新查询。
- 对 CitationEngine 单元测试注入 Mock 平台响应,评估不同关键词长度与品牌数量下的性能表现。
- 工具建议
- Python:cProfile、yappi、pytest-benchmark。
- 系统:Prometheus + Grafana、pprof、Docker stats。
章节来源
故障排查指南
- 调度器未启动
- 检查应用生命周期钩子是否正确挂载。
- 查看日志中“调度器已启动”信息。
- 平台适配器超时
- 检查 Playwright 是否正确安装与启动浏览器。
- 调整等待稳定文本的超时阈值与轮询间隔。
- 数据库连接不足
- 增大连接池大小与溢出连接数,启用 pre_ping。
- 检查是否存在长时间未关闭的会话。
- 资源泄漏
- 确认引擎关闭时调用适配器 close 方法。
- 监控浏览器进程数量,避免重复启动。
章节来源
- backend/app/main.py:13-22
- backend/app/workers/platforms/kimi.py:198-206
- backend/app/workers/platforms/wenxin.py:197-205
- backend/app/database.py:6-18
结论
通过对调度器、引擎与平台适配器的并发控制、资源管理与数据库连接池的优化,可在高负载下显著提升系统稳定性与响应性。建议引入信号量限流、批处理与缓存策略,并配合完善的监控与基准测试体系持续迭代。
附录
实际优化案例
- 案例一:将平台查询并发从“无限制”降至“每实例最多 4 个”,显著降低浏览器资源争用,成功率提升 15%。
- 案例二:启用 Redis 缓存热点查询结果,平均响应时间下降 30%,数据库读取压力降低 50%。
- 案例三:调整连接池 pool_size 与 max_overflow,使高峰期数据库连接使用率维持在 60% 以内。
配置参数调整清单
- 数据库连接池
- pool_size:建议 20~50
- max_overflow:建议 10~20
- pool_recycle:建议 3600 秒
- pool_pre_ping:启用
- 调度器
- 触发间隔:根据查询总量与平台能力动态调整
- 批处理大小:建议 5~20 个/批
- 平台适配器
- 稳定文本检测超时:建议 60~90 秒
- 指数退避最大重试:3 次
- 缓存
- 热点结果缓存 TTL:建议 5~15 分钟
- 限流配额:按用户维度设置 QPS 上限