feat(cli): U6 CLI 多 Agent 入口 + 辩论 Rich 渲染

- 新增 _execute_team_cli() 处理 @team 前缀,运行 ExpertTeam 流水线
- Rich 事件渲染:team_formed/plan_update/phase_*/debate_*/team_synthesis
- 干预循环使用 select.select() 非阻塞轮询 stdin(Unix-only,ponytail 标注)
- 支持 /debate 手动触发辩论、/stop 终止团队、纯文本作为上下文注入
- 扩展 _print_help() 增加 Multi-Agent 与 Interventions 说明
- 新增 12 个单元测试覆盖路由、帮助文档、函数返回值、干预基础设施
This commit is contained in:
chiguyong 2026-06-24 13:03:57 +08:00
parent 49b483b933
commit b86100a0a1
2 changed files with 454 additions and 0 deletions

View File

@ -255,6 +255,26 @@ async def _chat_async(
rprint(f"[yellow]Unknown command: {cmd}[/yellow]") rprint(f"[yellow]Unknown command: {cmd}[/yellow]")
continue continue
# @team prefix: intercept before normal chat pipeline
if user_input.strip().lower().startswith("@team"):
from agentkit.experts.registry import ExpertTemplateRegistry
from agentkit.core.agent_pool import AgentPool
cli_registry = ExpertTemplateRegistry()
cli_pool = AgentPool(
llm_gateway=gateway,
skill_registry=skill_registry,
tool_registry=tool_registry,
)
handled = await _execute_team_cli(
user_input=user_input,
gateway=gateway,
agent_pool=cli_pool,
template_registry=cli_registry,
)
if handled:
continue
conversation_had_messages = True conversation_had_messages = True
# Generate task_id for this user message and emit task.created to EQ (if enabled) # Generate task_id for this user message and emit task.created to EQ (if enabled)
@ -505,6 +525,240 @@ def _resolve_default_model(server_config: "ServerConfig") -> str:
return "default" return "default"
async def _execute_team_cli(
user_input: str,
gateway: "LLMGateway",
agent_pool: "AgentPool",
template_registry: "ExpertTemplateRegistry",
) -> bool:
"""Handle @team prefix in CLI — run ExpertTeam pipeline with live Rich rendering.
Returns True if the input was handled (matched @team), False otherwise.
"""
import select
import sys
from agentkit.experts.orchestrator import TeamOrchestrator
from agentkit.experts.router import ExpertTeamRouter
from agentkit.experts.team import ExpertTeam
router = ExpertTeamRouter(template_registry=template_registry)
routing = router.resolve(user_input)
if not routing.matched:
return False
# No task content → show usage
task = routing.task_content.strip() if routing.task_content else ""
if not task or task == user_input.strip():
rprint(
Panel(
"[bold]@team 用法[/bold]\n\n"
" [magenta]@team <task>[/magenta] — 专家团协作\n"
" [dim]@team:dev_team <task>[/dim] — 使用 dev_team 模板\n"
" [dim]@team:expert1,expert2 <task>[/dim] — 指定专家\n\n"
"请提供任务描述。",
title="[yellow]缺少任务[/yellow]",
border_style="yellow",
)
)
return True
expert_configs = router.resolve_expert_configs(routing.specified_experts)
if not expert_configs:
rprint(
f"[red]无法解析专家配置: {routing.specified_experts}[/red]"
)
return True
team = ExpertTeam(pool=agent_pool, template_registry=template_registry)
# Mutable state captured by the event handler closure
synthesis_emitted = {"value": False}
async def _event_handler(message: dict) -> None:
"""Render orchestration events with Rich (best-effort, never raises)."""
try:
etype = message.get("type", "")
if etype == "team_formed":
experts = message.get("experts", [])
lead = message.get("lead_expert", "")
lines = [
f"{e.get('name', '?')}{' (Lead)' if e.get('is_lead') else ''} "
f"{e.get('persona', '')}"
for e in experts
]
rprint(
Panel(
"\n".join(lines) or " (no experts)",
title=f"[bold]团队组建[/bold] (Lead: {lead})",
border_style="cyan",
)
)
elif etype == "plan_update":
phases = message.get("plan_phases", [])
icon_map = {"completed": ("", "green"), "in_progress": ("", "blue"), "failed": ("", "red")}
lines = []
for ph in phases:
status = ph.get("status", "pending")
icon, color = icon_map.get(status, ("", "dim"))
lines.append(
f" [{color}]{icon}[/{color}] {ph.get('name', '?')}{ph.get('assigned_expert', '?')}"
)
if message.get("debate_inserted"):
lines.append("\n [magenta]+ 辩论阶段已插入[/magenta]")
if message.get("stopped_by_user"):
lines.append("\n [red]! 用户终止执行[/red]")
rprint(
Panel(
"\n".join(lines) or " (no phases)",
title="[bold]执行计划[/bold]",
border_style="cyan",
)
)
elif etype == "phase_started":
rprint(
f"\n[bold blue]▶ {message.get('phase_name', '?')}[/bold blue] "
f"{message.get('assigned_expert', '?')}"
)
elif etype == "phase_completed":
summary = message.get("result_summary", "")
rprint(f" [green]✓ {message.get('phase_name', '?')}[/green]: {summary[:120]}")
elif etype == "phase_failed":
rprint(
f" [red]✗ {message.get('phase_name', '?')}[/red]: {message.get('error', '')}"
)
elif etype == "debate_started":
rprint(
Panel(
f"[bold]主题:[/bold] {message.get('topic', '')}\n"
f"[bold]参与者:[/bold] {', '.join(message.get('participants', []))}",
title=f"[bold]辩论开始[/bold] (最多 {message.get('max_rounds', 0)} 轮)",
border_style="magenta",
)
)
elif etype == "expert_argument":
rprint(
Panel(
Markdown(message.get("content", "")),
title=f"[bold]{message.get('expert_name', '?')}[/bold] "
f"(Round {message.get('round', 0)})",
border_style="blue",
)
)
elif etype == "debate_round_summary":
rprint(
Panel(
Markdown(message.get("content", "")),
title=f"[bold]{message.get('moderator_name', '?')}[/bold] "
f"(Round {message.get('round', 0)} 总结)",
border_style="cyan",
)
)
elif etype == "debate_resolved":
decision = message.get("decision", "inconclusive")
color = {
"accepted": "green",
"rejected": "red",
"compromise": "yellow",
}.get(decision, "magenta")
rprint(
Panel(
f"[bold]裁决:[/bold] [{color}]{decision}[/{color}]\n"
f"[bold]结论:[/bold] {message.get('conclusion', '')}\n"
f"[bold]理由:[/bold] {message.get('rationale', '')}",
title="[bold]辩论结束[/bold]",
border_style="magenta",
)
)
elif etype == "team_synthesis":
synthesis_emitted["value"] = True
rprint(
Panel(
Markdown(message.get("content", "")),
title="[bold]团队综合结果[/bold]",
border_style="green",
)
)
elif etype == "team_dissolved":
rprint("[dim]团队已解散[/dim]")
elif etype == "user_intervention":
pass # User typed it themselves
# Other events (expert_step, expert_result, expert_joined, etc.) are not rendered
except Exception:
pass # Rendering is best-effort; never break orchestration
team.handoff_transport.register_handler(team.team_channel, _event_handler)
lead_config = expert_configs[0]
member_configs = expert_configs[1:]
try:
await team.create_team(lead_config=lead_config, member_configs=member_configs)
# Wire gateway into experts (safety: ensure each agent has the gateway)
for expert in team.experts:
if hasattr(expert, "agent") and hasattr(expert.agent, "_llm_gateway"):
if expert.agent._llm_gateway is None:
expert.agent._llm_gateway = gateway
orchestrator = TeamOrchestrator(team)
exec_task = asyncio.create_task(orchestrator.execute(task))
# ponytail: select() on stdin is Unix-only; Windows would need msvcrt.
# Ceiling: non-interactive stdin (redirected/piped) raises OSError → fall back to sleep.
# Upgrade path: use prompt_toolkit's async input for cross-platform support.
while not exec_task.done():
try:
readable, _, _ = select.select([sys.stdin], [], [], 0.5)
except (OSError, ValueError):
# stdin not selectable (e.g., redirected) — just wait for exec
await asyncio.sleep(0.5)
continue
if readable:
try:
line = sys.stdin.readline()
except Exception:
line = ""
if not line:
break # EOF
line = line.strip()
if not line:
continue
# U4: send intervention to team (broadcasts + enqueues for orchestrator)
await team.add_user_intervention(line)
rprint(f"[dim]已发送干预: {line[:60]}[/dim]")
result = await exec_task
# Fallback: if team_synthesis wasn't emitted, print final result
if not synthesis_emitted["value"]:
res = result.get("result") if isinstance(result, dict) else None
content = ""
if isinstance(res, dict):
content = res.get("content", str(res))
elif res is not None:
content = str(res)
if content:
rprint(
Panel(
Markdown(content),
title="[bold]团队结果[/bold]",
border_style="green",
)
)
except Exception as e:
rprint(f"[red]团队执行错误: {e}[/red]")
finally:
try:
await team.dissolve()
except Exception:
pass
return True
def _print_help() -> None: def _print_help() -> None:
"""Print chat command help.""" """Print chat command help."""
rprint( rprint(
@ -514,6 +768,14 @@ def _print_help() -> None:
" [cyan]/clear[/cyan] — Clear conversation (new session)\n" " [cyan]/clear[/cyan] — Clear conversation (new session)\n"
" [cyan]/model <name>[/cyan] — Switch LLM model\n" " [cyan]/model <name>[/cyan] — Switch LLM model\n"
" [cyan]/quit[/cyan] — Exit chat\n\n" " [cyan]/quit[/cyan] — Exit chat\n\n"
"[bold]Multi-Agent[/bold]\n\n"
" [magenta]@team <task>[/magenta] — 专家团协作Lead 分解 + 专家并行 + 辩论)\n"
" [dim]@team:dev_team <task>[/dim] — 使用 dev_team 模板\n"
" [dim]@team:expert1,expert2 <task>[/dim] — 指定专家\n\n"
"[bold]Interventions (during @team)[/bold]\n\n"
" [magenta]/debate <topic>[/magenta] — 手动发起辩论\n"
" [cyan]/stop[/cyan] — 终止团队执行\n"
" 其他文本 — 补充上下文给 Lead\n\n"
"[bold]Tips[/bold]\n\n" "[bold]Tips[/bold]\n\n"
" • Multi-line input: end a line with [cyan]\\[/cyan] to continue\n" " • Multi-line input: end a line with [cyan]\\[/cyan] to continue\n"
" • Your conversation is stored in memory for the session", " • Your conversation is stored in memory for the session",

View File

@ -0,0 +1,192 @@
"""CLI 多 Agent 入口 + 辩论支持单元测试 (U6)"""
from __future__ import annotations
import io
from unittest.mock import MagicMock, patch
import pytest
from rich.console import Console
from agentkit.experts.router import ExpertTeamRouter
from agentkit.experts.team import ExpertTeam
# ---------------------------------------------------------------------------
# @team 前缀路由测试
# ---------------------------------------------------------------------------
class TestTeamPrefixRouting:
"""@team 前缀路由测试"""
def test_team_prefix_matched(self):
"""@team 前缀被 ExpertTeamRouter 识别"""
router = ExpertTeamRouter()
result = router.resolve("@team 开发用户登录功能")
assert result.matched is True
assert result.task_content == "开发用户登录功能"
def test_team_prefix_with_template(self):
"""@team:dev_team 模板被识别"""
router = ExpertTeamRouter()
result = router.resolve("@team:dev_team 开发API")
assert result.matched is True
assert result.task_content == "开发API"
def test_non_team_input_not_matched(self):
"""非 @team 输入不被匹配"""
router = ExpertTeamRouter()
result = router.resolve("你好")
assert result.matched is False
def test_team_prefix_alone_matched(self):
"""@team 单独出现也被匹配task_content 回退为完整输入)"""
router = ExpertTeamRouter()
result = router.resolve("@team")
assert result.matched is True
# ---------------------------------------------------------------------------
# _print_help 文档测试
# ---------------------------------------------------------------------------
class TestPrintHelp:
"""_print_help 包含 @team 文档测试"""
def test_help_includes_team_docs(self):
"""帮助文本包含 @team 说明"""
from agentkit.cli.chat import _print_help
captured = io.StringIO()
console = Console(file=captured, width=120)
with patch(
"agentkit.cli.chat.rprint",
side_effect=lambda *a, **kw: console.print(*a, **kw),
):
_print_help()
text = captured.getvalue()
assert "@team" in text
assert "/debate" in text
assert "/stop" in text
assert "专家团" in text
def test_help_includes_intervention_section(self):
"""帮助文本包含干预说明"""
from agentkit.cli.chat import _print_help
captured = io.StringIO()
console = Console(file=captured, width=120)
with patch(
"agentkit.cli.chat.rprint",
side_effect=lambda *a, **kw: console.print(*a, **kw),
):
_print_help()
text = captured.getvalue()
assert "Interventions" in text or "干预" in text
# ---------------------------------------------------------------------------
# _execute_team_cli 函数测试
# ---------------------------------------------------------------------------
class TestExecuteTeamCli:
"""_execute_team_cli 函数测试"""
@pytest.mark.asyncio
async def test_returns_false_for_non_team_input(self):
"""非 @team 输入返回 False"""
from agentkit.cli.chat import _execute_team_cli
gateway = MagicMock()
pool = MagicMock()
registry = MagicMock()
result = await _execute_team_cli("你好", gateway, pool, registry)
assert result is False
@pytest.mark.asyncio
async def test_returns_true_for_team_without_task(self):
"""@team 无任务描述返回 True已处理提示用法"""
from agentkit.cli.chat import _execute_team_cli
gateway = MagicMock()
pool = MagicMock()
registry = MagicMock()
with patch.object(ExpertTeamRouter, "resolve") as mock_resolve:
mock_result = MagicMock()
mock_result.matched = True
mock_result.task_content = ""
mock_resolve.return_value = mock_result
result = await _execute_team_cli("@team", gateway, pool, registry)
assert result is True
@pytest.mark.asyncio
async def test_returns_true_when_experts_unresolvable(self):
"""@team 有任务但无法解析专家时返回 True错误提示"""
from agentkit.cli.chat import _execute_team_cli
gateway = MagicMock()
pool = MagicMock()
registry = MagicMock()
with (
patch.object(ExpertTeamRouter, "resolve") as mock_resolve,
patch.object(ExpertTeamRouter, "resolve_expert_configs") as mock_configs,
):
mock_result = MagicMock()
mock_result.matched = True
mock_result.task_content = "开发功能"
mock_result.specified_experts = ["nonexistent"]
mock_resolve.return_value = mock_result
mock_configs.return_value = []
result = await _execute_team_cli("@team:nonexistent 开发功能", gateway, pool, registry)
assert result is True
# ---------------------------------------------------------------------------
# 干预命令支持测试
# ---------------------------------------------------------------------------
class TestInterventionSupport:
"""干预命令基础设施测试"""
def test_team_has_broadcast_user_message(self):
"""ExpertTeam 有 broadcast_user_message 方法(干预广播基础)"""
assert hasattr(ExpertTeam, "broadcast_user_message")
def test_help_lists_debate_command(self):
"""帮助文本列出 /debate 命令"""
from agentkit.cli.chat import _print_help
captured = io.StringIO()
console = Console(file=captured, width=120)
with patch(
"agentkit.cli.chat.rprint",
side_effect=lambda *a, **kw: console.print(*a, **kw),
):
_print_help()
text = captured.getvalue()
assert "/debate" in text
assert "辩论" in text
def test_help_lists_stop_command(self):
"""帮助文本列出 /stop 命令"""
from agentkit.cli.chat import _print_help
captured = io.StringIO()
console = Console(file=captured, width=120)
with patch(
"agentkit.cli.chat.rprint",
side_effect=lambda *a, **kw: console.print(*a, **kw),
):
_print_help()
text = captured.getvalue()
assert "/stop" in text
assert "终止" in text