geo/docs/02-模块说明/pipeline-orchestration.md

5.2 KiB
Raw Blame History

内容生成Pipeline编排逻辑

概述

本文档描述内容生成Pipeline的完整编排逻辑。

Pipeline架构

用户请求 → PipelineEngine.execute()
           │
           ├─ 1. 加载Pipeline定义 (YAML)
           │
           ├─ 2. 构建执行上下文
           │
           ├─ 3. 拓扑排序确定执行顺序
           │
           └─ 4. 逐阶段执行
               │
               ├─ Stage 1 (无依赖)
               ├─ Stage 2 (依赖Stage 1)
               ├─ Stage 3 (依赖Stage 1, 2)
               └─ ...

Pipeline定义结构

Pipeline通过YAML文件定义位于 backend/pipelines/ 目录:

name: content_production
version: "1.0"
description: 内容生产Pipeline

variables:
  brand_name: ""
  keywords: ""
  platform: "zhihu"

stages:
  - name: topic_selection
    agent: content_generator
    action: select_topic
    inputs:
      brand: "${brand_name}"
      keywords: "${keywords}"
    outputs: [selected_topic, topic_score]

  - name: content_generation
    agent: content_generator
    action: generate
    depends_on: [topic_selection]
    inputs:
      topic: "${stages.topic_selection.outputs.selected_topic}"
      brand: "${brand_name}"
    outputs: [content]

  - name: deai_processing
    agent: deai_agent
    action: humanize
    depends_on: [content_generation]
    inputs:
      content: "${stages.content_generation.outputs.content}"
    outputs: [natural_content]

  - name: seo_optimization
    agent: geo_optimizer
    action: optimize
    depends_on: [deai_processing]
    inputs:
      content: "${stages.deai_processing.outputs.natural_content}"
      keywords: "${keywords}"
    outputs: [optimized_content]

核心组件

PipelineEngine

位置:backend/app/agent_framework/pipeline/engine.py

方法 说明
execute(pipeline, context) 执行完整Pipeline
_execute_stage(stage, exec_context, stages_context) 执行单个阶段
_dispatch_and_wait(stage, inputs) 分发任务并等待结果
_topological_sort(stages) 拓扑排序
_resolve_variables(template, context) 解析变量引用

PipelineLoader

位置:backend/app/agent_framework/pipeline/loader.py

方法 说明
load(pipeline_name) 从YAML加载Pipeline
load_from_yaml(yaml_content) 从字符串加载
validate_dag(stages) 验证DAG无环
resolve_variables(template, context) 解析${...}变量

PipelineSchema

位置:backend/app/agent_framework/pipeline/schema.py

说明
Pipeline Pipeline定义
PipelineStage 单个阶段定义
StageResult 阶段执行结果
PipelineResult Pipeline执行结果
StageStatus 阶段状态枚举

变量解析

支持 ${var.path} 格式的变量引用:

格式 说明
${brand_name} 全局变量
${stages.step1.outputs.result} 上游阶段输出
${stages.step1.outputs.result.substring(0,10)} 字符串截取

条件执行

Stage支持 condition 字段进行条件执行:

- name: optional_stage
  agent: some_agent
  action: do_something
  condition: "${enable_optional} == true"

支持的比较操作:

  • ${var} - 变量存在且非空
  • ${var} == 'value' - 等于
  • ${var} != 'value' - 不等于

重试机制

每个Stage支持 retry_count 配置:

- name: unstable_stage
  agent: some_agent
  retry_count: 3  # 失败后重试3次

错误处理

配置 说明
continue_on_failure: false 阶段失败则Pipeline失败默认
continue_on_failure: true 阶段失败仍继续下游

内容生成服务层Pipeline

除了Agent Pipeline还有服务层的内容处理Pipeline

位置:backend/app/services/content/content_pipeline.py

用户内容 → RuleValidator (规则校验)
         ├─ 高严重问题 → 中断Pipeline
         └─ 通过 → SensitiveFilter (敏感词过滤)
                  └─ SEOOptimizer (SEO优化)
                     └─ HTMLGenerator (HTML生成)
                        └─ 输出 (html/markdown/plain)

ContentPipeline类

class ContentPipeline:
    async def run(self, request: dict) -> PipelineResponse:
        """
        request = {
            "content": "原始内容",
            "title": "标题",
            "platform": "目标平台",
            "optimize_for": ["validation", "sensitive", "seo"],
            "output_formats": ["html", "markdown", "plain"]
        }
        """

阶段说明

阶段 组件 说明
validation RuleValidator 校验标题长度、内容长度、AI模式检测
sensitive_filter SensitiveFilter 敏感词过滤替换
seo_optimization SEOOptimizer 关键词密度调整、位置检查
html_generation HTMLGenerator 生成HTML/ Markdown/纯文本

Dry-Run模式

当PipelineEngine初始化时未传入dispatcher进入dry-run模式

# Dry-run模式测试/开发)
engine = PipelineEngine(dispatcher=None)
result = await engine.execute(pipeline, context)
# 返回模拟结果用于测试Pipeline定义

生产环境若触发dry-run会记录ERROR级别日志。