geo/backend/app/api/diagnosis.py

151 lines
4.6 KiB
Python

"""诊断API端点 - 提供SEO和GEO诊断功能"""
import logging
import uuid
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.api.deps import get_current_user
from app.database import get_db
from app.models.user import User
from app.models.brand import Brand
from app.services.seo_diagnosis import SEODiagnosisService
from app.services.geo_diagnosis import GEODiagnosisService, GEODiagnosisInput
logger = logging.getLogger(__name__)
router = APIRouter()
@router.get("/seo/{brand_id}")
async def get_seo_diagnosis(
brand_id: uuid.UUID,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""
获取品牌的SEO诊断结果
返回5维度SEO诊断:
- 技术SEO (25分)
- 页面SEO (20分)
- 内容质量 (20分)
- 外链分析 (15分)
- 用户体验 (20分)
"""
brand = await _get_brand_or_404(brand_id, current_user, db)
try:
service = SEODiagnosisService()
result = service.diagnose()
logger.info(f"SEO诊断完成: brand_id={brand_id}, brand={brand.name}, score={result.overall_score}")
return result.to_dict()
except Exception as e:
logger.error(f"SEO诊断失败: brand_id={brand_id}, error={str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="SEO诊断服务异常,请稍后重试",
)
@router.get("/geo/{brand_id}")
async def get_geo_diagnosis(
brand_id: uuid.UUID,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""
获取品牌的GEO诊断结果
返回6维度GEO诊断:
- 内容可提取性 (20分)
- 实体清晰度 (15分)
- E-E-A-T信号 (20分)
- Schema标记 (15分)
- 主题权威 (15分)
- 引用就绪度 (15分)
"""
brand = await _get_brand_or_404(brand_id, current_user, db)
try:
input_data = GEODiagnosisInput()
service = GEODiagnosisService()
result = service.diagnose(input_data)
logger.info(f"GEO诊断完成: brand_id={brand_id}, brand={brand.name}, score={result.overall_score}")
return result.to_dict()
except Exception as e:
logger.error(f"GEO诊断失败: brand_id={brand_id}, error={str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="GEO诊断服务异常,请稍后重试",
)
@router.get("/combined/{brand_id}")
async def get_combined_diagnosis(
brand_id: uuid.UUID,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db),
):
"""
获取品牌的综合诊断结果
结合SEO和GEO诊断,返回综合评分和详细诊断结果
"""
brand = await _get_brand_or_404(brand_id, current_user, db)
try:
seo_service = SEODiagnosisService()
seo_result = seo_service.diagnose()
geo_service = GEODiagnosisService()
geo_result = geo_service.diagnose(GEODiagnosisInput())
combined_score = round((seo_result.overall_score + geo_result.overall_score) / 2, 2)
logger.info(
f"综合诊断完成: brand_id={brand_id}, brand={brand.name}, "
f"seo_score={seo_result.overall_score}, "
f"geo_score={geo_result.overall_score}, "
f"combined_score={combined_score}"
)
return {
"seo_score": seo_result.overall_score,
"geo_score": geo_result.overall_score,
"combined_score": combined_score,
"seo_diagnosis": seo_result.to_dict(),
"geo_diagnosis": geo_result.to_dict(),
}
except Exception as e:
logger.error(f"综合诊断失败: brand_id={brand_id}, error={str(e)}", exc_info=True)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="综合诊断服务异常,请稍后重试",
)
async def _get_brand_or_404(
brand_id: uuid.UUID,
current_user: User,
db: AsyncSession,
) -> Brand:
"""获取品牌或抛出404异常"""
stmt = select(Brand).where(Brand.id == brand_id, Brand.user_id == current_user.id)
result = await db.execute(stmt)
brand = result.scalar_one_or_none()
if not brand:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="品牌不存在",
)
return brand