"""Marketplace E2E 集成测试 - 多 Agent 市场架构端到端流程""" from __future__ import annotations import pytest from unittest.mock import AsyncMock, MagicMock from agentkit.chat.skill_routing import CostAwareRouter, SkillRoutingResult from agentkit.org.context import OrganizationContext, AgentProfile from agentkit.quality.alignment import AlignmentGuard, AlignmentConfig, CascadeAlert, ConstraintInjector from agentkit.marketplace.auction import AuctionHouse, Bid, AuctionResult from agentkit.marketplace.wealth import WealthTracker # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture def mock_llm_gateway(): """Mock LLMGateway for CostAwareRouter Layer 1 classification.""" gw = AsyncMock() response = MagicMock() response.content = '{"complexity": 0.5}' gw.chat = AsyncMock(return_value=response) return gw @pytest.fixture def mock_skill_registry(): """Mock SkillRegistry with no skills by default.""" registry = MagicMock() registry.list_skills.return_value = [] registry.get.side_effect = KeyError("not found") return registry @pytest.fixture def mock_intent_router(): """Mock IntentRouter that returns no match by default.""" router = AsyncMock() router.route = AsyncMock(return_value=None) return router # --------------------------------------------------------------------------- # Test 1: Simple chat routes to default agent (Layer 0) # --------------------------------------------------------------------------- class TestSimpleChatRoutesToDefault: """简单对话走 Layer 0 规则匹配,路由到默认 Agent""" @pytest.mark.asyncio async def test_greeting_routes_to_default(self, mock_skill_registry, mock_intent_router): router = CostAwareRouter(llm_gateway=None, org_context=None) result = await router.route( content="你好", skill_registry=mock_skill_registry, intent_router=mock_intent_router, default_tools=[], default_system_prompt="You are helpful", default_model="default", default_agent_name="default", ) assert result.match_method == "greeting" assert result.agent_name == "default" assert result.complexity == 0.0 assert result.matched is False @pytest.mark.asyncio async def test_chat_mode_routes_to_default(self, mock_skill_registry, mock_intent_router): router = CostAwareRouter(llm_gateway=None, org_context=None) result = await router.route( content="谢谢", skill_registry=mock_skill_registry, intent_router=mock_intent_router, default_tools=[], default_system_prompt="You are helpful", default_model="default", default_agent_name="default", ) assert result.match_method == "chat_mode" assert result.agent_name == "default" assert result.complexity == 0.0 # --------------------------------------------------------------------------- # Test 2: Complex task routes via capability matching # --------------------------------------------------------------------------- class TestCapabilityMatching: """高复杂度任务通过 OrganizationContext 能力匹配路由""" @pytest.mark.asyncio async def test_complex_task_routes_via_capability(self, mock_llm_gateway, mock_skill_registry, mock_intent_router): # Set up LLM to return high complexity high_response = MagicMock() high_response.content = '{"complexity": 0.9}' mock_llm_gateway.chat = AsyncMock(return_value=high_response) # Set up org_context with a capable agent org_context = OrganizationContext() org_context.register_agent(AgentProfile( name="research_agent", agent_type="react", capabilities=["research", "analysis"], skills=["research"], )) # Mock find_best_agent to return the research agent org_context.find_best_agent = MagicMock( return_value=org_context.get_agent_profile("research_agent") ) router = CostAwareRouter( llm_gateway=mock_llm_gateway, org_context=org_context, ) result = await router.route( content="请对市场趋势进行深度分析并给出投资建议", skill_registry=mock_skill_registry, intent_router=mock_intent_router, default_tools=[], default_system_prompt="You are helpful", default_model="default", default_agent_name="default", ) assert result.matched is True assert result.match_method == "capability" assert result.agent_name == "research_agent" assert result.complexity >= 0.7 # --------------------------------------------------------------------------- # Test 3: Alignment guard detects cascade risk # --------------------------------------------------------------------------- class TestAlignmentCascadeDetection: """AlignmentGuard 检测级联故障风险""" def test_cascade_alert_on_excessive_interactions(self): config = AlignmentConfig(cascade_max_interactions=3) guard = AlignmentGuard(config=config) # Record interactions below threshold for _ in range(3): alert = guard.record_interaction("session-1") assert alert is None # Next interaction should trigger alert alert = guard.record_interaction("session-1") assert alert is not None assert isinstance(alert, CascadeAlert) assert alert.alert_type == "interaction_limit" assert alert.current_value == 4 assert alert.threshold == 3 def test_cascade_alert_on_loop_depth(self): config = AlignmentConfig(cascade_max_depth=2) guard = AlignmentGuard(config=config) # Depth within threshold alert = guard.record_loop_depth("session-1", 2) assert alert is None # Depth exceeds threshold alert = guard.record_loop_depth("session-1", 3) assert alert is not None assert alert.alert_type == "loop_depth" assert alert.current_value == 3 assert alert.threshold == 2 def test_reset_session_clears_counts(self): config = AlignmentConfig(cascade_max_interactions=2) guard = AlignmentGuard(config=config) guard.record_interaction("session-1") guard.record_interaction("session-1") guard.record_interaction("session-1") # triggers alert assert guard.get_interaction_count("session-1") == 3 guard.reset_session("session-1") assert guard.get_interaction_count("session-1") == 0 # --------------------------------------------------------------------------- # Test 4: Transparency TRACE mode returns execution trace # --------------------------------------------------------------------------- class TestTransparencyTraceMode: """TRACE 透明度模式返回执行追踪""" @pytest.mark.asyncio async def test_trace_mode_populates_execution_trace(self, mock_skill_registry, mock_intent_router): router = CostAwareRouter(llm_gateway=None, org_context=None) result = await router.route( content="你好", skill_registry=mock_skill_registry, intent_router=mock_intent_router, default_tools=[], default_system_prompt="You are helpful", default_model="default", default_agent_name="default", transparency="TRACE", ) assert result.transparency_level == "TRACE" assert len(result.execution_trace) > 0 assert result.execution_trace[0]["layer"] == 0 @pytest.mark.asyncio async def test_silent_mode_no_trace(self, mock_skill_registry, mock_intent_router): router = CostAwareRouter(llm_gateway=None, org_context=None) result = await router.route( content="你好", skill_registry=mock_skill_registry, intent_router=mock_intent_router, default_tools=[], default_system_prompt="You are helpful", default_model="default", default_agent_name="default", transparency="SILENT", ) assert result.transparency_level == "SILENT" assert result.execution_trace == [] # --------------------------------------------------------------------------- # Test 5: Auction mode routes via auction # --------------------------------------------------------------------------- class TestAuctionMode: """拍卖模式通过 AuctionHouse 选择 Agent""" @pytest.mark.asyncio async def test_auction_selects_best_bidder(self): wealth_tracker = WealthTracker(initial_wealth=100.0) wealth_tracker.reward("agent_a", 50.0) # agent_a is richer auction_house = AuctionHouse(wealth_tracker=wealth_tracker) bids = [ Bid( agent_name="agent_a", architecture="react", estimated_steps=3, estimated_cost=0.5, confidence=0.9, payment_offer=1.0, capabilities=["research"], ), Bid( agent_name="agent_b", architecture="rewoo", estimated_steps=5, estimated_cost=0.8, confidence=0.7, payment_offer=0.5, capabilities=["research"], ), ] result = await auction_house.run_auction("research task", bids) assert result.winner is not None assert result.winner.agent_name == "agent_a" assert result.total_bidders == 2 @pytest.mark.asyncio async def test_auction_no_bidders(self): auction_house = AuctionHouse() result = await auction_house.run_auction("task", []) assert result.winner is None assert result.total_bidders == 0 @pytest.mark.asyncio async def test_bankrupt_agent_excluded(self): wealth_tracker = WealthTracker(initial_wealth=-150.0) auction_house = AuctionHouse(wealth_tracker=wealth_tracker) bids = [ Bid( agent_name="bankrupt_agent", architecture="react", estimated_steps=1, estimated_cost=0.1, confidence=0.9, payment_offer=1.0, ), ] result = await auction_house.run_auction("task", bids) assert result.winner is None assert "bankrupt" in result.selection_reason.lower() # --------------------------------------------------------------------------- # Test 6: Constraint injection works end-to-end # --------------------------------------------------------------------------- class TestConstraintInjection: """约束注入端到端测试""" def test_inject_constraints_into_input_data(self): config = AlignmentConfig(constraints=["不得泄露用户隐私", "禁止生成有害内容"]) guard = AlignmentGuard(config=config) input_data = {"content": "请帮我写一篇文章"} injected = guard.inject_constraints(input_data) assert "alignment_constraints" in injected assert "不得泄露用户隐私" in injected["alignment_constraints"] assert "禁止生成有害内容" in injected["alignment_constraints"] # Original data preserved assert injected["content"] == "请帮我写一篇文章" def test_inject_does_not_mutate_original(self): config = AlignmentConfig(constraints=["constraint_1"]) guard = AlignmentGuard(config=config) input_data = {"key": "value"} injected = guard.inject_constraints(input_data) assert "alignment_constraints" not in input_data assert "alignment_constraints" in injected # --------------------------------------------------------------------------- # Test 7: OrganizationContext builds from AgentPool # --------------------------------------------------------------------------- class TestOrganizationContextFromAgentPool: """OrganizationContext 从 AgentPool 构建""" def test_build_from_agent_pool_with_skills(self): # Mock AgentPool agent_pool = MagicMock() agent_pool.list_agents.return_value = [ {"name": "writer", "agent_type": "react"}, {"name": "analyst", "agent_type": "plan_exec"}, ] # Mock SkillRegistry — writer has a skill, analyst does not skill_registry = MagicMock() writer_skill = MagicMock() writer_config = MagicMock() writer_config.capabilities = [MagicMock(tag="writing"), MagicMock(tag="creative")] writer_config.execution_mode = "react" writer_config.llm = {"model": "gpt-4"} writer_config.max_concurrency = 2 writer_skill.config = writer_config def get_skill(name): if name == "writer": return writer_skill raise KeyError(name) skill_registry.get = MagicMock(side_effect=get_skill) org_context = OrganizationContext.from_agent_pool( agent_pool=agent_pool, skill_registry=skill_registry, ) profiles = org_context.list_agents() assert len(profiles) == 2 writer_profile = org_context.get_agent_profile("writer") assert writer_profile is not None assert writer_profile.agent_type == "react" assert "writing" in writer_profile.capabilities assert "creative" in writer_profile.capabilities assert writer_profile.model == "gpt-4" assert writer_profile.max_concurrency == 2 analyst_profile = org_context.get_agent_profile("analyst") assert analyst_profile is not None assert analyst_profile.agent_type == "plan_exec" # No skill found → default values assert analyst_profile.capabilities == [] assert analyst_profile.model == "default" def test_build_from_empty_agent_pool(self): agent_pool = MagicMock() agent_pool.list_agents.return_value = [] skill_registry = MagicMock() org_context = OrganizationContext.from_agent_pool( agent_pool=agent_pool, skill_registry=skill_registry, ) assert org_context.list_agents() == [] def test_find_best_agent_by_capability(self): org_context = OrganizationContext() org_context.register_agent(AgentProfile( name="researcher", agent_type="react", capabilities=["research", "analysis"], skills=["research"], current_load=0, )) org_context.register_agent(AgentProfile( name="writer", agent_type="react", capabilities=["writing", "creative"], skills=["writing"], current_load=2, )) # Find agent with research capability best = org_context.find_best_agent(["research"]) assert best is not None assert best.name == "researcher" # Find agent with both research and analysis best = org_context.find_best_agent(["research", "analysis"]) assert best is not None assert best.name == "researcher" # No agent with unknown capability best = org_context.find_best_agent(["coding"]) assert best is None # --------------------------------------------------------------------------- # Test 8: Full pipeline: Chat → Router → Agent → AlignmentGuard # --------------------------------------------------------------------------- class TestFullPipeline: """完整流水线: 用户消息 → CostAwareRouter → 技能匹配 → 约束注入 → 对齐检查""" @pytest.mark.asyncio async def test_full_pipeline_greeting(self): """简单问候走完整流水线""" # Setup org_context = OrganizationContext() alignment_config = AlignmentConfig( constraints=["不得包含敏感信息"], cascade_max_interactions=10, ) guard = AlignmentGuard(config=alignment_config) router = CostAwareRouter(llm_gateway=None, org_context=org_context) mock_skill_registry = MagicMock() mock_skill_registry.list_skills.return_value = [] mock_intent_router = AsyncMock() # Step 1: Route the message result = await router.route( content="你好", skill_registry=mock_skill_registry, intent_router=mock_intent_router, default_tools=[], default_system_prompt="You are helpful", default_model="default", default_agent_name="default", ) assert result.match_method == "greeting" assert result.agent_name == "default" # Step 2: Inject constraints input_data = {"content": result.clean_content} injected = guard.inject_constraints(input_data) assert "alignment_constraints" in injected # Step 3: Check alignment on simulated output output = {"result": "你好!有什么我可以帮助你的吗?"} check_result = await guard.check_output(output) assert check_result.passed is True # Step 4: Record interaction (no cascade) alert = guard.record_interaction("session-1") assert alert is None @pytest.mark.asyncio async def test_full_pipeline_with_constraint_violation(self): """输出违反约束时被检测到""" alignment_config = AlignmentConfig( constraints=["password", "secret_key"], ) guard = AlignmentGuard(config=alignment_config) # Output containing a constraint keyword output = {"result": "Your password is 123456"} check_result = await guard.check_output(output) assert check_result.passed is False assert len(check_result.violations) > 0 assert check_result.checked_by == "rule" @pytest.mark.asyncio async def test_full_pipeline_complex_task_with_alignment(self): """复杂任务走完整流水线:路由 → 能力匹配 → 约束注入 → 对齐检查""" # Setup LLM gateway returning high complexity mock_llm = AsyncMock() high_response = MagicMock() high_response.content = '{"complexity": 0.85}' mock_llm.chat = AsyncMock(return_value=high_response) # Setup org context with capable agent org_context = OrganizationContext() org_context.register_agent(AgentProfile( name="analyst", agent_type="react", capabilities=["analysis", "market_research"], skills=["market_analysis"], current_load=0, )) org_context.find_best_agent = MagicMock( return_value=org_context.get_agent_profile("analyst") ) alignment_config = AlignmentConfig( constraints=["不得提供具体投资建议"], cascade_max_interactions=5, ) guard = AlignmentGuard(config=alignment_config, llm_gateway=mock_llm) router = CostAwareRouter( llm_gateway=mock_llm, org_context=org_context, ) mock_skill_registry = MagicMock() mock_skill_registry.list_skills.return_value = [] mock_intent_router = AsyncMock() # Step 1: Route complex task result = await router.route( content="请分析当前AI行业的市场趋势", skill_registry=mock_skill_registry, intent_router=mock_intent_router, default_tools=[], default_system_prompt="You are a market analyst", default_model="default", default_agent_name="default", transparency="TRACE", ) assert result.matched is True assert result.match_method == "capability" assert result.agent_name == "analyst" assert result.complexity >= 0.7 assert len(result.execution_trace) > 0 # Step 2: Inject constraints input_data = {"content": result.clean_content} injected = guard.inject_constraints(input_data) assert "alignment_constraints" in injected # Step 3: Simulate agent output and check alignment safe_output = {"result": "AI行业目前呈现稳步增长趋势,主要驱动力来自大模型技术的突破。"} check_result = await guard.check_output(safe_output) assert check_result.passed is True # Step 4: Record interaction alert = guard.record_interaction("session-complex") assert alert is None # Under threshold @pytest.mark.asyncio async def test_full_pipeline_cascade_alert(self): """级联故障检测在完整流水线中触发""" alignment_config = AlignmentConfig( cascade_max_interactions=2, ) guard = AlignmentGuard(config=alignment_config) # Simulate multiple interactions guard.record_interaction("session-cascade") guard.record_interaction("session-cascade") alert = guard.record_interaction("session-cascade") assert alert is not None assert alert.alert_type == "interaction_limit" assert alert.current_value == 3