From 1075598ebf831ecbff9052ba936b649f5a2340f6 Mon Sep 17 00:00:00 2001 From: chiguyong Date: Thu, 18 Jun 2026 01:28:18 +0800 Subject: [PATCH] =?UTF-8?q?feat(experts):=E6=81=A2=E5=A4=8D=20plan.py=20?= =?UTF-8?q?=E9=98=B6=E6=AE=B5=E4=BE=9D=E8=B5=96=E5=9B=BE=20(PlanPhase=20+?= =?UTF-8?q?=20topological=5Fsort)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 PhaseStatus 枚举 (PENDING/RUNNING/COMPLETED/FAILED) - 新增 PlanPhase 数据类 (id/name/assigned_expert/task_description/depends_on/status/result) - TeamPlan 新增 phases 字段及配套方法: get_phase/update_phase_status/topological_sort/get_ready_phases - topological_sort 使用 Kahn 算法返回执行层 (list[list[PlanPhase]]),检测循环依赖 - 保留 SubTask/MergeStrategy 向后兼容 - 新增 54 个单元测试覆盖线性/并行/循环依赖、无效引用、就绪阶段、序列化 --- src/agentkit/experts/plan.py | 228 +++++++++++++++-- tests/unit/experts/test_plan.py | 429 +++++++++++++++++++++++++++++++- 2 files changed, 632 insertions(+), 25 deletions(-) diff --git a/src/agentkit/experts/plan.py b/src/agentkit/experts/plan.py index 4c1a440..2bea4a2 100644 --- a/src/agentkit/experts/plan.py +++ b/src/agentkit/experts/plan.py @@ -1,11 +1,13 @@ -"""Expert Team 计划数据模型 - hub-and-spoke 模式 +"""Expert Team 计划数据模型 - 流水线模式 -简化为 Lead Expert + 并行 Task 模式: -- Lead Expert 接收任务并分解为子任务 -- 子任务并行执行(Task 深度=1,不能再 spawn Task) -- Lead Expert 汇总结果(BEST 策略) +支持两种模式: +1. hub-and-spoke(向后兼容):Lead Expert 分解为并行子任务(SubTask) +2. pipeline(新):Lead Expert 分解为阶段(PlanPhase),阶段间有依赖关系 -移除了阶段依赖图、VOTE/FUSION 合并策略、跨阶段状态共享。 +流水线模式核心: +- Lead Expert 分解任务为阶段(phases),每个阶段有 assigned_expert 和 depends_on +- 执行时按依赖拓扑排序,无依赖的阶段并行,有依赖的串行 +- 阶段间数据通过 SharedWorkspace 传递 """ from __future__ import annotations @@ -19,8 +21,7 @@ from typing import Any class MergeStrategy(str, enum.Enum): """合并策略 - Lead Expert 用于选择最佳结果 - hub-and-spoke 模式下仅保留 BEST 策略: - Lead Expert 从所有子任务结果中选择或综合出最佳结果。 + 仅保留 BEST 策略:Lead Expert 从所有结果中选择或综合出最佳结果。 """ BEST = "best" @@ -37,7 +38,16 @@ class PlanStatus(str, enum.Enum): class SubTaskStatus(str, enum.Enum): - """子任务状态""" + """子任务状态(hub-and-spoke 模式)""" + + PENDING = "pending" + RUNNING = "running" + COMPLETED = "completed" + FAILED = "failed" + + +class PhaseStatus(str, enum.Enum): + """阶段状态(流水线模式)""" PENDING = "pending" RUNNING = "running" @@ -47,15 +57,7 @@ class SubTaskStatus(str, enum.Enum): @dataclass class SubTask: - """Lead Expert 分解出的子任务 - - 在 hub-and-spoke 模式中,Lead Expert 将原始任务分解为多个子任务, - 每个子任务由一个 Expert 并行执行。子任务之间无依赖关系、无通信。 - - 约束: - - Task 深度=1(子任务不能再 spawn 子任务) - - 子任务之间无通信 - - Lead Expert 持有所有状态 + """Lead Expert 分解出的子任务(hub-and-spoke 模式,向后兼容) Attributes: id: 子任务标识符 @@ -94,17 +96,71 @@ class SubTask: @dataclass -class TeamPlan: - """Expert Team hub-and-spoke 执行计划 +class PlanPhase: + """流水线模式中的执行阶段 - Lead Expert 持有此计划,包含分解的子任务列表。 - 与旧版 CollaborationPlan 不同,此计划无阶段依赖图, - 所有子任务并行执行,由 Lead Expert 统一汇总。 + Lead Expert 将任务分解为多个阶段,阶段间通过 depends_on 建立依赖关系。 + 执行时按拓扑排序,同层无依赖阶段并行,层间串行。 + + Attributes: + id: 阶段标识符(用于 depends_on 引用) + name: 阶段名称(如"规划"、"前端"、"后端"、"QA"、"评审") + assigned_expert: 分配的 Expert 名称 + task_description: 阶段任务描述 + depends_on: 前置阶段 ID 列表(空列表表示无依赖) + status: 当前状态 + result: 阶段输出结果 + """ + + id: str = field(default_factory=lambda: str(uuid.uuid4())) + name: str = "" + assigned_expert: str = "" + task_description: str = "" + depends_on: list[str] = field(default_factory=list) + status: PhaseStatus = PhaseStatus.PENDING + result: dict[str, Any] | None = None + + def to_dict(self) -> dict[str, Any]: + """序列化为字典""" + return { + "id": self.id, + "name": self.name, + "assigned_expert": self.assigned_expert, + "task_description": self.task_description, + "depends_on": list(self.depends_on), + "status": self.status.value, + "result": self.result, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> PlanPhase: + """从字典创建 PlanPhase""" + return cls( + id=data.get("id", str(uuid.uuid4())), + name=data.get("name", ""), + assigned_expert=data.get("assigned_expert", ""), + task_description=data.get("task_description", ""), + depends_on=list(data.get("depends_on", [])), + status=PhaseStatus(data.get("status", PhaseStatus.PENDING.value)), + result=data.get("result"), + ) + + +@dataclass +class TeamPlan: + """Expert Team 执行计划 + + 支持两种模式: + 1. hub-and-spoke(向后兼容):使用 subtasks,并行执行无依赖 + 2. pipeline(新):使用 phases,按依赖拓扑排序执行 + + 新代码应优先使用 phases 字段。subtasks 保留用于向后兼容。 Attributes: id: 计划标识符 task: 原始任务描述 - subtasks: 子任务列表(并行执行,无依赖关系) + subtasks: 子任务列表(hub-and-spoke 模式,向后兼容) + phases: 阶段列表(流水线模式) status: 计划状态 lead_expert: 主导 Expert 名称 """ @@ -112,6 +168,7 @@ class TeamPlan: id: str = field(default_factory=lambda: str(uuid.uuid4())) task: str = "" subtasks: list[SubTask] = field(default_factory=list) + phases: list[PlanPhase] = field(default_factory=list) status: PlanStatus = PlanStatus.DRAFT lead_expert: str = "" @@ -121,6 +178,7 @@ class TeamPlan: "id": self.id, "task": self.task, "subtasks": [st.to_dict() for st in self.subtasks], + "phases": [ph.to_dict() for ph in self.phases], "status": self.status.value, "lead_expert": self.lead_expert, } @@ -129,14 +187,18 @@ class TeamPlan: def from_dict(cls, data: dict[str, Any]) -> TeamPlan: """从字典创建 TeamPlan""" subtasks = [SubTask.from_dict(st) for st in data.get("subtasks", [])] + phases = [PlanPhase.from_dict(ph) for ph in data.get("phases", [])] return cls( id=data.get("id", str(uuid.uuid4())), task=data.get("task", ""), subtasks=subtasks, + phases=phases, status=PlanStatus(data.get("status", PlanStatus.DRAFT.value)), lead_expert=data.get("lead_expert", ""), ) + # ── SubTask 方法(hub-and-spoke 模式,向后兼容)── + def get_subtask(self, subtask_id: str) -> SubTask | None: """根据 ID 获取子任务,不存在则返回 None""" for st in self.subtasks: @@ -170,3 +232,121 @@ class TeamPlan: return all( st.status in (SubTaskStatus.COMPLETED, SubTaskStatus.FAILED) for st in self.subtasks ) + + # ── PlanPhase 方法(流水线模式)── + + def get_phase(self, phase_id: str) -> PlanPhase | None: + """根据 ID 获取阶段,不存在则返回 None""" + for ph in self.phases: + if ph.id == phase_id: + return ph + return None + + def update_phase_status( + self, phase_id: str, status: PhaseStatus, result: dict[str, Any] | None = None + ) -> None: + """更新阶段状态和可选的结果""" + ph = self.get_phase(phase_id) + if ph is not None: + ph.status = status + if result is not None: + ph.result = result + + @property + def completed_phases(self) -> list[PlanPhase]: + """已完成的阶段列表""" + return [ph for ph in self.phases if ph.status == PhaseStatus.COMPLETED] + + @property + def failed_phases(self) -> list[PlanPhase]: + """失败的阶段列表""" + return [ph for ph in self.phases if ph.status == PhaseStatus.FAILED] + + @property + def all_phases_done(self) -> bool: + """所有阶段是否都已完成(成功或失败)""" + return all( + ph.status in (PhaseStatus.COMPLETED, PhaseStatus.FAILED) for ph in self.phases + ) + + def get_ready_phases(self) -> list[PlanPhase]: + """返回当前可执行的阶段(状态为 PENDING 且所有依赖已完成) + + Returns: + 可执行的阶段列表 + """ + completed_ids = {ph.id for ph in self.completed_phases} + ready = [] + for ph in self.phases: + if ph.status != PhaseStatus.PENDING: + continue + # Check if all dependencies are completed + if all(dep_id in completed_ids for dep_id in ph.depends_on): + ready.append(ph) + return ready + + def topological_sort(self) -> list[list[PlanPhase]]: + """按依赖关系拓扑排序阶段,返回执行层列表 + + 同层阶段无依赖关系,可并行执行;层间有依赖,需串行等待。 + + Returns: + 执行层列表,每层包含可并行执行的阶段 + + Raises: + ValueError: 如果存在循环依赖 + """ + if not self.phases: + return [] + + # Build dependency graph + phase_map = {ph.id: ph for ph in self.phases} + all_ids = set(phase_map.keys()) + + # Validate depends_on references + for ph in self.phases: + for dep_id in ph.depends_on: + if dep_id not in all_ids: + raise ValueError( + f"Phase '{ph.id}' ({ph.name}) depends on non-existent phase '{dep_id}'" + ) + + # Kahn's algorithm for topological sort + # Compute in-degree (number of dependencies) for each phase + in_degree: dict[str, int] = {ph.id: len(ph.depends_on) for ph in self.phases} + + # Build reverse dependency map (which phases depend on this one) + dependents: dict[str, list[str]] = {ph.id: [] for ph in self.phases} + for ph in self.phases: + for dep_id in ph.depends_on: + dependents[dep_id].append(ph.id) + + layers: list[list[PlanPhase]] = [] + processed: set[str] = set() + + while len(processed) < len(self.phases): + # Find all phases with in_degree 0 that haven't been processed + current_layer_ids = [ + ph_id + for ph_id in in_degree + if ph_id not in processed and in_degree[ph_id] == 0 + ] + + if not current_layer_ids: + # No progress — cycle detected + remaining = [ph_id for ph_id in in_degree if ph_id not in processed] + raise ValueError( + f"Circular dependency detected among phases: {remaining}" + ) + + # Add current layer + current_layer = [phase_map[ph_id] for ph_id in current_layer_ids] + layers.append(current_layer) + + # Mark as processed and reduce in_degree for dependents + for ph_id in current_layer_ids: + processed.add(ph_id) + for dep_id in dependents[ph_id]: + in_degree[dep_id] -= 1 + + return layers diff --git a/tests/unit/experts/test_plan.py b/tests/unit/experts/test_plan.py index f75bccb..069f195 100644 --- a/tests/unit/experts/test_plan.py +++ b/tests/unit/experts/test_plan.py @@ -1,9 +1,13 @@ -"""TeamPlan / SubTask 数据模型单元测试 (hub-and-spoke 模式)""" +"""TeamPlan / SubTask / PlanPhase 数据模型单元测试 (hub-and-spoke + 流水线模式)""" from __future__ import annotations +import pytest + from agentkit.experts.plan import ( MergeStrategy, + PhaseStatus, + PlanPhase, PlanStatus, SubTask, SubTaskStatus, @@ -286,3 +290,426 @@ class TestTeamPlan: """all_done 当没有子任务时返回 True(vacuous truth)""" plan = TeamPlan(task="空计划") assert plan.all_done is True + + +# ── PlanPhase 测试(流水线模式)────────────────────────── + + +def _make_phase( + id: str = "phase_1", + name: str = "规划", + assigned_expert: str = "tech_lead", + task_description: str = "分析需求并制定计划", + depends_on: list[str] | None = None, + status: PhaseStatus = PhaseStatus.PENDING, + result: dict | None = None, +) -> PlanPhase: + """创建测试用 PlanPhase 实例""" + return PlanPhase( + id=id, + name=name, + assigned_expert=assigned_expert, + task_description=task_description, + depends_on=depends_on or [], + status=status, + result=result, + ) + + +def _make_pipeline_plan() -> TeamPlan: + """创建一个流水线模式的执行计划 + + 结构: + Layer 0: [规划] (无依赖) + Layer 1: [前端, 后端] (依赖规划) + Layer 2: [QA] (依赖前端+后端) + Layer 3: [评审] (依赖QA) + """ + phases = [ + _make_phase(id="p1", name="规划", assigned_expert="tech_lead", depends_on=[]), + _make_phase( + id="p2", name="前端", assigned_expert="frontend_engineer", depends_on=["p1"] + ), + _make_phase( + id="p3", name="后端", assigned_expert="backend_engineer", depends_on=["p1"] + ), + _make_phase(id="p4", name="QA", assigned_expert="qa_engineer", depends_on=["p2", "p3"]), + _make_phase(id="p5", name="评审", assigned_expert="code_reviewer", depends_on=["p4"]), + ] + return TeamPlan( + id="pipeline_001", + task="开发用户登录功能", + phases=phases, + status=PlanStatus.DRAFT, + lead_expert="tech_lead", + ) + + +class TestPhaseStatus: + """PhaseStatus 枚举测试""" + + def test_statuses_exist(self): + """阶段状态都存在""" + assert PhaseStatus.PENDING == "pending" + assert PhaseStatus.RUNNING == "running" + assert PhaseStatus.COMPLETED == "completed" + assert PhaseStatus.FAILED == "failed" + + +class TestPlanPhase: + """PlanPhase 数据模型测试""" + + def test_creation_with_all_fields(self): + """创建 PlanPhase 并设置所有字段""" + phase = PlanPhase( + id="phase_a", + name="前端开发", + assigned_expert="frontend_engineer", + task_description="实现登录页面", + depends_on=["phase_planning"], + status=PhaseStatus.RUNNING, + result={"component": "LoginForm"}, + ) + assert phase.id == "phase_a" + assert phase.name == "前端开发" + assert phase.assigned_expert == "frontend_engineer" + assert phase.task_description == "实现登录页面" + assert phase.depends_on == ["phase_planning"] + assert phase.status == PhaseStatus.RUNNING + assert phase.result == {"component": "LoginForm"} + + def test_default_values(self): + """默认值:自动生成 id,空 depends_on,PENDING 状态""" + phase = PlanPhase(name="测试阶段") + assert phase.id is not None + assert len(phase.id) > 0 + assert phase.name == "测试阶段" + assert phase.assigned_expert == "" + assert phase.task_description == "" + assert phase.depends_on == [] + assert phase.status == PhaseStatus.PENDING + assert phase.result is None + + def test_to_dict_from_dict_roundtrip(self): + """to_dict / from_dict 往返序列化""" + phase = _make_phase( + id="roundtrip_phase", + name="往返测试", + assigned_expert="tester", + task_description="测试序列化", + depends_on=["dep1", "dep2"], + status=PhaseStatus.COMPLETED, + result={"key": "value"}, + ) + d = phase.to_dict() + restored = PlanPhase.from_dict(d) + + assert restored.id == phase.id + assert restored.name == phase.name + assert restored.assigned_expert == phase.assigned_expert + assert restored.task_description == phase.task_description + assert restored.depends_on == phase.depends_on + assert restored.status == phase.status + assert restored.result == phase.result + + def test_to_dict_structure(self): + """to_dict 返回正确的字典结构""" + phase = _make_phase( + id="struct_test", + name="结构测试", + assigned_expert="dev", + task_description="测试结构", + depends_on=["d1"], + status=PhaseStatus.RUNNING, + result={"output": "data"}, + ) + d = phase.to_dict() + assert d["id"] == "struct_test" + assert d["name"] == "结构测试" + assert d["assigned_expert"] == "dev" + assert d["task_description"] == "测试结构" + assert d["depends_on"] == ["d1"] + assert d["status"] == "running" + assert d["result"] == {"output": "data"} + + +class TestTeamPlanPhases: + """TeamPlan 流水线模式(phases)测试""" + + def test_pipeline_plan_creation(self): + """创建流水线模式的 TeamPlan""" + plan = _make_pipeline_plan() + assert plan.id == "pipeline_001" + assert plan.task == "开发用户登录功能" + assert len(plan.phases) == 5 + assert plan.lead_expert == "tech_lead" + + def test_get_phase_by_id(self): + """get_phase 根据 ID 获取阶段""" + plan = _make_pipeline_plan() + ph = plan.get_phase("p3") + assert ph is not None + assert ph.id == "p3" + assert ph.name == "后端" + + def test_get_phase_nonexistent_returns_none(self): + """get_phase 对不存在的 ID 返回 None""" + plan = _make_pipeline_plan() + assert plan.get_phase("nonexistent") is None + + def test_update_phase_status(self): + """update_phase_status 更新阶段状态和结果""" + plan = _make_pipeline_plan() + plan.update_phase_status("p1", PhaseStatus.COMPLETED, {"plan": "设计文档"}) + ph = plan.get_phase("p1") + assert ph is not None + assert ph.status == PhaseStatus.COMPLETED + assert ph.result == {"plan": "设计文档"} + + def test_update_phase_status_without_result(self): + """update_phase_status 不传 result 时不覆盖已有 result""" + plan = _make_pipeline_plan() + plan.update_phase_status("p1", PhaseStatus.COMPLETED, {"plan": "文档"}) + plan.update_phase_status("p1", PhaseStatus.RUNNING) + ph = plan.get_phase("p1") + assert ph is not None + assert ph.status == PhaseStatus.RUNNING + assert ph.result == {"plan": "文档"} + + def test_completed_phases_property(self): + """completed_phases 返回已完成的阶段""" + plan = _make_pipeline_plan() + plan.update_phase_status("p1", PhaseStatus.COMPLETED) + plan.update_phase_status("p2", PhaseStatus.COMPLETED) + plan.update_phase_status("p3", PhaseStatus.FAILED) + + completed = plan.completed_phases + assert len(completed) == 2 + assert {ph.id for ph in completed} == {"p1", "p2"} + + def test_failed_phases_property(self): + """failed_phases 返回失败的阶段""" + plan = _make_pipeline_plan() + plan.update_phase_status("p1", PhaseStatus.COMPLETED) + plan.update_phase_status("p2", PhaseStatus.FAILED) + plan.update_phase_status("p3", PhaseStatus.FAILED) + + failed = plan.failed_phases + assert len(failed) == 2 + assert {ph.id for ph in failed} == {"p2", "p3"} + + def test_all_phases_done_when_all_completed(self): + """all_phases_done 当所有阶段完成时返回 True""" + plan = _make_pipeline_plan() + for ph in plan.phases: + plan.update_phase_status(ph.id, PhaseStatus.COMPLETED) + assert plan.all_phases_done is True + + def test_all_phases_done_when_some_failed(self): + """all_phases_done 当所有阶段完成或失败时返回 True""" + plan = _make_pipeline_plan() + plan.update_phase_status("p1", PhaseStatus.COMPLETED) + plan.update_phase_status("p2", PhaseStatus.FAILED) + plan.update_phase_status("p3", PhaseStatus.FAILED) + plan.update_phase_status("p4", PhaseStatus.COMPLETED) + plan.update_phase_status("p5", PhaseStatus.COMPLETED) + assert plan.all_phases_done is True + + def test_all_phases_done_when_pending(self): + """all_phases_done 当有阶段未完成时返回 False""" + plan = _make_pipeline_plan() + plan.update_phase_status("p1", PhaseStatus.COMPLETED) + # p2-p5 still pending + assert plan.all_phases_done is False + + def test_all_phases_done_empty_plan(self): + """all_phases_done 当没有阶段时返回 True""" + plan = TeamPlan(task="空计划") + assert plan.all_phases_done is True + + +class TestTopologicalSort: + """topological_sort 拓扑排序测试""" + + def test_linear_dependency(self): + """线性依赖:A→B→C 排序为 [[A], [B], [C]]""" + plan = TeamPlan( + task="线性任务", + phases=[ + _make_phase(id="a", name="A", depends_on=[]), + _make_phase(id="b", name="B", depends_on=["a"]), + _make_phase(id="c", name="C", depends_on=["b"]), + ], + ) + layers = plan.topological_sort() + assert len(layers) == 3 + assert [ph.id for ph in layers[0]] == ["a"] + assert [ph.id for ph in layers[1]] == ["b"] + assert [ph.id for ph in layers[2]] == ["c"] + + def test_parallel_phases(self): + """并行阶段:A→{B,C}→D 排序为 [[A], [B,C], [D]]""" + plan = TeamPlan( + task="并行任务", + phases=[ + _make_phase(id="a", name="A", depends_on=[]), + _make_phase(id="b", name="B", depends_on=["a"]), + _make_phase(id="c", name="C", depends_on=["a"]), + _make_phase(id="d", name="D", depends_on=["b", "c"]), + ], + ) + layers = plan.topological_sort() + assert len(layers) == 3 + assert [ph.id for ph in layers[0]] == ["a"] + assert set(ph.id for ph in layers[1]) == {"b", "c"} + assert [ph.id for ph in layers[2]] == ["d"] + + def test_full_pipeline_structure(self): + """完整流水线结构:规划→{前端,后端}→QA→评审""" + plan = _make_pipeline_plan() + layers = plan.topological_sort() + assert len(layers) == 4 + assert [ph.id for ph in layers[0]] == ["p1"] + assert set(ph.id for ph in layers[1]) == {"p2", "p3"} + assert [ph.id for ph in layers[2]] == ["p4"] + assert [ph.id for ph in layers[3]] == ["p5"] + + def test_no_dependencies(self): + """无依赖:所有阶段在同一层""" + plan = TeamPlan( + task="无依赖任务", + phases=[ + _make_phase(id="a", name="A", depends_on=[]), + _make_phase(id="b", name="B", depends_on=[]), + _make_phase(id="c", name="C", depends_on=[]), + ], + ) + layers = plan.topological_sort() + assert len(layers) == 1 + assert set(ph.id for ph in layers[0]) == {"a", "b", "c"} + + def test_empty_phases(self): + """空阶段列表返回空列表""" + plan = TeamPlan(task="空计划") + assert plan.topological_sort() == [] + + def test_circular_dependency_raises(self): + """循环依赖 A→B→A 抛出 ValueError""" + plan = TeamPlan( + task="循环依赖任务", + phases=[ + _make_phase(id="a", name="A", depends_on=["b"]), + _make_phase(id="b", name="B", depends_on=["a"]), + ], + ) + with pytest.raises(ValueError, match="Circular dependency"): + plan.topological_sort() + + def test_self_dependency_raises(self): + """自循环依赖 A→A 抛出 ValueError""" + plan = TeamPlan( + task="自循环任务", + phases=[ + _make_phase(id="a", name="A", depends_on=["a"]), + ], + ) + with pytest.raises(ValueError, match="Circular dependency"): + plan.topological_sort() + + def test_invalid_dependency_reference_raises(self): + """依赖不存在的阶段 ID 抛出 ValueError""" + plan = TeamPlan( + task="无效依赖任务", + phases=[ + _make_phase(id="a", name="A", depends_on=["nonexistent"]), + ], + ) + with pytest.raises(ValueError, match="non-existent phase"): + plan.topological_sort() + + +class TestGetReadyPhases: + """get_ready_phases 就绪阶段测试""" + + def test_initial_ready_phases(self): + """初始时只有无依赖的阶段就绪""" + plan = _make_pipeline_plan() + ready = plan.get_ready_phases() + assert len(ready) == 1 + assert ready[0].id == "p1" + + def test_ready_after_first_layer_completes(self): + """第一层完成后,第二层阶段就绪""" + plan = _make_pipeline_plan() + plan.update_phase_status("p1", PhaseStatus.COMPLETED) + ready = plan.get_ready_phases() + assert len(ready) == 2 + assert {ph.id for ph in ready} == {"p2", "p3"} + + def test_ready_after_partial_completion(self): + """部分完成:p1 完成后 p2 完成,p3 仍 pending → p4 不就绪""" + plan = _make_pipeline_plan() + plan.update_phase_status("p1", PhaseStatus.COMPLETED) + plan.update_phase_status("p2", PhaseStatus.COMPLETED) + # p3 still pending, so p4 (depends on p2 AND p3) is not ready + ready = plan.get_ready_phases() + assert len(ready) == 1 + assert ready[0].id == "p3" + + def test_ready_excludes_running_phases(self): + """运行中的阶段不被视为就绪""" + plan = _make_pipeline_plan() + plan.update_phase_status("p1", PhaseStatus.RUNNING) + ready = plan.get_ready_phases() + assert len(ready) == 0 + + def test_ready_excludes_completed_phases(self): + """已完成的阶段不被视为就绪""" + plan = _make_pipeline_plan() + plan.update_phase_status("p1", PhaseStatus.COMPLETED) + plan.update_phase_status("p2", PhaseStatus.COMPLETED) + plan.update_phase_status("p3", PhaseStatus.COMPLETED) + ready = plan.get_ready_phases() + assert len(ready) == 1 + assert ready[0].id == "p4" + + +class TestTeamPlanPhasesSerialization: + """TeamPlan phases 序列化测试""" + + def test_to_dict_includes_phases(self): + """to_dict 包含 phases 字段""" + plan = _make_pipeline_plan() + d = plan.to_dict() + assert "phases" in d + assert len(d["phases"]) == 5 + assert d["phases"][0]["id"] == "p1" + assert d["phases"][0]["depends_on"] == [] + + def test_from_dict_restores_phases(self): + """from_dict 恢复 phases""" + plan = _make_pipeline_plan() + d = plan.to_dict() + restored = TeamPlan.from_dict(d) + + assert len(restored.phases) == len(plan.phases) + for original, restored_ph in zip(plan.phases, restored.phases): + assert restored_ph.id == original.id + assert restored_ph.name == original.name + assert restored_ph.assigned_expert == original.assigned_expert + assert restored_ph.task_description == original.task_description + assert restored_ph.depends_on == original.depends_on + assert restored_ph.status == original.status + + def test_both_subtasks_and_phases_coexist(self): + """subtasks 和 phases 可以共存""" + plan = TeamPlan( + task="混合模式", + subtasks=[SubTask(id="s1", description="子任务1")], + phases=[_make_phase(id="p1", name="阶段1")], + ) + d = plan.to_dict() + assert len(d["subtasks"]) == 1 + assert len(d["phases"]) == 1 + assert d["subtasks"][0]["id"] == "s1" + assert d["phases"][0]["id"] == "p1"