26 KiB
26 KiB
查询管理接口
**本文档引用的文件** - [backend/app/api/queries.py](file://backend/app/api/queries.py) - [backend/app/schemas/query.py](file://backend/app/schemas/query.py) - [backend/app/schemas/citation.py](file://backend/app/schemas/citation.py) - [backend/app/models/query.py](file://backend/app/models/query.py) - [backend/app/models/query_task.py](file://backend/app/models/query_task.py) - [backend/app/models/citation_record.py](file://backend/app/models/citation_record.py) - [backend/app/models/user.py](file://backend/app/models/user.py) - [backend/app/services/query.py](file://backend/app/services/query.py) - [backend/app/services/citation.py](file://backend/app/services/citation.py) - [backend/app/workers/scheduler.py](file://backend/app/workers/scheduler.py) - [backend/app/workers/citation_engine.py](file://backend/app/workers/citation_engine.py) - [backend/app/workers/platforms/base.py](file://backend/app/workers/platforms/base.py) - [backend/app/workers/platforms/kimi.py](file://backend/app/workers/platforms/kimi.py) - [backend/app/workers/platforms/wenxin.py](file://backend/app/workers/platforms/wenxin.py) - [backend/app/api/citations.py](file://backend/app/api/citations.py) - [backend/app/main.py](file://backend/app/main.py) - [tests/test_queries.py](file://tests/test_queries.py) - [tests/test_business_flow.py](file://tests/test_business_flow.py)更新摘要
变更内容
- 更新run-now功能的API文档,反映其已合并到主查询API中
- 修正API路由结构,移除独立的run_now_router结构
- 更新架构图和依赖关系,体现新的API组织方式
- 补充run-now接口的详细使用说明和错误处理
目录
简介
本文件为"查询管理系统"的详细API文档,覆盖查询任务的创建、读取、更新、删除与执行全流程;详述查询任务的数据模型、字段定义与验证规则;文档化查询任务的状态管理、调度机制与执行监控;包含查询参数配置、定时任务设置与批量操作接口的使用建议;并提供查询任务生命周期管理的最佳实践与错误处理策略。
项目结构
后端采用FastAPI + SQLAlchemy异步ORM + PostgreSQL数据库,查询管理相关模块分布如下:
- API层:负责路由定义与请求/响应封装,包含主查询API和引用API
- Schema层:Pydantic模型,定义请求体与响应体的字段与校验
- Model层:SQLAlchemy ORM模型,定义数据库表结构与索引
- Service层:业务逻辑封装,处理权限、计数限制与时间计算
- Worker层:定时调度器与引用检测引擎,驱动查询任务执行
- 平台适配器:Kimi与文心一言平台的适配实现
graph TB
subgraph "API层"
QAPI["查询API<br/>backend/app/api/queries.py"]
CAPI["引用API<br/>backend/app/api/citations.py"]
end
subgraph "Schema层"
QS["查询Schema<br/>backend/app/schemas/query.py"]
CS["引用Schema<br/>backend/app/schemas/citation.py"]
end
subgraph "Model层"
MQ["查询模型<br/>backend/app/models/query.py"]
MT["查询任务模型<br/>backend/app/models/query_task.py"]
MR["引用记录模型<br/>backend/app/models/citation_record.py"]
MU["用户模型<br/>backend/app/models/user.py"]
end
subgraph "Service层"
SQ["查询服务<br/>backend/app/services/query.py"]
SC["引用服务<br/>backend/app/services/citation.py"]
end
subgraph "Worker层"
SCH["调度器<br/>backend/app/workers/scheduler.py"]
CE["引用引擎<br/>backend/app/workers/citation_engine.py"]
end
subgraph "平台适配器"
BASE["适配器基类<br/>backend/app/workers/platforms/base.py"]
KIMI["Kimi适配器<br/>backend/app/workers/platforms/kimi.py"]
WENXIN["文心一言适配器<br/>backend/app/workers/platforms/wenxin.py"]
end
QAPI --> SQ
CAPI --> SC
SQ --> MQ
SC --> MQ
SC --> MR
SC --> MT
SCH --> CE
CE --> KIMI
CE --> WENXIN
QS --> QAPI
CS --> QAPI
CS --> CAPI
MQ --> MU
图表来源
- backend/app/api/queries.py:1-109
- backend/app/api/citations.py:1-55
- backend/app/schemas/query.py:1-94
- backend/app/schemas/citation.py:1-52
- backend/app/models/query.py:1-55
- backend/app/models/query_task.py:1-39
- backend/app/models/citation_record.py:1-42
- backend/app/models/user.py:1-41
- backend/app/services/query.py:1-130
- backend/app/services/citation.py:1-429
- backend/app/workers/scheduler.py:1-95
- backend/app/workers/citation_engine.py:1-309
- backend/app/workers/platforms/base.py:1-18
- backend/app/workers/platforms/kimi.py:1-206
- backend/app/workers/platforms/wenxin.py:1-205
章节来源
核心组件
- 查询API:提供查询任务的增删改查与分页列表接口,包含run-now立即执行功能
- 引用API:提供查询历史、统计与导出接口
- 查询Schema:定义创建/更新请求体与响应体的字段与校验规则
- 引用Schema:定义引用记录、统计和run-now响应的数据结构
- 查询模型:定义数据库表结构、索引与关联关系
- 查询任务模型:记录每次平台执行的任务状态与时间戳
- 引用记录模型:保存每次查询的结果与统计信息
- 用户模型:限制用户的最大查询数量
- 查询服务:实现权限控制、计数限制与下次查询时间计算
- 引用服务:实现引用数据查询、统计、立即执行和导出功能
- 调度器:基于APScheduler的定时任务,周期性检查并执行到期查询
- 引用引擎:跨平台执行查询、品牌匹配、竞争品牌检测与结果持久化
- 平台适配器:Kimi与文心一言的自动化查询实现
章节来源
- backend/app/api/queries.py:15-109
- backend/app/api/citations.py:19-55
- backend/app/schemas/query.py:11-94
- backend/app/schemas/citation.py:7-52
- backend/app/models/query.py:11-55
- backend/app/models/query_task.py:11-39
- backend/app/models/citation_record.py:11-42
- backend/app/models/user.py:11-41
- backend/app/services/query.py:12-130
- backend/app/services/citation.py:219-429
- backend/app/workers/scheduler.py:25-95
- backend/app/workers/citation_engine.py:148-309
- backend/app/workers/platforms/base.py:4-18
架构总览
查询管理系统的整体工作流如下:
- 客户端通过查询API创建查询任务,服务端进行权限与参数校验,并计算下次查询时间
- 调度器定时扫描满足条件的查询任务,触发引用引擎执行
- 引用引擎遍历平台列表,调用平台适配器获取AI回复,执行品牌匹配与竞争品牌检测,生成引用记录
- 查询任务模型记录每次执行的状态与时间,便于监控与重试
- 引用API提供查询历史、统计与导出接口
- 新增:run-now功能允许用户立即执行查询任务,绕过正常调度机制
sequenceDiagram
participant Client as "客户端"
participant API as "查询API"
participant Service as "查询服务"
participant DB as "数据库"
participant Scheduler as "调度器"
participant Engine as "引用引擎"
participant Adapter as "平台适配器"
Client->>API : "POST /api/v1/queries/"
API->>Service : "create_query(...)"
Service->>DB : "插入查询记录并计算next_query_at"
DB-->>Service : "返回新查询"
Service-->>API : "返回查询"
API-->>Client : "201 Created"
Note over Scheduler,Engine : "定时触发"
Scheduler->>DB : "查询status='active'且next_query_at<=now()"
DB-->>Scheduler : "返回待执行查询集合"
Scheduler->>Engine : "execute_query(query)"
Engine->>Adapter : "逐平台查询(keyword)"
Adapter-->>Engine : "返回原始回复"
Engine->>DB : "写入引用记录与更新查询时间"
Note over Client,API : "立即执行"
Client->>API : "POST /api/v1/queries/{query_id}/run-now"
API->>Service : "trigger_query_now(...)"
Service->>DB : "创建QueryTask并立即执行"
DB-->>Service : "返回任务"
Service-->>API : "返回任务"
API-->>Client : "202 Accepted"
图表来源
- backend/app/api/queries.py:28-41
- backend/app/api/queries.py:90-109
- backend/app/services/query.py:45-81
- backend/app/services/citation.py:219-261
- backend/app/workers/scheduler.py:51-84
- backend/app/workers/citation_engine.py:159-234
- backend/app/workers/platforms/kimi.py:33-48
- backend/app/workers/platforms/wenxin.py:33-48
详细组件分析
数据模型与字段定义
- 查询模型(queries)
- 关键字段:id、user_id、keyword、target_brand、brand_aliases、platforms、frequency、status、last_queried_at、next_query_at、created_at、updated_at
- 约束与索引:外键约束、多字段索引(user_id、status、next_query_at)
- 关联:一对多关联至用户与引用记录、查询任务
- 查询任务模型(query_tasks)
- 关键字段:id、query_id、platform、status、error_message、scheduled_at、started_at、completed_at
- 约束与索引:外键约束、索引(status)
- 关联:多对一关联至查询
- 引用记录模型(citation_records)
- 关键字段:id、query_id、platform、cited、citation_position、citation_text、competitor_brands、raw_response、queried_at
- 约束与索引:外键约束、多字段索引(query_id、queried_at、platform)
- 关联:多对一关联至查询
- 用户模型(users)
- 关键字段:id、email、password_hash、name、plan、max_queries、is_active、created_at、updated_at
- 关联:一对多关联至查询与订阅
erDiagram
USERS {
uuid id PK
string email UK
string password_hash
string name
string plan
int max_queries
boolean is_active
}
QUERIES {
uuid id PK
uuid user_id FK
string keyword
string target_brand
jsonb brand_aliases
jsonb platforms
string frequency
string status
timestamp last_queried_at
timestamp next_query_at
}
QUERY_TASKS {
uuid id PK
uuid query_id FK
string platform
string status
text error_message
timestamp scheduled_at
timestamp started_at
timestamp completed_at
}
CITATION_RECORDS {
uuid id PK
uuid query_id FK
string platform
boolean cited
int citation_position
text citation_text
jsonb competitor_brands
text raw_response
timestamp queried_at
}
USERS ||--o{ QUERIES : "拥有"
QUERIES ||--o{ CITATION_RECORDS : "产生"
QUERIES ||--o{ QUERY_TASKS : "驱动"
图表来源
- backend/app/models/user.py:11-41
- backend/app/models/query.py:11-55
- backend/app/models/query_task.py:11-39
- backend/app/models/citation_record.py:11-42
章节来源
- backend/app/models/user.py:11-41
- backend/app/models/query.py:11-55
- backend/app/models/query_task.py:11-39
- backend/app/models/citation_record.py:11-42
查询任务状态管理与调度机制
- 状态流转
- 查询任务状态:pending → running → success 或 failed
- 查询状态:active、paused、disabled
- 调度机制
- 使用APScheduler的AsyncIOScheduler,每小时检查一次
- 条件:查询状态为active且next_query_at小于等于当前UTC时间
- 触发引用引擎执行,逐平台查询并生成引用记录
- 执行监控
- 记录scheduled_at、started_at、completed_at
- 失败时记录error_message
- 更新查询的last_queried_at与next_query_at
stateDiagram-v2
[*] --> 待调度
待调度 --> 已调度 : "被调度器发现"
已调度 --> 执行中 : "开始执行"
执行中 --> 成功 : "全部平台成功"
执行中 --> 失败 : "任一平台失败"
成功 --> 待调度 : "更新next_query_at"
失败 --> 待调度 : "更新next_query_at"
图表来源
- backend/app/workers/scheduler.py:51-84
- backend/app/workers/citation_engine.py:176-234
- backend/app/models/query_task.py:24-32
章节来源
查询参数配置与验证规则
- 支持平台:wenxin、kimi、tongyi、baidu_ai、yuanbao、qingyan(创建/更新时校验)
- 频率:daily、weekly(默认weekly)
- 状态:active、paused、disabled(仅更新时可修改)
- 字段长度:keyword(1~200)、target_brand(1~100)
- 平台列表不能为空,若为空则报错
- 频率必须在允许集合内,否则报错
- 状态必须在允许集合内,否则报错
flowchart TD
Start(["进入校验"]) --> CheckPlatforms["校验平台列表非空"]
CheckPlatforms --> PlatformsOK{"平台有效?"}
PlatformsOK --> |否| ErrPlatforms["抛出平台无效错误"]
PlatformsOK --> |是| CheckFrequency["校验频率"]
CheckFrequency --> FreqOK{"频率有效?"}
FreqOK --> |否| ErrFreq["抛出频率无效错误"]
FreqOK --> |是| CheckStatus["校验状态(如提供)"]
CheckStatus --> StatusOK{"状态有效?"}
StatusOK --> |否| ErrStatus["抛出状态无效错误"]
StatusOK --> |是| Done(["校验通过"])
图表来源
章节来源
API定义与使用示例
-
查询任务管理
- GET /api/v1/queries/
- 功能:分页列出当前用户的所有查询任务
- 查询参数:skip(>=0)、limit(1~100)
- 响应:包含items与total的列表响应
- POST /api/v1/queries/
- 功能:创建新的查询任务
- 请求体:keyword、target_brand、brand_aliases、platforms、frequency
- 响应:创建成功的查询任务详情
- 错误:超过用户最大查询数时返回403
- GET /api/v1/queries/{query_id}
- 功能:获取指定查询任务详情
- 响应:查询任务详情
- 错误:不存在或不属于当前用户返回404
- PUT /api/v1/queries/{query_id}
- 功能:更新查询任务
- 请求体:可选字段keyword、target_brand、brand_aliases、platforms、frequency、status
- 响应:更新后的查询任务详情
- 错误:不存在返回404
- DELETE /api/v1/queries/{query_id}
- 功能:删除查询任务
- 响应:204 No Content
- 错误:不存在返回404
- GET /api/v1/queries/
-
立即执行查询(已合并到主查询API)
- POST /api/v1/queries/{query_id}/run-now
- 功能:立即将某个查询加入执行队列
- 响应:包含task_id、status与消息的RunNowResponse
- 状态码:202 Accepted(任务已加入队列)
- 错误:查询不存在、不属于当前用户或查询状态不为active时返回404
- POST /api/v1/queries/{query_id}/run-now
-
引用数据与统计
- GET /api/v1/citations/
- 功能:分页查询引用记录,支持按query_id、platform、日期范围过滤
- 响应:包含items与total的列表响应
- GET /api/v1/citations/stats
- 功能:获取引用统计
- 响应:统计结果
- GET /api/v1/citations/
章节来源
- backend/app/api/queries.py:17-109
- backend/app/api/citations.py:22-55
- backend/app/schemas/citation.py:48-52
生命周期管理最佳实践
- 合理设置frequency:daily适合高频监控,weekly适合常规跟踪
- 控制platforms数量:平台越多,耗时越长,成本越高
- 使用status暂停:在维护或节假日可将查询置为paused避免执行
- 监控next_query_at:确保调度器能按时触发
- 引用记录归档:定期清理过期记录,保持查询性能
- 新增:合理使用run-now功能:仅在紧急情况下使用,避免过度消耗资源
错误处理策略
- 参数校验失败:返回422,提示具体字段问题
- 权限不足/查询不存在:返回404
- 超过用户最大查询数:返回403
- 平台适配器异常:记录error_message,状态标记为failed
- 调度器异常:日志记录错误并继续运行
- 新增:run-now功能错误处理:查询状态非active、无平台配置等情况返回404
章节来源
- backend/app/api/queries.py:32-39
- backend/app/api/queries.py:49-53
- backend/app/api/queries.py:64-69
- backend/app/api/queries.py:79-84
- backend/app/api/queries.py:96-103
- backend/app/api/citations.py:65-71
依赖分析
- 组件耦合
- API层依赖Service层;Service层依赖Model层;Worker层依赖Model层与平台适配器
- 引用引擎依赖平台适配器,平台适配器继承自基类
- 新增:查询API现在直接依赖引用服务的trigger_query_now功能
- 外部依赖
- APscheduler用于定时调度
- Playwright用于平台网页自动化
- SQLAlchemy异步ORM用于数据库访问
graph LR
APIQ["查询API"] --> SVCQ["查询服务"]
APIC["引用API"] --> SVCC["引用服务"]
APIQ --> SVCC
SVCQ --> MODELS["查询/任务/记录模型"]
SVCC --> MODELS
SVCC --> CE["引用引擎"]
CE --> ADP["平台适配器"]
ADP --> BASE["适配器基类"]
SCH["调度器"] --> CE
图表来源
- backend/app/api/queries.py:1-14
- backend/app/api/citations.py:1-19
- backend/app/services/query.py:1-10
- backend/app/services/citation.py:1-17
- backend/app/workers/citation_engine.py:148-157
- backend/app/workers/platforms/base.py:4-18
- backend/app/workers/scheduler.py:25-39
章节来源
- backend/app/workers/scheduler.py:13-19
- backend/app/workers/platforms/kimi.py:17-32
- backend/app/workers/platforms/wenxin.py:17-32
性能考虑
- 数据库索引:为常用查询字段建立索引,减少扫描开销
- 分页查询:API层提供skip/limit参数,避免一次性返回大量数据
- 异步I/O:使用异步数据库连接与平台适配器,提升并发能力
- 调度频率:每小时检查一次,可根据业务量调整
- 结果缓存:对频繁查询的平台响应可引入缓存(需结合业务场景)
- 新增:run-now功能的异步执行:使用asyncio.create_task避免阻塞主请求线程
故障排查指南
- 调度器未启动:确认应用生命周期钩子已注册并启动调度器
- 平台适配器异常:检查Playwright浏览器安装与页面选择器是否匹配
- 查询未执行:检查查询状态与next_query_at是否满足调度条件
- 引用记录缺失:确认引用引擎执行流程与数据库提交顺序
- 权限错误:确认用户max_queries限制与当前查询数量
- 新增:run-now功能问题排查:检查查询状态是否为active、平台配置是否正确、任务创建是否成功
章节来源
- backend/app/main.py:13-22
- backend/app/workers/platforms/kimi.py:21-32
- backend/app/workers/platforms/wenxin.py:21-32
- backend/app/workers/scheduler.py:51-84
- backend/app/services/query.py:45-81
- backend/app/services/citation.py:219-261
结论
本查询管理系统以清晰的分层架构实现了查询任务的全生命周期管理:从创建、校验、调度到执行与监控。通过严格的参数校验、灵活的状态管理与可靠的调度机制,系统能够稳定支撑多平台、多频率的查询需求。最新的架构变更将run-now功能直接集成到主查询API中,简化了API结构并提供了更便捷的立即执行能力。 建议在生产环境中结合业务量调整调度频率、优化平台适配器稳定性,并完善监控告警体系。
附录
类关系图(代码级)
classDiagram
class Query {
+uuid id
+uuid user_id
+string keyword
+string target_brand
+list brand_aliases
+list platforms
+string frequency
+string status
+datetime last_queried_at
+datetime next_query_at
+datetime created_at
+datetime updated_at
}
class QueryTask {
+uuid id
+uuid query_id
+string platform
+string status
+string error_message
+datetime scheduled_at
+datetime started_at
+datetime completed_at
}
class CitationRecord {
+uuid id
+uuid query_id
+string platform
+boolean cited
+int citation_position
+string citation_text
+list competitor_brands
+string raw_response
+datetime queried_at
}
class User {
+uuid id
+string email
+string password_hash
+string name
+string plan
+int max_queries
+boolean is_active
}
class CitationEngine {
+execute_query(query, db) list
+execute_single_platform(keyword, platform, target_brand, brand_aliases) dict
}
class QueryScheduler {
+start() void
+check_and_execute_queries() void
+shutdown() void
}
class KimiAdapter {
+query(keyword) str
+close() void
}
class WenxinAdapter {
+query(keyword) str
+close() void
}
class RunNowResponse {
+uuid task_id
+string status
+string message
}
Query "1" --> "*" CitationRecord : "产生"
Query "1" --> "*" QueryTask : "驱动"
User "1" --> "*" Query : "拥有"
CitationEngine --> Query : "读取"
CitationEngine --> CitationRecord : "写入"
CitationEngine --> QueryTask : "写入"
QueryScheduler --> CitationEngine : "触发"
CitationEngine --> KimiAdapter : "调用"
CitationEngine --> WenxinAdapter : "调用"
图表来源
- backend/app/models/query.py:11-55
- backend/app/models/query_task.py:11-39
- backend/app/models/citation_record.py:11-42
- backend/app/models/user.py:11-41
- backend/app/workers/citation_engine.py:148-309
- backend/app/workers/scheduler.py:25-95
- backend/app/workers/platforms/kimi.py:11-206
- backend/app/workers/platforms/wenxin.py:11-205
- backend/app/schemas/citation.py:48-52
单次查询执行序列图
sequenceDiagram
participant API as "查询API"
participant Service as "查询服务"
participant DB as "数据库"
participant Engine as "引用引擎"
participant Task as "查询任务模型"
participant Record as "引用记录模型"
API->>Service : "create_query(...)"
Service->>DB : "插入查询记录"
DB-->>Service : "返回查询"
Service-->>API : "返回查询"
Note over Engine,DB : "调度器触发"
Engine->>DB : "查询平台列表"
Engine->>Task : "获取或创建任务"
Task-->>Engine : "任务对象"
Engine->>Task : "状态=running"
Engine->>Engine : "调用平台适配器"
Engine->>Record : "创建引用记录"
Engine->>DB : "提交事务"
Note over API,DB : "run-now立即执行"
API->>Service : "trigger_query_now(...)"
Service->>DB : "创建QueryTask并立即执行"
Service->>Engine : "_execute_query_tasks(...)"
Engine->>Task : "状态=running"
Engine->>Engine : "调用平台适配器"
Engine->>Record : "创建引用记录"
Engine->>DB : "提交事务"
图表来源