"""TeamPlan / SubTask / PlanPhase 数据模型单元测试 (hub-and-spoke + 流水线模式)""" from __future__ import annotations import pytest from agentkit.experts.plan import ( CollaborationContract, MergeStrategy, PhaseStatus, PhaseType, PlanPhase, PlanStatus, SubTask, SubTaskStatus, TeamPlan, ) # ── 辅助函数 ────────────────────────────────────────────── def _make_subtask( id: str = "subtask_1", description: str = "分析数据", assigned_expert: str = "analyst", status: SubTaskStatus = SubTaskStatus.PENDING, result: dict | None = None, ) -> SubTask: """创建测试用 SubTask 实例""" return SubTask( id=id, description=description, assigned_expert=assigned_expert, status=status, result=result, ) def _make_valid_plan() -> TeamPlan: """创建一个有效的 hub-and-spoke 执行计划""" subtasks = [ _make_subtask(id="s1", description="分析需求", assigned_expert="analyst"), _make_subtask(id="s2", description="设计架构", assigned_expert="architect"), _make_subtask(id="s3", description="编写代码", assigned_expert="coder"), ] return TeamPlan( id="plan_001", task="实现用户登录功能", subtasks=subtasks, status=PlanStatus.DRAFT, lead_expert="architect", ) # ── MergeStrategy 测试 ──────────────────────────────────── class TestMergeStrategy: """MergeStrategy 枚举测试""" def test_only_best_strategy_exists(self): """hub-and-spoke 模式仅保留 BEST 策略""" assert MergeStrategy.BEST == "best" # 确保只有 BEST 一个值 assert len(list(MergeStrategy)) == 1 def test_no_vote_or_fusion(self): """VOTE 和 FUSION 已被移除""" assert not hasattr(MergeStrategy, "VOTE") assert not hasattr(MergeStrategy, "FUSION") # ── PlanStatus 测试 ─────────────────────────────────────── class TestPlanStatus: """PlanStatus 枚举测试""" def test_statuses_exist(self): """必要的计划状态都存在""" assert PlanStatus.DRAFT == "draft" assert PlanStatus.EXECUTING == "executing" assert PlanStatus.COMPLETED == "completed" assert PlanStatus.FAILED == "failed" assert PlanStatus.FALLBACK == "fallback" def test_no_confirmed_status(self): """CONFIRMED 状态已被移除(hub-and-spoke 无需确认阶段)""" assert not hasattr(PlanStatus, "CONFIRMED") # ── SubTaskStatus 测试 ──────────────────────────────────── class TestSubTaskStatus: """SubTaskStatus 枚举测试""" def test_statuses_exist(self): """子任务状态都存在""" assert SubTaskStatus.PENDING == "pending" assert SubTaskStatus.RUNNING == "running" assert SubTaskStatus.COMPLETED == "completed" assert SubTaskStatus.FAILED == "failed" # ── SubTask 测试 ────────────────────────────────────────── class TestSubTask: """SubTask 数据模型测试""" def test_creation_with_all_fields(self): """创建 SubTask 并设置所有字段""" subtask = SubTask( id="subtask_a", description="竞品分析", assigned_expert="analyst", status=SubTaskStatus.RUNNING, result={"report": "竞品分析报告"}, ) assert subtask.id == "subtask_a" assert subtask.description == "竞品分析" assert subtask.assigned_expert == "analyst" assert subtask.status == SubTaskStatus.RUNNING assert subtask.result == {"report": "竞品分析报告"} def test_default_values(self): """默认值:自动生成 id,PENDING 状态""" subtask = SubTask(description="测试任务") assert subtask.id is not None assert len(subtask.id) > 0 assert subtask.description == "测试任务" assert subtask.assigned_expert == "" assert subtask.status == SubTaskStatus.PENDING assert subtask.result is None def test_to_dict_from_dict_roundtrip(self): """to_dict / from_dict 往返序列化""" subtask = SubTask( id="roundtrip_subtask", description="往返测试", assigned_expert="tester", status=SubTaskStatus.COMPLETED, result={"key": "value"}, ) d = subtask.to_dict() restored = SubTask.from_dict(d) assert restored.id == subtask.id assert restored.description == subtask.description assert restored.assigned_expert == subtask.assigned_expert assert restored.status == subtask.status assert restored.result == subtask.result def test_to_dict_structure(self): """to_dict 返回正确的字典结构""" subtask = _make_subtask( id="struct_test", description="结构测试", assigned_expert="dev", status=SubTaskStatus.RUNNING, result={"output": "data"}, ) d = subtask.to_dict() assert d["id"] == "struct_test" assert d["description"] == "结构测试" assert d["assigned_expert"] == "dev" assert d["status"] == "running" assert d["result"] == {"output": "data"} # ── TeamPlan 测试 ───────────────────────────────────────── class TestTeamPlan: """TeamPlan 数据模型测试""" def test_creation(self): """创建 TeamPlan""" plan = _make_valid_plan() assert plan.id == "plan_001" assert plan.task == "实现用户登录功能" assert len(plan.subtasks) == 3 assert plan.status == PlanStatus.DRAFT assert plan.lead_expert == "architect" def test_default_values(self): """默认值:自动生成 id,空子任务列表""" plan = TeamPlan(task="测试任务") assert plan.id is not None assert plan.task == "测试任务" assert plan.subtasks == [] assert plan.status == PlanStatus.DRAFT assert plan.lead_expert == "" def test_to_dict_from_dict_roundtrip(self): """to_dict / from_dict 往返序列化""" plan = _make_valid_plan() d = plan.to_dict() restored = TeamPlan.from_dict(d) assert restored.id == plan.id assert restored.task == plan.task assert len(restored.subtasks) == len(plan.subtasks) assert restored.status == plan.status assert restored.lead_expert == plan.lead_expert for original, restored_st in zip(plan.subtasks, restored.subtasks): assert restored_st.id == original.id assert restored_st.description == original.description assert restored_st.assigned_expert == original.assigned_expert assert restored_st.status == original.status def test_get_subtask_by_id(self): """get_subtask 根据 ID 获取子任务""" plan = _make_valid_plan() st = plan.get_subtask("s2") assert st is not None assert st.id == "s2" assert st.description == "设计架构" def test_get_subtask_with_nonexistent_id_returns_none(self): """get_subtask 对不存在的 ID 返回 None""" plan = _make_valid_plan() assert plan.get_subtask("nonexistent") is None def test_update_subtask_status(self): """update_subtask_status 更新子任务状态和结果""" plan = _make_valid_plan() plan.update_subtask_status("s1", SubTaskStatus.COMPLETED, {"output": "分析完成"}) st = plan.get_subtask("s1") assert st is not None assert st.status == SubTaskStatus.COMPLETED assert st.result == {"output": "分析完成"} def test_update_subtask_status_without_result(self): """update_subtask_status 不传 result 时不覆盖已有 result""" plan = _make_valid_plan() plan.update_subtask_status("s1", SubTaskStatus.COMPLETED, {"output": "done"}) plan.update_subtask_status("s1", SubTaskStatus.RUNNING) st = plan.get_subtask("s1") assert st is not None assert st.status == SubTaskStatus.RUNNING assert st.result == {"output": "done"} def test_completed_subtasks_property(self): """completed_subtasks 返回已完成的子任务""" plan = _make_valid_plan() plan.update_subtask_status("s1", SubTaskStatus.COMPLETED) plan.update_subtask_status("s2", SubTaskStatus.COMPLETED) plan.update_subtask_status("s3", SubTaskStatus.FAILED) completed = plan.completed_subtasks assert len(completed) == 2 assert {st.id for st in completed} == {"s1", "s2"} def test_failed_subtasks_property(self): """failed_subtasks 返回失败的子任务""" plan = _make_valid_plan() plan.update_subtask_status("s1", SubTaskStatus.COMPLETED) plan.update_subtask_status("s2", SubTaskStatus.FAILED) plan.update_subtask_status("s3", SubTaskStatus.FAILED) failed = plan.failed_subtasks assert len(failed) == 2 assert {st.id for st in failed} == {"s2", "s3"} def test_all_done_property_when_all_completed(self): """all_done 当所有子任务完成时返回 True""" plan = _make_valid_plan() for st in plan.subtasks: plan.update_subtask_status(st.id, SubTaskStatus.COMPLETED) assert plan.all_done is True def test_all_done_property_when_some_failed(self): """all_done 当所有子任务完成或失败时返回 True""" plan = _make_valid_plan() plan.update_subtask_status("s1", SubTaskStatus.COMPLETED) plan.update_subtask_status("s2", SubTaskStatus.FAILED) plan.update_subtask_status("s3", SubTaskStatus.COMPLETED) assert plan.all_done is True def test_all_done_property_when_pending(self): """all_done 当有子任务未完成时返回 False""" plan = _make_valid_plan() plan.update_subtask_status("s1", SubTaskStatus.COMPLETED) # s2 and s3 still pending assert plan.all_done is False def test_all_done_property_empty_plan(self): """all_done 当没有子任务时返回 True(vacuous truth)""" plan = TeamPlan(task="空计划") assert plan.all_done is True # ── PlanPhase 测试(流水线模式)────────────────────────── def _make_phase( id: str = "phase_1", name: str = "规划", assigned_expert: str = "tech_lead", task_description: str = "分析需求并制定计划", depends_on: list[str] | None = None, status: PhaseStatus = PhaseStatus.PENDING, result: dict | None = None, ) -> PlanPhase: """创建测试用 PlanPhase 实例""" return PlanPhase( id=id, name=name, assigned_expert=assigned_expert, task_description=task_description, depends_on=depends_on or [], status=status, result=result, ) def _make_pipeline_plan() -> TeamPlan: """创建一个流水线模式的执行计划 结构: Layer 0: [规划] (无依赖) Layer 1: [前端, 后端] (依赖规划) Layer 2: [QA] (依赖前端+后端) Layer 3: [评审] (依赖QA) """ phases = [ _make_phase(id="p1", name="规划", assigned_expert="tech_lead", depends_on=[]), _make_phase(id="p2", name="前端", assigned_expert="frontend_engineer", depends_on=["p1"]), _make_phase(id="p3", name="后端", assigned_expert="backend_engineer", depends_on=["p1"]), _make_phase(id="p4", name="QA", assigned_expert="qa_engineer", depends_on=["p2", "p3"]), _make_phase(id="p5", name="评审", assigned_expert="code_reviewer", depends_on=["p4"]), ] return TeamPlan( id="pipeline_001", task="开发用户登录功能", phases=phases, status=PlanStatus.DRAFT, lead_expert="tech_lead", ) class TestPhaseStatus: """PhaseStatus 枚举测试""" def test_statuses_exist(self): """阶段状态都存在""" assert PhaseStatus.PENDING == "pending" assert PhaseStatus.RUNNING == "in_progress" assert PhaseStatus.COMPLETED == "completed" assert PhaseStatus.FAILED == "failed" class TestPhaseType: """PhaseType 枚举测试""" def test_types_exist(self): """阶段类型都存在""" assert PhaseType.EXECUTION == "execution" assert PhaseType.DEBATE == "debate" def test_only_two_types(self): """只有 EXECUTION 和 DEBATE 两种类型""" assert len(list(PhaseType)) == 2 class TestPlanPhase: """PlanPhase 数据模型测试""" def test_creation_with_all_fields(self): """创建 PlanPhase 并设置所有字段""" phase = PlanPhase( id="phase_a", name="前端开发", assigned_expert="frontend_engineer", task_description="实现登录页面", depends_on=["phase_planning"], status=PhaseStatus.RUNNING, result={"component": "LoginForm"}, ) assert phase.id == "phase_a" assert phase.name == "前端开发" assert phase.assigned_expert == "frontend_engineer" assert phase.task_description == "实现登录页面" assert phase.depends_on == ["phase_planning"] assert phase.status == PhaseStatus.RUNNING assert phase.result == {"component": "LoginForm"} def test_default_values(self): """默认值:自动生成 id,空 depends_on,PENDING 状态""" phase = PlanPhase(name="测试阶段") assert phase.id is not None assert len(phase.id) > 0 assert phase.name == "测试阶段" assert phase.assigned_expert == "" assert phase.task_description == "" assert phase.depends_on == [] assert phase.status == PhaseStatus.PENDING assert phase.result is None def test_to_dict_from_dict_roundtrip(self): """to_dict / from_dict 往返序列化 Note: to_dict() serializes result to string for frontend compatibility, so the roundtrip is lossy for the result field (dict → string). """ phase = _make_phase( id="roundtrip_phase", name="往返测试", assigned_expert="tester", task_description="测试序列化", depends_on=["dep1", "dep2"], status=PhaseStatus.COMPLETED, result={"content": "value"}, ) d = phase.to_dict() restored = PlanPhase.from_dict(d) assert restored.id == phase.id assert restored.name == phase.name assert restored.assigned_expert == phase.assigned_expert assert restored.task_description == phase.task_description assert restored.depends_on == phase.depends_on assert restored.status == phase.status # result is serialized to string in to_dict(); from_dict reads it back as string assert restored.result == "value" def test_to_dict_structure(self): """to_dict 返回正确的字典结构""" phase = _make_phase( id="struct_test", name="结构测试", assigned_expert="dev", task_description="测试结构", depends_on=["d1"], status=PhaseStatus.RUNNING, result={"content": "phase output data"}, ) d = phase.to_dict() assert d["id"] == "struct_test" assert d["name"] == "结构测试" assert d["assigned_expert"] == "dev" assert d["task_description"] == "测试结构" assert d["depends_on"] == ["d1"] assert d["status"] == "in_progress" # result is serialized to string to match frontend ITeamPlanPhase.result type assert d["result"] == "phase output data" def test_default_phase_type_is_execution(self): """默认 phase_type 为 EXECUTION""" phase = PlanPhase(name="测试阶段") assert phase.phase_type == PhaseType.EXECUTION assert phase.debate_config is None def test_debate_phase_creation(self): """创建 DEBATE 类型阶段""" debate_config = { "topic": "前端框架选型:React vs Vue", "participants": ["frontend_engineer", "tech_lead"], "max_rounds": 2, } phase = PlanPhase( name="框架选型辩论", assigned_expert="tech_lead", task_description="就前端框架选型进行辩论", phase_type=PhaseType.DEBATE, debate_config=debate_config, ) assert phase.phase_type == PhaseType.DEBATE assert phase.debate_config == debate_config assert phase.debate_config["topic"] == "前端框架选型:React vs Vue" assert phase.debate_config["participants"] == ["frontend_engineer", "tech_lead"] assert phase.debate_config["max_rounds"] == 2 def test_debate_phase_serialization_roundtrip(self): """DEBATE 阶段序列化往返""" debate_config = { "topic": "微服务 vs 单体", "participants": ["backend_engineer", "tech_lead"], "max_rounds": 3, } phase = PlanPhase( id="debate_1", name="架构辩论", assigned_expert="tech_lead", task_description="架构选型辩论", phase_type=PhaseType.DEBATE, debate_config=debate_config, ) d = phase.to_dict() assert d["phase_type"] == "debate" assert d["debate_config"] == debate_config restored = PlanPhase.from_dict(d) assert restored.phase_type == PhaseType.DEBATE assert restored.debate_config == debate_config assert restored.debate_config["topic"] == "微服务 vs 单体" def test_backward_compatibility_no_phase_type(self): """向后兼容:不带 phase_type 的旧 dict 默认为 EXECUTION""" old_dict = { "id": "old_phase", "name": "旧阶段", "assigned_expert": "dev", "task_description": "旧任务", "depends_on": [], "status": "pending", "result": None, } phase = PlanPhase.from_dict(old_dict) assert phase.phase_type == PhaseType.EXECUTION assert phase.debate_config is None def test_debate_config_none_for_execution(self): """EXECUTION 阶段的 debate_config 为 None""" phase = PlanPhase(name="执行阶段", phase_type=PhaseType.EXECUTION) assert phase.debate_config is None d = phase.to_dict() assert d["phase_type"] == "execution" assert d["debate_config"] is None def test_default_collaboration_contracts_empty(self): """默认 collaboration_contracts 为空列表""" phase = PlanPhase(name="测试阶段") assert phase.collaboration_contracts == [] d = phase.to_dict() assert d["collaboration_contracts"] == [] def test_plan_phase_with_contracts(self): """PlanPhase 携带 collaboration_contracts 序列化/反序列化正确""" contracts = [ CollaborationContract( from_expert="backend", to_expert="frontend", content_description="API 定义", status="delivered", ), CollaborationContract( from_expert="tech_lead", to_expert="backend", content_description="数据模型", ), ] phase = PlanPhase( id="contract_phase", name="后端开发", assigned_expert="backend_engineer", task_description="实现 API", collaboration_contracts=contracts, ) d = phase.to_dict() assert len(d["collaboration_contracts"]) == 2 assert d["collaboration_contracts"][0]["from_expert"] == "backend" assert d["collaboration_contracts"][0]["to_expert"] == "frontend" assert d["collaboration_contracts"][0]["content_description"] == "API 定义" assert d["collaboration_contracts"][0]["status"] == "delivered" # 往返序列化 restored = PlanPhase.from_dict(d) assert len(restored.collaboration_contracts) == 2 assert restored.collaboration_contracts[0].from_expert == "backend" assert restored.collaboration_contracts[0].to_expert == "frontend" assert restored.collaboration_contracts[0].content_description == "API 定义" assert restored.collaboration_contracts[0].status == "delivered" assert restored.collaboration_contracts[1].from_expert == "tech_lead" assert restored.collaboration_contracts[1].status == "pending" def test_plan_phase_empty_contracts(self): """协作契约为空列表时正常工作""" phase = PlanPhase( id="empty_contract_phase", name="独立阶段", assigned_expert="solo_expert", collaboration_contracts=[], ) d = phase.to_dict() assert d["collaboration_contracts"] == [] restored = PlanPhase.from_dict(d) assert restored.collaboration_contracts == [] def test_backward_compatibility_no_contracts_field(self): """向后兼容:不带 collaboration_contracts 的旧 dict 默认为空列表""" old_dict = { "id": "old_phase", "name": "旧阶段", "assigned_expert": "dev", "task_description": "旧任务", "depends_on": [], "status": "pending", "result": None, } phase = PlanPhase.from_dict(old_dict) assert phase.collaboration_contracts == [] class TestCollaborationContract: """CollaborationContract 数据模型测试""" def test_default_values(self): """默认值:空字符串字段,status 为 pending""" contract = CollaborationContract() assert contract.from_expert == "" assert contract.to_expert == "" assert contract.content_description == "" assert contract.status == "pending" def test_creation_with_all_fields(self): """创建 CollaborationContract 并设置所有字段""" contract = CollaborationContract( from_expert="backend", to_expert="frontend", content_description="API 定义", status="delivered", ) assert contract.from_expert == "backend" assert contract.to_expert == "frontend" assert contract.content_description == "API 定义" assert contract.status == "delivered" def test_collaboration_contract_serialization(self): """CollaborationContract 序列化/反序列化正确""" contract = CollaborationContract( from_expert="tech_lead", to_expert="qa_engineer", content_description="测试用例规范", status="received", ) d = contract.to_dict() assert d == { "from_expert": "tech_lead", "to_expert": "qa_engineer", "content_description": "测试用例规范", "status": "received", } restored = CollaborationContract.from_dict(d) assert restored.from_expert == contract.from_expert assert restored.to_expert == contract.to_expert assert restored.content_description == contract.content_description assert restored.status == contract.status def test_from_dict_missing_fields_uses_defaults(self): """from_dict 对缺失字段使用默认值""" restored = CollaborationContract.from_dict({"from_expert": "backend"}) assert restored.from_expert == "backend" assert restored.to_expert == "" assert restored.content_description == "" assert restored.status == "pending" def test_from_dict_empty_dict(self): """from_dict 对空字典返回全默认值""" restored = CollaborationContract.from_dict({}) assert restored.from_expert == "" assert restored.to_expert == "" assert restored.content_description == "" assert restored.status == "pending" class TestTeamPlanPhases: """TeamPlan 流水线模式(phases)测试""" def test_pipeline_plan_creation(self): """创建流水线模式的 TeamPlan""" plan = _make_pipeline_plan() assert plan.id == "pipeline_001" assert plan.task == "开发用户登录功能" assert len(plan.phases) == 5 assert plan.lead_expert == "tech_lead" def test_get_phase_by_id(self): """get_phase 根据 ID 获取阶段""" plan = _make_pipeline_plan() ph = plan.get_phase("p3") assert ph is not None assert ph.id == "p3" assert ph.name == "后端" def test_get_phase_nonexistent_returns_none(self): """get_phase 对不存在的 ID 返回 None""" plan = _make_pipeline_plan() assert plan.get_phase("nonexistent") is None def test_update_phase_status(self): """update_phase_status 更新阶段状态和结果""" plan = _make_pipeline_plan() plan.update_phase_status("p1", PhaseStatus.COMPLETED, {"plan": "设计文档"}) ph = plan.get_phase("p1") assert ph is not None assert ph.status == PhaseStatus.COMPLETED assert ph.result == {"plan": "设计文档"} def test_update_phase_status_without_result(self): """update_phase_status 不传 result 时不覆盖已有 result""" plan = _make_pipeline_plan() plan.update_phase_status("p1", PhaseStatus.COMPLETED, {"plan": "文档"}) plan.update_phase_status("p1", PhaseStatus.RUNNING) ph = plan.get_phase("p1") assert ph is not None assert ph.status == PhaseStatus.RUNNING assert ph.result == {"plan": "文档"} def test_completed_phases_property(self): """completed_phases 返回已完成的阶段""" plan = _make_pipeline_plan() plan.update_phase_status("p1", PhaseStatus.COMPLETED) plan.update_phase_status("p2", PhaseStatus.COMPLETED) plan.update_phase_status("p3", PhaseStatus.FAILED) completed = plan.completed_phases assert len(completed) == 2 assert {ph.id for ph in completed} == {"p1", "p2"} def test_failed_phases_property(self): """failed_phases 返回失败的阶段""" plan = _make_pipeline_plan() plan.update_phase_status("p1", PhaseStatus.COMPLETED) plan.update_phase_status("p2", PhaseStatus.FAILED) plan.update_phase_status("p3", PhaseStatus.FAILED) failed = plan.failed_phases assert len(failed) == 2 assert {ph.id for ph in failed} == {"p2", "p3"} def test_all_phases_done_when_all_completed(self): """all_phases_done 当所有阶段完成时返回 True""" plan = _make_pipeline_plan() for ph in plan.phases: plan.update_phase_status(ph.id, PhaseStatus.COMPLETED) assert plan.all_phases_done is True def test_all_phases_done_when_some_failed(self): """all_phases_done 当所有阶段完成或失败时返回 True""" plan = _make_pipeline_plan() plan.update_phase_status("p1", PhaseStatus.COMPLETED) plan.update_phase_status("p2", PhaseStatus.FAILED) plan.update_phase_status("p3", PhaseStatus.FAILED) plan.update_phase_status("p4", PhaseStatus.COMPLETED) plan.update_phase_status("p5", PhaseStatus.COMPLETED) assert plan.all_phases_done is True def test_all_phases_done_when_pending(self): """all_phases_done 当有阶段未完成时返回 False""" plan = _make_pipeline_plan() plan.update_phase_status("p1", PhaseStatus.COMPLETED) # p2-p5 still pending assert plan.all_phases_done is False def test_all_phases_done_empty_plan(self): """all_phases_done 当没有阶段时返回 True""" plan = TeamPlan(task="空计划") assert plan.all_phases_done is True class TestTopologicalSort: """topological_sort 拓扑排序测试""" def test_linear_dependency(self): """线性依赖:A→B→C 排序为 [[A], [B], [C]]""" plan = TeamPlan( task="线性任务", phases=[ _make_phase(id="a", name="A", depends_on=[]), _make_phase(id="b", name="B", depends_on=["a"]), _make_phase(id="c", name="C", depends_on=["b"]), ], ) layers = plan.topological_sort() assert len(layers) == 3 assert [ph.id for ph in layers[0]] == ["a"] assert [ph.id for ph in layers[1]] == ["b"] assert [ph.id for ph in layers[2]] == ["c"] def test_parallel_phases(self): """并行阶段:A→{B,C}→D 排序为 [[A], [B,C], [D]]""" plan = TeamPlan( task="并行任务", phases=[ _make_phase(id="a", name="A", depends_on=[]), _make_phase(id="b", name="B", depends_on=["a"]), _make_phase(id="c", name="C", depends_on=["a"]), _make_phase(id="d", name="D", depends_on=["b", "c"]), ], ) layers = plan.topological_sort() assert len(layers) == 3 assert [ph.id for ph in layers[0]] == ["a"] assert set(ph.id for ph in layers[1]) == {"b", "c"} assert [ph.id for ph in layers[2]] == ["d"] def test_full_pipeline_structure(self): """完整流水线结构:规划→{前端,后端}→QA→评审""" plan = _make_pipeline_plan() layers = plan.topological_sort() assert len(layers) == 4 assert [ph.id for ph in layers[0]] == ["p1"] assert set(ph.id for ph in layers[1]) == {"p2", "p3"} assert [ph.id for ph in layers[2]] == ["p4"] assert [ph.id for ph in layers[3]] == ["p5"] def test_no_dependencies(self): """无依赖:所有阶段在同一层""" plan = TeamPlan( task="无依赖任务", phases=[ _make_phase(id="a", name="A", depends_on=[]), _make_phase(id="b", name="B", depends_on=[]), _make_phase(id="c", name="C", depends_on=[]), ], ) layers = plan.topological_sort() assert len(layers) == 1 assert set(ph.id for ph in layers[0]) == {"a", "b", "c"} def test_empty_phases(self): """空阶段列表返回空列表""" plan = TeamPlan(task="空计划") assert plan.topological_sort() == [] def test_circular_dependency_raises(self): """循环依赖 A→B→A 抛出 ValueError""" plan = TeamPlan( task="循环依赖任务", phases=[ _make_phase(id="a", name="A", depends_on=["b"]), _make_phase(id="b", name="B", depends_on=["a"]), ], ) with pytest.raises(ValueError, match="Circular dependency"): plan.topological_sort() def test_self_dependency_raises(self): """自循环依赖 A→A 抛出 ValueError""" plan = TeamPlan( task="自循环任务", phases=[ _make_phase(id="a", name="A", depends_on=["a"]), ], ) with pytest.raises(ValueError, match="Circular dependency"): plan.topological_sort() def test_invalid_dependency_reference_raises(self): """依赖不存在的阶段 ID 抛出 ValueError""" plan = TeamPlan( task="无效依赖任务", phases=[ _make_phase(id="a", name="A", depends_on=["nonexistent"]), ], ) with pytest.raises(ValueError, match="non-existent phase"): plan.topological_sort() def test_mixed_execution_and_debate_phases(self): """混合 EXECUTION + DEBATE 阶段的拓扑排序 结构: Layer 0: [规划] (EXECUTION) Layer 1: [前端, 后端] (EXECUTION, 依赖规划) Layer 2: [架构辩论] (DEBATE, 依赖前端+后端) Layer 3: [QA] (EXECUTION, 依赖架构辩论) """ plan = TeamPlan( task="混合模式任务", phases=[ PlanPhase(id="p1", name="规划", assigned_expert="tech_lead", depends_on=[]), PlanPhase(id="p2", name="前端", assigned_expert="frontend", depends_on=["p1"]), PlanPhase(id="p3", name="后端", assigned_expert="backend", depends_on=["p1"]), PlanPhase( id="d1", name="架构辩论", assigned_expert="tech_lead", depends_on=["p2", "p3"], phase_type=PhaseType.DEBATE, debate_config={ "topic": "前后端接口设计", "participants": ["frontend", "backend"], "max_rounds": 2, }, ), PlanPhase(id="p4", name="QA", assigned_expert="qa", depends_on=["d1"]), ], ) layers = plan.topological_sort() assert len(layers) == 4 assert [ph.id for ph in layers[0]] == ["p1"] assert set(ph.id for ph in layers[1]) == {"p2", "p3"} assert [ph.id for ph in layers[2]] == ["d1"] assert [ph.id for ph in layers[3]] == ["p4"] # Verify the debate phase is correctly typed debate_phase = plan.get_phase("d1") assert debate_phase is not None assert debate_phase.phase_type == PhaseType.DEBATE assert debate_phase.debate_config is not None class TestGetReadyPhases: """get_ready_phases 就绪阶段测试""" def test_initial_ready_phases(self): """初始时只有无依赖的阶段就绪""" plan = _make_pipeline_plan() ready = plan.get_ready_phases() assert len(ready) == 1 assert ready[0].id == "p1" def test_ready_after_first_layer_completes(self): """第一层完成后,第二层阶段就绪""" plan = _make_pipeline_plan() plan.update_phase_status("p1", PhaseStatus.COMPLETED) ready = plan.get_ready_phases() assert len(ready) == 2 assert {ph.id for ph in ready} == {"p2", "p3"} def test_ready_after_partial_completion(self): """部分完成:p1 完成后 p2 完成,p3 仍 pending → p4 不就绪""" plan = _make_pipeline_plan() plan.update_phase_status("p1", PhaseStatus.COMPLETED) plan.update_phase_status("p2", PhaseStatus.COMPLETED) # p3 still pending, so p4 (depends on p2 AND p3) is not ready ready = plan.get_ready_phases() assert len(ready) == 1 assert ready[0].id == "p3" def test_ready_excludes_running_phases(self): """运行中的阶段不被视为就绪""" plan = _make_pipeline_plan() plan.update_phase_status("p1", PhaseStatus.RUNNING) ready = plan.get_ready_phases() assert len(ready) == 0 def test_ready_excludes_completed_phases(self): """已完成的阶段不被视为就绪""" plan = _make_pipeline_plan() plan.update_phase_status("p1", PhaseStatus.COMPLETED) plan.update_phase_status("p2", PhaseStatus.COMPLETED) plan.update_phase_status("p3", PhaseStatus.COMPLETED) ready = plan.get_ready_phases() assert len(ready) == 1 assert ready[0].id == "p4" class TestTeamPlanPhasesSerialization: """TeamPlan phases 序列化测试""" def test_to_dict_includes_phases(self): """to_dict 包含 phases 字段""" plan = _make_pipeline_plan() d = plan.to_dict() assert "phases" in d assert len(d["phases"]) == 5 assert d["phases"][0]["id"] == "p1" assert d["phases"][0]["depends_on"] == [] def test_from_dict_restores_phases(self): """from_dict 恢复 phases""" plan = _make_pipeline_plan() d = plan.to_dict() restored = TeamPlan.from_dict(d) assert len(restored.phases) == len(plan.phases) for original, restored_ph in zip(plan.phases, restored.phases): assert restored_ph.id == original.id assert restored_ph.name == original.name assert restored_ph.assigned_expert == original.assigned_expert assert restored_ph.task_description == original.task_description assert restored_ph.depends_on == original.depends_on assert restored_ph.status == original.status def test_both_subtasks_and_phases_coexist(self): """subtasks 和 phases 可以共存""" plan = TeamPlan( task="混合模式", subtasks=[SubTask(id="s1", description="子任务1")], phases=[_make_phase(id="p1", name="阶段1")], ) d = plan.to_dict() assert len(d["subtasks"]) == 1 assert len(d["phases"]) == 1 assert d["subtasks"][0]["id"] == "s1" assert d["phases"][0]["id"] == "p1"