fischer-agentkit/tests/unit/test_goal_planner.py

590 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""Tests for GoalPlanner — 目标分析与计划生成"""
import pytest
from agentkit.core.goal_planner import GoalPlanner
from agentkit.core.orchestrator import Orchestrator, SubTask
from agentkit.core.plan_schema import (
ExecutionPlan,
PlanStep,
PlanStepStatus,
SkillGap,
SkillGapLevel,
)
from agentkit.core.protocol import TaskMessage, TaskStatus
from datetime import datetime, timezone
# --- Plan Schema Tests ---
class TestPlanStep:
"""PlanStep 数据类测试"""
def test_default_values(self):
step = PlanStep(
step_id="s1",
name="Test Step",
description="A test step",
)
assert step.dependencies == []
assert step.parallel_group == 0
assert step.required_skills == []
assert step.input_data == {}
assert step.status == PlanStepStatus.PENDING
assert step.result is None
assert step.error is None
def test_to_dict(self):
step = PlanStep(
step_id="s1",
name="Test",
description="Desc",
dependencies=["s0"],
parallel_group=1,
required_skills=["web_search"],
)
d = step.to_dict()
assert d["step_id"] == "s1"
assert d["dependencies"] == ["s0"]
assert d["parallel_group"] == 1
assert d["required_skills"] == ["web_search"]
assert d["status"] == "pending"
class TestExecutionPlan:
"""ExecutionPlan 数据类测试"""
def test_default_values(self):
plan = ExecutionPlan(goal="test")
assert plan.goal == "test"
assert plan.steps == []
assert plan.parallel_groups == []
assert plan.skill_gaps == []
assert plan.confirmed is False
def test_has_skill_gaps(self):
plan = ExecutionPlan(
goal="test",
skill_gaps=[
SkillGap(step_name="s1", required_skill="x", level=SkillGapLevel.LOW),
],
)
assert plan.has_skill_gaps is False
plan.skill_gaps.append(
SkillGap(step_name="s2", required_skill="y", level=SkillGapLevel.HIGH),
)
assert plan.has_skill_gaps is True
def test_get_step(self):
step = PlanStep(step_id="s1", name="A", description="A step")
plan = ExecutionPlan(goal="test", steps=[step])
assert plan.get_step("s1") is step
assert plan.get_step("nonexistent") is None
def test_to_readable(self):
plan = ExecutionPlan(
plan_id="p1",
goal="调研竞品",
steps=[
PlanStep(step_id="s0", name="调研 A", description="调研竞品 A", parallel_group=0, required_skills=["web_search"]),
PlanStep(step_id="s1", name="调研 B", description="调研竞品 B", parallel_group=0, required_skills=["web_search"]),
PlanStep(step_id="s2", name="汇总", description="汇总报告", dependencies=["s0", "s1"], parallel_group=1, required_skills=["report_generator"]),
],
parallel_groups=[["s0", "s1"], ["s2"]],
)
readable = plan.to_readable()
assert "调研竞品" in readable
assert "并行组 1" in readable
assert "s0" in readable
assert "web_search" in readable
def test_to_dict(self):
plan = ExecutionPlan(
plan_id="p1",
goal="test",
steps=[PlanStep(step_id="s0", name="A", description="A")],
parallel_groups=[["s0"]],
)
d = plan.to_dict()
assert d["plan_id"] == "p1"
assert len(d["steps"]) == 1
assert d["parallel_groups"] == [["s0"]]
# --- GoalPlanner Tests ---
class TestGoalPlannerSimpleGoal:
"""简单目标 → 单步计划"""
@pytest.mark.asyncio
async def test_simple_goal_generates_single_step(self):
planner = GoalPlanner()
plan = await planner.generate_plan(
goal="查询今天的天气",
available_skills=["web_search"],
)
assert len(plan.steps) == 1
assert plan.steps[0].parallel_group == 0
assert len(plan.parallel_groups) == 1
@pytest.mark.asyncio
async def test_simple_goal_with_matching_skill(self):
planner = GoalPlanner()
plan = await planner.generate_plan(
goal="搜索最新的 AI 新闻",
available_skills=["web_search"],
)
assert "web_search" in plan.steps[0].required_skills
@pytest.mark.asyncio
async def test_simple_goal_no_matching_skill(self):
planner = GoalPlanner()
plan = await planner.generate_plan(
goal="执行某个未知操作",
available_skills=["web_search"],
)
# 单步任务,无匹配 Skill
assert len(plan.steps) == 1
assert plan.steps[0].required_skills == []
class TestGoalPlannerParallelGoal:
"""复杂目标(并列结构)→ 多步并行计划"""
@pytest.mark.asyncio
async def test_parallel_competitor_research(self):
"""AE1: 3 个竞品调研自动识别为并行步骤"""
planner = GoalPlanner()
plan = await planner.generate_plan(
goal="调研 3 个竞品 SEO 策略并生成对比报告",
available_skills=["web_search", "seo_analyzer", "report_generator"],
)
# 应有 4 个步骤3 个并行调研 + 1 个汇总
assert len(plan.steps) == 4
# 前 3 个步骤无依赖,应在同一并行组
parallel_steps = [s for s in plan.steps if not s.dependencies]
assert len(parallel_steps) == 3
# 汇总步骤依赖前 3 个
summary_step = [s for s in plan.steps if s.dependencies]
assert len(summary_step) == 1
assert len(summary_step[0].dependencies) == 3
# 并行组:第一组 3 个并行,第二组 1 个汇总
assert len(plan.parallel_groups) == 2
assert len(plan.parallel_groups[0]) == 3
assert len(plan.parallel_groups[1]) == 1
@pytest.mark.asyncio
async def test_parallel_items_with_dunhao(self):
"""顿号分隔的并列项"""
planner = GoalPlanner()
plan = await planner.generate_plan(
goal="调研竞品A、竞品B、竞品C的市场策略",
available_skills=["web_search", "report_generator"],
)
assert len(plan.steps) == 4 # 3 个并行 + 1 个汇总
parallel_steps = [s for s in plan.steps if not s.dependencies]
assert len(parallel_steps) == 3
@pytest.mark.asyncio
async def test_parallel_steps_have_correct_group(self):
planner = GoalPlanner()
plan = await planner.generate_plan(
goal="调研 3 个竞品 SEO 策略并生成对比报告",
available_skills=["web_search", "seo_analyzer", "report_generator"],
)
# 并行步骤的 parallel_group 应为 0
for step in plan.steps[:3]:
assert step.parallel_group == 0
# 汇总步骤的 parallel_group 应为 1
assert plan.steps[3].parallel_group == 1
class TestGoalPlannerSequentialGoal:
"""顺序目标 → 顺序步骤"""
@pytest.mark.asyncio
async def test_sequential_goal_with_bing(self):
"""'' 连接的顺序步骤"""
planner = GoalPlanner()
plan = await planner.generate_plan(
goal="调研市场趋势并生成分析报告",
available_skills=["web_search", "report_generator"],
)
assert len(plan.steps) == 2
# 第二步依赖第一步
assert plan.steps[1].dependencies == [plan.steps[0].step_id]
@pytest.mark.asyncio
async def test_sequential_goal_with_arrow(self):
"""箭头分隔的顺序步骤"""
planner = GoalPlanner()
plan = await planner.generate_plan(
goal="收集数据→分析数据→生成报告",
available_skills=["web_search", "data_analyzer", "report_generator"],
)
assert len(plan.steps) == 3
assert plan.steps[0].dependencies == []
assert plan.steps[1].dependencies == [plan.steps[0].step_id]
assert plan.steps[2].dependencies == [plan.steps[1].step_id]
class TestGoalPlannerSkillGaps:
"""能力缺口识别"""
@pytest.mark.asyncio
async def test_missing_skill_creates_gap(self):
"""无可用 Skill 的目标 → 计划标注能力缺口"""
planner = GoalPlanner()
plan = await planner.generate_plan(
goal="调研 3 个竞品 SEO 策略并生成对比报告",
available_skills=[], # 无可用 Skill
)
assert plan.has_skill_gaps is True
assert len(plan.skill_gaps) > 0
@pytest.mark.asyncio
async def test_no_gaps_when_skills_available(self):
"""所有 Skill 可用时无缺口"""
planner = GoalPlanner()
plan = await planner.generate_plan(
goal="搜索最新的 AI 新闻",
available_skills=["web_search"],
)
# web_search 匹配,不应有 HIGH 级别缺口
high_gaps = [g for g in plan.skill_gaps if g.level == SkillGapLevel.HIGH]
assert len(high_gaps) == 0
@pytest.mark.asyncio
async def test_gap_includes_suggestion(self):
planner = GoalPlanner()
plan = await planner.generate_plan(
goal="调研 3 个竞品 SEO 策略并生成对比报告",
available_skills=[],
)
for gap in plan.skill_gaps:
if gap.level == SkillGapLevel.HIGH:
assert gap.suggestion != ""
class TestGoalPlannerUpdatePlan:
"""用户修改计划"""
def test_remove_step(self):
planner = GoalPlanner()
plan = ExecutionPlan(
goal="test",
steps=[
PlanStep(step_id="s0", name="A", description="A"),
PlanStep(step_id="s1", name="B", description="B", dependencies=["s0"]),
PlanStep(step_id="s2", name="C", description="C", dependencies=["s1"]),
],
parallel_groups=[["s0"], ["s1"], ["s2"]],
)
updated = planner.update_plan_from_feedback(plan, {"remove_steps": ["s1"]})
assert len(updated.steps) == 2
# s2 的依赖应被清理
assert "s1" not in updated.steps[1].dependencies
def test_update_step(self):
planner = GoalPlanner()
plan = ExecutionPlan(
goal="test",
steps=[
PlanStep(step_id="s0", name="A", description="A"),
],
parallel_groups=[["s0"]],
)
updated = planner.update_plan_from_feedback(
plan,
{"update_steps": {"s0": {"name": "Updated A", "description": "Updated description"}}},
)
assert updated.steps[0].name == "Updated A"
assert updated.steps[0].description == "Updated description"
def test_add_step(self):
planner = GoalPlanner()
plan = ExecutionPlan(
goal="test",
steps=[
PlanStep(step_id="s0", name="A", description="A"),
],
parallel_groups=[["s0"]],
)
updated = planner.update_plan_from_feedback(
plan,
{"add_steps": [{"name": "New Step", "description": "A new step", "dependencies": ["s0"]}]},
)
assert len(updated.steps) == 2
assert updated.steps[1].name == "New Step"
def test_update_resets_confirmed(self):
planner = GoalPlanner()
plan = ExecutionPlan(
goal="test",
confirmed=True,
steps=[PlanStep(step_id="s0", name="A", description="A")],
parallel_groups=[["s0"]],
)
updated = planner.update_plan_from_feedback(plan, {"update_steps": {"s0": {"name": "B"}}})
assert updated.confirmed is False
def test_update_rebuilds_parallel_groups(self):
planner = GoalPlanner()
plan = ExecutionPlan(
goal="test",
steps=[
PlanStep(step_id="s0", name="A", description="A"),
PlanStep(step_id="s1", name="B", description="B", dependencies=["s0"]),
],
parallel_groups=[["s0"], ["s1"]],
)
# 添加一个与 s0 并行的步骤
updated = planner.update_plan_from_feedback(
plan,
{"add_steps": [{"name": "C", "description": "Parallel to A", "dependencies": []}]},
)
# 新步骤应与 s0 在同一并行组
assert len(updated.parallel_groups[0]) == 2
class TestGoalPlannerValidatePlan:
"""计划验证"""
def test_valid_plan(self):
planner = GoalPlanner()
plan = ExecutionPlan(
goal="test",
steps=[
PlanStep(step_id="s0", name="A", description="A"),
PlanStep(step_id="s1", name="B", description="B", dependencies=["s0"]),
],
parallel_groups=[["s0"], ["s1"]],
)
errors = planner.validate_plan(plan)
assert errors == []
def test_invalid_dependency(self):
planner = GoalPlanner()
plan = ExecutionPlan(
goal="test",
steps=[
PlanStep(step_id="s0", name="A", description="A", dependencies=["nonexistent"]),
],
parallel_groups=[["s0"]],
)
errors = planner.validate_plan(plan)
assert len(errors) > 0
assert any("nonexistent" in e for e in errors)
def test_circular_dependency(self):
planner = GoalPlanner()
plan = ExecutionPlan(
goal="test",
steps=[
PlanStep(step_id="s0", name="A", description="A", dependencies=["s1"]),
PlanStep(step_id="s1", name="B", description="B", dependencies=["s0"]),
],
parallel_groups=[["s0", "s1"]],
)
errors = planner.validate_plan(plan)
assert any("循环依赖" in e for e in errors)
def test_ungrouped_steps(self):
planner = GoalPlanner()
plan = ExecutionPlan(
goal="test",
steps=[
PlanStep(step_id="s0", name="A", description="A"),
PlanStep(step_id="s1", name="B", description="B"),
],
parallel_groups=[["s0"]], # s1 未分组
)
errors = planner.validate_plan(plan)
assert any("未分配" in e for e in errors)
class TestGoalPlannerWithOrchestrator:
"""GoalPlanner 与 Orchestrator 集成"""
@pytest.mark.asyncio
async def test_orchestrator_with_goal_planner(self):
"""Orchestrator 使用 GoalPlanner 分解任务"""
planner = GoalPlanner()
class MockAgent:
def __init__(self, name):
self.name = name
self.agent_type = "mock"
async def execute(self, task):
from agentkit.core.protocol import TaskResult
now = datetime.now(timezone.utc)
return TaskResult(
task_id=task.task_id,
agent_name=self.name,
status=TaskStatus.COMPLETED,
output_data={"result": f"from {self.name}"},
error_message=None,
started_at=now,
completed_at=now,
)
class MockPool:
def get_agent(self, name):
return MockAgent(name)
def list_agents(self):
return [
{"name": "web_search", "agent_type": "search", "description": "Web search agent"},
{"name": "report_generator", "agent_type": "report", "description": "Report generator"},
]
pool = MockPool()
orchestrator = Orchestrator(agent_pool=pool, goal_planner=planner)
task = TaskMessage(
task_id="t1",
agent_name="web_search",
task_type="research",
priority=1,
input_data={"query": "调研 3 个竞品 SEO 策略并生成对比报告"},
callback_url=None,
created_at=datetime.now(timezone.utc),
)
plan = await orchestrator._decompose_task(task)
# GoalPlanner 应将任务分解为 4 个子任务
assert len(plan.subtasks) == 4
# 前 3 个无依赖
assert len(plan.subtasks[0].depends_on) == 0
assert len(plan.subtasks[1].depends_on) == 0
assert len(plan.subtasks[2].depends_on) == 0
# 第 4 个依赖前 3 个
assert len(plan.subtasks[3].depends_on) == 3
@pytest.mark.asyncio
async def test_orchestrator_without_goal_planner_backward_compat(self):
"""无 GoalPlanner 时 Orchestrator 保持原有行为"""
class MockPool:
def get_agent(self, name):
return None
def list_agents(self):
return []
pool = MockPool()
orchestrator = Orchestrator(agent_pool=pool)
task = TaskMessage(
task_id="t1",
agent_name="worker1",
task_type="test",
priority=1,
input_data={"query": "test"},
callback_url=None,
created_at=datetime.now(timezone.utc),
)
plan = await orchestrator._decompose_task(task)
# Fallback: 单个子任务
assert len(plan.subtasks) == 1
assert plan.subtasks[0].task_id.startswith(plan.plan_id)
class TestGoalPlannerLLMRefinement:
"""LLM 细化计划"""
@pytest.mark.asyncio
async def test_llm_refinement_called_when_needed(self):
"""初始方案不够精确时调用 LLM 细化"""
class MockLLMGateway:
async def chat(self, messages, model):
import json
return type("Response", (), {
"content": json.dumps([
{"name": "调研竞品 A", "description": "使用搜索引擎调研竞品 A 的 SEO 策略,包括关键词排名和外链分析", "dependencies": [], "required_skills": ["web_search"]},
{"name": "调研竞品 B", "description": "使用搜索引擎调研竞品 B 的 SEO 策略,包括关键词排名和外链分析", "dependencies": [], "required_skills": ["web_search"]},
{"name": "调研竞品 C", "description": "使用搜索引擎调研竞品 C 的 SEO 策略,包括关键词排名和外链分析", "dependencies": [], "required_skills": ["web_search"]},
{"name": "生成对比报告", "description": "汇总三个竞品的 SEO 策略数据,生成结构化对比分析报告", "dependencies": [0, 1, 2], "required_skills": ["report_generator"]},
]),
})()
# 使用无可用 Skill 的场景触发 LLM 细化
planner = GoalPlanner(llm_gateway=MockLLMGateway())
plan = await planner.generate_plan(
goal="调研 3 个竞品 SEO 策略并生成对比报告",
available_skills=[], # 无可用 Skill触发 LLM 细化
)
# LLM 细化后步骤描述应更详细
assert len(plan.steps) == 4
assert plan.metadata.get("refined_by_llm") is True
for step in plan.steps:
assert len(step.description) >= 20
@pytest.mark.asyncio
async def test_llm_failure_falls_back_to_initial(self):
"""LLM 细化失败时回退到初始方案"""
class FailingLLMGateway:
async def chat(self, messages, model):
raise RuntimeError("LLM unavailable")
planner = GoalPlanner(llm_gateway=FailingLLMGateway())
plan = await planner.generate_plan(
goal="调研 3 个竞品 SEO 策略并生成对比报告",
available_skills=["web_search"],
)
# 应回退到规则生成的初始方案
assert len(plan.steps) == 4
assert plan.metadata.get("refined_by_llm") is None
class TestGoalPlannerBuildParallelGroups:
"""并行组构建"""
def test_simple_parallel(self):
planner = GoalPlanner()
steps = [
PlanStep(step_id="s0", name="A", description="A"),
PlanStep(step_id="s1", name="B", description="B"),
PlanStep(step_id="s2", name="C", description="C", dependencies=["s0", "s1"]),
]
groups = planner._build_parallel_groups(steps)
assert len(groups) == 2
assert set(groups[0]) == {"s0", "s1"}
assert groups[1] == ["s2"]
def test_sequential_chain(self):
planner = GoalPlanner()
steps = [
PlanStep(step_id="s0", name="A", description="A"),
PlanStep(step_id="s1", name="B", description="B", dependencies=["s0"]),
PlanStep(step_id="s2", name="C", description="C", dependencies=["s1"]),
]
groups = planner._build_parallel_groups(steps)
assert len(groups) == 3
assert groups[0] == ["s0"]
assert groups[1] == ["s1"]
assert groups[2] == ["s2"]
def test_max_parallel_limit(self):
planner = GoalPlanner(max_parallel=2)
steps = [
PlanStep(step_id="s0", name="A", description="A"),
PlanStep(step_id="s1", name="B", description="B"),
PlanStep(step_id="s2", name="C", description="C"),
]
groups = planner._build_parallel_groups(steps)
# 最多 2 个并行
assert len(groups[0]) <= 2
def test_circular_dependency_handling(self):
planner = GoalPlanner()
steps = [
PlanStep(step_id="s0", name="A", description="A", dependencies=["s1"]),
PlanStep(step_id="s1", name="B", description="B", dependencies=["s0"]),
]
groups = planner._build_parallel_groups(steps)
# 循环依赖时将剩余步骤放入一组
assert len(groups) == 1
assert set(groups[0]) == {"s0", "s1"}