geo/backend/app/services/citation/citation_pattern.py

381 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import re
from dataclasses import dataclass, field
from typing import Any
from app.services.ai_engine.base import AIQueryResult
@dataclass
class CitationPattern:
pattern_type: str
pattern_name: str
frequency: float
confidence: float
description: str
details: dict[str, Any] = field(default_factory=dict)
@dataclass
class PatternAnalysisReport:
brand_id: str
query: str
total_results: int
patterns: list[CitationPattern]
content_structure_insights: dict[str, Any]
authority_signal_insights: dict[str, Any]
citation_format_insights: dict[str, Any]
engine_preferences: dict[str, Any]
recommendations: list[str] = field(default_factory=list)
_FAQ_PATTERNS = [
re.compile(r"Q:\s*.+?\s*A:\s*", re.DOTALL | re.IGNORECASE),
re.compile(r"问题[:]\s*.+?\s*回答[:]\s*", re.DOTALL),
re.compile(r"常见问题", re.IGNORECASE),
re.compile(r"FAQ", re.IGNORECASE),
]
_LIST_PATTERNS = [
re.compile(r"(?:^|\n)\s*\d+\.\s+", re.MULTILINE),
re.compile(r"(?:^|\n)\s*[-*]\s+", re.MULTILINE),
]
_TABLE_PATTERNS = [
re.compile(r"\|.+\|.+\|", re.MULTILINE),
re.compile(r"-{4,}\s+-{4,}", re.MULTILINE),
]
_QUOTE_PATTERNS = [
re.compile(r"^\s*>\s+", re.MULTILINE),
re.compile(r"[\"\u201c].+?[\"\u201d]"),
]
_DATA_CITATION_PATTERNS = [
re.compile(r"\d+%", re.MULTILINE),
re.compile(r"(?:study|research|survey|report|data|statistics)", re.IGNORECASE),
re.compile(r"(?:according to|based on|shown by|indicates?|reveals?)", re.IGNORECASE),
re.compile(r"\b(?:19|20)\d{2}\b"),
]
_EXPERT_CITATION_PATTERNS = [
re.compile(r"(?:Dr\.|Professor|Prof\.)\s+\w+", re.IGNORECASE),
re.compile(r"(?:expert|analyst|researcher|scientist)\s+\w+", re.IGNORECASE),
re.compile(r"(?:from|at)\s+(?:Harvard|Stanford|MIT|Oxford|Cambridge|Yale|Princeton)", re.IGNORECASE),
]
_CERTIFICATION_PATTERNS = [
re.compile(r"(?:ISO\s*\d+|FDA\s+approved|CE\s+certif|UL\s+listed|SOC\s*2|HIPAA|GDPR)", re.IGNORECASE),
re.compile(r"(?:certified|certification|compliant|accredited)", re.IGNORECASE),
]
_COMPARISON_PATTERNS = [
re.compile(r"(?:compared?\s+to|vs\.?|versus|while\s+.+\s*,\s*.+)", re.IGNORECASE),
re.compile(r"(?:优于|相比|对比|不如|胜过)"),
]
_DIRECT_CITATION_PATTERNS = [
re.compile(r"(?:states?|claims?|says?|mentions?|notes?|reports?|announces?)", re.IGNORECASE),
re.compile(r"(?:according to|as stated by|as reported by)", re.IGNORECASE),
]
_CONTENT_RULES: list[tuple[str, list[re.Pattern[str]], float, str]] = [
("faq_format", _FAQ_PATTERNS, 0.8, "FAQ format detected in AI responses"),
("list_format", _LIST_PATTERNS, 0.8, "List format detected in AI responses"),
("table_format", _TABLE_PATTERNS, 0.8, "Table format detected in AI responses"),
("quote_block", _QUOTE_PATTERNS, 0.7, "Quote block detected in AI responses"),
]
_AUTHORITY_RULES: list[tuple[str, list[re.Pattern[str]], float, str, int]] = [
("data_citation", _DATA_CITATION_PATTERNS, 0.85, "Data citation signals detected in AI responses", 2),
("expert_citation", _EXPERT_CITATION_PATTERNS, 0.8, "Expert citation signals detected in AI responses", 1),
("certification_mark", _CERTIFICATION_PATTERNS, 0.9, "Certification marks detected in AI responses", 1),
]
_RECOMMENDATION_RULES: list[tuple[str, str, float, str]] = [
("content_structure", "faq_format", 0.3, "Consider adding FAQ sections to improve AI citation probability"),
("content_structure", "list_format", 0.3, "Use structured lists to make content more extractable by AI engines"),
("authority_signal", "data_citation", 0.3, "Include data citations and statistics to increase authority signals"),
("authority_signal", "expert_citation", 0.3, "Add expert quotes and references to strengthen E-E-A-T signals"),
("citation_format", "direct_citation", 0.2, "Optimize content for direct citation by AI engines"),
]
def _matches_any(text: str, patterns: list[re.Pattern[str]]) -> bool:
return any(p.search(text) for p in patterns)
def _count_matches(text: str, patterns: list[re.Pattern[str]]) -> int:
return sum(1 for p in patterns if p.search(text))
def _build_type_insights(
patterns: list[CitationPattern],
pattern_type: str,
top_key: str,
) -> dict[str, Any]:
filtered = [p for p in patterns if p.pattern_type == pattern_type]
insights: dict[str, Any] = {f"{p.pattern_name}_frequency": p.frequency for p in filtered}
if filtered:
best = max(filtered, key=lambda p: p.frequency)
if best.frequency > 0:
insights[top_key] = best.pattern_name
return insights
class ContentStructureAnalyzer:
def analyze(self, results: list[AIQueryResult]) -> list[CitationPattern]:
if not results:
return self._empty_patterns()
counts: dict[str, int] = {name: 0 for name, _, _, _ in _CONTENT_RULES}
for r in results:
for name, patterns, _, _ in _CONTENT_RULES:
if _matches_any(r.raw_response, patterns):
counts[name] += 1
total = len(results)
return [
CitationPattern(
pattern_type="content_structure",
pattern_name=name,
frequency=counts[name] / total,
confidence=conf if counts[name] > 0 else 0.0,
description=desc,
details={"count": counts[name], "total": total},
)
for name, _, conf, desc in _CONTENT_RULES
]
def _empty_patterns(self) -> list[CitationPattern]:
return [
CitationPattern(
pattern_type="content_structure",
pattern_name=name,
frequency=0.0,
confidence=0.0,
description="",
details={},
)
for name, _, _, _ in _CONTENT_RULES
]
class AuthoritySignalAnalyzer:
def analyze(self, results: list[AIQueryResult]) -> list[CitationPattern]:
if not results:
return self._empty_patterns()
counts: dict[str, int] = {name: 0 for name, _, _, _, _ in _AUTHORITY_RULES}
extra: dict[str, int] = {name: 0 for name, _, _, _, _ in _AUTHORITY_RULES}
for r in results:
for name, patterns, _, _, threshold in _AUTHORITY_RULES:
match_count = _count_matches(r.raw_response, patterns)
if match_count >= threshold:
counts[name] += 1
extra[name] += match_count
total = len(results)
return [
CitationPattern(
pattern_type="authority_signal",
pattern_name=name,
frequency=counts[name] / total,
confidence=conf if counts[name] > 0 else 0.0,
description=desc,
details={"count": counts[name], "total": total, "match_count": extra[name]},
)
for name, _, conf, desc, _ in _AUTHORITY_RULES
]
def _empty_patterns(self) -> list[CitationPattern]:
return [
CitationPattern(
pattern_type="authority_signal",
pattern_name=name,
frequency=0.0,
confidence=0.0,
description="",
details={},
)
for name, _, _, _, _ in _AUTHORITY_RULES
]
class CitationFormatAnalyzer:
def analyze(self, results: list[AIQueryResult]) -> list[CitationPattern]:
if not results:
return self._empty_patterns()
direct_count = 0
indirect_count = 0
comparison_count = 0
for r in results:
if r.has_brand_citation:
if _matches_any(r.raw_response, _COMPARISON_PATTERNS) and r.has_competitor_citation:
comparison_count += 1
elif r.brand_context and _matches_any(r.brand_context, _DIRECT_CITATION_PATTERNS):
direct_count += 1
else:
indirect_count += 1
total = len(results)
return [
CitationPattern(
pattern_type="citation_format",
pattern_name="direct_citation",
frequency=direct_count / total,
confidence=0.9 if direct_count > 0 else 0.0,
description="Direct citation format detected",
details={"count": direct_count, "total": total},
),
CitationPattern(
pattern_type="citation_format",
pattern_name="indirect_citation",
frequency=indirect_count / total,
confidence=0.7 if indirect_count > 0 else 0.0,
description="Indirect citation format detected",
details={"count": indirect_count, "total": total},
),
CitationPattern(
pattern_type="citation_format",
pattern_name="comparison_citation",
frequency=comparison_count / total,
confidence=0.85 if comparison_count > 0 else 0.0,
description="Comparison citation format detected",
details={"count": comparison_count, "total": total},
),
]
def _empty_patterns(self) -> list[CitationPattern]:
return [
CitationPattern(
pattern_type="citation_format",
pattern_name=name,
frequency=0.0,
confidence=0.0,
description="",
details={},
)
for name in ("direct_citation", "indirect_citation", "comparison_citation")
]
class EnginePreferenceAnalyzer:
def analyze(self, results: list[AIQueryResult]) -> dict[str, dict[str, Any]]:
if not results:
return {}
engine_data: dict[str, dict[str, Any]] = {}
for r in results:
engine_name = r.engine_type.value
if engine_name not in engine_data:
engine_data[engine_name] = {
"results": [],
"citation_positions": [],
"format_hits": {"faq": 0, "list": 0, "table": 0},
}
entry = engine_data[engine_name]
entry["results"].append(r)
if r.has_brand_citation:
for c in r.citations:
entry["citation_positions"].append(c.position)
if _matches_any(r.raw_response, _FAQ_PATTERNS):
entry["format_hits"]["faq"] += 1
if _matches_any(r.raw_response, _LIST_PATTERNS):
entry["format_hits"]["list"] += 1
if _matches_any(r.raw_response, _TABLE_PATTERNS):
entry["format_hits"]["table"] += 1
prefs: dict[str, dict[str, Any]] = {}
for engine_name, data in engine_data.items():
total = len(data["results"])
cited = sum(1 for r in data["results"] if r.has_brand_citation)
positions = data["citation_positions"]
avg_pos = sum(positions) / len(positions) if positions else -1
prefs[engine_name] = {
"citation_rate": cited / total if total > 0 else 0.0,
"avg_citation_position": avg_pos,
"format_preferences": {
fmt: count / total if total > 0 else 0.0
for fmt, count in data["format_hits"].items()
},
}
return prefs
class CitationPatternEngine:
def __init__(self) -> None:
self.content_analyzer = ContentStructureAnalyzer()
self.authority_analyzer = AuthoritySignalAnalyzer()
self.format_analyzer = CitationFormatAnalyzer()
self.engine_analyzer = EnginePreferenceAnalyzer()
def analyze(
self,
results: list[AIQueryResult],
brand_id: str,
query: str,
) -> PatternAnalysisReport:
if not results:
return PatternAnalysisReport(
brand_id=brand_id,
query=query,
total_results=0,
patterns=[],
content_structure_insights={},
authority_signal_insights={},
citation_format_insights={},
engine_preferences={},
recommendations=[],
)
patterns: list[CitationPattern] = []
patterns.extend(self.content_analyzer.analyze(results))
patterns.extend(self.authority_analyzer.analyze(results))
patterns.extend(self.format_analyzer.analyze(results))
engine_prefs = self.engine_analyzer.analyze(results)
recommendations = self._generate_recommendations(patterns, engine_prefs)
return PatternAnalysisReport(
brand_id=brand_id,
query=query,
total_results=len(results),
patterns=patterns,
content_structure_insights=_build_type_insights(patterns, "content_structure", "dominant_format"),
authority_signal_insights=_build_type_insights(patterns, "authority_signal", "strongest_signal"),
citation_format_insights=_build_type_insights(patterns, "citation_format", "primary_format"),
engine_preferences=engine_prefs,
recommendations=recommendations,
)
def _generate_recommendations(
self,
patterns: list[CitationPattern],
engine_prefs: dict[str, dict],
) -> list[str]:
recommendations: list[str] = []
pattern_map = {(p.pattern_type, p.pattern_name): p for p in patterns}
for ptype, pname, threshold, message in _RECOMMENDATION_RULES:
p = pattern_map.get((ptype, pname))
if p and p.frequency < threshold:
recommendations.append(message)
for engine_name, prefs in engine_prefs.items():
if prefs["citation_rate"] < 0.3:
recommendations.append(
f"Low citation rate on {engine_name} ({prefs['citation_rate']:.0%}), "
f"consider optimizing content for this engine"
)
return recommendations