geo/docs/02-模块说明/pipeline-orchestration.md

208 lines
5.2 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 内容生成Pipeline编排逻辑
## 概述
本文档描述内容生成Pipeline的完整编排逻辑。
## Pipeline架构
```
用户请求 → PipelineEngine.execute()
├─ 1. 加载Pipeline定义 (YAML)
├─ 2. 构建执行上下文
├─ 3. 拓扑排序确定执行顺序
└─ 4. 逐阶段执行
├─ Stage 1 (无依赖)
├─ Stage 2 (依赖Stage 1)
├─ Stage 3 (依赖Stage 1, 2)
└─ ...
```
## Pipeline定义结构
Pipeline通过YAML文件定义位于 `backend/pipelines/` 目录:
```yaml
name: content_production
version: "1.0"
description: 内容生产Pipeline
variables:
brand_name: ""
keywords: ""
platform: "zhihu"
stages:
- name: topic_selection
agent: content_generator
action: select_topic
inputs:
brand: "${brand_name}"
keywords: "${keywords}"
outputs: [selected_topic, topic_score]
- name: content_generation
agent: content_generator
action: generate
depends_on: [topic_selection]
inputs:
topic: "${stages.topic_selection.outputs.selected_topic}"
brand: "${brand_name}"
outputs: [content]
- name: deai_processing
agent: deai_agent
action: humanize
depends_on: [content_generation]
inputs:
content: "${stages.content_generation.outputs.content}"
outputs: [natural_content]
- name: seo_optimization
agent: geo_optimizer
action: optimize
depends_on: [deai_processing]
inputs:
content: "${stages.deai_processing.outputs.natural_content}"
keywords: "${keywords}"
outputs: [optimized_content]
```
## 核心组件
### PipelineEngine
位置:`backend/app/agent_framework/pipeline/engine.py`
| 方法 | 说明 |
|------|------|
| execute(pipeline, context) | 执行完整Pipeline |
| _execute_stage(stage, exec_context, stages_context) | 执行单个阶段 |
| _dispatch_and_wait(stage, inputs) | 分发任务并等待结果 |
| _topological_sort(stages) | 拓扑排序 |
| _resolve_variables(template, context) | 解析变量引用 |
### PipelineLoader
位置:`backend/app/agent_framework/pipeline/loader.py`
| 方法 | 说明 |
|------|------|
| load(pipeline_name) | 从YAML加载Pipeline |
| load_from_yaml(yaml_content) | 从字符串加载 |
| validate_dag(stages) | 验证DAG无环 |
| resolve_variables(template, context) | 解析${...}变量 |
### PipelineSchema
位置:`backend/app/agent_framework/pipeline/schema.py`
| 类 | 说明 |
|----|------|
| Pipeline | Pipeline定义 |
| PipelineStage | 单个阶段定义 |
| StageResult | 阶段执行结果 |
| PipelineResult | Pipeline执行结果 |
| StageStatus | 阶段状态枚举 |
## 变量解析
支持 `${var.path}` 格式的变量引用:
| 格式 | 说明 |
|------|------|
| `${brand_name}` | 全局变量 |
| `${stages.step1.outputs.result}` | 上游阶段输出 |
| `${stages.step1.outputs.result.substring(0,10)}` | 字符串截取 |
## 条件执行
Stage支持 `condition` 字段进行条件执行:
```yaml
- name: optional_stage
agent: some_agent
action: do_something
condition: "${enable_optional} == true"
```
支持的比较操作:
- `${var}` - 变量存在且非空
- `${var} == 'value'` - 等于
- `${var} != 'value'` - 不等于
## 重试机制
每个Stage支持 `retry_count` 配置:
```yaml
- name: unstable_stage
agent: some_agent
retry_count: 3 # 失败后重试3次
```
## 错误处理
| 配置 | 说明 |
|------|------|
| continue_on_failure: false | 阶段失败则Pipeline失败默认 |
| continue_on_failure: true | 阶段失败仍继续下游 |
## 内容生成服务层Pipeline
除了Agent Pipeline还有服务层的内容处理Pipeline
位置:`backend/app/services/content/content_pipeline.py`
```
用户内容 → RuleValidator (规则校验)
├─ 高严重问题 → 中断Pipeline
└─ 通过 → SensitiveFilter (敏感词过滤)
└─ SEOOptimizer (SEO优化)
└─ HTMLGenerator (HTML生成)
└─ 输出 (html/markdown/plain)
```
### ContentPipeline类
```python
class ContentPipeline:
async def run(self, request: dict) -> PipelineResponse:
"""
request = {
"content": "原始内容",
"title": "标题",
"platform": "目标平台",
"optimize_for": ["validation", "sensitive", "seo"],
"output_formats": ["html", "markdown", "plain"]
}
"""
```
### 阶段说明
| 阶段 | 组件 | 说明 |
|------|------|------|
| validation | RuleValidator | 校验标题长度、内容长度、AI模式检测 |
| sensitive_filter | SensitiveFilter | 敏感词过滤替换 |
| seo_optimization | SEOOptimizer | 关键词密度调整、位置检查 |
| html_generation | HTMLGenerator | 生成HTML/ Markdown/纯文本 |
## Dry-Run模式
当PipelineEngine初始化时未传入dispatcher进入dry-run模式
```python
# Dry-run模式测试/开发)
engine = PipelineEngine(dispatcher=None)
result = await engine.execute(pipeline, context)
# 返回模拟结果用于测试Pipeline定义
```
生产环境若触发dry-run会记录ERROR级别日志。