geo/backend/tests/test_scheduler.py

143 lines
4.5 KiB
Python

import pytest
import asyncio
import threading
import sys
sys.path.insert(0, '/Users/Chiguyong/Code/Fischer/geo/backend')
from datetime import datetime, timezone
# 模块级别事件循环,用于在测试运行时保持调度器活跃
_loop = None
_loop_thread = None
def start_scheduler_in_background():
"""在后台线程中启动调度器的事件循环"""
from app.workers.scheduler import query_scheduler
global _loop, _loop_thread
if query_scheduler.scheduler.running:
return
def run_loop():
global _loop
_loop = asyncio.new_event_loop()
asyncio.set_event_loop(_loop)
_loop.run_until_complete(start_scheduler_async())
async def start_scheduler_async():
from app.workers.scheduler import query_scheduler
query_scheduler.start()
_loop_thread = threading.Thread(target=run_loop, daemon=True)
_loop_thread.start()
_loop_thread.join(timeout=2) # 等待启动
@pytest.fixture(scope="module", autouse=True)
def scheduler_running():
"""启动调度器"""
start_scheduler_in_background()
yield
class TestScheduler:
"""定时调度器测试"""
def test_scheduler_job_registered(self):
"""调度器应注册所需任务"""
from app.workers.scheduler import query_scheduler
# 获取所有注册的任务
jobs = query_scheduler.scheduler.get_jobs()
job_ids = [job.id for job in jobs]
# 应包含检查查询任务
assert "check_queries" in job_ids
# 应包含检查pending任务
assert "check_pending_tasks" in job_ids
def test_job_can_be_triggered_manually(self):
"""任务应支持手动触发"""
from app.workers.scheduler import run_job_now, query_scheduler
# 验证任务可以被获取
job = query_scheduler.scheduler.get_job("check_queries")
assert job is not None
assert callable(job.func)
# 验证run_job_now返回True表示成功
result = run_job_now("check_queries")
assert result is True
def test_run_job_now_returns_false_for_unknown_job(self):
"""不存在的任务应返回False"""
from app.workers.scheduler import run_job_now
result = run_job_now("nonexistent_job")
assert result is False
def test_job_id_is_unique(self):
"""任务ID应唯一"""
from app.workers.scheduler import query_scheduler
jobs = query_scheduler.scheduler.get_jobs()
job_ids = [job.id for job in jobs]
assert len(job_ids) == len(set(job_ids))
def test_job_next_run_time_is_valid(self):
"""任务的下次运行时间应有效"""
from app.workers.scheduler import query_scheduler
jobs = query_scheduler.scheduler.get_jobs()
now = datetime.now(timezone.utc)
for job in jobs:
assert job.next_run_time is not None
# 应该是未来时间
job_time = job.next_run_time
if job_time.tzinfo is None:
job_time = job_time.replace(tzinfo=timezone.utc)
assert job_time > now
def test_job_stores_metadata(self):
"""任务应存储元数据"""
from app.workers.scheduler import query_scheduler
jobs = query_scheduler.scheduler.get_jobs()
for job in jobs:
# 任务应有ID和名称
assert hasattr(job, 'id')
assert hasattr(job, 'name')
assert job.id is not None
assert job.name is not None
def test_scheduler_has_two_jobs(self):
"""调度器应有两个注册的任务"""
from app.workers.scheduler import query_scheduler
jobs = query_scheduler.scheduler.get_jobs()
assert len(jobs) == 2
def test_check_queries_job_config(self):
"""检查check_queries任务配置"""
from app.workers.scheduler import query_scheduler
job = query_scheduler.scheduler.get_job("check_queries")
assert job is not None
assert job.name == "检查并执行到期的查询任务"
def test_check_pending_tasks_job_config(self):
"""检查check_pending_tasks任务配置"""
from app.workers.scheduler import query_scheduler
job = query_scheduler.scheduler.get_job("check_pending_tasks")
assert job is not None
assert job.name == "检查并执行遗留的pending查询任务"
def test_scheduler_instance_is_singleton(self):
"""query_scheduler应是单例"""
from app.workers.scheduler import query_scheduler, query_scheduler as qs2
assert query_scheduler is qs2