"""U15: End-to-end enterprise scenario validation 验证 Fischer AgentKit 7 项能力的端到端集成场景: - SC1: 目标驱动复杂任务(GoalPlanner → PlanExecutor → PlanChecker → ExperienceStore) - SC2: 知识库问答 + 系统操作(MultiSourceRetriever + ShellTool) - SC3: Workflow 人工审批场景(WorkflowStore + approval node) - SC4: 自进化经验积累(ExperienceStore → PitfallDetector → PathOptimizer) - SC5: 并行执行效率验证(PlanExecutor parallel groups) - SC6: Skill 注册与能力查询集成验证(SkillRegistry) """ from __future__ import annotations import asyncio import time from datetime import datetime, timezone from typing import Any from unittest.mock import AsyncMock import pytest from agentkit.core.goal_planner import GoalPlanner from agentkit.core.plan_checker import ( CheckResult, CheckStatus, PlanChecker, QualityGate, ) from agentkit.core.plan_executor import ( FailureAction, PlanExecutionResult, PlanExecutor, StepExecutionResult, ) from agentkit.core.plan_schema import ( ExecutionPlan, PlanStep, PlanStepStatus, SkillGap, SkillGapLevel, ) from agentkit.core.protocol import TaskMessage, TaskResult, TaskStatus from agentkit.evolution.experience_schema import TaskExperience from agentkit.evolution.experience_store import InMemoryExperienceStore from agentkit.evolution.path_optimizer import ExecutionPath, PathOptimizer from agentkit.evolution.pitfall_detector import PitfallDetector, WarningLevel from agentkit.memory.embedder import MockEmbedder from agentkit.memory.knowledge_base import Document from agentkit.memory.local_rag import InMemoryLocalRAGService from agentkit.memory.multi_source_retriever import MultiSourceRetriever from agentkit.orchestrator.workflow_schema import ( WorkflowDefinition, WorkflowExecution, WorkflowStage, ) from agentkit.server.routes.workflows import WorkflowStore from agentkit.skills.base import Skill, SkillConfig from agentkit.skills.registry import SkillRegistry from agentkit.skills.schema import CapabilityTag, DependencyDecl # ── Fixtures ────────────────────────────────────────────── class MockAgent: """Mock Agent for PlanExecutor integration tests""" def __init__(self, name: str = "mock_agent", output: dict | None = None): self.name = name self._output = output or {"result": "ok", "data": "mock output"} async def execute(self, task_msg: TaskMessage) -> TaskResult: await asyncio.sleep(0.01) # Simulate work return TaskResult( task_id=task_msg.task_id, agent_name=self.name, status="completed", output_data=self._output, error_message=None, started_at=datetime.now(timezone.utc), completed_at=datetime.now(timezone.utc), ) class MockAgentPool: """Mock AgentPool for PlanExecutor integration tests""" def __init__(self, agents: dict[str, MockAgent] | None = None): self._agents = agents or {} self._default_agent = MockAgent() def get_agent(self, name: str) -> MockAgent | None: return self._agents.get(name, self._default_agent) async def create_agent_from_skill(self, skill_name: str) -> MockAgent | None: return self._agents.get(skill_name, self._default_agent) def list_agents(self) -> list[dict[str, Any]]: agents_info = [ { "name": name, "agent_type": "mock", "description": f"Mock agent for {name}", } for name in self._agents ] if not agents_info: agents_info.append( { "name": "default", "agent_type": "mock", "description": "Default mock agent", } ) return agents_info @pytest.fixture def mock_agent_pool(): """Create a MockAgentPool with skills matching GoalPlanner keywords""" agents = { "web_search": MockAgent("web_search", {"result": "search results", "data": "SEO data"}), "seo_analyzer": MockAgent("seo_analyzer", {"result": "analysis", "data": "SEO analysis"}), "report_generator": MockAgent( "report_generator", {"result": "report", "data": "optimization report"} ), "data_analyzer": MockAgent("data_analyzer", {"result": "analysis", "data": "data analysis"}), } return MockAgentPool(agents) @pytest.fixture def experience_store(): """Create an InMemoryExperienceStore""" return InMemoryExperienceStore() @pytest.fixture def mock_embedder(): """Create a MockEmbedder""" return MockEmbedder(dimension=64) @pytest.fixture def local_rag(mock_embedder): """Create an InMemoryLocalRAGService""" return InMemoryLocalRAGService(embedder=mock_embedder) @pytest.fixture def skill_registry(): """Create a SkillRegistry with sample skills""" registry = SkillRegistry() # Register skills with capabilities skills_data = [ { "name": "web_search", "capabilities": ["search", "web"], "description": "Web search skill", }, { "name": "seo_analyzer", "capabilities": ["analysis", "seo"], "description": "SEO analysis skill", }, { "name": "report_generator", "capabilities": ["generation", "report"], "description": "Report generation skill", }, { "name": "data_analyzer", "capabilities": ["analysis", "data"], "description": "Data analysis skill", }, { "name": "terminal_tool", "capabilities": ["terminal", "execution"], "description": "Terminal execution skill", }, ] for skill_data in skills_data: config = SkillConfig( name=skill_data["name"], agent_type=skill_data["name"], description=skill_data["description"], task_mode="tool_call", capabilities=skill_data["capabilities"], tools=[skill_data["name"]], ) skill = Skill(config=config) registry.register(skill) return registry # ── SC1: Goal-Driven Complex Task ──────────────────────── @pytest.mark.asyncio async def test_goal_driven_seo_analysis(mock_agent_pool, experience_store): """SC1: 目标驱动的复杂任务端到端验证 场景:"分析竞品 SEO 策略并生成优化方案" - GoalPlanner 分解为并行竞品调研 → 数据分析 → 方案生成 - PlanExecutor 并行执行 - PlanChecker 验证每步产出 - ExperienceStore 记录经验 - 第二次执行可检索到历史经验 """ # 1. Setup: Create GoalPlanner with available skills available_skills = ["web_search", "seo_analyzer", "report_generator", "data_analyzer"] planner = GoalPlanner(max_parallel=5) # 2. Generate plan for SEO analysis goal goal = "分析竞品 SEO 策略并生成优化方案" plan = await planner.generate_plan( goal=goal, context={}, available_skills=available_skills, ) # 3. Verify plan structure assert plan.goal == goal assert len(plan.steps) >= 2 # At least parallel steps + summary assert len(plan.parallel_groups) >= 1 # Verify parallel group exists for competitor analyses # GoalPlanner detects "3" in "3 个竞品" pattern or "并" pattern first_group = plan.parallel_groups[0] assert len(first_group) >= 1 # At least one step in first group # 4. Confirm plan and execute plan.confirmed = True executor = PlanExecutor( agent_pool=mock_agent_pool, max_retries=1, step_timeout=10.0, ) original_task = TaskMessage( task_id="test-seo-task", agent_name="goal_driven_agent", task_type="complex_analysis", priority=1, input_data={"goal": goal}, callback_url=None, created_at=datetime.now(timezone.utc), ) plan_result = await executor.execute(plan, original_task) # 5. Verify PlanChecker validates results checker = PlanChecker() for step in plan.steps: step_result = plan_result.step_results.get(step.step_id) if step_result: check_result = await checker.check_step(step, step_result) # Completed steps should pass quality check if step_result.status == PlanStepStatus.COMPLETED: assert check_result.status in (CheckStatus.PASS, CheckStatus.FAIL) # 6. Generate review report report = await checker.review_plan( plan, plan_result, task_type="seo_analysis", goal=goal ) assert report.plan_id == plan.plan_id assert report.outcome in ("success", "partial", "failure") # 7. Record experience to ExperienceStore experience = TaskExperience( task_type="seo_analysis", goal=goal, steps_summary="; ".join( f"{s.name}: {s.status.value}" for s in plan.steps ), outcome=report.outcome, duration_seconds=report.total_duration_ms / 1000, success_rate=report.success_rate, failure_reasons=report.failure_reasons, optimization_tips=report.optimization_tips, ) exp_id = await experience_store.record_experience(experience) assert exp_id # 8. Run similar task again - verify experience is retrieved similar_results = await experience_store.search( query="竞品 SEO 分析", top_k=3, task_type="seo_analysis", ) assert len(similar_results) >= 1 assert similar_results[0].goal == goal # ── SC2: Knowledge Base Q&A + System Operation ─────────── @pytest.mark.asyncio async def test_knowledge_qa_with_system_operation(local_rag, mock_embedder): """SC2: 知识库问答+系统操作场景 - MultiSourceRetriever 从多个知识源检索 - InMemoryLocalRAGService 摄取和检索文档 - 检索结果包含来源追溯 """ # 1. Setup: Create MultiSourceRetriever with InMemoryLocalRAGService rag_service_1 = InMemoryLocalRAGService(embedder=mock_embedder) rag_service_2 = InMemoryLocalRAGService(embedder=mock_embedder) retriever = MultiSourceRetriever() retriever.register_source("internal_docs", rag_service_1) retriever.register_source("external_kb", rag_service_2) # 2. Ingest test documents into both sources docs_source_1 = [ Document( doc_id="doc-1", content="SEO 优化策略包括关键词研究、内容优化、外链建设和技术 SEO 四个核心方向。", title="SEO优化指南", source_id="internal", metadata={"format": "text", "department": "marketing"}, ), Document( doc_id="doc-2", content="竞品分析需要关注对手的关键词排名、内容策略和外链来源。", title="竞品分析方法论", source_id="internal", metadata={"format": "text", "department": "strategy"}, ), ] docs_source_2 = [ Document( doc_id="doc-3", content="技术 SEO 涵盖网站速度优化、结构化数据标记和移动端适配。", title="技术SEO手册", source_id="external", metadata={"format": "text", "source": "partner"}, ), ] ids_1 = await rag_service_1.ingest(docs_source_1) ids_2 = await rag_service_2.ingest(docs_source_2) assert len(ids_1) == 2 assert len(ids_2) == 1 # 3. Query with specified sources results = await retriever.search( query="SEO 优化策略", top_k=5, sources=["internal_docs", "external_kb"], ) # 4. Verify results include source attribution assert len(results) >= 1 for result in results: assert result.content # Has content assert result.source_id # Has source attribution assert result.score > 0 # Has relevance score assert result.source_name in ("internal_docs", "external_kb") # 5. Query from single source only single_source_results = await retriever.search( query="竞品分析", top_k=3, sources=["internal_docs"], ) assert len(single_source_results) >= 1 assert all(r.source_name == "internal_docs" for r in single_source_results) # 6. Verify list_all_sources all_sources = await retriever.list_all_sources() assert "internal_docs" in all_sources assert "external_kb" in all_sources # 7. Verify health check assert await rag_service_1.health_check() is True assert await rag_service_2.health_check() is True # ── SC3: Workflow with Approval ────────────────────────── @pytest.mark.asyncio async def test_workflow_with_approval(): """SC3: Workflow 人工审批场景 - 创建带审批节点的 WorkflowDefinition - 执行 Workflow → 在审批节点暂停 - 审批通过 → 继续执行 - 验证最终完成 """ # 1. Create WorkflowStore store = WorkflowStore() # 2. Create WorkflowDefinition with approval node workflow = WorkflowDefinition( workflow_id="wf-approval-test", name="审批流程测试", stages=[ WorkflowStage( name="data_collect", agent="data_collector", action="collect_data", type="skill", ), WorkflowStage( name="human_review", agent="reviewer", action="review_data", type="approval", config={"require_comment": True}, depends_on=["data_collect"], ), WorkflowStage( name="generate_report", agent="report_generator", action="generate_report", type="skill", depends_on=["human_review"], ), ], ) # 3. Save workflow saved = store.save(workflow) assert saved.workflow_id == "wf-approval-test" # 4. Create execution execution = store.create_execution(workflow.workflow_id) assert execution.status == "pending" assert execution.execution_id # 5. Execute workflow in background (approval stage will wait for event) from agentkit.server.routes.workflows import _execute_workflow # Use a short approval timeout for testing workflow.stages[1].config["approval_timeout"] = 5 async def _approve_after_pause(): """Wait for execution to pause, then approve.""" for _ in range(100): await asyncio.sleep(0.05) updated = store.get_execution(execution.execution_id) if updated and updated.status == "paused": break # Trigger approval event_key = f"{execution.execution_id}:human_review" if event_key in store._approval_events: execution.stage_results["human_review"] = { "status": "approved", "approver": "test_user", "comment": "Auto-approved in test", } store._approval_events[event_key].set() approve_task = asyncio.create_task(_approve_after_pause()) await _execute_workflow(workflow, execution, variables={}, store=store) await approve_task # 6. Verify execution completed updated = store.get_execution(execution.execution_id) assert updated is not None assert updated.status == "completed" # 7. Verify stage results assert "data_collect" in updated.stage_results assert "human_review" in updated.stage_results assert "generate_report" in updated.stage_results # 8. Verify approval stage was processed approval_result = updated.stage_results["human_review"] assert approval_result.get("status") in ("approved", "completed") # 9. Test second workflow with approval workflow2 = WorkflowDefinition( workflow_id="wf-manual-approval", name="手动审批流程", stages=[ WorkflowStage( name="step1", agent="agent1", action="do_step1", type="skill", ), WorkflowStage( name="approval_step", agent="reviewer", action="approve", type="approval", config={"approval_timeout": 5}, depends_on=["step1"], ), WorkflowStage( name="step2", agent="agent2", action="do_step2", type="skill", depends_on=["approval_step"], ), ], ) store.save(workflow2) execution2 = store.create_execution(workflow2.workflow_id) async def _approve2_after_pause(): for _ in range(100): await asyncio.sleep(0.05) updated2 = store.get_execution(execution2.execution_id) if updated2 and updated2.status == "paused": break event_key2 = f"{execution2.execution_id}:approval_step" if event_key2 in store._approval_events: execution2.stage_results["approval_step"] = { "status": "approved", "approver": "user", "comment": "LGTM", } store._approval_events[event_key2].set() approve_task2 = asyncio.create_task(_approve2_after_pause()) await _execute_workflow(workflow2, execution2, variables={}, store=store) await approve_task2 updated2 = store.get_execution(execution2.execution_id) assert updated2 is not None assert updated2.status == "completed" # Verify approval was recorded paused_exec = store.get_execution(execution2.execution_id) assert paused_exec.stage_results["approval_step"]["status"] == "approved" assert paused_exec.stage_results["approval_step"]["approver"] == "user" # ── SC4: Self-Evolution Experience Accumulation ────────── @pytest.mark.asyncio async def test_self_evolution_experience_accumulation(experience_store): """SC4: 自进化经验积累场景 - 执行任务 → 记录经验 - 执行相似任务 → PitfallDetector 预警 - 执行更好路径 → PathOptimizer 更新推荐路径 """ # 1. Record a failure experience (API timeout) failure_exp = TaskExperience( task_type="api_integration", goal="调用第三方 API 获取数据", steps_summary="调用 API: failed; 数据解析: skipped", outcome="failure", duration_seconds=30.0, success_rate=0.0, failure_reasons=["API 调用超时", "连接被拒绝"], optimization_tips=["增加超时时间", "添加重试机制"], ) exp_id = await experience_store.record_experience(failure_exp) assert exp_id # 2. Create PitfallDetector detector = PitfallDetector( experience_store=experience_store, similarity_threshold=0.1, # Low threshold for testing ) # 3. Check pitfalls for similar task - should warn # Create a PlanStep that resembles the failed step from agentkit.core.plan_schema import PlanStep similar_steps = [ PlanStep( step_id="step-0", name="调用 API", description="调用第三方 API 获取数据", dependencies=[], ), ] warnings = await detector.check_pitfalls( task_type="api_integration", planned_steps=similar_steps, ) # PitfallDetector should detect the historical failure # Note: depends on keyword matching between step names and failure stats # The step name "调用 API" should match historical failures # 4. Record a successful experience with better path success_exp = TaskExperience( task_type="api_integration", goal="调用第三方 API 获取数据(带重试)", steps_summary="调用 API (带重试): success; 数据解析: success", outcome="success", duration_seconds=15.0, success_rate=1.0, failure_reasons=[], optimization_tips=["重试机制有效"], ) await experience_store.record_experience(success_exp) # 5. PathOptimizer evaluates new path optimizer = PathOptimizer( experience_store=experience_store, min_sample_count=1, # Low threshold for testing ) # First path: slow and unreliable old_path = ExecutionPath( path_id="path-old", task_type="api_integration", steps=["调用 API", "数据解析"], total_duration=30.0, success_rate=0.3, sample_count=5, is_recommended=True, ) result = await optimizer.evaluate_and_update("api_integration", old_path) assert result.updated # No existing path → set as recommended # New path: faster and more reliable new_path = ExecutionPath( path_id="path-new", task_type="api_integration", steps=["调用 API (带重试)", "数据解析"], total_duration=15.0, success_rate=0.95, sample_count=5, ) result = await optimizer.evaluate_and_update("api_integration", new_path) assert result.updated # Success rate significantly improved assert result.new_path.is_recommended assert result.new_path.success_rate > old_path.success_rate # 6. Verify new path is recommended for similar tasks recommended = optimizer.get_recommended_path("api_integration") assert recommended is not None assert recommended.path_id == "path-new" assert recommended.is_recommended is True # ── SC5: Parallel Execution Efficiency ─────────────────── @pytest.mark.asyncio async def test_parallel_execution_efficiency(): """SC5: 并行执行效率验证 - 创建包含 3 个独立步骤的 ExecutionPlan - 每步耗时约 0.2s - 验证并行执行总时间 < 串行总时间 """ class SlowAgent: """Agent that takes a fixed time to execute""" def __init__(self, delay: float = 0.2): self.name = "slow_agent" self._delay = delay async def execute(self, task_msg: TaskMessage) -> TaskResult: await asyncio.sleep(self._delay) return TaskResult( task_id=task_msg.task_id, agent_name=self.name, status="completed", output_data={"result": "done"}, error_message=None, started_at=datetime.now(timezone.utc), completed_at=datetime.now(timezone.utc), ) class SlowAgentPool: def __init__(self, delay: float = 0.2): self._agent = SlowAgent(delay) self._delay = delay def get_agent(self, name: str) -> SlowAgent: return self._agent async def create_agent_from_skill(self, skill_name: str) -> SlowAgent: return self._agent def list_agents(self) -> list[dict[str, Any]]: return [{"name": "slow_agent", "agent_type": "mock", "description": "Slow mock"}] # 1. Create ExecutionPlan with 3 parallel steps step_delay = 0.2 pool = SlowAgentPool(delay=step_delay) plan = ExecutionPlan( goal="并行效率测试", steps=[ PlanStep( step_id="step-0", name="并行任务A", description="独立并行任务A", dependencies=[], parallel_group=0, required_skills=["data_analyzer"], ), PlanStep( step_id="step-1", name="并行任务B", description="独立并行任务B", dependencies=[], parallel_group=0, required_skills=["data_analyzer"], ), PlanStep( step_id="step-2", name="并行任务C", description="独立并行任务C", dependencies=[], parallel_group=0, required_skills=["data_analyzer"], ), ], parallel_groups=[["step-0", "step-1", "step-2"]], confirmed=True, ) # 2. Execute with PlanExecutor executor = PlanExecutor( agent_pool=pool, max_retries=0, step_timeout=5.0, ) original_task = TaskMessage( task_id="parallel-test", agent_name="test", task_type="parallel", priority=1, input_data={"goal": "并行效率测试"}, callback_url=None, created_at=datetime.now(timezone.utc), ) start = time.monotonic() result = await executor.execute(plan, original_task) elapsed = time.monotonic() - start # 3. Verify parallel execution efficiency # 3 parallel steps of 0.2s each should complete in ~0.2s (parallel), not ~0.6s (serial) serial_time = step_delay * 3 assert elapsed < serial_time * 0.8, ( f"Parallel execution took {elapsed:.2f}s, " f"expected less than {serial_time * 0.8:.2f}s (80% of serial {serial_time:.2f}s)" ) # 4. Verify all steps completed completed_count = len(result.completed_steps) assert completed_count == 3, f"Expected 3 completed steps, got {completed_count}" # ── SC6: Skill Registry Integration ────────────────────── @pytest.mark.asyncio async def test_skill_registry_integration(skill_registry): """SC6: Skill 注册与能力查询集成验证 - 注册带能力的 Skill - 按能力标签查询 - 健康检查 - 版本管理 """ # 1. Verify skills are registered all_skills = skill_registry.list_skills() assert len(all_skills) >= 5 # 2. Query by capability tag analysis_skills = skill_registry.query_by_capability("analysis") assert len(analysis_skills) >= 2 # seo_analyzer + data_analyzer search_skills = skill_registry.query_by_capability("search") assert len(search_skills) >= 1 # web_search # 3. Verify health check health_results = skill_registry.health_check() assert len(health_results) >= 5 # All skills should be healthy (no required dependencies missing) for result in health_results: assert result.healthy # 4. Register new version of existing skill new_config = SkillConfig( name="web_search", agent_type="web_search", version="2.0.0", description="Enhanced web search skill v2", task_mode="tool_call", capabilities=["search", "web", "advanced"], tools=["web_search"], ) new_skill = Skill(config=new_config) skill_registry.register(new_skill) # 5. Verify version history versions = skill_registry.get_versions("web_search") assert "1.0.0" in versions assert "2.0.0" in versions # Default should point to latest version current = skill_registry.get("web_search") assert current.version == "2.0.0" # Can still get specific version v1 = skill_registry.get("web_search", version="1.0.0") assert v1.version == "1.0.0" # 6. Verify capability query includes new version advanced_skills = skill_registry.query_by_capability("advanced") assert len(advanced_skills) >= 1 assert advanced_skills[0].version == "2.0.0" # 7. Test dependency health check # Register a skill with a missing dependency dep_config = SkillConfig( name="dependent_skill", agent_type="dependent", version="1.0.0", description="Skill with missing dependency", task_mode="tool_call", tools=["dependent_tool"], dependencies=[ {"name": "web_search", "type": "skill", "required": True}, {"name": "nonexistent_skill", "type": "skill", "required": True}, ], ) dep_skill = Skill(config=dep_config) skill_registry.register(dep_skill) dep_health = skill_registry.health_check("dependent_skill") assert len(dep_health) == 1 assert dep_health[0].healthy is False assert "nonexistent_skill" in dep_health[0].missing_dependencies # ── Cross-Capability Integration ───────────────────────── @pytest.mark.asyncio async def test_goal_planner_with_experience_and_pitfall(experience_store): """跨能力集成:GoalPlanner + ExperienceStore + PitfallDetector 验证目标驱动任务与自进化能力的协同工作: 1. GoalPlanner 生成计划 2. PitfallDetector 检测历史陷阱 3. ExperienceStore 提供经验参考 """ # 1. Record historical failure experience failure_exp = TaskExperience( task_type="competitive_analysis", goal="竞品调研分析", steps_summary="数据采集: failed; 数据分析: skipped; 报告生成: skipped", outcome="failure", duration_seconds=60.0, success_rate=0.0, failure_reasons=["数据源不可用", "API 限流"], optimization_tips=["使用缓存数据源", "添加限流处理"], ) await experience_store.record_experience(failure_exp) # 2. GoalPlanner generates plan for similar task planner = GoalPlanner(max_parallel=5) plan = await planner.generate_plan( goal="调研3个竞品的市场策略并生成对比报告", context={"task_type": "competitive_analysis"}, available_skills=["web_search", "data_analyzer", "report_generator"], ) assert len(plan.steps) >= 2 # Parallel steps + summary assert len(plan.parallel_groups) >= 1 # 3. PitfallDetector checks for pitfalls detector = PitfallDetector( experience_store=experience_store, similarity_threshold=0.1, ) warnings = await detector.check_pitfalls( task_type="competitive_analysis", planned_steps=plan.steps, ) # Warnings may or may not be generated depending on keyword matching # The important thing is the integration works without errors assert isinstance(warnings, list) # 4. Search for relevant experience before execution relevant = await experience_store.search( query="竞品调研分析", top_k=3, task_type="competitive_analysis", ) assert len(relevant) >= 1 assert relevant[0].outcome == "failure" assert len(relevant[0].optimization_tips) > 0 @pytest.mark.asyncio async def test_plan_checker_with_experience_store(experience_store): """跨能力集成:PlanChecker + ExperienceStore 验证复盘结果自动写入经验库: 1. PlanChecker 生成复盘报告 2. 复盘结果写入 ExperienceStore 3. 经验可被后续检索 """ # 1. Create a plan and execution result plan = ExecutionPlan( goal="测试复盘经验写入", steps=[ PlanStep( step_id="step-0", name="数据采集", description="采集测试数据", dependencies=[], ), PlanStep( step_id="step-1", name="数据分析", description="分析采集的数据", dependencies=["step-0"], ), ], parallel_groups=[["step-0"], ["step-1"]], ) plan_result = PlanExecutionResult( plan_id=plan.plan_id, step_results={ "step-0": StepExecutionResult( step_id="step-0", status=PlanStepStatus.COMPLETED, result={"data": "collected"}, duration_ms=100.0, ), "step-1": StepExecutionResult( step_id="step-1", status=PlanStepStatus.COMPLETED, result={"analysis": "done"}, duration_ms=200.0, ), }, status=TaskStatus.COMPLETED, total_duration_ms=300.0, ) # 2. PlanChecker with ExperienceStore checker = PlanChecker(experience_store=experience_store) # Check each step for step in plan.steps: step_result = plan_result.step_results[step.step_id] await checker.check_step(step, step_result) # 3. Review plan - should write experience report = await checker.review_plan( plan, plan_result, task_type="test_task", goal="测试复盘经验写入" ) assert report.outcome == "success" assert report.success_rate == 1.0 # 4. Verify experience was recorded experiences = await experience_store.search( query="测试复盘经验写入", top_k=5, ) assert len(experiences) >= 1 assert experiences[0].goal == "测试复盘经验写入" assert experiences[0].outcome == "success" @pytest.mark.asyncio async def test_multi_source_rag_with_workflow(local_rag, mock_embedder): """跨能力集成:MultiSourceRAG + Workflow 验证知识库检索与工作流的协同: 1. 摄取文档到知识库 2. 创建使用知识库的工作流 3. 工作流执行中引用知识库结果 """ # 1. Ingest documents docs = [ Document( doc_id="policy-1", content="公司数据安全策略要求所有外部 API 调用必须经过审批。", title="数据安全策略", source_id="policy", metadata={"format": "text", "category": "security"}, ), ] ids = await local_rag.ingest(docs) assert len(ids) == 1 # 2. Query the knowledge base results = await local_rag.query("外部 API 调用审批", top_k=3) assert len(results) >= 1 assert "审批" in results[0].content # 3. Create workflow that references knowledge base findings store = WorkflowStore() workflow = WorkflowDefinition( workflow_id="wf-kb-integration", name="知识库集成工作流", stages=[ WorkflowStage( name="query_kb", agent="kb_agent", action="query_knowledge_base", type="skill", ), WorkflowStage( name="review_findings", agent="reviewer", action="review", type="approval", depends_on=["query_kb"], ), WorkflowStage( name="execute_action", agent="executor", action="execute", type="skill", depends_on=["review_findings"], ), ], ) saved = store.save(workflow) assert saved.workflow_id == "wf-kb-integration" # 4. Execute workflow execution = store.create_execution(workflow.workflow_id) from agentkit.server.routes.workflows import _execute_workflow # Set short approval timeout and handle approval workflow.stages[1].config["approval_timeout"] = 5 async def _approve_kb_review(): for _ in range(100): await asyncio.sleep(0.05) upd = store.get_execution(execution.execution_id) if upd and upd.status == "paused": break event_key = f"{execution.execution_id}:review_findings" if event_key in store._approval_events: execution.stage_results["review_findings"] = { "status": "approved", "approver": "test_user", "comment": "Approved", } store._approval_events[event_key].set() approve_task = asyncio.create_task(_approve_kb_review()) await _execute_workflow(workflow, execution, variables={}, store=store) await approve_task updated = store.get_execution(execution.execution_id) assert updated.status == "completed" assert len(updated.stage_results) == 3