feat(experts):恢复 plan.py 阶段依赖图 (PlanPhase + topological_sort)

- 新增 PhaseStatus 枚举 (PENDING/RUNNING/COMPLETED/FAILED)
- 新增 PlanPhase 数据类 (id/name/assigned_expert/task_description/depends_on/status/result)
- TeamPlan 新增 phases 字段及配套方法: get_phase/update_phase_status/topological_sort/get_ready_phases
- topological_sort 使用 Kahn 算法返回执行层 (list[list[PlanPhase]]),检测循环依赖
- 保留 SubTask/MergeStrategy 向后兼容
- 新增 54 个单元测试覆盖线性/并行/循环依赖、无效引用、就绪阶段、序列化
This commit is contained in:
chiguyong 2026-06-18 01:28:18 +08:00
parent 28ca5b6001
commit 1075598ebf
2 changed files with 632 additions and 25 deletions

View File

@ -1,11 +1,13 @@
"""Expert Team 计划数据模型 - hub-and-spoke 模式
"""Expert Team 计划数据模型 - 流水线模式
简化为 Lead Expert + 并行 Task 模式
- Lead Expert 接收任务并分解为子任务
- 子任务并行执行Task 深度=1不能再 spawn Task
- Lead Expert 汇总结果BEST 策略
支持两种模式
1. hub-and-spoke向后兼容Lead Expert 分解为并行子任务SubTask
2. pipelineLead Expert 分解为阶段PlanPhase阶段间有依赖关系
移除了阶段依赖图VOTE/FUSION 合并策略跨阶段状态共享
流水线模式核心
- Lead Expert 分解任务为阶段phases每个阶段有 assigned_expert depends_on
- 执行时按依赖拓扑排序无依赖的阶段并行有依赖的串行
- 阶段间数据通过 SharedWorkspace 传递
"""
from __future__ import annotations
@ -19,8 +21,7 @@ from typing import Any
class MergeStrategy(str, enum.Enum):
"""合并策略 - Lead Expert 用于选择最佳结果
hub-and-spoke 模式下仅保留 BEST 策略
Lead Expert 从所有子任务结果中选择或综合出最佳结果
仅保留 BEST 策略Lead Expert 从所有结果中选择或综合出最佳结果
"""
BEST = "best"
@ -37,7 +38,16 @@ class PlanStatus(str, enum.Enum):
class SubTaskStatus(str, enum.Enum):
"""子任务状态"""
"""子任务状态hub-and-spoke 模式)"""
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
class PhaseStatus(str, enum.Enum):
"""阶段状态(流水线模式)"""
PENDING = "pending"
RUNNING = "running"
@ -47,15 +57,7 @@ class SubTaskStatus(str, enum.Enum):
@dataclass
class SubTask:
"""Lead Expert 分解出的子任务
hub-and-spoke 模式中Lead Expert 将原始任务分解为多个子任务
每个子任务由一个 Expert 并行执行子任务之间无依赖关系无通信
约束
- Task 深度=1子任务不能再 spawn 子任务
- 子任务之间无通信
- Lead Expert 持有所有状态
"""Lead Expert 分解出的子任务hub-and-spoke 模式,向后兼容)
Attributes:
id: 子任务标识符
@ -94,17 +96,71 @@ class SubTask:
@dataclass
class TeamPlan:
"""Expert Team hub-and-spoke 执行计划
class PlanPhase:
"""流水线模式中的执行阶段
Lead Expert 持有此计划包含分解的子任务列表
与旧版 CollaborationPlan 不同此计划无阶段依赖图
所有子任务并行执行 Lead Expert 统一汇总
Lead Expert 将任务分解为多个阶段阶段间通过 depends_on 建立依赖关系
执行时按拓扑排序同层无依赖阶段并行层间串行
Attributes:
id: 阶段标识符用于 depends_on 引用
name: 阶段名称"规划""前端""后端""QA""评审"
assigned_expert: 分配的 Expert 名称
task_description: 阶段任务描述
depends_on: 前置阶段 ID 列表空列表表示无依赖
status: 当前状态
result: 阶段输出结果
"""
id: str = field(default_factory=lambda: str(uuid.uuid4()))
name: str = ""
assigned_expert: str = ""
task_description: str = ""
depends_on: list[str] = field(default_factory=list)
status: PhaseStatus = PhaseStatus.PENDING
result: dict[str, Any] | None = None
def to_dict(self) -> dict[str, Any]:
"""序列化为字典"""
return {
"id": self.id,
"name": self.name,
"assigned_expert": self.assigned_expert,
"task_description": self.task_description,
"depends_on": list(self.depends_on),
"status": self.status.value,
"result": self.result,
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> PlanPhase:
"""从字典创建 PlanPhase"""
return cls(
id=data.get("id", str(uuid.uuid4())),
name=data.get("name", ""),
assigned_expert=data.get("assigned_expert", ""),
task_description=data.get("task_description", ""),
depends_on=list(data.get("depends_on", [])),
status=PhaseStatus(data.get("status", PhaseStatus.PENDING.value)),
result=data.get("result"),
)
@dataclass
class TeamPlan:
"""Expert Team 执行计划
支持两种模式
1. hub-and-spoke向后兼容使用 subtasks并行执行无依赖
2. pipeline使用 phases按依赖拓扑排序执行
新代码应优先使用 phases 字段subtasks 保留用于向后兼容
Attributes:
id: 计划标识符
task: 原始任务描述
subtasks: 子任务列表并行执行无依赖关系
subtasks: 子任务列表hub-and-spoke 模式向后兼容
phases: 阶段列表流水线模式
status: 计划状态
lead_expert: 主导 Expert 名称
"""
@ -112,6 +168,7 @@ class TeamPlan:
id: str = field(default_factory=lambda: str(uuid.uuid4()))
task: str = ""
subtasks: list[SubTask] = field(default_factory=list)
phases: list[PlanPhase] = field(default_factory=list)
status: PlanStatus = PlanStatus.DRAFT
lead_expert: str = ""
@ -121,6 +178,7 @@ class TeamPlan:
"id": self.id,
"task": self.task,
"subtasks": [st.to_dict() for st in self.subtasks],
"phases": [ph.to_dict() for ph in self.phases],
"status": self.status.value,
"lead_expert": self.lead_expert,
}
@ -129,14 +187,18 @@ class TeamPlan:
def from_dict(cls, data: dict[str, Any]) -> TeamPlan:
"""从字典创建 TeamPlan"""
subtasks = [SubTask.from_dict(st) for st in data.get("subtasks", [])]
phases = [PlanPhase.from_dict(ph) for ph in data.get("phases", [])]
return cls(
id=data.get("id", str(uuid.uuid4())),
task=data.get("task", ""),
subtasks=subtasks,
phases=phases,
status=PlanStatus(data.get("status", PlanStatus.DRAFT.value)),
lead_expert=data.get("lead_expert", ""),
)
# ── SubTask 方法hub-and-spoke 模式,向后兼容)──
def get_subtask(self, subtask_id: str) -> SubTask | None:
"""根据 ID 获取子任务,不存在则返回 None"""
for st in self.subtasks:
@ -170,3 +232,121 @@ class TeamPlan:
return all(
st.status in (SubTaskStatus.COMPLETED, SubTaskStatus.FAILED) for st in self.subtasks
)
# ── PlanPhase 方法(流水线模式)──
def get_phase(self, phase_id: str) -> PlanPhase | None:
"""根据 ID 获取阶段,不存在则返回 None"""
for ph in self.phases:
if ph.id == phase_id:
return ph
return None
def update_phase_status(
self, phase_id: str, status: PhaseStatus, result: dict[str, Any] | None = None
) -> None:
"""更新阶段状态和可选的结果"""
ph = self.get_phase(phase_id)
if ph is not None:
ph.status = status
if result is not None:
ph.result = result
@property
def completed_phases(self) -> list[PlanPhase]:
"""已完成的阶段列表"""
return [ph for ph in self.phases if ph.status == PhaseStatus.COMPLETED]
@property
def failed_phases(self) -> list[PlanPhase]:
"""失败的阶段列表"""
return [ph for ph in self.phases if ph.status == PhaseStatus.FAILED]
@property
def all_phases_done(self) -> bool:
"""所有阶段是否都已完成(成功或失败)"""
return all(
ph.status in (PhaseStatus.COMPLETED, PhaseStatus.FAILED) for ph in self.phases
)
def get_ready_phases(self) -> list[PlanPhase]:
"""返回当前可执行的阶段(状态为 PENDING 且所有依赖已完成)
Returns:
可执行的阶段列表
"""
completed_ids = {ph.id for ph in self.completed_phases}
ready = []
for ph in self.phases:
if ph.status != PhaseStatus.PENDING:
continue
# Check if all dependencies are completed
if all(dep_id in completed_ids for dep_id in ph.depends_on):
ready.append(ph)
return ready
def topological_sort(self) -> list[list[PlanPhase]]:
"""按依赖关系拓扑排序阶段,返回执行层列表
同层阶段无依赖关系可并行执行层间有依赖需串行等待
Returns:
执行层列表每层包含可并行执行的阶段
Raises:
ValueError: 如果存在循环依赖
"""
if not self.phases:
return []
# Build dependency graph
phase_map = {ph.id: ph for ph in self.phases}
all_ids = set(phase_map.keys())
# Validate depends_on references
for ph in self.phases:
for dep_id in ph.depends_on:
if dep_id not in all_ids:
raise ValueError(
f"Phase '{ph.id}' ({ph.name}) depends on non-existent phase '{dep_id}'"
)
# Kahn's algorithm for topological sort
# Compute in-degree (number of dependencies) for each phase
in_degree: dict[str, int] = {ph.id: len(ph.depends_on) for ph in self.phases}
# Build reverse dependency map (which phases depend on this one)
dependents: dict[str, list[str]] = {ph.id: [] for ph in self.phases}
for ph in self.phases:
for dep_id in ph.depends_on:
dependents[dep_id].append(ph.id)
layers: list[list[PlanPhase]] = []
processed: set[str] = set()
while len(processed) < len(self.phases):
# Find all phases with in_degree 0 that haven't been processed
current_layer_ids = [
ph_id
for ph_id in in_degree
if ph_id not in processed and in_degree[ph_id] == 0
]
if not current_layer_ids:
# No progress — cycle detected
remaining = [ph_id for ph_id in in_degree if ph_id not in processed]
raise ValueError(
f"Circular dependency detected among phases: {remaining}"
)
# Add current layer
current_layer = [phase_map[ph_id] for ph_id in current_layer_ids]
layers.append(current_layer)
# Mark as processed and reduce in_degree for dependents
for ph_id in current_layer_ids:
processed.add(ph_id)
for dep_id in dependents[ph_id]:
in_degree[dep_id] -= 1
return layers

View File

@ -1,9 +1,13 @@
"""TeamPlan / SubTask 数据模型单元测试 (hub-and-spoke 模式)"""
"""TeamPlan / SubTask / PlanPhase 数据模型单元测试 (hub-and-spoke + 流水线模式)"""
from __future__ import annotations
import pytest
from agentkit.experts.plan import (
MergeStrategy,
PhaseStatus,
PlanPhase,
PlanStatus,
SubTask,
SubTaskStatus,
@ -286,3 +290,426 @@ class TestTeamPlan:
"""all_done 当没有子任务时返回 Truevacuous truth"""
plan = TeamPlan(task="空计划")
assert plan.all_done is True
# ── PlanPhase 测试(流水线模式)──────────────────────────
def _make_phase(
id: str = "phase_1",
name: str = "规划",
assigned_expert: str = "tech_lead",
task_description: str = "分析需求并制定计划",
depends_on: list[str] | None = None,
status: PhaseStatus = PhaseStatus.PENDING,
result: dict | None = None,
) -> PlanPhase:
"""创建测试用 PlanPhase 实例"""
return PlanPhase(
id=id,
name=name,
assigned_expert=assigned_expert,
task_description=task_description,
depends_on=depends_on or [],
status=status,
result=result,
)
def _make_pipeline_plan() -> TeamPlan:
"""创建一个流水线模式的执行计划
结构:
Layer 0: [规划] (无依赖)
Layer 1: [前端, 后端] (依赖规划)
Layer 2: [QA] (依赖前端+后端)
Layer 3: [评审] (依赖QA)
"""
phases = [
_make_phase(id="p1", name="规划", assigned_expert="tech_lead", depends_on=[]),
_make_phase(
id="p2", name="前端", assigned_expert="frontend_engineer", depends_on=["p1"]
),
_make_phase(
id="p3", name="后端", assigned_expert="backend_engineer", depends_on=["p1"]
),
_make_phase(id="p4", name="QA", assigned_expert="qa_engineer", depends_on=["p2", "p3"]),
_make_phase(id="p5", name="评审", assigned_expert="code_reviewer", depends_on=["p4"]),
]
return TeamPlan(
id="pipeline_001",
task="开发用户登录功能",
phases=phases,
status=PlanStatus.DRAFT,
lead_expert="tech_lead",
)
class TestPhaseStatus:
"""PhaseStatus 枚举测试"""
def test_statuses_exist(self):
"""阶段状态都存在"""
assert PhaseStatus.PENDING == "pending"
assert PhaseStatus.RUNNING == "running"
assert PhaseStatus.COMPLETED == "completed"
assert PhaseStatus.FAILED == "failed"
class TestPlanPhase:
"""PlanPhase 数据模型测试"""
def test_creation_with_all_fields(self):
"""创建 PlanPhase 并设置所有字段"""
phase = PlanPhase(
id="phase_a",
name="前端开发",
assigned_expert="frontend_engineer",
task_description="实现登录页面",
depends_on=["phase_planning"],
status=PhaseStatus.RUNNING,
result={"component": "LoginForm"},
)
assert phase.id == "phase_a"
assert phase.name == "前端开发"
assert phase.assigned_expert == "frontend_engineer"
assert phase.task_description == "实现登录页面"
assert phase.depends_on == ["phase_planning"]
assert phase.status == PhaseStatus.RUNNING
assert phase.result == {"component": "LoginForm"}
def test_default_values(self):
"""默认值:自动生成 id空 depends_onPENDING 状态"""
phase = PlanPhase(name="测试阶段")
assert phase.id is not None
assert len(phase.id) > 0
assert phase.name == "测试阶段"
assert phase.assigned_expert == ""
assert phase.task_description == ""
assert phase.depends_on == []
assert phase.status == PhaseStatus.PENDING
assert phase.result is None
def test_to_dict_from_dict_roundtrip(self):
"""to_dict / from_dict 往返序列化"""
phase = _make_phase(
id="roundtrip_phase",
name="往返测试",
assigned_expert="tester",
task_description="测试序列化",
depends_on=["dep1", "dep2"],
status=PhaseStatus.COMPLETED,
result={"key": "value"},
)
d = phase.to_dict()
restored = PlanPhase.from_dict(d)
assert restored.id == phase.id
assert restored.name == phase.name
assert restored.assigned_expert == phase.assigned_expert
assert restored.task_description == phase.task_description
assert restored.depends_on == phase.depends_on
assert restored.status == phase.status
assert restored.result == phase.result
def test_to_dict_structure(self):
"""to_dict 返回正确的字典结构"""
phase = _make_phase(
id="struct_test",
name="结构测试",
assigned_expert="dev",
task_description="测试结构",
depends_on=["d1"],
status=PhaseStatus.RUNNING,
result={"output": "data"},
)
d = phase.to_dict()
assert d["id"] == "struct_test"
assert d["name"] == "结构测试"
assert d["assigned_expert"] == "dev"
assert d["task_description"] == "测试结构"
assert d["depends_on"] == ["d1"]
assert d["status"] == "running"
assert d["result"] == {"output": "data"}
class TestTeamPlanPhases:
"""TeamPlan 流水线模式phases测试"""
def test_pipeline_plan_creation(self):
"""创建流水线模式的 TeamPlan"""
plan = _make_pipeline_plan()
assert plan.id == "pipeline_001"
assert plan.task == "开发用户登录功能"
assert len(plan.phases) == 5
assert plan.lead_expert == "tech_lead"
def test_get_phase_by_id(self):
"""get_phase 根据 ID 获取阶段"""
plan = _make_pipeline_plan()
ph = plan.get_phase("p3")
assert ph is not None
assert ph.id == "p3"
assert ph.name == "后端"
def test_get_phase_nonexistent_returns_none(self):
"""get_phase 对不存在的 ID 返回 None"""
plan = _make_pipeline_plan()
assert plan.get_phase("nonexistent") is None
def test_update_phase_status(self):
"""update_phase_status 更新阶段状态和结果"""
plan = _make_pipeline_plan()
plan.update_phase_status("p1", PhaseStatus.COMPLETED, {"plan": "设计文档"})
ph = plan.get_phase("p1")
assert ph is not None
assert ph.status == PhaseStatus.COMPLETED
assert ph.result == {"plan": "设计文档"}
def test_update_phase_status_without_result(self):
"""update_phase_status 不传 result 时不覆盖已有 result"""
plan = _make_pipeline_plan()
plan.update_phase_status("p1", PhaseStatus.COMPLETED, {"plan": "文档"})
plan.update_phase_status("p1", PhaseStatus.RUNNING)
ph = plan.get_phase("p1")
assert ph is not None
assert ph.status == PhaseStatus.RUNNING
assert ph.result == {"plan": "文档"}
def test_completed_phases_property(self):
"""completed_phases 返回已完成的阶段"""
plan = _make_pipeline_plan()
plan.update_phase_status("p1", PhaseStatus.COMPLETED)
plan.update_phase_status("p2", PhaseStatus.COMPLETED)
plan.update_phase_status("p3", PhaseStatus.FAILED)
completed = plan.completed_phases
assert len(completed) == 2
assert {ph.id for ph in completed} == {"p1", "p2"}
def test_failed_phases_property(self):
"""failed_phases 返回失败的阶段"""
plan = _make_pipeline_plan()
plan.update_phase_status("p1", PhaseStatus.COMPLETED)
plan.update_phase_status("p2", PhaseStatus.FAILED)
plan.update_phase_status("p3", PhaseStatus.FAILED)
failed = plan.failed_phases
assert len(failed) == 2
assert {ph.id for ph in failed} == {"p2", "p3"}
def test_all_phases_done_when_all_completed(self):
"""all_phases_done 当所有阶段完成时返回 True"""
plan = _make_pipeline_plan()
for ph in plan.phases:
plan.update_phase_status(ph.id, PhaseStatus.COMPLETED)
assert plan.all_phases_done is True
def test_all_phases_done_when_some_failed(self):
"""all_phases_done 当所有阶段完成或失败时返回 True"""
plan = _make_pipeline_plan()
plan.update_phase_status("p1", PhaseStatus.COMPLETED)
plan.update_phase_status("p2", PhaseStatus.FAILED)
plan.update_phase_status("p3", PhaseStatus.FAILED)
plan.update_phase_status("p4", PhaseStatus.COMPLETED)
plan.update_phase_status("p5", PhaseStatus.COMPLETED)
assert plan.all_phases_done is True
def test_all_phases_done_when_pending(self):
"""all_phases_done 当有阶段未完成时返回 False"""
plan = _make_pipeline_plan()
plan.update_phase_status("p1", PhaseStatus.COMPLETED)
# p2-p5 still pending
assert plan.all_phases_done is False
def test_all_phases_done_empty_plan(self):
"""all_phases_done 当没有阶段时返回 True"""
plan = TeamPlan(task="空计划")
assert plan.all_phases_done is True
class TestTopologicalSort:
"""topological_sort 拓扑排序测试"""
def test_linear_dependency(self):
"""线性依赖A→B→C 排序为 [[A], [B], [C]]"""
plan = TeamPlan(
task="线性任务",
phases=[
_make_phase(id="a", name="A", depends_on=[]),
_make_phase(id="b", name="B", depends_on=["a"]),
_make_phase(id="c", name="C", depends_on=["b"]),
],
)
layers = plan.topological_sort()
assert len(layers) == 3
assert [ph.id for ph in layers[0]] == ["a"]
assert [ph.id for ph in layers[1]] == ["b"]
assert [ph.id for ph in layers[2]] == ["c"]
def test_parallel_phases(self):
"""并行阶段A→{B,C}→D 排序为 [[A], [B,C], [D]]"""
plan = TeamPlan(
task="并行任务",
phases=[
_make_phase(id="a", name="A", depends_on=[]),
_make_phase(id="b", name="B", depends_on=["a"]),
_make_phase(id="c", name="C", depends_on=["a"]),
_make_phase(id="d", name="D", depends_on=["b", "c"]),
],
)
layers = plan.topological_sort()
assert len(layers) == 3
assert [ph.id for ph in layers[0]] == ["a"]
assert set(ph.id for ph in layers[1]) == {"b", "c"}
assert [ph.id for ph in layers[2]] == ["d"]
def test_full_pipeline_structure(self):
"""完整流水线结构:规划→{前端,后端}→QA→评审"""
plan = _make_pipeline_plan()
layers = plan.topological_sort()
assert len(layers) == 4
assert [ph.id for ph in layers[0]] == ["p1"]
assert set(ph.id for ph in layers[1]) == {"p2", "p3"}
assert [ph.id for ph in layers[2]] == ["p4"]
assert [ph.id for ph in layers[3]] == ["p5"]
def test_no_dependencies(self):
"""无依赖:所有阶段在同一层"""
plan = TeamPlan(
task="无依赖任务",
phases=[
_make_phase(id="a", name="A", depends_on=[]),
_make_phase(id="b", name="B", depends_on=[]),
_make_phase(id="c", name="C", depends_on=[]),
],
)
layers = plan.topological_sort()
assert len(layers) == 1
assert set(ph.id for ph in layers[0]) == {"a", "b", "c"}
def test_empty_phases(self):
"""空阶段列表返回空列表"""
plan = TeamPlan(task="空计划")
assert plan.topological_sort() == []
def test_circular_dependency_raises(self):
"""循环依赖 A→B→A 抛出 ValueError"""
plan = TeamPlan(
task="循环依赖任务",
phases=[
_make_phase(id="a", name="A", depends_on=["b"]),
_make_phase(id="b", name="B", depends_on=["a"]),
],
)
with pytest.raises(ValueError, match="Circular dependency"):
plan.topological_sort()
def test_self_dependency_raises(self):
"""自循环依赖 A→A 抛出 ValueError"""
plan = TeamPlan(
task="自循环任务",
phases=[
_make_phase(id="a", name="A", depends_on=["a"]),
],
)
with pytest.raises(ValueError, match="Circular dependency"):
plan.topological_sort()
def test_invalid_dependency_reference_raises(self):
"""依赖不存在的阶段 ID 抛出 ValueError"""
plan = TeamPlan(
task="无效依赖任务",
phases=[
_make_phase(id="a", name="A", depends_on=["nonexistent"]),
],
)
with pytest.raises(ValueError, match="non-existent phase"):
plan.topological_sort()
class TestGetReadyPhases:
"""get_ready_phases 就绪阶段测试"""
def test_initial_ready_phases(self):
"""初始时只有无依赖的阶段就绪"""
plan = _make_pipeline_plan()
ready = plan.get_ready_phases()
assert len(ready) == 1
assert ready[0].id == "p1"
def test_ready_after_first_layer_completes(self):
"""第一层完成后,第二层阶段就绪"""
plan = _make_pipeline_plan()
plan.update_phase_status("p1", PhaseStatus.COMPLETED)
ready = plan.get_ready_phases()
assert len(ready) == 2
assert {ph.id for ph in ready} == {"p2", "p3"}
def test_ready_after_partial_completion(self):
"""部分完成p1 完成后 p2 完成p3 仍 pending → p4 不就绪"""
plan = _make_pipeline_plan()
plan.update_phase_status("p1", PhaseStatus.COMPLETED)
plan.update_phase_status("p2", PhaseStatus.COMPLETED)
# p3 still pending, so p4 (depends on p2 AND p3) is not ready
ready = plan.get_ready_phases()
assert len(ready) == 1
assert ready[0].id == "p3"
def test_ready_excludes_running_phases(self):
"""运行中的阶段不被视为就绪"""
plan = _make_pipeline_plan()
plan.update_phase_status("p1", PhaseStatus.RUNNING)
ready = plan.get_ready_phases()
assert len(ready) == 0
def test_ready_excludes_completed_phases(self):
"""已完成的阶段不被视为就绪"""
plan = _make_pipeline_plan()
plan.update_phase_status("p1", PhaseStatus.COMPLETED)
plan.update_phase_status("p2", PhaseStatus.COMPLETED)
plan.update_phase_status("p3", PhaseStatus.COMPLETED)
ready = plan.get_ready_phases()
assert len(ready) == 1
assert ready[0].id == "p4"
class TestTeamPlanPhasesSerialization:
"""TeamPlan phases 序列化测试"""
def test_to_dict_includes_phases(self):
"""to_dict 包含 phases 字段"""
plan = _make_pipeline_plan()
d = plan.to_dict()
assert "phases" in d
assert len(d["phases"]) == 5
assert d["phases"][0]["id"] == "p1"
assert d["phases"][0]["depends_on"] == []
def test_from_dict_restores_phases(self):
"""from_dict 恢复 phases"""
plan = _make_pipeline_plan()
d = plan.to_dict()
restored = TeamPlan.from_dict(d)
assert len(restored.phases) == len(plan.phases)
for original, restored_ph in zip(plan.phases, restored.phases):
assert restored_ph.id == original.id
assert restored_ph.name == original.name
assert restored_ph.assigned_expert == original.assigned_expert
assert restored_ph.task_description == original.task_description
assert restored_ph.depends_on == original.depends_on
assert restored_ph.status == original.status
def test_both_subtasks_and_phases_coexist(self):
"""subtasks 和 phases 可以共存"""
plan = TeamPlan(
task="混合模式",
subtasks=[SubTask(id="s1", description="子任务1")],
phases=[_make_phase(id="p1", name="阶段1")],
)
d = plan.to_dict()
assert len(d["subtasks"]) == 1
assert len(d["phases"]) == 1
assert d["subtasks"][0]["id"] == "s1"
assert d["phases"][0]["id"] == "p1"