5.2 KiB
5.2 KiB
内容生成Pipeline编排逻辑
概述
本文档描述内容生成Pipeline的完整编排逻辑。
Pipeline架构
用户请求 → PipelineEngine.execute()
│
├─ 1. 加载Pipeline定义 (YAML)
│
├─ 2. 构建执行上下文
│
├─ 3. 拓扑排序确定执行顺序
│
└─ 4. 逐阶段执行
│
├─ Stage 1 (无依赖)
├─ Stage 2 (依赖Stage 1)
├─ Stage 3 (依赖Stage 1, 2)
└─ ...
Pipeline定义结构
Pipeline通过YAML文件定义,位于 backend/pipelines/ 目录:
name: content_production
version: "1.0"
description: 内容生产Pipeline
variables:
brand_name: ""
keywords: ""
platform: "zhihu"
stages:
- name: topic_selection
agent: content_generator
action: select_topic
inputs:
brand: "${brand_name}"
keywords: "${keywords}"
outputs: [selected_topic, topic_score]
- name: content_generation
agent: content_generator
action: generate
depends_on: [topic_selection]
inputs:
topic: "${stages.topic_selection.outputs.selected_topic}"
brand: "${brand_name}"
outputs: [content]
- name: deai_processing
agent: deai_agent
action: humanize
depends_on: [content_generation]
inputs:
content: "${stages.content_generation.outputs.content}"
outputs: [natural_content]
- name: seo_optimization
agent: geo_optimizer
action: optimize
depends_on: [deai_processing]
inputs:
content: "${stages.deai_processing.outputs.natural_content}"
keywords: "${keywords}"
outputs: [optimized_content]
核心组件
PipelineEngine
位置:backend/app/agent_framework/pipeline/engine.py
| 方法 | 说明 |
|---|---|
| execute(pipeline, context) | 执行完整Pipeline |
| _execute_stage(stage, exec_context, stages_context) | 执行单个阶段 |
| _dispatch_and_wait(stage, inputs) | 分发任务并等待结果 |
| _topological_sort(stages) | 拓扑排序 |
| _resolve_variables(template, context) | 解析变量引用 |
PipelineLoader
位置:backend/app/agent_framework/pipeline/loader.py
| 方法 | 说明 |
|---|---|
| load(pipeline_name) | 从YAML加载Pipeline |
| load_from_yaml(yaml_content) | 从字符串加载 |
| validate_dag(stages) | 验证DAG无环 |
| resolve_variables(template, context) | 解析${...}变量 |
PipelineSchema
位置:backend/app/agent_framework/pipeline/schema.py
| 类 | 说明 |
|---|---|
| Pipeline | Pipeline定义 |
| PipelineStage | 单个阶段定义 |
| StageResult | 阶段执行结果 |
| PipelineResult | Pipeline执行结果 |
| StageStatus | 阶段状态枚举 |
变量解析
支持 ${var.path} 格式的变量引用:
| 格式 | 说明 |
|---|---|
${brand_name} |
全局变量 |
${stages.step1.outputs.result} |
上游阶段输出 |
${stages.step1.outputs.result.substring(0,10)} |
字符串截取 |
条件执行
Stage支持 condition 字段进行条件执行:
- name: optional_stage
agent: some_agent
action: do_something
condition: "${enable_optional} == true"
支持的比较操作:
${var}- 变量存在且非空${var} == 'value'- 等于${var} != 'value'- 不等于
重试机制
每个Stage支持 retry_count 配置:
- name: unstable_stage
agent: some_agent
retry_count: 3 # 失败后重试3次
错误处理
| 配置 | 说明 |
|---|---|
| continue_on_failure: false | 阶段失败则Pipeline失败(默认) |
| continue_on_failure: true | 阶段失败仍继续下游 |
内容生成服务层Pipeline
除了Agent Pipeline,还有服务层的内容处理Pipeline:
位置:backend/app/services/content/content_pipeline.py
用户内容 → RuleValidator (规则校验)
├─ 高严重问题 → 中断Pipeline
└─ 通过 → SensitiveFilter (敏感词过滤)
└─ SEOOptimizer (SEO优化)
└─ HTMLGenerator (HTML生成)
└─ 输出 (html/markdown/plain)
ContentPipeline类
class ContentPipeline:
async def run(self, request: dict) -> PipelineResponse:
"""
request = {
"content": "原始内容",
"title": "标题",
"platform": "目标平台",
"optimize_for": ["validation", "sensitive", "seo"],
"output_formats": ["html", "markdown", "plain"]
}
"""
阶段说明
| 阶段 | 组件 | 说明 |
|---|---|---|
| validation | RuleValidator | 校验标题长度、内容长度、AI模式检测 |
| sensitive_filter | SensitiveFilter | 敏感词过滤替换 |
| seo_optimization | SEOOptimizer | 关键词密度调整、位置检查 |
| html_generation | HTMLGenerator | 生成HTML/ Markdown/纯文本 |
Dry-Run模式
当PipelineEngine初始化时未传入dispatcher,进入dry-run模式:
# Dry-run模式(测试/开发)
engine = PipelineEngine(dispatcher=None)
result = await engine.execute(pipeline, context)
# 返回模拟结果用于测试Pipeline定义
生产环境若触发dry-run会记录ERROR级别日志。