"""Integration tests for the detection task lifecycle flow. Covers: create detection task → toggle active state → verify → delete. """ import uuid import pytest from sqlalchemy import select from app.models.brand import Brand from app.models.detection_task import DetectionTask from tests.fixtures.auth import _make_user, _to_uuid class TestDetectionFlow: """Test complete detection task lifecycle.""" @pytest.mark.asyncio async def test_detection_task_lifecycle(self, async_session, test_user): """Test complete detection task lifecycle: create → deactivate → reactivate → delete.""" # 1. Create brand first brand = Brand( user_id=_to_uuid(test_user.id), name="Detection Brand", platforms=["wenxin"], ) async_session.add(brand) await async_session.commit() await async_session.refresh(brand) # 2. Create detection task task = DetectionTask( user_id=_to_uuid(test_user.id), brand_id=brand.id, name="Daily detection", frequency="daily", engines=["wenxin"], queries=["AI搜索优化"], is_active=True, ) async_session.add(task) await async_session.commit() await async_session.refresh(task) assert task.is_active is True assert task.frequency == "daily" # 3. Deactivate task (simulate pause) task.is_active = False await async_session.commit() await async_session.refresh(task) assert task.is_active is False # 4. Reactivate task task.is_active = True await async_session.commit() await async_session.refresh(task) assert task.is_active is True # 5. Delete task await async_session.delete(task) await async_session.commit() result = await async_session.execute( select(DetectionTask).where(DetectionTask.id == task.id) ) assert result.scalar_one_or_none() is None @pytest.mark.asyncio async def test_detection_task_belongs_to_brand(self, async_session, test_user): """Test that detection tasks are correctly associated with brands.""" brand = Brand( user_id=_to_uuid(test_user.id), name="Task Brand", platforms=["wenxin"], ) async_session.add(brand) await async_session.commit() await async_session.refresh(brand) task = DetectionTask( user_id=_to_uuid(test_user.id), brand_id=brand.id, name="Brand detection", frequency="weekly", engines=["wenxin", "kimi"], queries=["品牌检测"], is_active=True, ) async_session.add(task) await async_session.commit() result = await async_session.execute( select(DetectionTask).where(DetectionTask.brand_id == brand.id) ) tasks = result.scalars().all() assert len(tasks) == 1 assert tasks[0].brand_id == brand.id assert tasks[0].engines == ["wenxin", "kimi"] @pytest.mark.asyncio async def test_detection_task_user_isolation(self, async_session, test_user): """Test that detection tasks are isolated per user.""" other_user = _make_user(email="other-detect@example.com") async_session.add(other_user) await async_session.commit() brand1 = Brand( user_id=_to_uuid(test_user.id), name="User1 Detect Brand", platforms=["wenxin"], ) brand2 = Brand( user_id=_to_uuid(other_user.id), name="User2 Detect Brand", platforms=["kimi"], ) async_session.add(brand1) async_session.add(brand2) await async_session.commit() await async_session.refresh(brand1) await async_session.refresh(brand2) task1 = DetectionTask( user_id=_to_uuid(test_user.id), brand_id=brand1.id, name="User1 Task", frequency="daily", engines=["wenxin"], queries=["query1"], is_active=True, ) task2 = DetectionTask( user_id=_to_uuid(other_user.id), brand_id=brand2.id, name="User2 Task", frequency="weekly", engines=["kimi"], queries=["query2"], is_active=True, ) async_session.add(task1) async_session.add(task2) await async_session.commit() # test_user should only see their own tasks result = await async_session.execute( select(DetectionTask).where(DetectionTask.user_id == _to_uuid(test_user.id)) ) user_tasks = result.scalars().all() assert len(user_tasks) == 1 assert user_tasks[0].name == "User1 Task"