From 68b079f8cbf69335b8f934fba704427c0e5019a1 Mon Sep 17 00:00:00 2001 From: chiguyong Date: Thu, 4 Jun 2026 22:26:39 +0800 Subject: [PATCH] feat: add core monetization, detection, and content distribution integration tests --- .../test_content_distribution_flow.py | 152 ++++++++++++++++++ .../test_core_monetization_flow.py | 143 ++++++++++++++++ .../test_integration/test_detection_flow.py | 151 +++++++++++++++++ 3 files changed, 446 insertions(+) create mode 100644 backend/tests/test_integration/test_content_distribution_flow.py create mode 100644 backend/tests/test_integration/test_core_monetization_flow.py create mode 100644 backend/tests/test_integration/test_detection_flow.py diff --git a/backend/tests/test_integration/test_content_distribution_flow.py b/backend/tests/test_integration/test_content_distribution_flow.py new file mode 100644 index 0000000..e35e0ea --- /dev/null +++ b/backend/tests/test_integration/test_content_distribution_flow.py @@ -0,0 +1,152 @@ +"""Integration tests for the content generation and distribution flow. + +Covers: organization → content creation → distribution schedule → status tracking. +""" +import uuid + +import pytest +from sqlalchemy import select + +from app.models.organization import Organization +from app.models.content import Content +from app.models.distribution import DistributionSchedule +from tests.fixtures.auth import _to_uuid + + +class TestContentDistributionFlow: + """Test content creation through distribution.""" + + @pytest.mark.asyncio + async def test_content_to_distribution_flow(self, async_session, test_user): + """Test content creation through distribution scheduling.""" + # 1. Create organization (required FK for Content and DistributionSchedule) + org = Organization( + name="Test Org", + slug=f"test-org-{uuid.uuid4().hex[:8]}", + plan="free", + ) + async_session.add(org) + await async_session.commit() + await async_session.refresh(org) + + # 2. Create content under the organization + content = Content( + organization_id=org.id, + title="GEO优化文章", + body="这是一篇关于AI搜索优化的文章正文。", + content_type="article", + status="draft", + target_platforms=["wenxin", "kimi"], + created_by=test_user.id, + ) + async_session.add(content) + await async_session.commit() + await async_session.refresh(content) + + # 3. Create distribution schedule for the content + schedule = DistributionSchedule( + organization_id=org.id, + content_title=content.title, + content_id=content.id, + platforms=[ + {"platform": "wenxin", "platform_name": "文心一言", "status": "pending"}, + {"platform": "kimi", "platform_name": "Kimi", "status": "pending"}, + ], + status="pending", + created_by=test_user.id, + ) + async_session.add(schedule) + await async_session.commit() + await async_session.refresh(schedule) + + # 4. Verify the chain: org → content → distribution + assert content.organization_id == org.id + assert schedule.content_id == content.id + assert schedule.organization_id == org.id + assert len(schedule.platforms) == 2 + + @pytest.mark.asyncio + async def test_content_status_transitions(self, async_session, test_user): + """Test content status transitions: draft → published.""" + org = Organization( + name="Status Org", + slug=f"status-org-{uuid.uuid4().hex[:8]}", + plan="free", + ) + async_session.add(org) + await async_session.commit() + await async_session.refresh(org) + + content = Content( + organization_id=org.id, + title="Status Test Article", + body="Content body", + content_type="article", + status="draft", + created_by=test_user.id, + ) + async_session.add(content) + await async_session.commit() + await async_session.refresh(content) + assert content.status == "draft" + + # Transition to published + content.status = "published" + await async_session.commit() + await async_session.refresh(content) + assert content.status == "published" + + @pytest.mark.asyncio + async def test_distribution_schedule_status_tracking(self, async_session, test_user): + """Test distribution schedule status tracking: pending → completed.""" + org = Organization( + name="Dist Org", + slug=f"dist-org-{uuid.uuid4().hex[:8]}", + plan="free", + ) + async_session.add(org) + await async_session.commit() + await async_session.refresh(org) + + content = Content( + organization_id=org.id, + title="Distribution Test", + body="Body text", + content_type="article", + status="published", + created_by=test_user.id, + ) + async_session.add(content) + await async_session.commit() + await async_session.refresh(content) + + schedule = DistributionSchedule( + organization_id=org.id, + content_title=content.title, + content_id=content.id, + platforms=[ + {"platform": "wenxin", "status": "pending"}, + ], + status="pending", + created_by=test_user.id, + ) + async_session.add(schedule) + await async_session.commit() + await async_session.refresh(schedule) + assert schedule.status == "pending" + + # Mark as completed + schedule.status = "completed" + await async_session.commit() + await async_session.refresh(schedule) + assert schedule.status == "completed" + + # Verify it's still linked to the content + result = await async_session.execute( + select(DistributionSchedule).where( + DistributionSchedule.content_id == content.id + ) + ) + schedules = result.scalars().all() + assert len(schedules) == 1 + assert schedules[0].status == "completed" diff --git a/backend/tests/test_integration/test_core_monetization_flow.py b/backend/tests/test_integration/test_core_monetization_flow.py new file mode 100644 index 0000000..72c85d1 --- /dev/null +++ b/backend/tests/test_integration/test_core_monetization_flow.py @@ -0,0 +1,143 @@ +"""Integration tests for the core monetization closed-loop flow. + +Covers: brand creation → query → citation detection → diagnosis → attribution monitoring. +""" +import uuid +from datetime import datetime, timezone + +import pytest +from sqlalchemy import select + +from app.models.brand import Brand +from app.models.query import Query as QueryModel +from app.models.citation_record import CitationRecord +from app.models.diagnosis_record import DiagnosisRecord +from app.models.attribution_record import AttributionRecord +from tests.fixtures.auth import _make_user, _to_uuid + + +class TestCoreMonetizationFlow: + """Test the complete monetization closed loop from brand creation to monitoring.""" + + @pytest.mark.asyncio + async def test_full_monetization_closed_loop(self, async_session, test_user): + """Test the complete monetization closed loop: brand → query → citation → diagnosis → attribution.""" + # 1. Create brand + brand = Brand( + user_id=_to_uuid(test_user.id), + name="MonoBrand", + aliases=["MonoBrand", "MB"], + platforms=["wenxin"], + frequency="weekly", + ) + async_session.add(brand) + await async_session.commit() + await async_session.refresh(brand) + + # 2. Create query for the brand + query = QueryModel( + user_id=test_user.id, + keyword="AI搜索优化", + target_brand="MonoBrand", + brand_aliases=["MonoBrand", "MB"], + platforms=["wenxin"], + frequency="weekly", + status="active", + ) + async_session.add(query) + await async_session.commit() + await async_session.refresh(query) + + # 3. Create citation record (simulating detection result) + citation = CitationRecord( + query_id=query.id, + platform="wenxin", + cited=True, + citation_position=1, + citation_text="MonoBrand is a leading AI company...", + competitor_brands=[], + raw_response="{}", + confidence=0.85, + match_type="exact", + queried_at=datetime.now(timezone.utc), + ) + async_session.add(citation) + await async_session.commit() + + # 4. Verify data flows correctly — query has citations + result = await async_session.execute( + select(CitationRecord).where(CitationRecord.query_id == query.id) + ) + citations = result.scalars().all() + assert len(citations) == 1 + assert citations[0].cited is True + + # 5. Create diagnosis record (simulating GEO diagnosis) + diagnosis = DiagnosisRecord( + brand_id=brand.id, + user_id=_to_uuid(test_user.id), + diagnosis_type="geo", + status="completed", + overall_score=72.5, + result_json={"overall_score": 72.5, "dimensions": {}}, + ) + async_session.add(diagnosis) + await async_session.commit() + await async_session.refresh(diagnosis) + + # 6. Create attribution record (monitoring) + attribution = AttributionRecord( + user_id=test_user.id, + brand_id=brand.id, + baseline_score=72.5, + attribution_window_days=28, + status="tracking", + ) + async_session.add(attribution) + await async_session.commit() + await async_session.refresh(attribution) + + # 7. Verify the complete chain: brand → query → citation, brand → diagnosis, brand → attribution + assert brand.id is not None + assert citation.query_id == query.id + assert diagnosis.brand_id == brand.id + assert attribution.brand_id == brand.id + assert attribution.baseline_score == 72.5 + + @pytest.mark.asyncio + async def test_brand_not_found_returns_none(self, async_session, test_user): + """Test that querying a non-existent brand returns None.""" + fake_id = uuid.uuid4() + result = await async_session.execute( + select(Brand).where(Brand.id == fake_id) + ) + assert result.scalar_one_or_none() is None + + @pytest.mark.asyncio + async def test_user_data_isolation(self, async_session, test_user): + """Test that users cannot see each other's brands.""" + other_user = _make_user(email="other@example.com") + async_session.add(other_user) + await async_session.commit() + + brand1 = Brand( + user_id=_to_uuid(test_user.id), + name="User1 Brand", + platforms=["wenxin"], + ) + brand2 = Brand( + user_id=_to_uuid(other_user.id), + name="User2 Brand", + platforms=["kimi"], + ) + async_session.add(brand1) + async_session.add(brand2) + await async_session.commit() + + # User1 should only see their own brand + result = await async_session.execute( + select(Brand).where(Brand.user_id == _to_uuid(test_user.id)) + ) + user1_brands = result.scalars().all() + assert len(user1_brands) == 1 + assert user1_brands[0].name == "User1 Brand" diff --git a/backend/tests/test_integration/test_detection_flow.py b/backend/tests/test_integration/test_detection_flow.py new file mode 100644 index 0000000..dac2c91 --- /dev/null +++ b/backend/tests/test_integration/test_detection_flow.py @@ -0,0 +1,151 @@ +"""Integration tests for the detection task lifecycle flow. + +Covers: create detection task → toggle active state → verify → delete. +""" +import uuid + +import pytest +from sqlalchemy import select + +from app.models.brand import Brand +from app.models.detection_task import DetectionTask +from tests.fixtures.auth import _make_user, _to_uuid + + +class TestDetectionFlow: + """Test complete detection task lifecycle.""" + + @pytest.mark.asyncio + async def test_detection_task_lifecycle(self, async_session, test_user): + """Test complete detection task lifecycle: create → deactivate → reactivate → delete.""" + # 1. Create brand first + brand = Brand( + user_id=_to_uuid(test_user.id), + name="Detection Brand", + platforms=["wenxin"], + ) + async_session.add(brand) + await async_session.commit() + await async_session.refresh(brand) + + # 2. Create detection task + task = DetectionTask( + user_id=_to_uuid(test_user.id), + brand_id=brand.id, + name="Daily detection", + frequency="daily", + engines=["wenxin"], + queries=["AI搜索优化"], + is_active=True, + ) + async_session.add(task) + await async_session.commit() + await async_session.refresh(task) + assert task.is_active is True + assert task.frequency == "daily" + + # 3. Deactivate task (simulate pause) + task.is_active = False + await async_session.commit() + await async_session.refresh(task) + assert task.is_active is False + + # 4. Reactivate task + task.is_active = True + await async_session.commit() + await async_session.refresh(task) + assert task.is_active is True + + # 5. Delete task + await async_session.delete(task) + await async_session.commit() + + result = await async_session.execute( + select(DetectionTask).where(DetectionTask.id == task.id) + ) + assert result.scalar_one_or_none() is None + + @pytest.mark.asyncio + async def test_detection_task_belongs_to_brand(self, async_session, test_user): + """Test that detection tasks are correctly associated with brands.""" + brand = Brand( + user_id=_to_uuid(test_user.id), + name="Task Brand", + platforms=["wenxin"], + ) + async_session.add(brand) + await async_session.commit() + await async_session.refresh(brand) + + task = DetectionTask( + user_id=_to_uuid(test_user.id), + brand_id=brand.id, + name="Brand detection", + frequency="weekly", + engines=["wenxin", "kimi"], + queries=["品牌检测"], + is_active=True, + ) + async_session.add(task) + await async_session.commit() + + result = await async_session.execute( + select(DetectionTask).where(DetectionTask.brand_id == brand.id) + ) + tasks = result.scalars().all() + assert len(tasks) == 1 + assert tasks[0].brand_id == brand.id + assert tasks[0].engines == ["wenxin", "kimi"] + + @pytest.mark.asyncio + async def test_detection_task_user_isolation(self, async_session, test_user): + """Test that detection tasks are isolated per user.""" + other_user = _make_user(email="other-detect@example.com") + async_session.add(other_user) + await async_session.commit() + + brand1 = Brand( + user_id=_to_uuid(test_user.id), + name="User1 Detect Brand", + platforms=["wenxin"], + ) + brand2 = Brand( + user_id=_to_uuid(other_user.id), + name="User2 Detect Brand", + platforms=["kimi"], + ) + async_session.add(brand1) + async_session.add(brand2) + await async_session.commit() + await async_session.refresh(brand1) + await async_session.refresh(brand2) + + task1 = DetectionTask( + user_id=_to_uuid(test_user.id), + brand_id=brand1.id, + name="User1 Task", + frequency="daily", + engines=["wenxin"], + queries=["query1"], + is_active=True, + ) + task2 = DetectionTask( + user_id=_to_uuid(other_user.id), + brand_id=brand2.id, + name="User2 Task", + frequency="weekly", + engines=["kimi"], + queries=["query2"], + is_active=True, + ) + async_session.add(task1) + async_session.add(task2) + await async_session.commit() + + # test_user should only see their own tasks + result = await async_session.execute( + select(DetectionTask).where(DetectionTask.user_id == _to_uuid(test_user.id)) + ) + user_tasks = result.scalars().all() + assert len(user_tasks) == 1 + assert user_tasks[0].name == "User1 Task"