fischer-agentkit/tests/integration/test_goal_driven_scenario.py

1067 lines
35 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""U15: End-to-end enterprise scenario validation
验证 Fischer AgentKit 7 项能力的端到端集成场景:
- SC1: 目标驱动复杂任务GoalPlanner → PlanExecutor → PlanChecker → ExperienceStore
- SC2: 知识库问答 + 系统操作MultiSourceRetriever + ShellTool
- SC3: Workflow 人工审批场景WorkflowStore + approval node
- SC4: 自进化经验积累ExperienceStore → PitfallDetector → PathOptimizer
- SC5: 并行执行效率验证PlanExecutor parallel groups
- SC6: Skill 注册与能力查询集成验证SkillRegistry
"""
from __future__ import annotations
import asyncio
import time
from datetime import datetime, timezone
from typing import Any
from unittest.mock import AsyncMock
import pytest
from agentkit.core.goal_planner import GoalPlanner
from agentkit.core.plan_checker import (
CheckResult,
CheckStatus,
PlanChecker,
QualityGate,
)
from agentkit.core.plan_executor import (
FailureAction,
PlanExecutionResult,
PlanExecutor,
StepExecutionResult,
)
from agentkit.core.plan_schema import (
ExecutionPlan,
PlanStep,
PlanStepStatus,
SkillGap,
SkillGapLevel,
)
from agentkit.core.protocol import TaskMessage, TaskResult, TaskStatus
from agentkit.evolution.experience_schema import TaskExperience
from agentkit.evolution.experience_store import InMemoryExperienceStore
from agentkit.evolution.path_optimizer import ExecutionPath, PathOptimizer
from agentkit.evolution.pitfall_detector import PitfallDetector, WarningLevel
from agentkit.memory.embedder import MockEmbedder
from agentkit.memory.knowledge_base import Document
from agentkit.memory.local_rag import InMemoryLocalRAGService
from agentkit.memory.multi_source_retriever import MultiSourceRetriever
from agentkit.orchestrator.workflow_schema import (
WorkflowDefinition,
WorkflowExecution,
WorkflowStage,
)
from agentkit.server.routes.workflows import WorkflowStore
from agentkit.skills.base import Skill, SkillConfig
from agentkit.skills.registry import SkillRegistry
from agentkit.skills.schema import CapabilityTag, DependencyDecl
# ── Fixtures ──────────────────────────────────────────────
class MockAgent:
"""Mock Agent for PlanExecutor integration tests"""
def __init__(self, name: str = "mock_agent", output: dict | None = None):
self.name = name
self._output = output or {"result": "ok", "data": "mock output"}
async def execute(self, task_msg: TaskMessage) -> TaskResult:
await asyncio.sleep(0.01) # Simulate work
return TaskResult(
task_id=task_msg.task_id,
agent_name=self.name,
status="completed",
output_data=self._output,
error_message=None,
started_at=datetime.now(timezone.utc),
completed_at=datetime.now(timezone.utc),
)
class MockAgentPool:
"""Mock AgentPool for PlanExecutor integration tests"""
def __init__(self, agents: dict[str, MockAgent] | None = None):
self._agents = agents or {}
self._default_agent = MockAgent()
def get_agent(self, name: str) -> MockAgent | None:
return self._agents.get(name, self._default_agent)
async def create_agent_from_skill(self, skill_name: str) -> MockAgent | None:
return self._agents.get(skill_name, self._default_agent)
def list_agents(self) -> list[dict[str, Any]]:
agents_info = [
{
"name": name,
"agent_type": "mock",
"description": f"Mock agent for {name}",
}
for name in self._agents
]
if not agents_info:
agents_info.append(
{
"name": "default",
"agent_type": "mock",
"description": "Default mock agent",
}
)
return agents_info
@pytest.fixture
def mock_agent_pool():
"""Create a MockAgentPool with skills matching GoalPlanner keywords"""
agents = {
"web_search": MockAgent("web_search", {"result": "search results", "data": "SEO data"}),
"seo_analyzer": MockAgent("seo_analyzer", {"result": "analysis", "data": "SEO analysis"}),
"report_generator": MockAgent(
"report_generator", {"result": "report", "data": "optimization report"}
),
"data_analyzer": MockAgent("data_analyzer", {"result": "analysis", "data": "data analysis"}),
}
return MockAgentPool(agents)
@pytest.fixture
def experience_store():
"""Create an InMemoryExperienceStore"""
return InMemoryExperienceStore()
@pytest.fixture
def mock_embedder():
"""Create a MockEmbedder"""
return MockEmbedder(dimension=64)
@pytest.fixture
def local_rag(mock_embedder):
"""Create an InMemoryLocalRAGService"""
return InMemoryLocalRAGService(embedder=mock_embedder)
@pytest.fixture
def skill_registry():
"""Create a SkillRegistry with sample skills"""
registry = SkillRegistry()
# Register skills with capabilities
skills_data = [
{
"name": "web_search",
"capabilities": ["search", "web"],
"description": "Web search skill",
},
{
"name": "seo_analyzer",
"capabilities": ["analysis", "seo"],
"description": "SEO analysis skill",
},
{
"name": "report_generator",
"capabilities": ["generation", "report"],
"description": "Report generation skill",
},
{
"name": "data_analyzer",
"capabilities": ["analysis", "data"],
"description": "Data analysis skill",
},
{
"name": "terminal_tool",
"capabilities": ["terminal", "execution"],
"description": "Terminal execution skill",
},
]
for skill_data in skills_data:
config = SkillConfig(
name=skill_data["name"],
agent_type=skill_data["name"],
description=skill_data["description"],
task_mode="tool_call",
capabilities=skill_data["capabilities"],
tools=[skill_data["name"]],
)
skill = Skill(config=config)
registry.register(skill)
return registry
# ── SC1: Goal-Driven Complex Task ────────────────────────
@pytest.mark.asyncio
async def test_goal_driven_seo_analysis(mock_agent_pool, experience_store):
"""SC1: 目标驱动的复杂任务端到端验证
场景:"分析竞品 SEO 策略并生成优化方案"
- GoalPlanner 分解为并行竞品调研 → 数据分析 → 方案生成
- PlanExecutor 并行执行
- PlanChecker 验证每步产出
- ExperienceStore 记录经验
- 第二次执行可检索到历史经验
"""
# 1. Setup: Create GoalPlanner with available skills
available_skills = ["web_search", "seo_analyzer", "report_generator", "data_analyzer"]
planner = GoalPlanner(max_parallel=5)
# 2. Generate plan for SEO analysis goal
goal = "分析竞品 SEO 策略并生成优化方案"
plan = await planner.generate_plan(
goal=goal,
context={},
available_skills=available_skills,
)
# 3. Verify plan structure
assert plan.goal == goal
assert len(plan.steps) >= 2 # At least parallel steps + summary
assert len(plan.parallel_groups) >= 1
# Verify parallel group exists for competitor analyses
# GoalPlanner detects "3" in "3 个竞品" pattern or "并" pattern
first_group = plan.parallel_groups[0]
assert len(first_group) >= 1 # At least one step in first group
# 4. Confirm plan and execute
plan.confirmed = True
executor = PlanExecutor(
agent_pool=mock_agent_pool,
max_retries=1,
step_timeout=10.0,
)
original_task = TaskMessage(
task_id="test-seo-task",
agent_name="goal_driven_agent",
task_type="complex_analysis",
priority=1,
input_data={"goal": goal},
callback_url=None,
created_at=datetime.now(timezone.utc),
)
plan_result = await executor.execute(plan, original_task)
# 5. Verify PlanChecker validates results
checker = PlanChecker()
for step in plan.steps:
step_result = plan_result.step_results.get(step.step_id)
if step_result:
check_result = await checker.check_step(step, step_result)
# Completed steps should pass quality check
if step_result.status == PlanStepStatus.COMPLETED:
assert check_result.status in (CheckStatus.PASS, CheckStatus.FAIL)
# 6. Generate review report
report = await checker.review_plan(
plan, plan_result, task_type="seo_analysis", goal=goal
)
assert report.plan_id == plan.plan_id
assert report.outcome in ("success", "partial", "failure")
# 7. Record experience to ExperienceStore
experience = TaskExperience(
task_type="seo_analysis",
goal=goal,
steps_summary="; ".join(
f"{s.name}: {s.status.value}" for s in plan.steps
),
outcome=report.outcome,
duration_seconds=report.total_duration_ms / 1000,
success_rate=report.success_rate,
failure_reasons=report.failure_reasons,
optimization_tips=report.optimization_tips,
)
exp_id = await experience_store.record_experience(experience)
assert exp_id
# 8. Run similar task again - verify experience is retrieved
similar_results = await experience_store.search(
query="竞品 SEO 分析",
top_k=3,
task_type="seo_analysis",
)
assert len(similar_results) >= 1
assert similar_results[0].goal == goal
# ── SC2: Knowledge Base Q&A + System Operation ───────────
@pytest.mark.asyncio
async def test_knowledge_qa_with_system_operation(local_rag, mock_embedder):
"""SC2: 知识库问答+系统操作场景
- MultiSourceRetriever 从多个知识源检索
- InMemoryLocalRAGService 摄取和检索文档
- 检索结果包含来源追溯
"""
# 1. Setup: Create MultiSourceRetriever with InMemoryLocalRAGService
rag_service_1 = InMemoryLocalRAGService(embedder=mock_embedder)
rag_service_2 = InMemoryLocalRAGService(embedder=mock_embedder)
retriever = MultiSourceRetriever()
retriever.register_source("internal_docs", rag_service_1)
retriever.register_source("external_kb", rag_service_2)
# 2. Ingest test documents into both sources
docs_source_1 = [
Document(
doc_id="doc-1",
content="SEO 优化策略包括关键词研究、内容优化、外链建设和技术 SEO 四个核心方向。",
title="SEO优化指南",
source_id="internal",
metadata={"format": "text", "department": "marketing"},
),
Document(
doc_id="doc-2",
content="竞品分析需要关注对手的关键词排名、内容策略和外链来源。",
title="竞品分析方法论",
source_id="internal",
metadata={"format": "text", "department": "strategy"},
),
]
docs_source_2 = [
Document(
doc_id="doc-3",
content="技术 SEO 涵盖网站速度优化、结构化数据标记和移动端适配。",
title="技术SEO手册",
source_id="external",
metadata={"format": "text", "source": "partner"},
),
]
ids_1 = await rag_service_1.ingest(docs_source_1)
ids_2 = await rag_service_2.ingest(docs_source_2)
assert len(ids_1) == 2
assert len(ids_2) == 1
# 3. Query with specified sources
results = await retriever.search(
query="SEO 优化策略",
top_k=5,
sources=["internal_docs", "external_kb"],
)
# 4. Verify results include source attribution
assert len(results) >= 1
for result in results:
assert result.content # Has content
assert result.source_id # Has source attribution
assert result.score > 0 # Has relevance score
assert result.source_name in ("internal_docs", "external_kb")
# 5. Query from single source only
single_source_results = await retriever.search(
query="竞品分析",
top_k=3,
sources=["internal_docs"],
)
assert len(single_source_results) >= 1
assert all(r.source_name == "internal_docs" for r in single_source_results)
# 6. Verify list_all_sources
all_sources = await retriever.list_all_sources()
assert "internal_docs" in all_sources
assert "external_kb" in all_sources
# 7. Verify health check
assert await rag_service_1.health_check() is True
assert await rag_service_2.health_check() is True
# ── SC3: Workflow with Approval ──────────────────────────
@pytest.mark.asyncio
async def test_workflow_with_approval():
"""SC3: Workflow 人工审批场景
- 创建带审批节点的 WorkflowDefinition
- 执行 Workflow → 在审批节点暂停
- 审批通过 → 继续执行
- 验证最终完成
"""
# 1. Create WorkflowStore
store = WorkflowStore()
# 2. Create WorkflowDefinition with approval node
workflow = WorkflowDefinition(
workflow_id="wf-approval-test",
name="审批流程测试",
stages=[
WorkflowStage(
name="data_collect",
agent="data_collector",
action="collect_data",
type="skill",
),
WorkflowStage(
name="human_review",
agent="reviewer",
action="review_data",
type="approval",
config={"require_comment": True},
depends_on=["data_collect"],
),
WorkflowStage(
name="generate_report",
agent="report_generator",
action="generate_report",
type="skill",
depends_on=["human_review"],
),
],
)
# 3. Save workflow
saved = store.save(workflow)
assert saved.workflow_id == "wf-approval-test"
# 4. Create execution
execution = store.create_execution(workflow.workflow_id)
assert execution.status == "pending"
assert execution.execution_id
# 5. Execute workflow in background (approval stage will wait for event)
from agentkit.server.routes.workflows import _execute_workflow
# Use a short approval timeout for testing
workflow.stages[1].config["approval_timeout"] = 5
async def _approve_after_pause():
"""Wait for execution to pause, then approve."""
for _ in range(100):
await asyncio.sleep(0.05)
updated = store.get_execution(execution.execution_id)
if updated and updated.status == "paused":
break
# Trigger approval
event_key = f"{execution.execution_id}:human_review"
if event_key in store._approval_events:
execution.stage_results["human_review"] = {
"status": "approved",
"approver": "test_user",
"comment": "Auto-approved in test",
}
store._approval_events[event_key].set()
approve_task = asyncio.create_task(_approve_after_pause())
await _execute_workflow(workflow, execution, variables={}, store=store)
await approve_task
# 6. Verify execution completed
updated = store.get_execution(execution.execution_id)
assert updated is not None
assert updated.status == "completed"
# 7. Verify stage results
assert "data_collect" in updated.stage_results
assert "human_review" in updated.stage_results
assert "generate_report" in updated.stage_results
# 8. Verify approval stage was processed
approval_result = updated.stage_results["human_review"]
assert approval_result.get("status") in ("approved", "completed")
# 9. Test second workflow with approval
workflow2 = WorkflowDefinition(
workflow_id="wf-manual-approval",
name="手动审批流程",
stages=[
WorkflowStage(
name="step1",
agent="agent1",
action="do_step1",
type="skill",
),
WorkflowStage(
name="approval_step",
agent="reviewer",
action="approve",
type="approval",
config={"approval_timeout": 5},
depends_on=["step1"],
),
WorkflowStage(
name="step2",
agent="agent2",
action="do_step2",
type="skill",
depends_on=["approval_step"],
),
],
)
store.save(workflow2)
execution2 = store.create_execution(workflow2.workflow_id)
async def _approve2_after_pause():
for _ in range(100):
await asyncio.sleep(0.05)
updated2 = store.get_execution(execution2.execution_id)
if updated2 and updated2.status == "paused":
break
event_key2 = f"{execution2.execution_id}:approval_step"
if event_key2 in store._approval_events:
execution2.stage_results["approval_step"] = {
"status": "approved",
"approver": "user",
"comment": "LGTM",
}
store._approval_events[event_key2].set()
approve_task2 = asyncio.create_task(_approve2_after_pause())
await _execute_workflow(workflow2, execution2, variables={}, store=store)
await approve_task2
updated2 = store.get_execution(execution2.execution_id)
assert updated2 is not None
assert updated2.status == "completed"
# Verify approval was recorded
paused_exec = store.get_execution(execution2.execution_id)
assert paused_exec.stage_results["approval_step"]["status"] == "approved"
assert paused_exec.stage_results["approval_step"]["approver"] == "user"
# ── SC4: Self-Evolution Experience Accumulation ──────────
@pytest.mark.asyncio
async def test_self_evolution_experience_accumulation(experience_store):
"""SC4: 自进化经验积累场景
- 执行任务 → 记录经验
- 执行相似任务 → PitfallDetector 预警
- 执行更好路径 → PathOptimizer 更新推荐路径
"""
# 1. Record a failure experience (API timeout)
failure_exp = TaskExperience(
task_type="api_integration",
goal="调用第三方 API 获取数据",
steps_summary="调用 API: failed; 数据解析: skipped",
outcome="failure",
duration_seconds=30.0,
success_rate=0.0,
failure_reasons=["API 调用超时", "连接被拒绝"],
optimization_tips=["增加超时时间", "添加重试机制"],
)
exp_id = await experience_store.record_experience(failure_exp)
assert exp_id
# 2. Create PitfallDetector
detector = PitfallDetector(
experience_store=experience_store,
similarity_threshold=0.1, # Low threshold for testing
)
# 3. Check pitfalls for similar task - should warn
# Create a PlanStep that resembles the failed step
from agentkit.core.plan_schema import PlanStep
similar_steps = [
PlanStep(
step_id="step-0",
name="调用 API",
description="调用第三方 API 获取数据",
dependencies=[],
),
]
warnings = await detector.check_pitfalls(
task_type="api_integration",
planned_steps=similar_steps,
)
# PitfallDetector should detect the historical failure
# Note: depends on keyword matching between step names and failure stats
# The step name "调用 API" should match historical failures
# 4. Record a successful experience with better path
success_exp = TaskExperience(
task_type="api_integration",
goal="调用第三方 API 获取数据(带重试)",
steps_summary="调用 API (带重试): success; 数据解析: success",
outcome="success",
duration_seconds=15.0,
success_rate=1.0,
failure_reasons=[],
optimization_tips=["重试机制有效"],
)
await experience_store.record_experience(success_exp)
# 5. PathOptimizer evaluates new path
optimizer = PathOptimizer(
experience_store=experience_store,
min_sample_count=1, # Low threshold for testing
)
# First path: slow and unreliable
old_path = ExecutionPath(
path_id="path-old",
task_type="api_integration",
steps=["调用 API", "数据解析"],
total_duration=30.0,
success_rate=0.3,
sample_count=5,
is_recommended=True,
)
result = await optimizer.evaluate_and_update("api_integration", old_path)
assert result.updated # No existing path → set as recommended
# New path: faster and more reliable
new_path = ExecutionPath(
path_id="path-new",
task_type="api_integration",
steps=["调用 API (带重试)", "数据解析"],
total_duration=15.0,
success_rate=0.95,
sample_count=5,
)
result = await optimizer.evaluate_and_update("api_integration", new_path)
assert result.updated # Success rate significantly improved
assert result.new_path.is_recommended
assert result.new_path.success_rate > old_path.success_rate
# 6. Verify new path is recommended for similar tasks
recommended = optimizer.get_recommended_path("api_integration")
assert recommended is not None
assert recommended.path_id == "path-new"
assert recommended.is_recommended is True
# ── SC5: Parallel Execution Efficiency ───────────────────
@pytest.mark.asyncio
async def test_parallel_execution_efficiency():
"""SC5: 并行执行效率验证
- 创建包含 3 个独立步骤的 ExecutionPlan
- 每步耗时约 0.2s
- 验证并行执行总时间 < 串行总时间
"""
class SlowAgent:
"""Agent that takes a fixed time to execute"""
def __init__(self, delay: float = 0.2):
self.name = "slow_agent"
self._delay = delay
async def execute(self, task_msg: TaskMessage) -> TaskResult:
await asyncio.sleep(self._delay)
return TaskResult(
task_id=task_msg.task_id,
agent_name=self.name,
status="completed",
output_data={"result": "done"},
error_message=None,
started_at=datetime.now(timezone.utc),
completed_at=datetime.now(timezone.utc),
)
class SlowAgentPool:
def __init__(self, delay: float = 0.2):
self._agent = SlowAgent(delay)
self._delay = delay
def get_agent(self, name: str) -> SlowAgent:
return self._agent
async def create_agent_from_skill(self, skill_name: str) -> SlowAgent:
return self._agent
def list_agents(self) -> list[dict[str, Any]]:
return [{"name": "slow_agent", "agent_type": "mock", "description": "Slow mock"}]
# 1. Create ExecutionPlan with 3 parallel steps
step_delay = 0.2
pool = SlowAgentPool(delay=step_delay)
plan = ExecutionPlan(
goal="并行效率测试",
steps=[
PlanStep(
step_id="step-0",
name="并行任务A",
description="独立并行任务A",
dependencies=[],
parallel_group=0,
required_skills=["data_analyzer"],
),
PlanStep(
step_id="step-1",
name="并行任务B",
description="独立并行任务B",
dependencies=[],
parallel_group=0,
required_skills=["data_analyzer"],
),
PlanStep(
step_id="step-2",
name="并行任务C",
description="独立并行任务C",
dependencies=[],
parallel_group=0,
required_skills=["data_analyzer"],
),
],
parallel_groups=[["step-0", "step-1", "step-2"]],
confirmed=True,
)
# 2. Execute with PlanExecutor
executor = PlanExecutor(
agent_pool=pool,
max_retries=0,
step_timeout=5.0,
)
original_task = TaskMessage(
task_id="parallel-test",
agent_name="test",
task_type="parallel",
priority=1,
input_data={"goal": "并行效率测试"},
callback_url=None,
created_at=datetime.now(timezone.utc),
)
start = time.monotonic()
result = await executor.execute(plan, original_task)
elapsed = time.monotonic() - start
# 3. Verify parallel execution efficiency
# 3 parallel steps of 0.2s each should complete in ~0.2s (parallel), not ~0.6s (serial)
serial_time = step_delay * 3
assert elapsed < serial_time * 0.8, (
f"Parallel execution took {elapsed:.2f}s, "
f"expected less than {serial_time * 0.8:.2f}s (80% of serial {serial_time:.2f}s)"
)
# 4. Verify all steps completed
completed_count = len(result.completed_steps)
assert completed_count == 3, f"Expected 3 completed steps, got {completed_count}"
# ── SC6: Skill Registry Integration ──────────────────────
@pytest.mark.asyncio
async def test_skill_registry_integration(skill_registry):
"""SC6: Skill 注册与能力查询集成验证
- 注册带能力的 Skill
- 按能力标签查询
- 健康检查
- 版本管理
"""
# 1. Verify skills are registered
all_skills = skill_registry.list_skills()
assert len(all_skills) >= 5
# 2. Query by capability tag
analysis_skills = skill_registry.query_by_capability("analysis")
assert len(analysis_skills) >= 2 # seo_analyzer + data_analyzer
search_skills = skill_registry.query_by_capability("search")
assert len(search_skills) >= 1 # web_search
# 3. Verify health check
health_results = skill_registry.health_check()
assert len(health_results) >= 5
# All skills should be healthy (no required dependencies missing)
for result in health_results:
assert result.healthy
# 4. Register new version of existing skill
new_config = SkillConfig(
name="web_search",
agent_type="web_search",
version="2.0.0",
description="Enhanced web search skill v2",
task_mode="tool_call",
capabilities=["search", "web", "advanced"],
tools=["web_search"],
)
new_skill = Skill(config=new_config)
skill_registry.register(new_skill)
# 5. Verify version history
versions = skill_registry.get_versions("web_search")
assert "1.0.0" in versions
assert "2.0.0" in versions
# Default should point to latest version
current = skill_registry.get("web_search")
assert current.version == "2.0.0"
# Can still get specific version
v1 = skill_registry.get("web_search", version="1.0.0")
assert v1.version == "1.0.0"
# 6. Verify capability query includes new version
advanced_skills = skill_registry.query_by_capability("advanced")
assert len(advanced_skills) >= 1
assert advanced_skills[0].version == "2.0.0"
# 7. Test dependency health check
# Register a skill with a missing dependency
dep_config = SkillConfig(
name="dependent_skill",
agent_type="dependent",
version="1.0.0",
description="Skill with missing dependency",
task_mode="tool_call",
tools=["dependent_tool"],
dependencies=[
{"name": "web_search", "type": "skill", "required": True},
{"name": "nonexistent_skill", "type": "skill", "required": True},
],
)
dep_skill = Skill(config=dep_config)
skill_registry.register(dep_skill)
dep_health = skill_registry.health_check("dependent_skill")
assert len(dep_health) == 1
assert dep_health[0].healthy is False
assert "nonexistent_skill" in dep_health[0].missing_dependencies
# ── Cross-Capability Integration ─────────────────────────
@pytest.mark.asyncio
async def test_goal_planner_with_experience_and_pitfall(experience_store):
"""跨能力集成GoalPlanner + ExperienceStore + PitfallDetector
验证目标驱动任务与自进化能力的协同工作:
1. GoalPlanner 生成计划
2. PitfallDetector 检测历史陷阱
3. ExperienceStore 提供经验参考
"""
# 1. Record historical failure experience
failure_exp = TaskExperience(
task_type="competitive_analysis",
goal="竞品调研分析",
steps_summary="数据采集: failed; 数据分析: skipped; 报告生成: skipped",
outcome="failure",
duration_seconds=60.0,
success_rate=0.0,
failure_reasons=["数据源不可用", "API 限流"],
optimization_tips=["使用缓存数据源", "添加限流处理"],
)
await experience_store.record_experience(failure_exp)
# 2. GoalPlanner generates plan for similar task
planner = GoalPlanner(max_parallel=5)
plan = await planner.generate_plan(
goal="调研3个竞品的市场策略并生成对比报告",
context={"task_type": "competitive_analysis"},
available_skills=["web_search", "data_analyzer", "report_generator"],
)
assert len(plan.steps) >= 2 # Parallel steps + summary
assert len(plan.parallel_groups) >= 1
# 3. PitfallDetector checks for pitfalls
detector = PitfallDetector(
experience_store=experience_store,
similarity_threshold=0.1,
)
warnings = await detector.check_pitfalls(
task_type="competitive_analysis",
planned_steps=plan.steps,
)
# Warnings may or may not be generated depending on keyword matching
# The important thing is the integration works without errors
assert isinstance(warnings, list)
# 4. Search for relevant experience before execution
relevant = await experience_store.search(
query="竞品调研分析",
top_k=3,
task_type="competitive_analysis",
)
assert len(relevant) >= 1
assert relevant[0].outcome == "failure"
assert len(relevant[0].optimization_tips) > 0
@pytest.mark.asyncio
async def test_plan_checker_with_experience_store(experience_store):
"""跨能力集成PlanChecker + ExperienceStore
验证复盘结果自动写入经验库:
1. PlanChecker 生成复盘报告
2. 复盘结果写入 ExperienceStore
3. 经验可被后续检索
"""
# 1. Create a plan and execution result
plan = ExecutionPlan(
goal="测试复盘经验写入",
steps=[
PlanStep(
step_id="step-0",
name="数据采集",
description="采集测试数据",
dependencies=[],
),
PlanStep(
step_id="step-1",
name="数据分析",
description="分析采集的数据",
dependencies=["step-0"],
),
],
parallel_groups=[["step-0"], ["step-1"]],
)
plan_result = PlanExecutionResult(
plan_id=plan.plan_id,
step_results={
"step-0": StepExecutionResult(
step_id="step-0",
status=PlanStepStatus.COMPLETED,
result={"data": "collected"},
duration_ms=100.0,
),
"step-1": StepExecutionResult(
step_id="step-1",
status=PlanStepStatus.COMPLETED,
result={"analysis": "done"},
duration_ms=200.0,
),
},
status=TaskStatus.COMPLETED,
total_duration_ms=300.0,
)
# 2. PlanChecker with ExperienceStore
checker = PlanChecker(experience_store=experience_store)
# Check each step
for step in plan.steps:
step_result = plan_result.step_results[step.step_id]
await checker.check_step(step, step_result)
# 3. Review plan - should write experience
report = await checker.review_plan(
plan, plan_result, task_type="test_task", goal="测试复盘经验写入"
)
assert report.outcome == "success"
assert report.success_rate == 1.0
# 4. Verify experience was recorded
experiences = await experience_store.search(
query="测试复盘经验写入",
top_k=5,
)
assert len(experiences) >= 1
assert experiences[0].goal == "测试复盘经验写入"
assert experiences[0].outcome == "success"
@pytest.mark.asyncio
async def test_multi_source_rag_with_workflow(local_rag, mock_embedder):
"""跨能力集成MultiSourceRAG + Workflow
验证知识库检索与工作流的协同:
1. 摄取文档到知识库
2. 创建使用知识库的工作流
3. 工作流执行中引用知识库结果
"""
# 1. Ingest documents
docs = [
Document(
doc_id="policy-1",
content="公司数据安全策略要求所有外部 API 调用必须经过审批。",
title="数据安全策略",
source_id="policy",
metadata={"format": "text", "category": "security"},
),
]
ids = await local_rag.ingest(docs)
assert len(ids) == 1
# 2. Query the knowledge base
results = await local_rag.query("外部 API 调用审批", top_k=3)
assert len(results) >= 1
assert "审批" in results[0].content
# 3. Create workflow that references knowledge base findings
store = WorkflowStore()
workflow = WorkflowDefinition(
workflow_id="wf-kb-integration",
name="知识库集成工作流",
stages=[
WorkflowStage(
name="query_kb",
agent="kb_agent",
action="query_knowledge_base",
type="skill",
),
WorkflowStage(
name="review_findings",
agent="reviewer",
action="review",
type="approval",
depends_on=["query_kb"],
),
WorkflowStage(
name="execute_action",
agent="executor",
action="execute",
type="skill",
depends_on=["review_findings"],
),
],
)
saved = store.save(workflow)
assert saved.workflow_id == "wf-kb-integration"
# 4. Execute workflow
execution = store.create_execution(workflow.workflow_id)
from agentkit.server.routes.workflows import _execute_workflow
# Set short approval timeout and handle approval
workflow.stages[1].config["approval_timeout"] = 5
async def _approve_kb_review():
for _ in range(100):
await asyncio.sleep(0.05)
upd = store.get_execution(execution.execution_id)
if upd and upd.status == "paused":
break
event_key = f"{execution.execution_id}:review_findings"
if event_key in store._approval_events:
execution.stage_results["review_findings"] = {
"status": "approved",
"approver": "test_user",
"comment": "Approved",
}
store._approval_events[event_key].set()
approve_task = asyncio.create_task(_approve_kb_review())
await _execute_workflow(workflow, execution, variables={}, store=store)
await approve_task
updated = store.get_execution(execution.execution_id)
assert updated.status == "completed"
assert len(updated.stage_results) == 3