"""Tests for GoalPlanner — 目标分析与计划生成""" import pytest from agentkit.core.goal_planner import GoalPlanner from agentkit.core.orchestrator import Orchestrator, SubTask from agentkit.core.plan_schema import ( ExecutionPlan, PlanStep, PlanStepStatus, SkillGap, SkillGapLevel, ) from agentkit.core.protocol import TaskMessage, TaskStatus from datetime import datetime, timezone # --- Plan Schema Tests --- class TestPlanStep: """PlanStep 数据类测试""" def test_default_values(self): step = PlanStep( step_id="s1", name="Test Step", description="A test step", ) assert step.dependencies == [] assert step.parallel_group == 0 assert step.required_skills == [] assert step.input_data == {} assert step.status == PlanStepStatus.PENDING assert step.result is None assert step.error is None def test_to_dict(self): step = PlanStep( step_id="s1", name="Test", description="Desc", dependencies=["s0"], parallel_group=1, required_skills=["web_search"], ) d = step.to_dict() assert d["step_id"] == "s1" assert d["dependencies"] == ["s0"] assert d["parallel_group"] == 1 assert d["required_skills"] == ["web_search"] assert d["status"] == "pending" class TestExecutionPlan: """ExecutionPlan 数据类测试""" def test_default_values(self): plan = ExecutionPlan(goal="test") assert plan.goal == "test" assert plan.steps == [] assert plan.parallel_groups == [] assert plan.skill_gaps == [] assert plan.confirmed is False def test_has_skill_gaps(self): plan = ExecutionPlan( goal="test", skill_gaps=[ SkillGap(step_name="s1", required_skill="x", level=SkillGapLevel.LOW), ], ) assert plan.has_skill_gaps is False plan.skill_gaps.append( SkillGap(step_name="s2", required_skill="y", level=SkillGapLevel.HIGH), ) assert plan.has_skill_gaps is True def test_get_step(self): step = PlanStep(step_id="s1", name="A", description="A step") plan = ExecutionPlan(goal="test", steps=[step]) assert plan.get_step("s1") is step assert plan.get_step("nonexistent") is None def test_to_readable(self): plan = ExecutionPlan( plan_id="p1", goal="调研竞品", steps=[ PlanStep(step_id="s0", name="调研 A", description="调研竞品 A", parallel_group=0, required_skills=["web_search"]), PlanStep(step_id="s1", name="调研 B", description="调研竞品 B", parallel_group=0, required_skills=["web_search"]), PlanStep(step_id="s2", name="汇总", description="汇总报告", dependencies=["s0", "s1"], parallel_group=1, required_skills=["report_generator"]), ], parallel_groups=[["s0", "s1"], ["s2"]], ) readable = plan.to_readable() assert "调研竞品" in readable assert "并行组 1" in readable assert "s0" in readable assert "web_search" in readable def test_to_dict(self): plan = ExecutionPlan( plan_id="p1", goal="test", steps=[PlanStep(step_id="s0", name="A", description="A")], parallel_groups=[["s0"]], ) d = plan.to_dict() assert d["plan_id"] == "p1" assert len(d["steps"]) == 1 assert d["parallel_groups"] == [["s0"]] # --- GoalPlanner Tests --- class TestGoalPlannerSimpleGoal: """简单目标 → 单步计划""" @pytest.mark.asyncio async def test_simple_goal_generates_single_step(self): planner = GoalPlanner() plan = await planner.generate_plan( goal="查询今天的天气", available_skills=["web_search"], ) assert len(plan.steps) == 1 assert plan.steps[0].parallel_group == 0 assert len(plan.parallel_groups) == 1 @pytest.mark.asyncio async def test_simple_goal_with_matching_skill(self): planner = GoalPlanner() plan = await planner.generate_plan( goal="搜索最新的 AI 新闻", available_skills=["web_search"], ) assert "web_search" in plan.steps[0].required_skills @pytest.mark.asyncio async def test_simple_goal_no_matching_skill(self): planner = GoalPlanner() plan = await planner.generate_plan( goal="执行某个未知操作", available_skills=["web_search"], ) # 单步任务,无匹配 Skill assert len(plan.steps) == 1 assert plan.steps[0].required_skills == [] class TestGoalPlannerParallelGoal: """复杂目标(并列结构)→ 多步并行计划""" @pytest.mark.asyncio async def test_parallel_competitor_research(self): """AE1: 3 个竞品调研自动识别为并行步骤""" planner = GoalPlanner() plan = await planner.generate_plan( goal="调研 3 个竞品 SEO 策略并生成对比报告", available_skills=["web_search", "seo_analyzer", "report_generator"], ) # 应有 4 个步骤:3 个并行调研 + 1 个汇总 assert len(plan.steps) == 4 # 前 3 个步骤无依赖,应在同一并行组 parallel_steps = [s for s in plan.steps if not s.dependencies] assert len(parallel_steps) == 3 # 汇总步骤依赖前 3 个 summary_step = [s for s in plan.steps if s.dependencies] assert len(summary_step) == 1 assert len(summary_step[0].dependencies) == 3 # 并行组:第一组 3 个并行,第二组 1 个汇总 assert len(plan.parallel_groups) == 2 assert len(plan.parallel_groups[0]) == 3 assert len(plan.parallel_groups[1]) == 1 @pytest.mark.asyncio async def test_parallel_items_with_dunhao(self): """顿号分隔的并列项""" planner = GoalPlanner() plan = await planner.generate_plan( goal="调研竞品A、竞品B、竞品C的市场策略", available_skills=["web_search", "report_generator"], ) assert len(plan.steps) == 4 # 3 个并行 + 1 个汇总 parallel_steps = [s for s in plan.steps if not s.dependencies] assert len(parallel_steps) == 3 @pytest.mark.asyncio async def test_parallel_steps_have_correct_group(self): planner = GoalPlanner() plan = await planner.generate_plan( goal="调研 3 个竞品 SEO 策略并生成对比报告", available_skills=["web_search", "seo_analyzer", "report_generator"], ) # 并行步骤的 parallel_group 应为 0 for step in plan.steps[:3]: assert step.parallel_group == 0 # 汇总步骤的 parallel_group 应为 1 assert plan.steps[3].parallel_group == 1 class TestGoalPlannerSequentialGoal: """顺序目标 → 顺序步骤""" @pytest.mark.asyncio async def test_sequential_goal_with_bing(self): """'并' 连接的顺序步骤""" planner = GoalPlanner() plan = await planner.generate_plan( goal="调研市场趋势并生成分析报告", available_skills=["web_search", "report_generator"], ) assert len(plan.steps) == 2 # 第二步依赖第一步 assert plan.steps[1].dependencies == [plan.steps[0].step_id] @pytest.mark.asyncio async def test_sequential_goal_with_arrow(self): """箭头分隔的顺序步骤""" planner = GoalPlanner() plan = await planner.generate_plan( goal="收集数据→分析数据→生成报告", available_skills=["web_search", "data_analyzer", "report_generator"], ) assert len(plan.steps) == 3 assert plan.steps[0].dependencies == [] assert plan.steps[1].dependencies == [plan.steps[0].step_id] assert plan.steps[2].dependencies == [plan.steps[1].step_id] class TestGoalPlannerSkillGaps: """能力缺口识别""" @pytest.mark.asyncio async def test_missing_skill_creates_gap(self): """无可用 Skill 的目标 → 计划标注能力缺口""" planner = GoalPlanner() plan = await planner.generate_plan( goal="调研 3 个竞品 SEO 策略并生成对比报告", available_skills=[], # 无可用 Skill ) assert plan.has_skill_gaps is True assert len(plan.skill_gaps) > 0 @pytest.mark.asyncio async def test_no_gaps_when_skills_available(self): """所有 Skill 可用时无缺口""" planner = GoalPlanner() plan = await planner.generate_plan( goal="搜索最新的 AI 新闻", available_skills=["web_search"], ) # web_search 匹配,不应有 HIGH 级别缺口 high_gaps = [g for g in plan.skill_gaps if g.level == SkillGapLevel.HIGH] assert len(high_gaps) == 0 @pytest.mark.asyncio async def test_gap_includes_suggestion(self): planner = GoalPlanner() plan = await planner.generate_plan( goal="调研 3 个竞品 SEO 策略并生成对比报告", available_skills=[], ) for gap in plan.skill_gaps: if gap.level == SkillGapLevel.HIGH: assert gap.suggestion != "" class TestGoalPlannerUpdatePlan: """用户修改计划""" def test_remove_step(self): planner = GoalPlanner() plan = ExecutionPlan( goal="test", steps=[ PlanStep(step_id="s0", name="A", description="A"), PlanStep(step_id="s1", name="B", description="B", dependencies=["s0"]), PlanStep(step_id="s2", name="C", description="C", dependencies=["s1"]), ], parallel_groups=[["s0"], ["s1"], ["s2"]], ) updated = planner.update_plan_from_feedback(plan, {"remove_steps": ["s1"]}) assert len(updated.steps) == 2 # s2 的依赖应被清理 assert "s1" not in updated.steps[1].dependencies def test_update_step(self): planner = GoalPlanner() plan = ExecutionPlan( goal="test", steps=[ PlanStep(step_id="s0", name="A", description="A"), ], parallel_groups=[["s0"]], ) updated = planner.update_plan_from_feedback( plan, {"update_steps": {"s0": {"name": "Updated A", "description": "Updated description"}}}, ) assert updated.steps[0].name == "Updated A" assert updated.steps[0].description == "Updated description" def test_add_step(self): planner = GoalPlanner() plan = ExecutionPlan( goal="test", steps=[ PlanStep(step_id="s0", name="A", description="A"), ], parallel_groups=[["s0"]], ) updated = planner.update_plan_from_feedback( plan, {"add_steps": [{"name": "New Step", "description": "A new step", "dependencies": ["s0"]}]}, ) assert len(updated.steps) == 2 assert updated.steps[1].name == "New Step" def test_update_resets_confirmed(self): planner = GoalPlanner() plan = ExecutionPlan( goal="test", confirmed=True, steps=[PlanStep(step_id="s0", name="A", description="A")], parallel_groups=[["s0"]], ) updated = planner.update_plan_from_feedback(plan, {"update_steps": {"s0": {"name": "B"}}}) assert updated.confirmed is False def test_update_rebuilds_parallel_groups(self): planner = GoalPlanner() plan = ExecutionPlan( goal="test", steps=[ PlanStep(step_id="s0", name="A", description="A"), PlanStep(step_id="s1", name="B", description="B", dependencies=["s0"]), ], parallel_groups=[["s0"], ["s1"]], ) # 添加一个与 s0 并行的步骤 updated = planner.update_plan_from_feedback( plan, {"add_steps": [{"name": "C", "description": "Parallel to A", "dependencies": []}]}, ) # 新步骤应与 s0 在同一并行组 assert len(updated.parallel_groups[0]) == 2 class TestGoalPlannerValidatePlan: """计划验证""" def test_valid_plan(self): planner = GoalPlanner() plan = ExecutionPlan( goal="test", steps=[ PlanStep(step_id="s0", name="A", description="A"), PlanStep(step_id="s1", name="B", description="B", dependencies=["s0"]), ], parallel_groups=[["s0"], ["s1"]], ) errors = planner.validate_plan(plan) assert errors == [] def test_invalid_dependency(self): planner = GoalPlanner() plan = ExecutionPlan( goal="test", steps=[ PlanStep(step_id="s0", name="A", description="A", dependencies=["nonexistent"]), ], parallel_groups=[["s0"]], ) errors = planner.validate_plan(plan) assert len(errors) > 0 assert any("nonexistent" in e for e in errors) def test_circular_dependency(self): planner = GoalPlanner() plan = ExecutionPlan( goal="test", steps=[ PlanStep(step_id="s0", name="A", description="A", dependencies=["s1"]), PlanStep(step_id="s1", name="B", description="B", dependencies=["s0"]), ], parallel_groups=[["s0", "s1"]], ) errors = planner.validate_plan(plan) assert any("循环依赖" in e for e in errors) def test_ungrouped_steps(self): planner = GoalPlanner() plan = ExecutionPlan( goal="test", steps=[ PlanStep(step_id="s0", name="A", description="A"), PlanStep(step_id="s1", name="B", description="B"), ], parallel_groups=[["s0"]], # s1 未分组 ) errors = planner.validate_plan(plan) assert any("未分配" in e for e in errors) class TestGoalPlannerWithOrchestrator: """GoalPlanner 与 Orchestrator 集成""" @pytest.mark.asyncio async def test_orchestrator_with_goal_planner(self): """Orchestrator 使用 GoalPlanner 分解任务""" planner = GoalPlanner() class MockAgent: def __init__(self, name): self.name = name self.agent_type = "mock" async def execute(self, task): from agentkit.core.protocol import TaskResult now = datetime.now(timezone.utc) return TaskResult( task_id=task.task_id, agent_name=self.name, status=TaskStatus.COMPLETED, output_data={"result": f"from {self.name}"}, error_message=None, started_at=now, completed_at=now, ) class MockPool: def get_agent(self, name): return MockAgent(name) def list_agents(self): return [ {"name": "web_search", "agent_type": "search", "description": "Web search agent"}, {"name": "report_generator", "agent_type": "report", "description": "Report generator"}, ] pool = MockPool() orchestrator = Orchestrator(agent_pool=pool, goal_planner=planner) task = TaskMessage( task_id="t1", agent_name="web_search", task_type="research", priority=1, input_data={"query": "调研 3 个竞品 SEO 策略并生成对比报告"}, callback_url=None, created_at=datetime.now(timezone.utc), ) plan = await orchestrator._decompose_task(task) # GoalPlanner 应将任务分解为 4 个子任务 assert len(plan.subtasks) == 4 # 前 3 个无依赖 assert len(plan.subtasks[0].depends_on) == 0 assert len(plan.subtasks[1].depends_on) == 0 assert len(plan.subtasks[2].depends_on) == 0 # 第 4 个依赖前 3 个 assert len(plan.subtasks[3].depends_on) == 3 @pytest.mark.asyncio async def test_orchestrator_without_goal_planner_backward_compat(self): """无 GoalPlanner 时 Orchestrator 保持原有行为""" class MockPool: def get_agent(self, name): return None def list_agents(self): return [] pool = MockPool() orchestrator = Orchestrator(agent_pool=pool) task = TaskMessage( task_id="t1", agent_name="worker1", task_type="test", priority=1, input_data={"query": "test"}, callback_url=None, created_at=datetime.now(timezone.utc), ) plan = await orchestrator._decompose_task(task) # Fallback: 单个子任务 assert len(plan.subtasks) == 1 assert plan.subtasks[0].task_id.startswith(plan.plan_id) class TestGoalPlannerLLMRefinement: """LLM 细化计划""" @pytest.mark.asyncio async def test_llm_refinement_called_when_needed(self): """初始方案不够精确时调用 LLM 细化""" class MockLLMGateway: async def chat(self, messages, model): import json return type("Response", (), { "content": json.dumps([ {"name": "调研竞品 A", "description": "使用搜索引擎调研竞品 A 的 SEO 策略,包括关键词排名和外链分析", "dependencies": [], "required_skills": ["web_search"]}, {"name": "调研竞品 B", "description": "使用搜索引擎调研竞品 B 的 SEO 策略,包括关键词排名和外链分析", "dependencies": [], "required_skills": ["web_search"]}, {"name": "调研竞品 C", "description": "使用搜索引擎调研竞品 C 的 SEO 策略,包括关键词排名和外链分析", "dependencies": [], "required_skills": ["web_search"]}, {"name": "生成对比报告", "description": "汇总三个竞品的 SEO 策略数据,生成结构化对比分析报告", "dependencies": [0, 1, 2], "required_skills": ["report_generator"]}, ]), })() # 使用无可用 Skill 的场景触发 LLM 细化 planner = GoalPlanner(llm_gateway=MockLLMGateway()) plan = await planner.generate_plan( goal="调研 3 个竞品 SEO 策略并生成对比报告", available_skills=[], # 无可用 Skill,触发 LLM 细化 ) # LLM 细化后步骤描述应更详细 assert len(plan.steps) == 4 assert plan.metadata.get("refined_by_llm") is True for step in plan.steps: assert len(step.description) >= 20 @pytest.mark.asyncio async def test_llm_failure_falls_back_to_initial(self): """LLM 细化失败时回退到初始方案""" class FailingLLMGateway: async def chat(self, messages, model): raise RuntimeError("LLM unavailable") planner = GoalPlanner(llm_gateway=FailingLLMGateway()) plan = await planner.generate_plan( goal="调研 3 个竞品 SEO 策略并生成对比报告", available_skills=["web_search"], ) # 应回退到规则生成的初始方案 assert len(plan.steps) == 4 assert plan.metadata.get("refined_by_llm") is None class TestGoalPlannerBuildParallelGroups: """并行组构建""" def test_simple_parallel(self): planner = GoalPlanner() steps = [ PlanStep(step_id="s0", name="A", description="A"), PlanStep(step_id="s1", name="B", description="B"), PlanStep(step_id="s2", name="C", description="C", dependencies=["s0", "s1"]), ] groups = planner._build_parallel_groups(steps) assert len(groups) == 2 assert set(groups[0]) == {"s0", "s1"} assert groups[1] == ["s2"] def test_sequential_chain(self): planner = GoalPlanner() steps = [ PlanStep(step_id="s0", name="A", description="A"), PlanStep(step_id="s1", name="B", description="B", dependencies=["s0"]), PlanStep(step_id="s2", name="C", description="C", dependencies=["s1"]), ] groups = planner._build_parallel_groups(steps) assert len(groups) == 3 assert groups[0] == ["s0"] assert groups[1] == ["s1"] assert groups[2] == ["s2"] def test_max_parallel_limit(self): planner = GoalPlanner(max_parallel=2) steps = [ PlanStep(step_id="s0", name="A", description="A"), PlanStep(step_id="s1", name="B", description="B"), PlanStep(step_id="s2", name="C", description="C"), ] groups = planner._build_parallel_groups(steps) # 最多 2 个并行 assert len(groups[0]) <= 2 def test_circular_dependency_handling(self): planner = GoalPlanner() steps = [ PlanStep(step_id="s0", name="A", description="A", dependencies=["s1"]), PlanStep(step_id="s1", name="B", description="B", dependencies=["s0"]), ] groups = planner._build_parallel_groups(steps) # 循环依赖时将剩余步骤放入一组 assert len(groups) == 1 assert set(groups[0]) == {"s0", "s1"}