geo/backend/app/services/schema/schema_advisor_service.py

350 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import logging
import uuid
from datetime import datetime, timezone
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.schema_suggestion import SchemaSuggestion
from app.services.llm import LLMFactory, LLMError
from app.prompts.schema_advisor import SCHEMA_ADVISOR_TEMPLATE
from app.utils.json_extractor import extract_json
logger = logging.getLogger(__name__)
SCHEMA_TEMPLATES = {
"Organization": {
"@context": "https://schema.org",
"@type": "Organization",
"name": "",
"description": "",
"url": "",
"logo": "",
"sameAs": [],
"contactPoint": {
"@type": "ContactPoint",
"contactType": "customer service",
"telephone": "",
},
},
"Product": {
"@context": "https://schema.org",
"@type": "Product",
"name": "",
"description": "",
"brand": {"@type": "Brand", "name": ""},
"offers": {
"@type": "Offer",
"priceCurrency": "CNY",
"availability": "https://schema.org/InStock",
},
},
"FAQPage": {
"@context": "https://schema.org",
"@type": "FAQPage",
"mainEntity": [
{
"@type": "Question",
"name": "",
"acceptedAnswer": {
"@type": "Answer",
"text": "",
},
}
],
},
"Article": {
"@context": "https://schema.org",
"@type": "Article",
"headline": "",
"description": "",
"author": {"@type": "Organization", "name": ""},
"datePublished": "",
"image": "",
},
"LocalBusiness": {
"@context": "https://schema.org",
"@type": "LocalBusiness",
"name": "",
"address": {
"@type": "PostalAddress",
"streetAddress": "",
"addressLocality": "",
"addressRegion": "",
"postalCode": "",
"addressCountry": "CN",
},
"geo": {
"@type": "GeoCoordinates",
"latitude": "",
"longitude": "",
},
"telephone": "",
"openingHours": "",
},
}
DIMENSION_SCHEMA_MAP = {
"schema_marketing": ["Organization", "LocalBusiness"],
"entity_clarity": ["Organization", "Product"],
"citation_readiness": ["FAQPage", "Article"],
"brand_visibility": ["Organization", "Product"],
"local_seo": ["LocalBusiness"],
}
PRIORITY_THRESHOLD = {
"high": 30.0,
"medium": 60.0,
}
DIFFICULTY_MAP = {
"Organization": "easy",
"Product": "medium",
"FAQPage": "medium",
"Article": "easy",
"LocalBusiness": "hard",
}
class SchemaAdvisorService:
async def generate_suggestions(
self,
db: AsyncSession,
brand_id: uuid.UUID,
diagnosis_data: dict,
brand_info: dict,
target_url: str | None = None,
focus_dimensions: list[str] | None = None,
) -> list[SchemaSuggestion]:
missing_dimensions = self._identify_missing_dimensions(diagnosis_data, focus_dimensions)
matched = self.match_templates(missing_dimensions)
filled = await self.fill_template_with_llm(matched, brand_info)
suggestions = []
for item in filled:
validation = self.validate_json_ld(item.get("json_ld_filled") or {})
suggestion = SchemaSuggestion(
brand_id=brand_id,
schema_type=item["schema_type"],
target_url=target_url,
json_ld_template=item["json_ld_template"],
json_ld_filled=item.get("json_ld_filled"),
priority=item["priority"],
status="pending",
diagnosis_dimensions=item.get("diagnosis_dimensions"),
implementation_difficulty=DIFFICULTY_MAP.get(item["schema_type"], "medium"),
estimated_impact=item.get("estimated_impact"),
validation_errors=None if validation["is_valid"] else {"errors": validation["errors"]},
)
db.add(suggestion)
suggestions.append(suggestion)
await db.commit()
for s in suggestions:
await db.refresh(s)
return self.prioritize_suggestions(suggestions)
def match_templates(self, missing_dimensions: list[dict]) -> list[dict]:
matched = []
seen_types = set()
for dim in missing_dimensions:
schema_types = DIMENSION_SCHEMA_MAP.get(dim["dimension"], [])
for schema_type in schema_types:
if schema_type in seen_types:
continue
seen_types.add(schema_type)
template = SCHEMA_TEMPLATES.get(schema_type)
if template:
import copy
percentage = dim["percentage"]
if percentage < PRIORITY_THRESHOLD["high"]:
priority = "high"
elif percentage < PRIORITY_THRESHOLD["medium"]:
priority = "medium"
else:
priority = "low"
matched.append({
"schema_type": schema_type,
"priority": priority,
"diagnosis_dimensions": {
"dimension": dim["dimension"],
"current_score": dim["current_score"],
"max_score": dim["max_score"],
"percentage": dim["percentage"],
},
"json_ld_template": copy.deepcopy(template),
})
return matched
async def fill_template_with_llm(self, matched: list[dict], brand_info: dict) -> list[dict]:
provider = LLMFactory.get_default()
results = []
for item in matched:
schema_type = item["schema_type"]
template = item["json_ld_template"]
try:
variables = {
"brand_name": brand_info.get("name", ""),
"brand_website": brand_info.get("website", ""),
"brand_industry": brand_info.get("industry", ""),
"schema_type": schema_type,
"diagnosis_data": json.dumps(item.get("diagnosis_dimensions", {}), ensure_ascii=False),
"existing_schemas": "",
}
messages = SCHEMA_ADVISOR_TEMPLATE.render(variables)
response = await provider.chat(
messages,
temperature=0.3,
max_tokens=2048,
)
filled = json.loads(extract_json(response.content))
item["json_ld_filled"] = filled
item["estimated_impact"] = self._generate_impact_description(
schema_type, item.get("diagnosis_dimensions", {}).get("dimension", "")
)
except (json.JSONDecodeError, LLMError, ValueError) as e:
logger.warning(f"LLM填充Schema {schema_type} 失败: {e}")
item["json_ld_filled"] = None
item["estimated_impact"] = self._generate_impact_description(
schema_type, item.get("diagnosis_dimensions", {}).get("dimension", "")
)
results.append(item)
return results
def validate_json_ld(self, json_ld: dict) -> dict:
errors = []
warnings = []
if not json_ld:
return {"is_valid": False, "errors": ["JSON-LD为空"], "warnings": []}
if "@context" not in json_ld:
errors.append("缺少@context字段")
if "@type" not in json_ld:
errors.append("缺少@type字段")
if "@context" in json_ld and json_ld["@context"] != "https://schema.org":
warnings.append(f"@context值非标准: {json_ld.get('@context')}")
if "@type" in json_ld and json_ld["@type"] not in SCHEMA_TEMPLATES:
warnings.append(f"@type非推荐类型: {json_ld.get('@type')}")
try:
json.dumps(json_ld)
except (json.JSONDecodeError, TypeError) as e:
errors.append(f"JSON序列化失败: {e}")
return {
"is_valid": len(errors) == 0,
"errors": errors,
"warnings": warnings,
}
def prioritize_suggestions(self, suggestions: list[SchemaSuggestion]) -> list[SchemaSuggestion]:
priority_order = {"high": 0, "medium": 1, "low": 2}
return sorted(suggestions, key=lambda x: priority_order.get(x.priority, 1))
async def get_suggestions(
self,
db: AsyncSession,
brand_id: uuid.UUID,
status_filter: str | None = None,
schema_type: str | None = None,
skip: int = 0,
limit: int = 20,
) -> tuple[list[SchemaSuggestion], int]:
conditions = [SchemaSuggestion.brand_id == brand_id]
if status_filter:
conditions.append(SchemaSuggestion.status == status_filter)
if schema_type:
conditions.append(SchemaSuggestion.schema_type == schema_type)
count_stmt = select(func.count()).select_from(SchemaSuggestion).where(*conditions)
count_result = await db.execute(count_stmt)
total = count_result.scalar_one()
stmt = (
select(SchemaSuggestion)
.where(*conditions)
.order_by(SchemaSuggestion.created_at.desc())
.offset(skip)
.limit(limit)
)
result = await db.execute(stmt)
suggestions = list(result.scalars().all())
return self.prioritize_suggestions(suggestions), total
async def get_suggestion_by_id(
self,
db: AsyncSession,
suggestion_id: uuid.UUID,
) -> SchemaSuggestion | None:
stmt = select(SchemaSuggestion).where(SchemaSuggestion.id == suggestion_id)
result = await db.execute(stmt)
return result.scalar_one_or_none()
async def update_status(
self,
db: AsyncSession,
suggestion_id: uuid.UUID,
new_status: str,
) -> SchemaSuggestion | None:
stmt = select(SchemaSuggestion).where(SchemaSuggestion.id == suggestion_id)
result = await db.execute(stmt)
suggestion = result.scalar_one_or_none()
if not suggestion:
return None
suggestion.status = new_status
await db.commit()
await db.refresh(suggestion)
return suggestion
def _identify_missing_dimensions(
self,
diagnosis_data: dict,
focus_dimensions: list[str] | None = None,
) -> list[dict]:
dimensions = []
dimension_scores = diagnosis_data.get("dimensions", {})
for dim_name, dim_info in dimension_scores.items():
if dim_name not in DIMENSION_SCHEMA_MAP:
continue
if focus_dimensions and dim_name not in focus_dimensions:
continue
score = dim_info.get("score", 0) if isinstance(dim_info, dict) else dim_info
max_score = dim_info.get("max_score", 100) if isinstance(dim_info, dict) else 100
percentage = (score / max_score * 100) if max_score > 0 else 0
if percentage < 80:
dimensions.append({
"dimension": dim_name,
"current_score": round(score, 2),
"max_score": max_score,
"percentage": round(percentage, 2),
})
if not dimensions and diagnosis_data:
overall = diagnosis_data.get("overall_score", 0)
if overall < 80:
for dim_name in DIMENSION_SCHEMA_MAP:
if focus_dimensions and dim_name not in focus_dimensions:
continue
dimensions.append({
"dimension": dim_name,
"current_score": 0,
"max_score": 100,
"percentage": 0,
})
return dimensions
def _generate_impact_description(self, schema_type: str, dimension: str) -> str:
impacts = {
"Organization": "增强品牌实体识别提升AI搜索引擎对品牌的理解和引用概率",
"Product": "提升产品在搜索结果中的富摘要展示,增加点击率和引用率",
"FAQPage": "增加FAQ富摘要展示机会提升在AI回答中的直接引用概率",
"Article": "优化文章内容的结构化表达提升AI搜索引擎的内容理解和引用",
"LocalBusiness": "增强本地搜索可见性,提升地理位置相关查询的引用率",
}
return impacts.get(schema_type, f"提升{dimension}维度的得分和AI引用率")