feat(server): U6 新增 _execute_team_collab 集成 @team 流水线到 WebSocket

This commit is contained in:
chiguyong 2026-06-18 02:08:29 +08:00
parent ee6d16345c
commit 1e818b507d
2 changed files with 640 additions and 0 deletions

View File

@ -307,6 +307,148 @@ async def _execute_board_meeting(
return True
async def _execute_team_collab(
websocket: WebSocket,
session_id: str,
content: str,
sm: SessionManager,
) -> bool:
"""Intercept @team prefix and execute a pipeline team collaboration.
Returns True if the input was handled as a team collaboration (caller should return),
False if the input should continue through the normal chat pipeline.
Flow:
1. Resolve @team routing via ExpertTeamRouter
2. Create ExpertTeam with lead + member configs
3. Register handoff_transport handler to relay events to WebSocket
4. Execute TeamOrchestrator (pipeline mode)
5. Send final synthesis as final_answer
6. Persist user task + final result to session history
"""
from agentkit.experts.router import ExpertTeamRouter
from agentkit.experts.team import ExpertTeam
from agentkit.experts.orchestrator import TeamOrchestrator
app_state = websocket.app.state
# Resolve ExpertTemplateRegistry from app.state (loaded at startup)
template_registry = getattr(app_state, "expert_template_registry", None)
if template_registry is None:
from agentkit.experts.registry import ExpertTemplateRegistry
template_registry = ExpertTemplateRegistry()
team_router = ExpertTeamRouter(template_registry=template_registry)
routing_result = team_router.resolve(content)
if not routing_result.matched:
return False # Not a @team input, continue normal pipeline
if not routing_result.task_content:
await websocket.send_json(
{"type": "error", "data": {"message": "团队任务需要一个描述,例如:@team 开发用户登录功能"}}
)
return True
# Resolve expert configs from specified experts or default dev_team template
expert_configs = team_router.resolve_expert_configs(routing_result.specified_experts)
if not expert_configs:
await websocket.send_json(
{"type": "error", "data": {"message": "无法解析团队成员,请检查专家名称或模板配置"}}
)
return True
# Split configs: first is lead, rest are members (V2 verification)
lead_config = expert_configs[0]
member_configs = expert_configs[1:] if len(expert_configs) > 1 else []
# Create ExpertTeam
team = ExpertTeam(
pool=app_state.agent_pool,
template_registry=template_registry,
)
# Register handoff_transport handler to relay team events to WebSocket
async def _relay_team_event(message: dict) -> None:
msg_type = message.get("type")
if not msg_type:
return
# Strip internal fields, keep only event data
event_data = {k: v for k, v in message.items() if k != "type"}
await emit_team_event(websocket, msg_type, event_data)
team.handoff_transport.register_handler(team.team_channel, _relay_team_event)
# Append user task to session history
await sm.append_message(
session_id=session_id,
role=MessageRole.USER,
content=content,
)
try:
await team.create_team(lead_config=lead_config, member_configs=member_configs)
orchestrator = TeamOrchestrator(team=team)
result = await orchestrator.execute(routing_result.task_content)
except Exception as e:
logger.error(f"Team collaboration failed for session {session_id}: {e}", exc_info=True)
await websocket.send_json(
{"type": "error", "data": {"message": f"团队协作执行失败: {str(e)[:200]}"}}
)
try:
await team.dissolve()
except Exception:
pass
return True
finally:
# Always remove handler to avoid leaks
try:
team.handoff_transport._handlers.pop(team.team_channel, None)
except Exception:
pass
# Build final answer text from synthesis result
final_result = result.get("result") or {}
final_content = final_result.get("content", "") if isinstance(final_result, dict) else str(final_result)
if not final_content:
# Fallback: use phase results if synthesis is empty
phase_results = result.get("phase_results") or {}
if phase_results:
parts = []
for phase_id, pr in phase_results.items():
if isinstance(pr, dict) and "content" in pr:
parts.append(f"### {pr.get('phase_name', phase_id)}\n\n{pr['content']}")
final_content = "\n\n".join(parts) if parts else "团队执行完成,但未生成最终结果。"
else:
final_content = "团队执行完成,但未生成最终结果。"
await websocket.send_json(
{
"type": "final_answer",
"content": final_content,
"is_final": True,
}
)
# Persist final synthesis as assistant message
await sm.append_message(
session_id=session_id,
role=MessageRole.ASSISTANT,
content=final_content,
agent_name="team_collab",
)
# Dissolve the team to release expert agents
try:
await team.dissolve()
except Exception as e:
logger.warning(f"Team dissolve failed: {e}")
return True
def _session_to_response(session) -> SessionResponse:
return SessionResponse(
session_id=session.session_id,
@ -637,6 +779,9 @@ async def _handle_chat_message(
Board Meeting mode: @board prefix is intercepted before RequestPreprocessor
and routed to BoardOrchestrator for multi-round group discussion.
Team Collaboration mode: @team prefix is intercepted before RequestPreprocessor
and routed to TeamOrchestrator for pipeline-based expert collaboration.
"""
from agentkit.chat.request_preprocessor import RequestPreprocessor
@ -644,6 +789,10 @@ async def _handle_chat_message(
if await _execute_board_meeting(websocket, session_id, content, sm):
return
# Team Collaboration mode: intercept @team prefix before any other preprocessing
if await _execute_team_collab(websocket, session_id, content, sm):
return
# Resolve Agent first (needed for default tools/prompt)
pool = websocket.app.state.agent_pool
session = await sm.get_session(session_id)

View File

@ -0,0 +1,491 @@
"""Tests for _execute_team_collab in chat.py (U6).
Tests cover:
- @team prefix triggers team collaboration (returns True)
- Non-@team input does not trigger (returns False)
- @team without task content sends error
- Team events are relayed to WebSocket via emit_team_event
- final_answer is sent after execution
- User message and final result are persisted to session history
- Team is dissolved after execution
- Execution failure sends error and dissolves team
- @team and @board do not interfere with each other
"""
from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from agentkit.experts.config import ExpertConfig, ExpertTemplate
from agentkit.experts.registry import ExpertTemplateRegistry
from agentkit.server.routes.chat import _execute_team_collab
from agentkit.session.manager import SessionManager
from agentkit.session.models import MessageRole
from agentkit.session.store import InMemorySessionStore
# ── 辅助函数 ──────────────────────────────────────────────
def _make_expert_template(
name: str,
persona: str = "测试专家",
is_lead: bool = False,
) -> ExpertTemplate:
"""创建测试用 ExpertTemplate"""
config = ExpertConfig(
name=name,
agent_type="expert",
persona=persona,
thinking_style="analytical",
bound_skills=[],
is_lead=is_lead,
task_mode="llm_generate",
prompt={"identity": persona},
)
return ExpertTemplate(
name=name,
config=config,
is_builtin=True,
description=f"{name} 模板",
)
def _make_registry_with_dev_team() -> ExpertTemplateRegistry:
"""创建包含 dev_team 模板和成员模板的注册中心"""
registry = ExpertTemplateRegistry()
registry.register(_make_expert_template("tech_lead", persona="技术负责人"))
registry.register(_make_expert_template("frontend_engineer", persona="前端工程师"))
registry.register(_make_expert_template("backend_engineer", persona="后端工程师"))
# dev_team 模板bound_skills 存储成员列表)
registry.register(
ExpertTemplate(
name="dev_team",
config=ExpertConfig(
name="dev_team",
agent_type="expert",
persona="编程团队",
thinking_style="流水线",
bound_skills=["tech_lead", "frontend_engineer", "backend_engineer"],
task_mode="llm_generate",
prompt={"identity": "Dev Team"},
),
is_builtin=True,
description="编程团队模板",
)
)
return registry
class FakeWebSocket:
"""Minimal WebSocket fake for testing."""
def __init__(self) -> None:
self.sent: list[dict] = []
self.app = MagicMock()
self.app.state.agent_pool = MagicMock()
self.app.state.expert_template_registry = None # Will be set per-test
async def send_json(self, data: dict) -> None:
self.sent.append(data)
def _make_mock_team() -> MagicMock:
"""创建 mock ExpertTeam 实例"""
mock_team = MagicMock()
mock_team.team_channel = "team:test"
mock_team.handoff_transport = MagicMock()
mock_team.handoff_transport.register_handler = MagicMock()
mock_team.handoff_transport._handlers = {}
mock_team.create_team = AsyncMock()
mock_team.dissolve = AsyncMock()
return mock_team
def _make_mock_orchestrator(result: dict) -> MagicMock:
"""创建 mock TeamOrchestrator 实例"""
mock_orch = MagicMock()
mock_orch.execute = AsyncMock(return_value=result)
return mock_orch
@pytest.fixture
async def session_manager() -> SessionManager:
sm = SessionManager(store=InMemorySessionStore())
session = await sm.create_session(agent_name="test-agent")
sm._test_session_id = session.session_id
return sm
@pytest.fixture
def websocket() -> FakeWebSocket:
ws = FakeWebSocket()
ws.app.state.expert_template_registry = _make_registry_with_dev_team()
return ws
@pytest.fixture
def mock_orchestrator_result() -> dict:
"""Mock result from TeamOrchestrator.execute()"""
return {
"status": "completed",
"result": {"content": "## 团队最终结果\n\n用户登录功能已实现"},
"phase_results": {
"phase-1": {"content": "规划完成", "phase_name": "规划"},
"phase-2": {"content": "前端实现完成", "phase_name": "前端"},
},
"plan": MagicMock(),
}
# ── 路由匹配测试 ──────────────────────────────────────────
class TestTeamCollabRouting:
"""_execute_team_collab 路由匹配测试"""
@pytest.mark.asyncio
async def test_team_prefix_triggers_collab(
self, websocket, session_manager, mock_orchestrator_result
):
"""@team 前缀触发团队协作,返回 True"""
session_id = session_manager._test_session_id
with patch(
"agentkit.experts.team.ExpertTeam"
) as mock_team_cls, patch(
"agentkit.experts.orchestrator.TeamOrchestrator"
) as mock_orch_cls:
mock_team_cls.return_value = _make_mock_team()
mock_orch_cls.return_value = _make_mock_orchestrator(mock_orchestrator_result)
result = await _execute_team_collab(
websocket, session_id, "@team 开发用户登录功能", session_manager
)
assert result is True
@pytest.mark.asyncio
async def test_non_team_input_does_not_trigger(self, websocket, session_manager):
"""非 @team 输入不触发,返回 False"""
session_id = session_manager._test_session_id
result = await _execute_team_collab(
websocket, session_id, "普通问题", session_manager
)
assert result is False
@pytest.mark.asyncio
async def test_board_prefix_does_not_trigger_team(
self, websocket, session_manager
):
"""@board 前缀不触发 @team 协作"""
session_id = session_manager._test_session_id
result = await _execute_team_collab(
websocket, session_id, "@board 讨论主题", session_manager
)
assert result is False
@pytest.mark.asyncio
async def test_team_with_dev_team_template(
self, websocket, session_manager, mock_orchestrator_result
):
"""@team:dev_team 触发团队协作"""
session_id = session_manager._test_session_id
with patch(
"agentkit.experts.team.ExpertTeam"
) as mock_team_cls, patch(
"agentkit.experts.orchestrator.TeamOrchestrator"
) as mock_orch_cls:
mock_team_cls.return_value = _make_mock_team()
mock_orch_cls.return_value = _make_mock_orchestrator(mock_orchestrator_result)
result = await _execute_team_collab(
websocket, session_id, "@team:dev_team 开发功能", session_manager
)
assert result is True
# ── 错误处理测试 ──────────────────────────────────────────
class TestTeamCollabErrorHandling:
"""_execute_team_collab 错误处理测试"""
@pytest.mark.asyncio
async def test_team_without_task_sends_error(
self, websocket, session_manager
):
"""@team 无任务内容时发送错误(通过 mock router 模拟空 task_content"""
session_id = session_manager._test_session_id
# Mock ExpertTeamRouter to return empty task_content
mock_routing_result = MagicMock()
mock_routing_result.matched = True
mock_routing_result.task_content = "" # 空 task_content
mock_routing_result.specified_experts = ["tech_lead"]
with patch(
"agentkit.experts.router.ExpertTeamRouter"
) as mock_router_cls:
mock_router = MagicMock()
mock_router.resolve = MagicMock(return_value=mock_routing_result)
mock_router_cls.return_value = mock_router
result = await _execute_team_collab(
websocket, session_id, "@team", session_manager
)
assert result is True
# 应该发送了错误消息
assert any(
msg.get("type") == "error" and "描述" in msg.get("data", {}).get("message", "")
for msg in websocket.sent
)
@pytest.mark.asyncio
async def test_team_execution_failure_sends_error(
self, websocket, session_manager
):
"""团队执行失败时发送错误并清理"""
session_id = session_manager._test_session_id
with patch(
"agentkit.experts.team.ExpertTeam"
) as mock_team_cls, patch(
"agentkit.experts.orchestrator.TeamOrchestrator"
) as mock_orch_cls:
mock_team = _make_mock_team()
mock_team_cls.return_value = mock_team
mock_orch_cls.return_value = _make_mock_orchestrator_result_failing()
result = await _execute_team_collab(
websocket, session_id, "@team 开发功能", session_manager
)
assert result is True
# 应该发送了错误消息
assert any(
msg.get("type") == "error"
and "团队协作执行失败" in msg.get("data", {}).get("message", "")
for msg in websocket.sent
)
# 应该调用了 dissolve 清理
mock_team.dissolve.assert_called()
def _make_mock_orchestrator_result_failing() -> MagicMock:
"""创建 mock TeamOrchestrator that raises an exception"""
mock_orch = MagicMock()
mock_orch.execute = AsyncMock(side_effect=RuntimeError("LLM 不可用"))
return mock_orch
# ── 事件中继与持久化测试 ──────────────────────────────────
class TestTeamCollabEventRelay:
"""_execute_team_collab 事件中继与持久化测试"""
@pytest.mark.asyncio
async def test_final_answer_sent_after_execution(
self, websocket, session_manager, mock_orchestrator_result
):
"""执行完成后发送 final_answer"""
session_id = session_manager._test_session_id
with patch(
"agentkit.experts.team.ExpertTeam"
) as mock_team_cls, patch(
"agentkit.experts.orchestrator.TeamOrchestrator"
) as mock_orch_cls:
mock_team_cls.return_value = _make_mock_team()
mock_orch_cls.return_value = _make_mock_orchestrator(mock_orchestrator_result)
await _execute_team_collab(
websocket, session_id, "@team 开发登录功能", session_manager
)
final_msgs = [msg for msg in websocket.sent if msg.get("type") == "final_answer"]
assert len(final_msgs) == 1
assert "团队最终结果" in final_msgs[0]["content"]
assert final_msgs[0]["is_final"] is True
@pytest.mark.asyncio
async def test_user_message_persisted(
self, websocket, session_manager, mock_orchestrator_result
):
"""用户消息持久化到会话历史"""
session_id = session_manager._test_session_id
with patch(
"agentkit.experts.team.ExpertTeam"
) as mock_team_cls, patch(
"agentkit.experts.orchestrator.TeamOrchestrator"
) as mock_orch_cls:
mock_team_cls.return_value = _make_mock_team()
mock_orch_cls.return_value = _make_mock_orchestrator(mock_orchestrator_result)
await _execute_team_collab(
websocket, session_id, "@team 开发登录功能", session_manager
)
messages = await session_manager.get_messages(session_id)
user_msgs = [m for m in messages if m.role == MessageRole.USER]
assert len(user_msgs) == 1
assert "@team 开发登录功能" in user_msgs[0].content
@pytest.mark.asyncio
async def test_final_result_persisted(
self, websocket, session_manager, mock_orchestrator_result
):
"""最终结果持久化为 assistant 消息"""
session_id = session_manager._test_session_id
with patch(
"agentkit.experts.team.ExpertTeam"
) as mock_team_cls, patch(
"agentkit.experts.orchestrator.TeamOrchestrator"
) as mock_orch_cls:
mock_team_cls.return_value = _make_mock_team()
mock_orch_cls.return_value = _make_mock_orchestrator(mock_orchestrator_result)
await _execute_team_collab(
websocket, session_id, "@team 开发登录功能", session_manager
)
messages = await session_manager.get_messages(session_id)
assistant_msgs = [m for m in messages if m.role == MessageRole.ASSISTANT]
assert len(assistant_msgs) == 1
assert "团队最终结果" in assistant_msgs[0].content
assert assistant_msgs[0].agent_name == "team_collab"
@pytest.mark.asyncio
async def test_team_dissolved_after_execution(
self, websocket, session_manager, mock_orchestrator_result
):
"""执行后 team.dissolve() 被调用"""
session_id = session_manager._test_session_id
with patch(
"agentkit.experts.team.ExpertTeam"
) as mock_team_cls, patch(
"agentkit.experts.orchestrator.TeamOrchestrator"
) as mock_orch_cls:
mock_team = _make_mock_team()
mock_team_cls.return_value = mock_team
mock_orch_cls.return_value = _make_mock_orchestrator(mock_orchestrator_result)
await _execute_team_collab(
websocket, session_id, "@team 开发登录功能", session_manager
)
mock_team.dissolve.assert_called_once()
@pytest.mark.asyncio
async def test_handoff_handler_registered(
self, websocket, session_manager, mock_orchestrator_result
):
"""handoff_transport handler 被注册用于事件中继"""
session_id = session_manager._test_session_id
with patch(
"agentkit.experts.team.ExpertTeam"
) as mock_team_cls, patch(
"agentkit.experts.orchestrator.TeamOrchestrator"
) as mock_orch_cls:
mock_team = _make_mock_team()
mock_team.team_channel = "team:test-channel"
mock_team_cls.return_value = mock_team
mock_orch_cls.return_value = _make_mock_orchestrator(mock_orchestrator_result)
await _execute_team_collab(
websocket, session_id, "@team 开发登录功能", session_manager
)
mock_team.handoff_transport.register_handler.assert_called_once()
call_args = mock_team.handoff_transport.register_handler.call_args
assert call_args[0][0] == "team:test-channel"
@pytest.mark.asyncio
async def test_create_team_called_with_lead_and_members(
self, websocket, session_manager, mock_orchestrator_result
):
"""create_team 以 lead_config 和 member_configs 调用"""
session_id = session_manager._test_session_id
with patch(
"agentkit.experts.team.ExpertTeam"
) as mock_team_cls, patch(
"agentkit.experts.orchestrator.TeamOrchestrator"
) as mock_orch_cls:
mock_team = _make_mock_team()
mock_team_cls.return_value = mock_team
mock_orch_cls.return_value = _make_mock_orchestrator(mock_orchestrator_result)
await _execute_team_collab(
websocket, session_id, "@team:dev_team 开发功能", session_manager
)
mock_team.create_team.assert_called_once()
call_kwargs = mock_team.create_team.call_args.kwargs
# lead_config 应该是第一个专家tech_lead
assert call_kwargs["lead_config"].name == "tech_lead"
# member_configs 应该包含其余专家
member_names = [c.name for c in call_kwargs["member_configs"]]
assert "frontend_engineer" in member_names
assert "backend_engineer" in member_names
@pytest.mark.asyncio
async def test_empty_synthesis_falls_back_to_phase_results(
self, websocket, session_manager
):
"""synthesis 结果为空时 fallback 到 phase_results"""
session_id = session_manager._test_session_id
empty_result = {
"status": "completed",
"result": {"content": ""}, # 空的 synthesis
"phase_results": {
"phase-1": {"content": "阶段1结果", "phase_name": "规划"},
"phase-2": {"content": "阶段2结果", "phase_name": "执行"},
},
"plan": MagicMock(),
}
with patch(
"agentkit.experts.team.ExpertTeam"
) as mock_team_cls, patch(
"agentkit.experts.orchestrator.TeamOrchestrator"
) as mock_orch_cls:
mock_team_cls.return_value = _make_mock_team()
mock_orch_cls.return_value = _make_mock_orchestrator(empty_result)
await _execute_team_collab(
websocket, session_id, "@team 开发功能", session_manager
)
final_msgs = [msg for msg in websocket.sent if msg.get("type") == "final_answer"]
assert len(final_msgs) == 1
# 应该包含 phase_results 的内容
assert "阶段1结果" in final_msgs[0]["content"] or "阶段2结果" in final_msgs[0]["content"]
@pytest.mark.asyncio
async def test_completely_empty_result(
self, websocket, session_manager
):
"""result 和 phase_results 都为空时发送默认消息"""
session_id = session_manager._test_session_id
empty_result = {
"status": "completed",
"result": {},
"phase_results": {},
"plan": MagicMock(),
}
with patch(
"agentkit.experts.team.ExpertTeam"
) as mock_team_cls, patch(
"agentkit.experts.orchestrator.TeamOrchestrator"
) as mock_orch_cls:
mock_team_cls.return_value = _make_mock_team()
mock_orch_cls.return_value = _make_mock_orchestrator(empty_result)
await _execute_team_collab(
websocket, session_id, "@team 开发功能", session_manager
)
final_msgs = [msg for msg in websocket.sent if msg.get("type") == "final_answer"]
assert len(final_msgs) == 1
assert "未生成最终结果" in final_msgs[0]["content"]