"""Expert 运行时包装器单元测试""" from __future__ import annotations from unittest.mock import AsyncMock, MagicMock, patch import pytest from agentkit.core.config_driven import ConfigDrivenAgent from agentkit.core.handoff_transport import InProcessHandoffTransport from agentkit.core.shared_workspace import SharedWorkspace from agentkit.experts.config import ExpertConfig from agentkit.experts.expert import Expert # ── 辅助函数 ────────────────────────────────────────────── def _make_expert_config( name: str = "test_expert", agent_type: str = "expert", persona: str = "测试专家", thinking_style: str = "逻辑推理", bound_skills: list[str] | None = None, is_lead: bool = False, color: str = "#1890ff", avatar: str = "", **kwargs, ) -> ExpertConfig: """创建测试用 ExpertConfig 实例""" return ExpertConfig( name=name, agent_type=agent_type, persona=persona, thinking_style=thinking_style, bound_skills=bound_skills or ["skill_a"], is_lead=is_lead, color=color, avatar=avatar, task_mode="llm_generate", prompt={"identity": "测试"}, **kwargs, ) def _make_mock_agent() -> MagicMock: """创建 mock ConfigDrivenAgent""" agent = MagicMock(spec=ConfigDrivenAgent) agent.name = "test_expert" agent._prompt_template = None return agent def _make_mock_pool() -> AsyncMock: """创建 mock AgentPool""" pool = AsyncMock() pool.create_agent = AsyncMock(return_value=_make_mock_agent()) pool.remove_agent = AsyncMock() return pool # ── Expert 创建测试 ─────────────────────────────────────── class TestExpertCreate: """Expert.create 工厂方法测试""" @pytest.mark.asyncio async def test_create_expert_via_pool(self): """通过 AgentPool 创建 Expert""" config = _make_expert_config() pool = _make_mock_pool() expert = await Expert.create(config=config, pool=pool) pool.create_agent.assert_awaited_once_with(config) assert expert.config is config assert expert.agent is pool.create_agent.return_value assert expert.is_active is True assert expert.team_id is None @pytest.mark.asyncio async def test_create_with_handoff_transport(self): """创建 Expert 时传入 handoff_transport""" config = _make_expert_config() pool = _make_mock_pool() transport = MagicMock(spec=InProcessHandoffTransport) expert = await Expert.create( config=config, pool=pool, handoff_transport=transport ) assert expert._handoff_transport is transport @pytest.mark.asyncio async def test_create_with_workspace(self): """创建 Expert 时传入 workspace""" config = _make_expert_config() pool = _make_mock_pool() workspace = MagicMock(spec=SharedWorkspace) expert = await Expert.create( config=config, pool=pool, workspace=workspace ) assert expert._workspace is workspace @pytest.mark.asyncio async def test_create_with_team_context_injects_into_prompt(self): """team_context 注入到 Agent 的 prompt 中""" config = _make_expert_config() pool = _make_mock_pool() mock_agent = _make_mock_agent() # 模拟有 _prompt_template 的 Agent mock_template = MagicMock() mock_sections = MagicMock() mock_sections.context = "原始上下文" mock_template._sections = mock_sections mock_agent._prompt_template = mock_template pool.create_agent = AsyncMock(return_value=mock_agent) expert = await Expert.create( config=config, pool=pool, team_context="你是团队中的分析专家", ) # 验证 team_context 被注入到 context 段落 assert "你是团队中的分析专家" in mock_sections.context assert "原始上下文" in mock_sections.context @pytest.mark.asyncio async def test_create_with_team_context_no_existing_context(self): """team_context 注入到空 context 段落""" config = _make_expert_config() pool = _make_mock_pool() mock_agent = _make_mock_agent() mock_template = MagicMock() mock_sections = MagicMock() mock_sections.context = "" mock_template._sections = mock_sections mock_agent._prompt_template = mock_template pool.create_agent = AsyncMock(return_value=mock_agent) expert = await Expert.create( config=config, pool=pool, team_context="你是团队中的分析专家", ) assert mock_sections.context == "你是团队中的分析专家" # ── Expert.send_message 测试 ────────────────────────────── class TestExpertSendMessage: """Expert.send_message 消息广播测试""" @pytest.mark.asyncio async def test_send_message_broadcasts_to_transport(self): """send_message 通过 handoff_transport 广播消息""" config = _make_expert_config() agent = _make_mock_agent() transport = AsyncMock(spec=InProcessHandoffTransport) expert = Expert(config=config, agent=agent, handoff_transport=transport) await expert.send_message("team:chat", "你好团队") transport.send.assert_awaited_once() call_args = transport.send.call_args assert call_args[0][0] == "team:chat" message = call_args[0][1] assert message["expert_id"] == "test_expert" assert message["content"] == "你好团队" assert message["type"] == "chat" @pytest.mark.asyncio async def test_send_message_no_transport(self): """没有 handoff_transport 时 send_message 不报错(静默忽略)""" config = _make_expert_config() agent = _make_mock_agent() expert = Expert(config=config, agent=agent, handoff_transport=None) # 不应抛出异常 await expert.send_message("team:chat", "你好") @pytest.mark.asyncio async def test_send_message_long_content_stores_in_workspace(self): """长内容(>500 字符)存储到 SharedWorkspace,广播摘要""" config = _make_expert_config() agent = _make_mock_agent() transport = AsyncMock(spec=InProcessHandoffTransport) workspace = AsyncMock(spec=SharedWorkspace) expert = Expert( config=config, agent=agent, handoff_transport=transport, workspace=workspace, ) long_content = "x" * 600 summary = "长内容摘要" await expert.send_message("team:chat", long_content, summary=summary) # 验证 workspace.write 被调用存储完整内容 workspace.write.assert_awaited_once() write_call = workspace.write.call_args assert write_call[0][1] == long_content # value = 完整内容 assert write_call[0][2] == "test_expert" # agent_id # 验证 transport.send 广播的是摘要 transport.send.assert_awaited_once() message = transport.send.call_args[0][1] assert message["content"] == summary @pytest.mark.asyncio async def test_send_message_short_content_no_workspace(self): """短内容(<=500 字符)不存储到 workspace""" config = _make_expert_config() agent = _make_mock_agent() transport = AsyncMock(spec=InProcessHandoffTransport) workspace = AsyncMock(spec=SharedWorkspace) expert = Expert( config=config, agent=agent, handoff_transport=transport, workspace=workspace, ) await expert.send_message("team:chat", "短消息") workspace.write.assert_not_awaited() @pytest.mark.asyncio async def test_send_message_long_content_without_summary(self): """长内容没有 summary 时,广播截断内容(前500字符),但仍存储到 workspace 防止数据丢失""" config = _make_expert_config() agent = _make_mock_agent() transport = AsyncMock(spec=InProcessHandoffTransport) workspace = AsyncMock(spec=SharedWorkspace) expert = Expert( config=config, agent=agent, handoff_transport=transport, workspace=workspace, ) long_content = "x" * 600 await expert.send_message("team:chat", long_content) # 即使没有 summary,长内容也会存储到 workspace 防止数据丢失 workspace.write.assert_awaited_once() # 广播的是截断内容 transport.send.assert_awaited_once() message = transport.send.call_args[0][1] assert message["content"] == long_content[:500] # ── Expert.request_assist 测试 ──────────────────────────── class TestExpertRequestAssist: """Expert.request_assist 协助请求测试""" @pytest.mark.asyncio async def test_request_assist_sends_handoff_message(self): """request_assist 通过 handoff_transport 发送协助请求""" config = _make_expert_config() agent = _make_mock_agent() transport = AsyncMock(spec=InProcessHandoffTransport) expert = Expert(config=config, agent=agent, handoff_transport=transport) await expert.request_assist( target_expert="analyst", task="分析数据", reason="需要专业分析" ) transport.send.assert_awaited_once() call_args = transport.send.call_args assert call_args[0][0] == "expert:analyst:handoff" message = call_args[0][1] assert message["source_expert"] == "test_expert" assert message["target_expert"] == "analyst" assert message["task"] == "分析数据" assert message["reason"] == "需要专业分析" assert message["type"] == "assist_request" @pytest.mark.asyncio async def test_request_assist_raises_without_transport(self): """没有 handoff_transport 时 request_assist 抛出 RuntimeError""" config = _make_expert_config() agent = _make_mock_agent() expert = Expert(config=config, agent=agent, handoff_transport=None) with pytest.raises(RuntimeError, match="No handoff transport configured"): await expert.request_assist("analyst", "分析数据") # ── Expert.propose_plan_modification 测试 ───────────────── class TestExpertProposePlanModification: """Expert.propose_plan_modification 计划修改提案测试""" @pytest.mark.asyncio async def test_propose_plan_modification_sends_proposal(self): """propose_plan_modification 通过 handoff_transport 发送提案""" config = _make_expert_config() agent = _make_mock_agent() transport = AsyncMock(spec=InProcessHandoffTransport) expert = Expert(config=config, agent=agent, handoff_transport=transport) modification = {"action": "add_step", "step": "验证结果"} await expert.propose_plan_modification(plan_id="plan_1", modification=modification) transport.send.assert_awaited_once() call_args = transport.send.call_args assert call_args[0][0] == "team:plan_modifications" message = call_args[0][1] assert message["proposing_expert"] == "test_expert" assert message["plan_id"] == "plan_1" assert message["modification"] == modification assert message["type"] == "plan_modification_proposal" @pytest.mark.asyncio async def test_propose_plan_modification_raises_without_transport(self): """没有 handoff_transport 时 propose_plan_modification 抛出 RuntimeError""" config = _make_expert_config() agent = _make_mock_agent() expert = Expert(config=config, agent=agent, handoff_transport=None) with pytest.raises(RuntimeError, match="No handoff transport configured"): await expert.propose_plan_modification("plan_1", {"action": "add"}) # ── Expert.get_capabilities_summary 测试 ────────────────── class TestExpertGetCapabilitiesSummary: """Expert.get_capabilities_summary 能力摘要测试""" def test_returns_correct_dict(self): """返回正确的能力摘要字典""" config = _make_expert_config( name="analyst", persona="数据分析师", thinking_style="逻辑推理", bound_skills=["data_query", "chart_gen"], is_lead=True, color="#52c41a", avatar="C", ) agent = _make_mock_agent() expert = Expert(config=config, agent=agent) summary = expert.get_capabilities_summary() assert summary == { "name": "analyst", "persona": "数据分析师", "thinking_style": "逻辑推理", "bound_skills": ["data_query", "chart_gen"], "is_lead": True, "color": "#52c41a", "avatar": "C", } # ── Expert.destroy 测试 ─────────────────────────────────── class TestExpertDestroy: """Expert.destroy 销毁测试""" @pytest.mark.asyncio async def test_destroy_removes_agent_from_pool(self): """destroy 从 AgentPool 中移除 Agent""" config = _make_expert_config(name="removable_expert") agent = _make_mock_agent() pool = _make_mock_pool() expert = Expert(config=config, agent=agent) await expert.destroy(pool) pool.remove_agent.assert_awaited_once_with("removable_expert") @pytest.mark.asyncio async def test_is_active_after_destroy(self): """destroy 后 is_active 为 False""" config = _make_expert_config() agent = _make_mock_agent() pool = _make_mock_pool() expert = Expert(config=config, agent=agent) assert expert.is_active is True await expert.destroy(pool) assert expert.is_active is False # ── Expert.team_id 测试 ─────────────────────────────────── class TestExpertTeamId: """Expert.team_id 属性测试""" def test_team_id_defaults_to_none(self): """team_id 默认为 None""" config = _make_expert_config() agent = _make_mock_agent() expert = Expert(config=config, agent=agent) assert expert.team_id is None def test_team_id_setter(self): """team_id setter 正确设置值""" config = _make_expert_config() agent = _make_mock_agent() expert = Expert(config=config, agent=agent) expert.team_id = "team_alpha" assert expert.team_id == "team_alpha"