geo/backend/app/services/distribution/publish_engine.py

109 lines
3.7 KiB
Python

import logging
import uuid
from datetime import datetime
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.content import Content
from app.models.distribution import DistributionSchedule
from app.services.distribution.formatter import ContentFormatter
from app.services.distribution.publishers import get_publisher
from app.services.distribution.publishers.base import PublishResult
logger = logging.getLogger(__name__)
class PublishEngine:
def __init__(self):
self._formatter = ContentFormatter()
async def publish_content(
self,
content_id: str,
platforms: list[str],
db: AsyncSession,
user_id: str,
org_id: str,
) -> list[PublishResult]:
stmt = select(Content).where(Content.id == uuid.UUID(content_id))
result = await db.execute(stmt)
content = result.scalar_one_or_none()
if not content:
raise ValueError(f"Content not found: {content_id}")
results: list[PublishResult] = []
platform_results: list[dict] = []
for platform in platforms:
publisher = get_publisher(platform)
formatted = self._formatter.format_for_platform(content.body or "", platform)
try:
pub_result = await publisher.publish(
title=content.title,
content=formatted,
)
results.append(pub_result)
platform_results.append({
"platform": platform,
"status": "published" if pub_result.success else "failed",
"article_id": pub_result.article_id,
"article_url": pub_result.article_url,
"error": pub_result.error,
"published_at": datetime.now().isoformat() if pub_result.success else None,
})
except Exception as e:
logger.error(f"Publish to {platform} failed: {e}")
fail_result = PublishResult(
success=False,
platform=platform,
error=str(e),
)
results.append(fail_result)
platform_results.append({
"platform": platform,
"status": "failed",
"error": str(e),
})
schedule = DistributionSchedule(
organization_id=uuid.UUID(org_id) if isinstance(org_id, str) else org_id,
content_title=content.title,
content_id=content.id,
platforms=platform_results,
status="published" if all(r.success for r in results) else "partial",
created_by=user_id,
)
db.add(schedule)
await db.commit()
return results
async def get_publish_status(
self,
content_id: str,
db: AsyncSession,
) -> list[dict]:
stmt = (
select(DistributionSchedule)
.where(DistributionSchedule.content_id == uuid.UUID(content_id))
.order_by(DistributionSchedule.created_at.desc())
)
result = await db.execute(stmt)
schedules = result.scalars().all()
status_list: list[dict] = []
for schedule in schedules:
platforms = schedule.platforms or []
for p in platforms:
status_list.append({
"platform": p.get("platform", ""),
"status": p.get("status", "unknown"),
"article_url": p.get("article_url"),
"published_at": p.get("published_at"),
})
return status_list