fischer-agentkit/tests/unit/experts/test_plan.py

722 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""TeamPlan / SubTask / PlanPhase 数据模型单元测试 (hub-and-spoke + 流水线模式)"""
from __future__ import annotations
import pytest
from agentkit.experts.plan import (
MergeStrategy,
PhaseStatus,
PlanPhase,
PlanStatus,
SubTask,
SubTaskStatus,
TeamPlan,
)
# ── 辅助函数 ──────────────────────────────────────────────
def _make_subtask(
id: str = "subtask_1",
description: str = "分析数据",
assigned_expert: str = "analyst",
status: SubTaskStatus = SubTaskStatus.PENDING,
result: dict | None = None,
) -> SubTask:
"""创建测试用 SubTask 实例"""
return SubTask(
id=id,
description=description,
assigned_expert=assigned_expert,
status=status,
result=result,
)
def _make_valid_plan() -> TeamPlan:
"""创建一个有效的 hub-and-spoke 执行计划"""
subtasks = [
_make_subtask(id="s1", description="分析需求", assigned_expert="analyst"),
_make_subtask(id="s2", description="设计架构", assigned_expert="architect"),
_make_subtask(id="s3", description="编写代码", assigned_expert="coder"),
]
return TeamPlan(
id="plan_001",
task="实现用户登录功能",
subtasks=subtasks,
status=PlanStatus.DRAFT,
lead_expert="architect",
)
# ── MergeStrategy 测试 ────────────────────────────────────
class TestMergeStrategy:
"""MergeStrategy 枚举测试"""
def test_only_best_strategy_exists(self):
"""hub-and-spoke 模式仅保留 BEST 策略"""
assert MergeStrategy.BEST == "best"
# 确保只有 BEST 一个值
assert len(list(MergeStrategy)) == 1
def test_no_vote_or_fusion(self):
"""VOTE 和 FUSION 已被移除"""
assert not hasattr(MergeStrategy, "VOTE")
assert not hasattr(MergeStrategy, "FUSION")
# ── PlanStatus 测试 ───────────────────────────────────────
class TestPlanStatus:
"""PlanStatus 枚举测试"""
def test_statuses_exist(self):
"""必要的计划状态都存在"""
assert PlanStatus.DRAFT == "draft"
assert PlanStatus.EXECUTING == "executing"
assert PlanStatus.COMPLETED == "completed"
assert PlanStatus.FAILED == "failed"
assert PlanStatus.FALLBACK == "fallback"
def test_no_confirmed_status(self):
"""CONFIRMED 状态已被移除hub-and-spoke 无需确认阶段)"""
assert not hasattr(PlanStatus, "CONFIRMED")
# ── SubTaskStatus 测试 ────────────────────────────────────
class TestSubTaskStatus:
"""SubTaskStatus 枚举测试"""
def test_statuses_exist(self):
"""子任务状态都存在"""
assert SubTaskStatus.PENDING == "pending"
assert SubTaskStatus.RUNNING == "running"
assert SubTaskStatus.COMPLETED == "completed"
assert SubTaskStatus.FAILED == "failed"
# ── SubTask 测试 ──────────────────────────────────────────
class TestSubTask:
"""SubTask 数据模型测试"""
def test_creation_with_all_fields(self):
"""创建 SubTask 并设置所有字段"""
subtask = SubTask(
id="subtask_a",
description="竞品分析",
assigned_expert="analyst",
status=SubTaskStatus.RUNNING,
result={"report": "竞品分析报告"},
)
assert subtask.id == "subtask_a"
assert subtask.description == "竞品分析"
assert subtask.assigned_expert == "analyst"
assert subtask.status == SubTaskStatus.RUNNING
assert subtask.result == {"report": "竞品分析报告"}
def test_default_values(self):
"""默认值:自动生成 idPENDING 状态"""
subtask = SubTask(description="测试任务")
assert subtask.id is not None
assert len(subtask.id) > 0
assert subtask.description == "测试任务"
assert subtask.assigned_expert == ""
assert subtask.status == SubTaskStatus.PENDING
assert subtask.result is None
def test_to_dict_from_dict_roundtrip(self):
"""to_dict / from_dict 往返序列化"""
subtask = SubTask(
id="roundtrip_subtask",
description="往返测试",
assigned_expert="tester",
status=SubTaskStatus.COMPLETED,
result={"key": "value"},
)
d = subtask.to_dict()
restored = SubTask.from_dict(d)
assert restored.id == subtask.id
assert restored.description == subtask.description
assert restored.assigned_expert == subtask.assigned_expert
assert restored.status == subtask.status
assert restored.result == subtask.result
def test_to_dict_structure(self):
"""to_dict 返回正确的字典结构"""
subtask = _make_subtask(
id="struct_test",
description="结构测试",
assigned_expert="dev",
status=SubTaskStatus.RUNNING,
result={"output": "data"},
)
d = subtask.to_dict()
assert d["id"] == "struct_test"
assert d["description"] == "结构测试"
assert d["assigned_expert"] == "dev"
assert d["status"] == "running"
assert d["result"] == {"output": "data"}
# ── TeamPlan 测试 ─────────────────────────────────────────
class TestTeamPlan:
"""TeamPlan 数据模型测试"""
def test_creation(self):
"""创建 TeamPlan"""
plan = _make_valid_plan()
assert plan.id == "plan_001"
assert plan.task == "实现用户登录功能"
assert len(plan.subtasks) == 3
assert plan.status == PlanStatus.DRAFT
assert plan.lead_expert == "architect"
def test_default_values(self):
"""默认值:自动生成 id空子任务列表"""
plan = TeamPlan(task="测试任务")
assert plan.id is not None
assert plan.task == "测试任务"
assert plan.subtasks == []
assert plan.status == PlanStatus.DRAFT
assert plan.lead_expert == ""
def test_to_dict_from_dict_roundtrip(self):
"""to_dict / from_dict 往返序列化"""
plan = _make_valid_plan()
d = plan.to_dict()
restored = TeamPlan.from_dict(d)
assert restored.id == plan.id
assert restored.task == plan.task
assert len(restored.subtasks) == len(plan.subtasks)
assert restored.status == plan.status
assert restored.lead_expert == plan.lead_expert
for original, restored_st in zip(plan.subtasks, restored.subtasks):
assert restored_st.id == original.id
assert restored_st.description == original.description
assert restored_st.assigned_expert == original.assigned_expert
assert restored_st.status == original.status
def test_get_subtask_by_id(self):
"""get_subtask 根据 ID 获取子任务"""
plan = _make_valid_plan()
st = plan.get_subtask("s2")
assert st is not None
assert st.id == "s2"
assert st.description == "设计架构"
def test_get_subtask_with_nonexistent_id_returns_none(self):
"""get_subtask 对不存在的 ID 返回 None"""
plan = _make_valid_plan()
assert plan.get_subtask("nonexistent") is None
def test_update_subtask_status(self):
"""update_subtask_status 更新子任务状态和结果"""
plan = _make_valid_plan()
plan.update_subtask_status("s1", SubTaskStatus.COMPLETED, {"output": "分析完成"})
st = plan.get_subtask("s1")
assert st is not None
assert st.status == SubTaskStatus.COMPLETED
assert st.result == {"output": "分析完成"}
def test_update_subtask_status_without_result(self):
"""update_subtask_status 不传 result 时不覆盖已有 result"""
plan = _make_valid_plan()
plan.update_subtask_status("s1", SubTaskStatus.COMPLETED, {"output": "done"})
plan.update_subtask_status("s1", SubTaskStatus.RUNNING)
st = plan.get_subtask("s1")
assert st is not None
assert st.status == SubTaskStatus.RUNNING
assert st.result == {"output": "done"}
def test_completed_subtasks_property(self):
"""completed_subtasks 返回已完成的子任务"""
plan = _make_valid_plan()
plan.update_subtask_status("s1", SubTaskStatus.COMPLETED)
plan.update_subtask_status("s2", SubTaskStatus.COMPLETED)
plan.update_subtask_status("s3", SubTaskStatus.FAILED)
completed = plan.completed_subtasks
assert len(completed) == 2
assert {st.id for st in completed} == {"s1", "s2"}
def test_failed_subtasks_property(self):
"""failed_subtasks 返回失败的子任务"""
plan = _make_valid_plan()
plan.update_subtask_status("s1", SubTaskStatus.COMPLETED)
plan.update_subtask_status("s2", SubTaskStatus.FAILED)
plan.update_subtask_status("s3", SubTaskStatus.FAILED)
failed = plan.failed_subtasks
assert len(failed) == 2
assert {st.id for st in failed} == {"s2", "s3"}
def test_all_done_property_when_all_completed(self):
"""all_done 当所有子任务完成时返回 True"""
plan = _make_valid_plan()
for st in plan.subtasks:
plan.update_subtask_status(st.id, SubTaskStatus.COMPLETED)
assert plan.all_done is True
def test_all_done_property_when_some_failed(self):
"""all_done 当所有子任务完成或失败时返回 True"""
plan = _make_valid_plan()
plan.update_subtask_status("s1", SubTaskStatus.COMPLETED)
plan.update_subtask_status("s2", SubTaskStatus.FAILED)
plan.update_subtask_status("s3", SubTaskStatus.COMPLETED)
assert plan.all_done is True
def test_all_done_property_when_pending(self):
"""all_done 当有子任务未完成时返回 False"""
plan = _make_valid_plan()
plan.update_subtask_status("s1", SubTaskStatus.COMPLETED)
# s2 and s3 still pending
assert plan.all_done is False
def test_all_done_property_empty_plan(self):
"""all_done 当没有子任务时返回 Truevacuous truth"""
plan = TeamPlan(task="空计划")
assert plan.all_done is True
# ── PlanPhase 测试(流水线模式)──────────────────────────
def _make_phase(
id: str = "phase_1",
name: str = "规划",
assigned_expert: str = "tech_lead",
task_description: str = "分析需求并制定计划",
depends_on: list[str] | None = None,
status: PhaseStatus = PhaseStatus.PENDING,
result: dict | None = None,
) -> PlanPhase:
"""创建测试用 PlanPhase 实例"""
return PlanPhase(
id=id,
name=name,
assigned_expert=assigned_expert,
task_description=task_description,
depends_on=depends_on or [],
status=status,
result=result,
)
def _make_pipeline_plan() -> TeamPlan:
"""创建一个流水线模式的执行计划
结构:
Layer 0: [规划] (无依赖)
Layer 1: [前端, 后端] (依赖规划)
Layer 2: [QA] (依赖前端+后端)
Layer 3: [评审] (依赖QA)
"""
phases = [
_make_phase(id="p1", name="规划", assigned_expert="tech_lead", depends_on=[]),
_make_phase(
id="p2", name="前端", assigned_expert="frontend_engineer", depends_on=["p1"]
),
_make_phase(
id="p3", name="后端", assigned_expert="backend_engineer", depends_on=["p1"]
),
_make_phase(id="p4", name="QA", assigned_expert="qa_engineer", depends_on=["p2", "p3"]),
_make_phase(id="p5", name="评审", assigned_expert="code_reviewer", depends_on=["p4"]),
]
return TeamPlan(
id="pipeline_001",
task="开发用户登录功能",
phases=phases,
status=PlanStatus.DRAFT,
lead_expert="tech_lead",
)
class TestPhaseStatus:
"""PhaseStatus 枚举测试"""
def test_statuses_exist(self):
"""阶段状态都存在"""
assert PhaseStatus.PENDING == "pending"
assert PhaseStatus.RUNNING == "in_progress"
assert PhaseStatus.COMPLETED == "completed"
assert PhaseStatus.FAILED == "failed"
class TestPlanPhase:
"""PlanPhase 数据模型测试"""
def test_creation_with_all_fields(self):
"""创建 PlanPhase 并设置所有字段"""
phase = PlanPhase(
id="phase_a",
name="前端开发",
assigned_expert="frontend_engineer",
task_description="实现登录页面",
depends_on=["phase_planning"],
status=PhaseStatus.RUNNING,
result={"component": "LoginForm"},
)
assert phase.id == "phase_a"
assert phase.name == "前端开发"
assert phase.assigned_expert == "frontend_engineer"
assert phase.task_description == "实现登录页面"
assert phase.depends_on == ["phase_planning"]
assert phase.status == PhaseStatus.RUNNING
assert phase.result == {"component": "LoginForm"}
def test_default_values(self):
"""默认值:自动生成 id空 depends_onPENDING 状态"""
phase = PlanPhase(name="测试阶段")
assert phase.id is not None
assert len(phase.id) > 0
assert phase.name == "测试阶段"
assert phase.assigned_expert == ""
assert phase.task_description == ""
assert phase.depends_on == []
assert phase.status == PhaseStatus.PENDING
assert phase.result is None
def test_to_dict_from_dict_roundtrip(self):
"""to_dict / from_dict 往返序列化
Note: to_dict() serializes result to string for frontend compatibility,
so the roundtrip is lossy for the result field (dict → string).
"""
phase = _make_phase(
id="roundtrip_phase",
name="往返测试",
assigned_expert="tester",
task_description="测试序列化",
depends_on=["dep1", "dep2"],
status=PhaseStatus.COMPLETED,
result={"content": "value"},
)
d = phase.to_dict()
restored = PlanPhase.from_dict(d)
assert restored.id == phase.id
assert restored.name == phase.name
assert restored.assigned_expert == phase.assigned_expert
assert restored.task_description == phase.task_description
assert restored.depends_on == phase.depends_on
assert restored.status == phase.status
# result is serialized to string in to_dict(); from_dict reads it back as string
assert restored.result == "value"
def test_to_dict_structure(self):
"""to_dict 返回正确的字典结构"""
phase = _make_phase(
id="struct_test",
name="结构测试",
assigned_expert="dev",
task_description="测试结构",
depends_on=["d1"],
status=PhaseStatus.RUNNING,
result={"content": "phase output data"},
)
d = phase.to_dict()
assert d["id"] == "struct_test"
assert d["name"] == "结构测试"
assert d["assigned_expert"] == "dev"
assert d["task_description"] == "测试结构"
assert d["depends_on"] == ["d1"]
assert d["status"] == "in_progress"
# result is serialized to string to match frontend ITeamPlanPhase.result type
assert d["result"] == "phase output data"
class TestTeamPlanPhases:
"""TeamPlan 流水线模式phases测试"""
def test_pipeline_plan_creation(self):
"""创建流水线模式的 TeamPlan"""
plan = _make_pipeline_plan()
assert plan.id == "pipeline_001"
assert plan.task == "开发用户登录功能"
assert len(plan.phases) == 5
assert plan.lead_expert == "tech_lead"
def test_get_phase_by_id(self):
"""get_phase 根据 ID 获取阶段"""
plan = _make_pipeline_plan()
ph = plan.get_phase("p3")
assert ph is not None
assert ph.id == "p3"
assert ph.name == "后端"
def test_get_phase_nonexistent_returns_none(self):
"""get_phase 对不存在的 ID 返回 None"""
plan = _make_pipeline_plan()
assert plan.get_phase("nonexistent") is None
def test_update_phase_status(self):
"""update_phase_status 更新阶段状态和结果"""
plan = _make_pipeline_plan()
plan.update_phase_status("p1", PhaseStatus.COMPLETED, {"plan": "设计文档"})
ph = plan.get_phase("p1")
assert ph is not None
assert ph.status == PhaseStatus.COMPLETED
assert ph.result == {"plan": "设计文档"}
def test_update_phase_status_without_result(self):
"""update_phase_status 不传 result 时不覆盖已有 result"""
plan = _make_pipeline_plan()
plan.update_phase_status("p1", PhaseStatus.COMPLETED, {"plan": "文档"})
plan.update_phase_status("p1", PhaseStatus.RUNNING)
ph = plan.get_phase("p1")
assert ph is not None
assert ph.status == PhaseStatus.RUNNING
assert ph.result == {"plan": "文档"}
def test_completed_phases_property(self):
"""completed_phases 返回已完成的阶段"""
plan = _make_pipeline_plan()
plan.update_phase_status("p1", PhaseStatus.COMPLETED)
plan.update_phase_status("p2", PhaseStatus.COMPLETED)
plan.update_phase_status("p3", PhaseStatus.FAILED)
completed = plan.completed_phases
assert len(completed) == 2
assert {ph.id for ph in completed} == {"p1", "p2"}
def test_failed_phases_property(self):
"""failed_phases 返回失败的阶段"""
plan = _make_pipeline_plan()
plan.update_phase_status("p1", PhaseStatus.COMPLETED)
plan.update_phase_status("p2", PhaseStatus.FAILED)
plan.update_phase_status("p3", PhaseStatus.FAILED)
failed = plan.failed_phases
assert len(failed) == 2
assert {ph.id for ph in failed} == {"p2", "p3"}
def test_all_phases_done_when_all_completed(self):
"""all_phases_done 当所有阶段完成时返回 True"""
plan = _make_pipeline_plan()
for ph in plan.phases:
plan.update_phase_status(ph.id, PhaseStatus.COMPLETED)
assert plan.all_phases_done is True
def test_all_phases_done_when_some_failed(self):
"""all_phases_done 当所有阶段完成或失败时返回 True"""
plan = _make_pipeline_plan()
plan.update_phase_status("p1", PhaseStatus.COMPLETED)
plan.update_phase_status("p2", PhaseStatus.FAILED)
plan.update_phase_status("p3", PhaseStatus.FAILED)
plan.update_phase_status("p4", PhaseStatus.COMPLETED)
plan.update_phase_status("p5", PhaseStatus.COMPLETED)
assert plan.all_phases_done is True
def test_all_phases_done_when_pending(self):
"""all_phases_done 当有阶段未完成时返回 False"""
plan = _make_pipeline_plan()
plan.update_phase_status("p1", PhaseStatus.COMPLETED)
# p2-p5 still pending
assert plan.all_phases_done is False
def test_all_phases_done_empty_plan(self):
"""all_phases_done 当没有阶段时返回 True"""
plan = TeamPlan(task="空计划")
assert plan.all_phases_done is True
class TestTopologicalSort:
"""topological_sort 拓扑排序测试"""
def test_linear_dependency(self):
"""线性依赖A→B→C 排序为 [[A], [B], [C]]"""
plan = TeamPlan(
task="线性任务",
phases=[
_make_phase(id="a", name="A", depends_on=[]),
_make_phase(id="b", name="B", depends_on=["a"]),
_make_phase(id="c", name="C", depends_on=["b"]),
],
)
layers = plan.topological_sort()
assert len(layers) == 3
assert [ph.id for ph in layers[0]] == ["a"]
assert [ph.id for ph in layers[1]] == ["b"]
assert [ph.id for ph in layers[2]] == ["c"]
def test_parallel_phases(self):
"""并行阶段A→{B,C}→D 排序为 [[A], [B,C], [D]]"""
plan = TeamPlan(
task="并行任务",
phases=[
_make_phase(id="a", name="A", depends_on=[]),
_make_phase(id="b", name="B", depends_on=["a"]),
_make_phase(id="c", name="C", depends_on=["a"]),
_make_phase(id="d", name="D", depends_on=["b", "c"]),
],
)
layers = plan.topological_sort()
assert len(layers) == 3
assert [ph.id for ph in layers[0]] == ["a"]
assert set(ph.id for ph in layers[1]) == {"b", "c"}
assert [ph.id for ph in layers[2]] == ["d"]
def test_full_pipeline_structure(self):
"""完整流水线结构:规划→{前端,后端}→QA→评审"""
plan = _make_pipeline_plan()
layers = plan.topological_sort()
assert len(layers) == 4
assert [ph.id for ph in layers[0]] == ["p1"]
assert set(ph.id for ph in layers[1]) == {"p2", "p3"}
assert [ph.id for ph in layers[2]] == ["p4"]
assert [ph.id for ph in layers[3]] == ["p5"]
def test_no_dependencies(self):
"""无依赖:所有阶段在同一层"""
plan = TeamPlan(
task="无依赖任务",
phases=[
_make_phase(id="a", name="A", depends_on=[]),
_make_phase(id="b", name="B", depends_on=[]),
_make_phase(id="c", name="C", depends_on=[]),
],
)
layers = plan.topological_sort()
assert len(layers) == 1
assert set(ph.id for ph in layers[0]) == {"a", "b", "c"}
def test_empty_phases(self):
"""空阶段列表返回空列表"""
plan = TeamPlan(task="空计划")
assert plan.topological_sort() == []
def test_circular_dependency_raises(self):
"""循环依赖 A→B→A 抛出 ValueError"""
plan = TeamPlan(
task="循环依赖任务",
phases=[
_make_phase(id="a", name="A", depends_on=["b"]),
_make_phase(id="b", name="B", depends_on=["a"]),
],
)
with pytest.raises(ValueError, match="Circular dependency"):
plan.topological_sort()
def test_self_dependency_raises(self):
"""自循环依赖 A→A 抛出 ValueError"""
plan = TeamPlan(
task="自循环任务",
phases=[
_make_phase(id="a", name="A", depends_on=["a"]),
],
)
with pytest.raises(ValueError, match="Circular dependency"):
plan.topological_sort()
def test_invalid_dependency_reference_raises(self):
"""依赖不存在的阶段 ID 抛出 ValueError"""
plan = TeamPlan(
task="无效依赖任务",
phases=[
_make_phase(id="a", name="A", depends_on=["nonexistent"]),
],
)
with pytest.raises(ValueError, match="non-existent phase"):
plan.topological_sort()
class TestGetReadyPhases:
"""get_ready_phases 就绪阶段测试"""
def test_initial_ready_phases(self):
"""初始时只有无依赖的阶段就绪"""
plan = _make_pipeline_plan()
ready = plan.get_ready_phases()
assert len(ready) == 1
assert ready[0].id == "p1"
def test_ready_after_first_layer_completes(self):
"""第一层完成后,第二层阶段就绪"""
plan = _make_pipeline_plan()
plan.update_phase_status("p1", PhaseStatus.COMPLETED)
ready = plan.get_ready_phases()
assert len(ready) == 2
assert {ph.id for ph in ready} == {"p2", "p3"}
def test_ready_after_partial_completion(self):
"""部分完成p1 完成后 p2 完成p3 仍 pending → p4 不就绪"""
plan = _make_pipeline_plan()
plan.update_phase_status("p1", PhaseStatus.COMPLETED)
plan.update_phase_status("p2", PhaseStatus.COMPLETED)
# p3 still pending, so p4 (depends on p2 AND p3) is not ready
ready = plan.get_ready_phases()
assert len(ready) == 1
assert ready[0].id == "p3"
def test_ready_excludes_running_phases(self):
"""运行中的阶段不被视为就绪"""
plan = _make_pipeline_plan()
plan.update_phase_status("p1", PhaseStatus.RUNNING)
ready = plan.get_ready_phases()
assert len(ready) == 0
def test_ready_excludes_completed_phases(self):
"""已完成的阶段不被视为就绪"""
plan = _make_pipeline_plan()
plan.update_phase_status("p1", PhaseStatus.COMPLETED)
plan.update_phase_status("p2", PhaseStatus.COMPLETED)
plan.update_phase_status("p3", PhaseStatus.COMPLETED)
ready = plan.get_ready_phases()
assert len(ready) == 1
assert ready[0].id == "p4"
class TestTeamPlanPhasesSerialization:
"""TeamPlan phases 序列化测试"""
def test_to_dict_includes_phases(self):
"""to_dict 包含 phases 字段"""
plan = _make_pipeline_plan()
d = plan.to_dict()
assert "phases" in d
assert len(d["phases"]) == 5
assert d["phases"][0]["id"] == "p1"
assert d["phases"][0]["depends_on"] == []
def test_from_dict_restores_phases(self):
"""from_dict 恢复 phases"""
plan = _make_pipeline_plan()
d = plan.to_dict()
restored = TeamPlan.from_dict(d)
assert len(restored.phases) == len(plan.phases)
for original, restored_ph in zip(plan.phases, restored.phases):
assert restored_ph.id == original.id
assert restored_ph.name == original.name
assert restored_ph.assigned_expert == original.assigned_expert
assert restored_ph.task_description == original.task_description
assert restored_ph.depends_on == original.depends_on
assert restored_ph.status == original.status
def test_both_subtasks_and_phases_coexist(self):
"""subtasks 和 phases 可以共存"""
plan = TeamPlan(
task="混合模式",
subtasks=[SubTask(id="s1", description="子任务1")],
phases=[_make_phase(id="p1", name="阶段1")],
)
d = plan.to_dict()
assert len(d["subtasks"]) == 1
assert len(d["phases"]) == 1
assert d["subtasks"][0]["id"] == "s1"
assert d["phases"][0]["id"] == "p1"