feat(configs): add GEO AgentKit Server configuration

- llm_config.yaml: DeepSeek + OpenAI-compatible providers with env var substitution
- skills/ (8 YAML): citation_detector, content_generator, deai_agent, geo_optimizer,
  monitor, schema_advisor, competitor_analyzer, trend_agent
  - Added intent fields for content_generator, competitor_analyzer, trend_agent
  - Added quality_gate fields for content_generator, deai_agent, geo_optimizer
  - Updated custom_handler paths to configs.geo_handlers
- geo_tools.py: 14 FunctionTools calling GEO Backend via HTTP
- geo_handlers.py: 3 custom handlers (citation/monitor/schema) calling /internal/ API
- geo_server.py: FastAPI factory with LLM Gateway, Tool Registry, Skill Registry
This commit is contained in:
chiguyong 2026-06-05 23:25:14 +08:00
parent 47a848fbcb
commit 669ca604e5
13 changed files with 1243 additions and 0 deletions

1
configs/__init__.py Normal file
View File

@ -0,0 +1 @@
"""GEO AgentKit Server 配置包"""

85
configs/geo_handlers.py Normal file
View File

@ -0,0 +1,85 @@
"""GEO 项目的 Custom Handler — 供 AgentKit Server 使用
所有 Handler 通过 HTTP 回调 GEO Backend /internal/ 端点不直接访问 DB
"""
import logging
import os
import httpx
from agentkit.core.protocol import TaskMessage
logger = logging.getLogger(__name__)
GEO_BACKEND_URL = os.getenv("GEO_BACKEND_URL", "http://localhost:8000")
INTERNAL_API_TOKEN = os.getenv("INTERNAL_API_TOKEN", "")
def _internal_headers() -> dict:
"""获取内部 API 请求头"""
headers = {"Content-Type": "application/json"}
if INTERNAL_API_TOKEN:
headers["X-Internal-Token"] = INTERNAL_API_TOKEN
return headers
async def handle_citation_task(task: TaskMessage) -> dict:
"""引用检测任务 — 通过 HTTP 回调 GEO Backend
task_type 路由
- citation_detect: POST /internal/citation/detect
- citation_detect_single: POST /internal/citation/detect-single
"""
if task.task_type == "citation_detect":
return await _call_internal("/internal/citation/detect", task.input_data)
elif task.task_type == "citation_detect_single":
return await _call_internal("/internal/citation/detect-single", task.input_data)
else:
raise ValueError(f"Unsupported task type: {task.task_type}")
async def handle_monitor_task(task: TaskMessage) -> dict:
"""效果追踪任务 — 通过 HTTP 回调 GEO Backend
task_type 路由
- monitor_track: POST /internal/monitor/track
- monitor_check_single: POST /internal/monitor/check-single
"""
if task.task_type == "monitor_track":
return await _call_internal("/internal/monitor/track", task.input_data)
elif task.task_type == "monitor_check_single":
return await _call_internal("/internal/monitor/check-single", task.input_data)
else:
raise ValueError(f"Unsupported task type: {task.task_type}")
async def handle_schema_task(task: TaskMessage) -> dict:
"""Schema 建议任务 — 通过 HTTP 回调 GEO Backend
task_type 路由
- schema_advise: POST /internal/schema/advise
"""
if task.task_type == "schema_advise":
return await _call_internal("/internal/schema/advise", task.input_data)
else:
raise ValueError(f"Unsupported task type: {task.task_type}")
async def _call_internal(path: str, input_data: dict) -> dict:
"""调用 GEO Backend 内部 API"""
try:
async with httpx.AsyncClient(timeout=300.0) as client:
resp = await client.post(
f"{GEO_BACKEND_URL}{path}",
json=input_data,
headers=_internal_headers(),
)
resp.raise_for_status()
return resp.json()
except httpx.HTTPStatusError as e:
logger.error(f"HTTP error calling {path}: {e.response.status_code} {e.response.text[:500]}")
return {"error": f"HTTP {e.response.status_code}", "detail": e.response.text[:500]}
except Exception as e:
logger.error(f"Error calling {path}: {e}")
return {"error": str(e)}

111
configs/geo_server.py Normal file
View File

@ -0,0 +1,111 @@
"""GEO AgentKit Server 启动入口
工厂函数 create_geo_app() 初始化 LLM GatewayTool RegistrySkill Registry
然后创建 FastAPI 应用
使用方式
uvicorn configs.geo_server:create_geo_app --factory --host 0.0.0.0 --port 8001
"""
import logging
import os
from agentkit.core.agent_pool import AgentPool
from agentkit.llm.config import LLMConfig
from agentkit.llm.gateway import LLMGateway
from agentkit.llm.providers.openai import OpenAICompatibleProvider
from agentkit.quality.gate import QualityGate
from agentkit.quality.output import OutputStandardizer
from agentkit.router.intent import IntentRouter
from agentkit.server.app import create_app
from agentkit.skills.loader import SkillLoader
from agentkit.skills.registry import SkillRegistry
from agentkit.tools.registry import ToolRegistry
logger = logging.getLogger(__name__)
# ─── 配置路径 ───
CONFIGS_DIR = os.path.dirname(os.path.abspath(__file__))
LLM_CONFIG_PATH = os.path.join(CONFIGS_DIR, "llm_config.yaml")
SKILLS_DIR = os.path.join(CONFIGS_DIR, "skills")
def _substitute_env_vars(config_path: str) -> dict:
"""加载 YAML 配置并替换 ${VAR} 环境变量"""
import yaml
with open(config_path, encoding="utf-8") as f:
raw = f.read()
# 递归替换 ${VAR_NAME} 和 ${VAR_NAME:-default} 格式
import re
def _replace_env(match):
var_expr = match.group(1)
if ":-" in var_expr:
var_name, default = var_expr.split(":-", 1)
return os.getenv(var_name, default)
return os.getenv(var_expr, match.group(0))
resolved = re.sub(r"\$\{([^}]+)\}", _replace_env, raw)
return yaml.safe_load(resolved)
def _init_llm_gateway() -> LLMGateway:
"""初始化 LLM Gateway 并注册 Provider"""
config_data = _substitute_env_vars(LLM_CONFIG_PATH)
config = LLMConfig.from_dict(config_data)
gateway = LLMGateway(config)
for provider_name, pconf in config.providers.items():
if not pconf.api_key:
logger.warning(f"Skipping provider '{provider_name}': no API key")
continue
models = list(pconf.models.keys()) if pconf.models else []
default_model = models[0] if models else "gpt-4o-mini"
provider = OpenAICompatibleProvider(
api_key=pconf.api_key,
base_url=pconf.base_url,
default_model=default_model,
)
gateway.register_provider(provider_name, provider)
logger.info(f"Provider '{provider_name}' registered with model '{default_model}'")
return gateway
def _init_tool_registry() -> ToolRegistry:
"""初始化 Tool Registry 并注册 GEO Tools"""
registry = ToolRegistry()
from configs.geo_tools import register_geo_tools
register_geo_tools(registry)
return registry
def _init_skill_registry(tool_registry: ToolRegistry) -> SkillRegistry:
"""初始化 Skill Registry 并从 configs/skills/ 目录加载"""
registry = SkillRegistry()
loader = SkillLoader(registry, tool_registry)
skills = loader.load_from_directory(SKILLS_DIR)
logger.info(f"Loaded {len(skills)} skills from {SKILLS_DIR}")
return registry
def create_geo_app() -> "FastAPI":
"""GEO AgentKit Server FastAPI 工厂函数"""
llm_gateway = _init_llm_gateway()
tool_registry = _init_tool_registry()
skill_registry = _init_skill_registry(tool_registry)
app = create_app(
llm_gateway=llm_gateway,
skill_registry=skill_registry,
tool_registry=tool_registry,
)
app.title = "GEO AgentKit Server"
logger.info(f"GEO AgentKit Server initialized: {len(skill_registry.list_skills())} skills, "
f"{len(tool_registry.list_tools())} tools")
return app

465
configs/geo_tools.py Normal file
View File

@ -0,0 +1,465 @@
"""GEO 项目的 Tool 注册 — 供 AgentKit Server 使用
所有 Tool 通过 HTTP 调用 GEO Backend 的业务 API不直接 import GEO 服务类
"""
import logging
import os
from typing import Any
import httpx
from agentkit.tools.function_tool import FunctionTool
from agentkit.tools.registry import ToolRegistry
logger = logging.getLogger(__name__)
GEO_BACKEND_URL = os.getenv("GEO_BACKEND_URL", "http://localhost:8000")
INTERNAL_API_TOKEN = os.getenv("INTERNAL_API_TOKEN", "")
def _internal_headers() -> dict:
"""获取内部 API 请求头"""
headers = {"Content-Type": "application/json"}
if INTERNAL_API_TOKEN:
headers["X-Internal-Token"] = INTERNAL_API_TOKEN
return headers
# ─── Citation Tools ───
async def execute_single_platform(
keyword: str,
platform: str,
target_brand: str,
brand_aliases: list[str] | None = None,
) -> dict:
"""在单个 AI 平台执行引用检测"""
try:
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(
f"{GEO_BACKEND_URL}/api/v1/ai-engines/execute-single-platform",
json={
"keyword": keyword,
"platform": platform,
"target_brand": target_brand,
"brand_aliases": brand_aliases or [],
},
)
resp.raise_for_status()
return resp.json()
except Exception as e:
logger.error(f"execute_single_platform 失败: {e}")
return {"error": str(e), "keyword": keyword, "platform": platform}
async def get_or_create_task(query_id: str, platform: str) -> dict:
"""获取或创建查询任务 — 通过内部 API"""
try:
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.post(
f"{GEO_BACKEND_URL}/internal/citation/get-or-create-task",
json={"query_id": query_id, "platform": platform},
headers=_internal_headers(),
)
resp.raise_for_status()
return resp.json()
except Exception as e:
logger.error(f"get_or_create_task 失败: {e}")
return {"error": str(e), "query_id": query_id, "platform": platform}
# ─── Content Tools ───
async def retrieve_knowledge(
knowledge_base_ids: list[str],
query: str,
top_k: int = 5,
) -> dict:
"""从知识库检索相关内容 — 通过内部 API"""
if not knowledge_base_ids or not query:
return {"content": "暂无相关知识库内容", "sources": []}
try:
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.post(
f"{GEO_BACKEND_URL}/internal/knowledge/search",
json={"query": query, "knowledge_base_ids": knowledge_base_ids, "top_k": top_k},
headers=_internal_headers(),
)
resp.raise_for_status()
data = resp.json()
results = data.get("results", [])
if results:
content_parts = []
sources = []
for r in results:
title = r.get("document_title", "未知")
content_parts.append(f"[来源: {title}]\n{r.get('content', '')}")
sources.append(title)
return {"content": "\n\n---\n\n".join(content_parts), "sources": sources}
return {"content": "暂无相关知识库内容", "sources": []}
except Exception as e:
logger.warning(f"retrieve_knowledge 失败: {e}")
return {"content": "暂无相关知识库内容", "sources": []}
# ─── Monitor Tools ───
async def monitor_check_and_compare(record_id: str) -> dict:
"""检测并对比监测记录的变化 — 通过内部 API"""
try:
async with httpx.AsyncClient(timeout=60.0) as client:
resp = await client.post(
f"{GEO_BACKEND_URL}/internal/monitor/check",
json={"record_id": record_id},
headers=_internal_headers(),
)
resp.raise_for_status()
return resp.json()
except Exception as e:
logger.error(f"monitor_check_and_compare 失败: {e}")
return {"error": str(e), "record_id": record_id}
async def monitor_generate_report(record_id: str) -> dict:
"""生成监测变化报告 — 通过内部 API"""
try:
async with httpx.AsyncClient(timeout=60.0) as client:
resp = await client.post(
f"{GEO_BACKEND_URL}/internal/monitor/generate-report",
json={"record_id": record_id},
headers=_internal_headers(),
)
resp.raise_for_status()
return resp.json()
except Exception as e:
logger.error(f"monitor_generate_report 失败: {e}")
return {"error": str(e), "record_id": record_id}
async def monitor_create_record(
brand_id: str,
query_keywords: str | None = None,
platform: str | None = None,
check_interval_hours: int = 24,
) -> dict:
"""创建监测记录 — 通过内部 API"""
try:
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.post(
f"{GEO_BACKEND_URL}/internal/monitor/create-record",
json={
"brand_id": brand_id,
"query_keywords": query_keywords,
"platform": platform,
"check_interval_hours": check_interval_hours,
},
headers=_internal_headers(),
)
resp.raise_for_status()
return resp.json()
except Exception as e:
logger.error(f"monitor_create_record 失败: {e}")
return {"error": str(e), "brand_id": brand_id}
# ─── Schema Tools ───
SCHEMA_TEMPLATES = {
"Organization": {
"@context": "https://schema.org", "@type": "Organization",
"name": "", "description": "", "url": "", "logo": "", "sameAs": [],
},
"Product": {
"@context": "https://schema.org", "@type": "Product",
"name": "", "description": "",
"brand": {"@type": "Brand", "name": ""},
},
"FAQPage": {
"@context": "https://schema.org", "@type": "FAQPage",
"mainEntity": [{"@type": "Question", "name": "", "acceptedAnswer": {"@type": "Answer", "text": ""}}],
},
"Article": {
"@context": "https://schema.org", "@type": "Article",
"headline": "", "description": "", "author": {"@type": "Organization", "name": ""},
},
"LocalBusiness": {
"@context": "https://schema.org", "@type": "LocalBusiness",
"name": "", "address": {"@type": "PostalAddress"},
},
}
DIMENSION_SCHEMA_MAP = {
"schema_marketing": ["Organization", "LocalBusiness"],
"entity_clarity": ["Organization", "Product"],
"citation_readiness": ["FAQPage", "Article"],
"brand_visibility": ["Organization", "Product"],
"local_seo": ["LocalBusiness"],
}
async def fill_schema_with_llm(
schema_type: str,
brand_info: dict | None = None,
diagnosis_dimensions: dict | None = None,
) -> dict:
"""使用 LLM 填充 Schema JSON-LD 模板 — 通过 GEO Backend 内部 API"""
try:
async with httpx.AsyncClient(timeout=60.0) as client:
resp = await client.post(
f"{GEO_BACKEND_URL}/internal/schema/advise",
json={
"schema_type": schema_type,
"brand_info": brand_info or {},
"diagnosis_dimensions": diagnosis_dimensions or {},
},
headers=_internal_headers(),
)
resp.raise_for_status()
return resp.json()
except Exception as e:
logger.error(f"fill_schema_with_llm 失败: {e}")
return {"error": str(e), "schema_type": schema_type}
async def identify_missing_dimensions(
diagnosis_data: dict,
focus_dimensions: list[str] | None = None,
) -> dict:
"""识别 Schema 缺失维度"""
dimensions = []
dimension_scores = diagnosis_data.get("dimensions", {})
for dim_name, dim_info in dimension_scores.items():
if dim_name not in DIMENSION_SCHEMA_MAP:
continue
if focus_dimensions and dim_name not in focus_dimensions:
continue
score = dim_info.get("score", 0) if isinstance(dim_info, dict) else dim_info
max_score = dim_info.get("max_score", 100) if isinstance(dim_info, dict) else 100
percentage = (score / max_score * 100) if max_score > 0 else 0
if percentage < 80:
dimensions.append({
"dimension": dim_name,
"current_score": round(score, 2),
"max_score": max_score,
"percentage": round(percentage, 2),
})
return {"missing_dimensions": dimensions}
# ─── Competitor Tools ───
async def competitor_analyze(
brand_id: str,
analysis_types: list[str] | None = None,
period_days: int = 30,
) -> dict:
"""执行竞品策略分析 — 通过 GEO Backend API"""
try:
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(
f"{GEO_BACKEND_URL}/api/v1/competitor/analyze",
json={
"brand_id": brand_id,
"analysis_types": analysis_types,
"period_days": period_days,
},
)
resp.raise_for_status()
return resp.json()
except Exception as e:
logger.error(f"competitor_analyze 失败: {e}")
return {"error": str(e), "brand_id": brand_id}
async def competitor_gap_analysis(
brand_id: str,
period_days: int = 30,
) -> dict:
"""执行竞品差距分析 — 通过 GEO Backend API"""
return await competitor_analyze(
brand_id=brand_id,
analysis_types=["citation_gap", "platform_coverage", "query_overlap"],
period_days=period_days,
)
# ─── Trend Tools ───
async def trend_insight(
brand_id: str,
days: int = 30,
platforms: list[str] | None = None,
keywords: list[str] | None = None,
) -> dict:
"""执行趋势洞察分析 — 通过 GEO Backend API"""
try:
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(
f"{GEO_BACKEND_URL}/api/v1/trends/insight",
json={
"brand_id": brand_id,
"days": days,
"platforms": platforms,
"keywords": keywords,
},
)
resp.raise_for_status()
return resp.json()
except Exception as e:
logger.error(f"trend_insight 失败: {e}")
return {"error": str(e), "brand_id": brand_id}
async def trend_hotspot(
brand_id: str,
days: int = 30,
) -> dict:
"""检测引用量突增的热点话题 — 通过 GEO Backend API"""
try:
async with httpx.AsyncClient(timeout=120.0) as client:
resp = await client.post(
f"{GEO_BACKEND_URL}/api/v1/trends/hotspot",
json={"brand_id": brand_id, "days": days},
)
resp.raise_for_status()
return resp.json()
except Exception as e:
logger.error(f"trend_hotspot 失败: {e}")
return {"error": str(e), "brand_id": brand_id}
# ─── Knowledge Tools ───
async def search_knowledge(
query: str,
knowledge_base_ids: list[str],
top_k: int = 5,
) -> dict:
"""从知识库检索相关内容 — 通过内部 API"""
return await retrieve_knowledge(
knowledge_base_ids=knowledge_base_ids,
query=query,
top_k=top_k,
)
async def detect_ai_patterns(content: str, platform_id: str) -> dict:
"""检测内容中的 AI 生成模式 — 通过 GEO Backend API"""
try:
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.post(
f"{GEO_BACKEND_URL}/api/v1/ai-engines/detect-ai-patterns",
json={"content": content, "platform_id": platform_id},
)
resp.raise_for_status()
return resp.json()
except Exception as e:
logger.error(f"detect_ai_patterns 失败: {e}")
return {"error": str(e), "patterns": [], "count": 0}
# ─── Registration ───
def register_geo_tools(registry: ToolRegistry) -> None:
"""注册 GEO 项目的所有 Tool"""
# Citation
registry.register(FunctionTool(
name="execute_single_platform",
description="在单个AI平台执行引用检测",
func=execute_single_platform,
tags=["citation", "detection"],
))
registry.register(FunctionTool(
name="get_or_create_task",
description="获取或创建引用检测的查询任务",
func=get_or_create_task,
tags=["citation", "task"],
))
# Content
registry.register(FunctionTool(
name="retrieve_knowledge",
description="从知识库检索相关内容",
func=retrieve_knowledge,
tags=["content", "rag", "knowledge"],
))
# Monitor
registry.register(FunctionTool(
name="monitor_check_and_compare",
description="检测并对比监测记录的变化",
func=monitor_check_and_compare,
tags=["monitor", "tracking"],
))
registry.register(FunctionTool(
name="monitor_generate_report",
description="生成监测变化报告",
func=monitor_generate_report,
tags=["monitor", "report"],
))
registry.register(FunctionTool(
name="monitor_create_record",
description="创建新的监测记录",
func=monitor_create_record,
tags=["monitor", "record"],
))
# Schema
registry.register(FunctionTool(
name="fill_schema_with_llm",
description="使用LLM填充Schema JSON-LD模板",
func=fill_schema_with_llm,
tags=["schema", "llm"],
))
registry.register(FunctionTool(
name="identify_missing_dimensions",
description="识别Schema缺失维度",
func=identify_missing_dimensions,
tags=["schema", "diagnosis"],
))
# Competitor
registry.register(FunctionTool(
name="competitor_analyze",
description="执行竞品策略分析",
func=competitor_analyze,
tags=["competitor", "analysis"],
))
registry.register(FunctionTool(
name="competitor_gap_analysis",
description="执行竞品差距分析",
func=competitor_gap_analysis,
tags=["competitor", "gap"],
))
# Trend
registry.register(FunctionTool(
name="trend_insight",
description="分析品牌引用趋势",
func=trend_insight,
tags=["trend", "insight"],
))
registry.register(FunctionTool(
name="trend_hotspot",
description="检测引用量突增的热点话题",
func=trend_hotspot,
tags=["trend", "hotspot"],
))
# Knowledge
registry.register(FunctionTool(
name="search_knowledge",
description="从知识库检索相关内容",
func=search_knowledge,
tags=["knowledge", "rag"],
))
registry.register(FunctionTool(
name="detect_ai_patterns",
description="检测内容中的AI生成模式",
func=detect_ai_patterns,
tags=["knowledge", "deai"],
))
logger.info(f"GEO tools registered: {len(registry.list_all_tools())} tools")

30
configs/llm_config.yaml Normal file
View File

@ -0,0 +1,30 @@
# LLM Provider 配置 — AgentKit Server 使用
# 环境变量替换:${VAR_NAME} 在启动时由 LLMConfig.from_yaml() 处理
providers:
deepseek:
api_key: "${DEEPSEEK_API_KEY}"
base_url: "https://api.deepseek.com/v1"
models:
deepseek-chat:
max_tokens: 64000
cost_per_1k_input: 0.00014
cost_per_1k_output: 0.00028
openai:
api_key: "${OPENAI_API_KEY}"
base_url: "${OPENAI_BASE_URL:-https://coding.dashscope.aliyuncs.com/v1}"
models:
qwen3-coder-plus:
max_tokens: 64000
cost_per_1k_input: 0.00014
cost_per_1k_output: 0.00028
model_aliases:
default: "deepseek/deepseek-chat"
fast: "deepseek/deepseek-chat"
powerful: "deepseek/deepseek-chat"
fallbacks:
deepseek/deepseek-chat:
- "openai/qwen3-coder-plus"

View File

@ -0,0 +1,56 @@
name: citation_detector
agent_type: citation_detection
version: "1.0.0"
description: "AI平台引用检测Agent检测目标品牌在各AI平台回答中的引用情况"
task_mode: custom
supported_tasks:
- citation_detect
- citation_detect_single
max_concurrency: 3
custom_handler: "configs.geo_handlers.handle_citation_task"
input_schema:
type: object
properties:
query_id:
type: string
description: 查询IDcitation_detect模式
keyword:
type: string
description: 关键词citation_detect_single模式
platform:
type: string
description: 平台名称citation_detect_single模式
target_brand:
type: string
description: 目标品牌citation_detect_single模式
brand_aliases:
type: array
items:
type: string
description: 品牌别名列表
output_schema:
type: object
properties:
query_id:
type: string
keyword:
type: string
total_records:
type: integer
cited_count:
type: integer
records:
type: array
tools:
- execute_single_platform
- get_or_create_task
memory:
working:
enabled: true
episodic:
enabled: true
track_success: true

View File

@ -0,0 +1,56 @@
name: competitor_analyzer
agent_type: competitor_analysis
version: "1.0.0"
description: "竞品策略分析Agent对比品牌与竞品的引用数据识别差距领域发现机会点生成策略建议"
task_mode: tool_call
supported_tasks:
- competitor_analyze
- competitor_gap_analysis
max_concurrency: 2
intent:
keywords: ["竞品", "对比", "竞争", "competitor", "gap", "分析"]
description: "用户需要分析竞品策略、对比品牌差距或发现竞争机会"
examples:
- "分析我的竞品策略"
- "对比我和竞品的差距"
- "竞品分析"
input_schema:
type: object
required:
- brand_id
properties:
brand_id:
type: string
description: 品牌ID
analysis_types:
type: array
items:
type: string
description: 分析类型列表
period_days:
type: integer
description: 分析周期(天)
default: 30
output_schema:
type: object
properties:
brand_id:
type: string
analysis:
type: object
recommendations:
type: array
tools:
- competitor_analyze
- competitor_gap_analysis
memory:
working:
enabled: true
episodic:
enabled: true
track_success: true

View File

@ -0,0 +1,110 @@
name: content_generator
agent_type: content_generation
version: "1.0.0"
description: "AI内容生成Agent支持选题推荐和文章生成可结合知识库RAG检索"
task_mode: llm_generate
supported_tasks:
- generate_topics
- generate_article
max_concurrency: 2
intent:
keywords: ["生成内容", "写文章", "选题", "generate", "content", "创作"]
description: "用户需要生成SEO/GEO优化内容、推荐选题或撰写文章"
examples:
- "帮我写一篇关于AI的文章"
- "推荐一些选题"
- "生成关于品牌的内容"
input_schema:
type: object
required:
- target_keyword
properties:
target_keyword:
type: string
description: 目标关键词
brand_name:
type: string
description: 品牌名称
brand_description:
type: string
description: 品牌描述
target_platform:
type: string
description: 目标平台
default: "通用"
knowledge_base_ids:
type: array
items:
type: string
description: 知识库ID列表用于RAG检索
topic_title:
type: string
description: 选题标题generate_article时使用
word_count:
type: integer
description: 目标字数
default: 2000
content_style:
type: string
description: 内容风格
default: "专业严谨"
content_angle:
type: string
description: 内容角度
model:
type: string
description: 指定LLM模型
output_schema:
type: object
properties:
topics:
type: array
description: 选题列表
content:
type: string
description: 生成的文章内容
word_count:
type: integer
usage:
type: object
prompt:
identity: "你是一个专业的内容生成助手擅长为品牌创作高质量的SEO/GEO优化内容"
context: "品牌需要通过优质内容提升在AI搜索引擎中的可见性和引用率"
instructions: |
根据用户提供的关键词、品牌信息和知识库内容,生成符合要求的内容。
- generate_topics: 生成选题列表,每个选题包含 title、reason、keywords 字段
- generate_article: 生成完整文章,确保内容专业、结构清晰、关键词自然融入
constraints: |
- 内容必须原创,避免抄袭
- 关键词密度适中,不要堆砌
- 文章结构清晰,段落分明
- 数据和引用需标注来源
output_format: "以 JSON 格式输出generate_topics 返回 {topics: [{title, reason, keywords}]}generate_article 返回 {content, word_count}"
examples: ""
llm:
model: "deepseek"
temperature: 0.7
max_tokens: 4000
tools:
- retrieve_knowledge
quality_gate:
required_fields: ["content"]
min_word_count: 500
max_retries: 1
memory:
working:
enabled: true
episodic:
enabled: true
track_success: true
semantic:
enabled: true
knowledge_base_ids_field: "knowledge_base_ids"

View File

@ -0,0 +1,81 @@
name: deai_agent
agent_type: deai_processing
version: "1.1.0"
description: "内容去AI化Agent消除AI生成特征使文章更自然流畅"
task_mode: llm_generate
supported_tasks:
- deai_process
max_concurrency: 2
input_schema:
type: object
required:
- content
properties:
content:
type: string
description: 待处理的文章内容
platform:
type: string
description: 目标平台ID如 zhihu, wechat
style:
type: string
description: 目标风格
default: "自然流畅"
preserve_structure:
type: boolean
description: 是否保留原有结构
default: true
output_schema:
type: object
properties:
content:
type: string
description: 处理后的内容
original_word_count:
type: integer
processed_word_count:
type: integer
usage:
type: object
detected_ai_patterns:
type: array
prompt:
identity: "你是一个专业的内容改写专家擅长将AI生成的文本改写为自然、人类化的表达"
context: "平台对AI生成内容的检测越来越严格需要将内容改写为更自然的风格"
instructions: |
对提供的文章内容进行去AI化处理
1. 替换AI常用表达如"总之"、"综上所述"、"首先其次最后"等)
2. 增加口语化表达和个人观点
3. 调整句式结构,避免过于工整的排比
4. 保留核心信息和数据
5. 如有平台特定要求,遵循平台规则
constraints: |
- 保留原文的核心信息和数据
- 不要改变文章的主题和立场
- 保持专业性的同时增加自然感
- 如指定平台,需符合该平台的内容规范
output_format: "返回处理后的完整文章内容"
examples: ""
llm:
model: "deepseek"
temperature: 0.9
max_tokens: 8000
tools:
- detect_ai_patterns
quality_gate:
required_fields: ["content"]
min_word_count: 200
max_retries: 1
memory:
working:
enabled: true
episodic:
enabled: true
track_success: true

View File

@ -0,0 +1,83 @@
name: geo_optimizer
agent_type: geo_optimization
version: "1.0.0"
description: "GEO/SEO内容优化Agent提升内容在AI搜索引擎中的可见性和引用率"
task_mode: llm_generate
supported_tasks:
- geo_optimize
max_concurrency: 2
input_schema:
type: object
required:
- content
- target_keywords
properties:
content:
type: string
description: 待优化文章
target_keywords:
type: array
items:
type: string
description: 目标关键词列表
target_platform:
type: string
description: 目标平台
default: "通用"
optimization_level:
type: string
enum: [light, moderate, aggressive]
description: 优化级别
default: "moderate"
output_schema:
type: object
properties:
optimized_content:
type: string
seo_score:
type: number
changes:
type: array
items:
type: string
usage:
type: object
prompt:
identity: "你是一个GEO/SEO优化专家擅长优化内容以提升在AI搜索引擎中的可见性"
context: "品牌需要通过内容优化提升在AI搜索结果中的引用率和排名"
instructions: |
对提供的文章进行GEO/SEO优化
1. 自然融入目标关键词
2. 优化标题和段落结构
3. 增加结构化数据标记建议
4. 提升内容的权威性和引用价值
5. 根据optimization_level调整优化力度
constraints: |
- 优化后的内容必须保持原意
- 关键词融入要自然,避免堆砌
- 保持文章可读性
- 不要添加虚假信息
output_format: "以 JSON 格式输出: {optimized_content: string, seo_score: number, changes: [string]}"
examples: ""
llm:
model: "deepseek"
temperature: 0.5
max_tokens: 8000
tools: []
quality_gate:
required_fields: ["optimized_content"]
min_word_count: 200
max_retries: 1
memory:
working:
enabled: true
episodic:
enabled: true
track_success: true

View File

@ -0,0 +1,55 @@
name: monitor
agent_type: performance_tracker
version: "1.0.0"
description: "效果追踪Agent监测品牌引用量、情感、排名变化生成变化报告"
task_mode: custom
supported_tasks:
- monitor_track
- monitor_check_single
max_concurrency: 3
custom_handler: "configs.geo_handlers.handle_monitor_task"
input_schema:
type: object
required:
- brand_id
properties:
brand_id:
type: string
description: 品牌ID
keyword:
type: string
description: 关键词monitor_check_single模式
platform:
type: string
description: 平台名称monitor_check_single模式
check_interval_hours:
type: integer
description: 检测间隔小时数
default: 24
output_schema:
type: object
properties:
brand_id:
type: string
brand_name:
type: string
total_queries:
type: integer
checked_records:
type: integer
reports:
type: array
tools:
- monitor_check_and_compare
- monitor_generate_report
- monitor_create_record
memory:
working:
enabled: true
episodic:
enabled: true
track_success: true

View File

@ -0,0 +1,49 @@
name: schema_advisor
agent_type: schema_advisor
version: "1.0.0"
description: "Schema优化建议Agent识别Schema缺失维度生成JSON-LD结构化数据建议"
task_mode: custom
supported_tasks:
- schema_advise
max_concurrency: 2
custom_handler: "configs.geo_handlers.handle_schema_task"
input_schema:
type: object
required:
- brand_id
properties:
brand_id:
type: string
description: 品牌ID
diagnosis_data:
type: object
description: 诊断数据
brand_info:
type: object
description: 品牌信息
focus_dimensions:
type: array
items:
type: string
description: 重点关注维度
output_schema:
type: object
properties:
brand_id:
type: string
suggestions:
type: array
total:
type: integer
tools:
- fill_schema_with_llm
memory:
working:
enabled: true
episodic:
enabled: true
track_success: true

View File

@ -0,0 +1,61 @@
name: trend_agent
agent_type: trend_analysis
version: "1.0.0"
description: "趋势洞察Agent分析品牌引用趋势、识别热点话题、推断变化原因并生成建议"
task_mode: tool_call
supported_tasks:
- trend_insight
- trend_hotspot
max_concurrency: 2
intent:
keywords: ["趋势", "热点", "洞察", "trend", "hotspot", "insight"]
description: "用户需要分析品牌趋势、识别热点话题或获取行业洞察"
examples:
- "分析品牌趋势"
- "最近的热点话题是什么"
- "趋势洞察"
input_schema:
type: object
required:
- brand_id
properties:
brand_id:
type: string
description: 品牌ID
days:
type: integer
description: 分析天数
default: 30
platforms:
type: array
items:
type: string
description: 平台列表
keywords:
type: array
items:
type: string
description: 关键词列表
output_schema:
type: object
properties:
brand_id:
type: string
trends:
type: array
hotspots:
type: array
tools:
- trend_insight
- trend_hotspot
memory:
working:
enabled: true
episodic:
enabled: true
track_success: true