"""Chat command — interactive terminal chat with an Agent. Runs a lightweight in-process server and opens a REPL-style chat session. No external server or Docker needed. Usage: agentkit chat # Start chatting (auto-onboard if no config) agentkit chat --model deepseek/deepseek-chat # Use specific model """ from __future__ import annotations import asyncio import os import typer from rich import print as rprint from rich.panel import Panel from rich.prompt import Prompt from rich.markdown import Markdown from rich.live import Live from rich.text import Text def chat( model: str = typer.Option( "default", "--model", "-m", help="LLM model to use (e.g. deepseek/deepseek-chat)" ), agent_name: str = typer.Option("default", "--agent", "-a", help="Agent name to chat with"), config: str | None = typer.Option(None, "--config", "-c", help="Path to agentkit.yaml"), system_prompt: str | None = typer.Option( None, "--system-prompt", "-s", help="Custom system prompt" ), no_stream: bool = typer.Option(False, "--no-stream", help="Disable token streaming"), event_queue: bool = typer.Option( False, "--event-queue", help="Enable EventQueue for structured event emission (for future extensions)", ), ): """Start an interactive chat session with an Agent.""" asyncio.run(_chat_async(model, agent_name, config, system_prompt, no_stream, event_queue)) async def _chat_async( model: str, agent_name: str, config_arg: str | None, system_prompt: str | None, no_stream: bool, enable_event_queue: bool = False, ) -> None: """Async implementation of the chat command.""" from agentkit.cli.onboarding import run_onboarding from agentkit.server.config import find_config_path # ── Onboarding check ────────────────────────────────────────── config_path = find_config_path(config_arg) if config_path is None: config_path = run_onboarding(config_arg=config_arg) if config_path is None: rprint("[red]Onboarding cancelled. Cannot start chat without configuration.[/red]") raise typer.Exit(code=1) # ── Load config ─────────────────────────────────────────────── rprint(f"[dim]Loading config from {config_path}[/dim]") from agentkit.server.config import load_config_with_dotenv server_config = load_config_with_dotenv(config_path) # ── Build in-process components ─────────────────────────────── from agentkit.session.manager import SessionManager from agentkit.session.store import InMemorySessionStore from agentkit.session.models import MessageRole from agentkit.core.react import ReActEngine from agentkit.tools.base import Tool from agentkit.memory.profile import MemoryStore from agentkit.tools.memory_tool import MemoryTool from agentkit.tools.shell import ShellTool from agentkit.tools.web_search import WebSearchTool from agentkit.tools.web_crawl import WebCrawlTool # Build LLM Gateway gateway = _build_gateway(server_config) # Initialize memory store memory_store = MemoryStore() memory_store.ensure_defaults() memory_snapshot = memory_store.load_all() # Create session session_manager = SessionManager(store=InMemorySessionStore()) session = await session_manager.create_session(agent_name=agent_name) # Initialize EventQueue if --event-queue flag is set (lightweight, optional) # The EQ is a side-channel for structured event emission; CLI output format is unchanged. eq = None _emit = None # type: ignore[assignment] if enable_event_queue: import uuid as _uuid from agentkit.core.event_queue import EventQueue from agentkit.core.protocol import ( Event, SessionEventType, TaskEventType, TurnEventType, ) eq = EventQueue() # Map ReAct engine event_type strings to TurnEventType constants for EQ emission _CLI_REACT_EVENT_MAP: dict[str, str] = { "thinking": TurnEventType.THINKING, "tool_call": TurnEventType.TOOL_CALL, "tool_result": TurnEventType.TOOL_RESULT, "final_answer": TurnEventType.FINAL_ANSWER, } async def _emit( # type: ignore[no-redef] event_type: str, task_id: str, session_id: str, data: dict | None = None, ) -> None: """Emit an event to the EQ. Best-effort: never raises.""" try: await eq.emit( # type: ignore[union-attr] Event.create( event_type=event_type, task_id=task_id, session_id=session_id, data=data or {}, ) ) except (asyncio.QueueFull, RuntimeError, ConnectionError): pass # EQ is best-effort; never break CLI flow # Emit session.started event await _emit( SessionEventType.SESSION_STARTED, task_id="", session_id=session.session_id, data={"agent_name": agent_name, "model": model}, ) # Build tools list — all available tools for chat mode search_api_keys = _extract_search_keys(server_config) tools: list[Tool] = [ MemoryTool(memory_store=memory_store), ShellTool(working_dir=os.getcwd()), WebSearchTool(**search_api_keys), WebCrawlTool(), ] # ── Load skills ──────────────────────────────────────────── from agentkit.tools.registry import ToolRegistry from agentkit.skills.registry import SkillRegistry from agentkit.skills.loader import SkillLoader tool_registry = ToolRegistry() for tool in tools: tool_registry.register(tool) skill_registry = SkillRegistry() if server_config.skill_paths: loader = SkillLoader(skill_registry=skill_registry, tool_registry=tool_registry) for skill_path in server_config.skill_paths: from pathlib import Path as _P p = _P(skill_path) if p.is_dir(): loaded = loader.load_from_directory(str(p)) if loaded: rprint(f"[dim]Loaded {len(loaded)} skills from {p}[/dim]") elif p.is_file() and p.suffix in (".yaml", ".yml"): try: loader.load_from_file(str(p)) except (ValueError, TypeError, KeyError, OSError, RuntimeError): pass # Build system prompt — inject memory into system prompt base_prompt = system_prompt or ( "你是一个有帮助的AI助手。请记住我们对话的上下文,并在后续对话中引用之前的内容。回答要清晰简洁,请使用中文回复。" ) effective_system_prompt = memory_store.build_system_prompt(memory_snapshot, base_prompt) # Resolve agent display name from SOUL.md agent_display_name = memory_store.get_file("soul").read_section("身份") or agent_name # Extract just the name (first line after "我是") for prefix in ["我是", "我叫", "我的名字是"]: if prefix in agent_display_name: name_part = agent_display_name.split(prefix, 1)[1].strip() # Take first meaningful token (before comma, period, etc.) for sep in [",", "。", "、", ",", ".", " "]: if sep in name_part: name_part = name_part.split(sep)[0] break agent_display_name = name_part break # ── Welcome banner ──────────────────────────────────────────── effective_model = model if model != "default" else _resolve_default_model(server_config) rprint( Panel( f"[bold]AgentKit Chat[/bold]\n\n" f" Model: [cyan]{effective_model}[/cyan]\n" f" Agent: [cyan]{agent_display_name}[/cyan]\n" f" Session: [dim]{session.session_id[:8]}...[/dim]\n\n" f" Type your message and press Enter.\n" f" [dim]/help[/dim] — Show commands\n" f" [dim]/clear[/dim] — Clear conversation\n" f" [dim]/model [/dim] — Switch model\n" f" [dim]/quit[/dim] — Exit chat", title="AgentKit", border_style="bright_blue", ) ) # ── Chat loop ───────────────────────────────────────────────── react_engine = ReActEngine(llm_gateway=gateway) current_model = effective_model conversation_had_messages = False while True: try: user_input = Prompt.ask("\n[bold green]You[/bold green]") except (EOFError, KeyboardInterrupt): rprint("\n[dim]Goodbye![/dim]") break if not user_input.strip(): continue # Handle commands if user_input.startswith("/"): cmd = user_input.strip().lower() if cmd in ("/quit", "/q", "/exit"): rprint("[dim]Goodbye![/dim]") break elif cmd == "/help": _print_help() continue elif cmd == "/clear": # Create a new session (memory files persist) session = await session_manager.create_session(agent_name=agent_name) rprint("[dim]Conversation cleared. New session started.[/dim]") continue elif cmd.startswith("/model "): current_model = cmd.split(" ", 1)[1].strip() rprint(f"[dim]Switched to model: {current_model}[/dim]") continue else: rprint(f"[yellow]Unknown command: {cmd}[/yellow]") continue # @team prefix: intercept before normal chat pipeline if user_input.strip().lower().startswith("@team"): from agentkit.experts.registry import ExpertTemplateRegistry from agentkit.core.agent_pool import AgentPool cli_registry = ExpertTemplateRegistry() cli_pool = AgentPool( llm_gateway=gateway, skill_registry=skill_registry, tool_registry=tool_registry, ) handled = await _execute_team_cli( user_input=user_input, gateway=gateway, agent_pool=cli_pool, template_registry=cli_registry, ) if handled: continue conversation_had_messages = True # Generate task_id for this user message and emit task.created to EQ (if enabled) cli_task_id = str(_uuid.uuid4()) if enable_event_queue else "" if _emit is not None: await _emit( TaskEventType.TASK_CREATED, task_id=cli_task_id, session_id=session.session_id, data={"message": user_input}, ) # Append user message to session await session_manager.append_message( session_id=session.session_id, role=MessageRole.USER, content=user_input, ) # Get full conversation history (includes all previous turns) chat_messages = await session_manager.get_chat_messages(session.session_id) # ── Skill routing ───────────────────────────────────────── from agentkit.chat.skill_routing import resolve_skill_routing routing = await resolve_skill_routing( content=user_input, skill_registry=skill_registry, default_tools=tools, default_system_prompt=effective_system_prompt, default_model=current_model, default_agent_name=agent_name, session_id=session.session_id, ) if routing.matched: rprint( f"[dim]Skill: {routing.skill_name} ({routing.match_method}, {int(routing.match_confidence * 100)}%)[/dim]" ) # Emit task.started to EQ (if enabled) if _emit is not None: await _emit( TaskEventType.TASK_STARTED, task_id=cli_task_id, session_id=session.session_id, data={"agent_name": routing.skill_name or agent_name}, ) exec_system_prompt = routing.system_prompt exec_tools = routing.tools exec_model = routing.model # Print Agent label before streaming rprint(f"\n[bold blue]{agent_display_name}[/bold blue]: ", end="") # Execute Agent try: if no_stream: # Non-streaming mode result = await react_engine.execute( messages=chat_messages, tools=exec_tools, model=exec_model, agent_name=routing.skill_name or agent_name, system_prompt=exec_system_prompt, ) output = result.output if hasattr(result, "output") else str(result) rprint(output) await session_manager.append_message( session_id=session.session_id, role=MessageRole.ASSISTANT, content=output, agent_name=agent_name, ) # Emit turn.final_answer and task.completed to EQ (if enabled) if _emit is not None: await _emit( TurnEventType.FINAL_ANSWER, task_id=cli_task_id, session_id=session.session_id, data={"output": output}, ) await _emit( TaskEventType.TASK_COMPLETED, task_id=cli_task_id, session_id=session.session_id, data={"output": output}, ) else: # Streaming mode — Live displays under the "Agent:" label full_content = "" with Live( Text(""), refresh_per_second=15, vertical_overflow="visible", transient=False, # Keep final output on screen ) as live: async for event in react_engine.execute_stream( messages=chat_messages, tools=exec_tools, model=exec_model, agent_name=routing.skill_name or agent_name, system_prompt=exec_system_prompt, ): if event.event_type == "token": token = event.data.get("content", "") full_content += token live.update(Text(full_content)) elif event.event_type == "final_answer": # Use final_answer output (may differ slightly from accumulated tokens) full_content = event.data.get("output", full_content) live.update(Markdown(full_content)) elif event.event_type == "tool_call": tool_name = event.data.get("tool_name", "unknown") live.update(Text(f"[calling tool: {tool_name}...]")) elif event.event_type == "tool_result": # After tool result, show accumulated content again if full_content: live.update(Text(full_content)) # Emit turn events to EQ (if enabled) # Maps ReAct event types to TurnEventType constants if _emit is not None: _turn_type = _CLI_REACT_EVENT_MAP.get(event.event_type) if _turn_type is not None: await _emit( _turn_type, task_id=cli_task_id, session_id=session.session_id, data=event.data, ) # Live already displayed the final content, no need to rprint again await session_manager.append_message( session_id=session.session_id, role=MessageRole.ASSISTANT, content=full_content, agent_name=agent_name, ) # Emit task.completed to EQ (if enabled) if _emit is not None: await _emit( TaskEventType.TASK_COMPLETED, task_id=cli_task_id, session_id=session.session_id, data={"output": full_content}, ) except asyncio.CancelledError: raise except Exception as e: # noqa: BLE001 — CLI main loop top-level; must not crash on user-facing errors rprint(f"\n[red]Error: {e}[/red]") # Emit task.failed to EQ (if enabled) if _emit is not None: await _emit( TaskEventType.TASK_FAILED, task_id=cli_task_id, session_id=session.session_id, data={"error": str(e)}, ) # ── Session end: generate daily log ──────────────────────────── if conversation_had_messages: try: messages = await session_manager.get_messages(session.session_id) if messages: # Build a brief summary of the conversation summary_parts = [] for msg in messages[-10:]: # Last 10 messages role = msg.role.value if hasattr(msg.role, "value") else str(msg.role) summary_parts.append(f"{role}: {msg.content[:100]}") summary = "\n".join(summary_parts) daily = memory_store.get_file("daily") existing = daily.read() new_entry = f"## 会话摘要\n{summary}" if existing: daily.write(f"{existing}\n\n{new_entry}") else: daily.write(new_entry) # Archive old daily logs memory_store.archive_old_dailies(keep_days=2) except (ConnectionError, OSError, asyncio.TimeoutError, ValueError, KeyError, RuntimeError): pass # Daily log generation is best-effort # Close EventQueue if it was enabled (emit session.ended and close) if eq is not None and _emit is not None: try: await _emit( SessionEventType.SESSION_ENDED, task_id="", session_id=session.session_id, data={}, ) except (asyncio.QueueFull, RuntimeError, ConnectionError): pass eq.close() # ruff: noqa: F821 — string annotations resolved at runtime via from __future__ import annotations def _extract_search_keys(server_config: "ServerConfig") -> dict[str, str]: """Extract search API keys from server config environment.""" return { "tavily_api_key": os.environ.get("TAVILY_API_KEY"), "serper_api_key": os.environ.get("SERPER_API_KEY"), } def _build_gateway(server_config: "ServerConfig") -> "LLMGateway": """Build LLMGateway from ServerConfig, reusing shared _create_provider.""" from agentkit.llm.gateway import LLMGateway from agentkit.server.app import _create_provider gateway = LLMGateway(config=server_config.llm_config) for name, pconf in server_config.llm_config.providers.items(): if not pconf.api_key: continue try: provider = _create_provider(name, pconf) gateway.register_provider(name, provider) except (ValueError, TypeError, KeyError, RuntimeError, ConnectionError, OSError) as e: import logging logging.getLogger(__name__).warning(f"Failed to register LLM provider '{name}': {e}") return gateway def _resolve_default_model(server_config: "ServerConfig") -> str: """Resolve the default model from config.""" if ( server_config.llm_config.model_aliases and "default" in server_config.llm_config.model_aliases ): return server_config.llm_config.model_aliases["default"] # Fallback: first provider's first model for name, pconf in server_config.llm_config.providers.items(): if pconf.api_key and pconf.models: first_model = list(pconf.models.keys())[0] return f"{name}/{first_model}" return "default" def _render_collaboration_contracts(contracts: list[dict]) -> None: """Render collaboration contracts as a Panel (U6).""" if not contracts: return lines = [ f" [blue]{c.get('from_expert', '?')}[/blue] → " f"[magenta]{c.get('to_expert', '?')}[/magenta]: " f"{c.get('content_description', '')} " f"[dim]({c.get('status', 'pending')})[/dim]" for c in contracts ] rprint( Panel( "\n".join(lines), title="[bold]协作契约[/bold]", border_style="cyan", ) ) def _render_pm_collaboration_event(message: dict) -> bool: """Render PM collaboration events (U6). Handles 4 event types: collaboration_contract_defined, collaboration_notice, review_result, risk_flagged. Returns True if the event type was handled. Best-effort: never raises on missing/malformed data. """ etype = message.get("type", "") try: if etype == "collaboration_contract_defined": # ponytail: 此事件当前由后端 plan_update 携带契约(未独立广播), # 保留渲染逻辑以备未来独立事件,不删除以避免破坏测试 _render_collaboration_contracts(message.get("contracts", [])) return True elif etype == "collaboration_notice": from_e = message.get("from_expert", "?") to_e = message.get("to_expert", "?") content = message.get("content_description", "") rprint(f" [blue]{from_e}[/blue] [dim]→[/dim] [magenta]{to_e}[/magenta]: {content}") return True elif etype == "review_result": passed = bool(message.get("passed", False)) feedback = message.get("feedback", "") phase_name = message.get("phase_name", "?") expert = message.get("expert", "?") rework_count = message.get("rework_count", 0) color = "green" if passed else "red" status_text = "验收通过" if passed else "验收未通过" lines = [ f"[bold]阶段:[/bold] {phase_name} ({expert})", f"[bold]结果:[/bold] [{color}]{status_text}[/{color}]", ] if rework_count: lines.append(f"[bold]返工次数:[/bold] {rework_count}") if feedback: lines.append(f"[bold]反馈:[/bold] {feedback}") rprint( Panel( "\n".join(lines), title=f"[bold]{'✓' if passed else '✗'} 验收结果[/bold]", border_style=color, ) ) return True elif etype == "risk_flagged": expert = message.get("expert", "?") risk_desc = message.get("risk_description", "") phase_name = message.get("phase_name", "?") rprint( Panel( f"[bold]专家:[/bold] {expert}\n" f"[bold]阶段:[/bold] {phase_name}\n" f"[bold]风险:[/bold] {risk_desc}", title="[bold]⚠ 风险标记[/bold]", border_style="yellow", ) ) return True except asyncio.CancelledError: raise except Exception as e: # noqa: BLE001 — best-effort rendering; must not break orchestration # ponytail: best-effort 渲染不中断编排,但记录日志便于调试 import logging logging.getLogger(__name__).debug(f"PM collaboration render error: {e}") return False async def _execute_team_cli( user_input: str, gateway: "LLMGateway", agent_pool: "AgentPool", template_registry: "ExpertTemplateRegistry", ) -> bool: """Handle @team prefix in CLI — run ExpertTeam pipeline with live Rich rendering. Returns True if the input was handled (matched @team), False otherwise. """ import select import sys from agentkit.experts.orchestrator import TeamOrchestrator from agentkit.experts.router import ExpertTeamRouter from agentkit.experts.team import ExpertTeam router = ExpertTeamRouter(template_registry=template_registry) routing = router.resolve(user_input) if not routing.matched: return False # No task content → show usage task = routing.task_content.strip() if routing.task_content else "" if not task or task == user_input.strip(): rprint( Panel( "[bold]@team 用法[/bold]\n\n" " [magenta]@team [/magenta] — 专家团协作\n" " [dim]@team:dev_team [/dim] — 使用 dev_team 模板\n" " [dim]@team:expert1,expert2 [/dim] — 指定专家\n\n" "请提供任务描述。", title="[yellow]缺少任务[/yellow]", border_style="yellow", ) ) return True expert_configs = router.resolve_expert_configs(routing.specified_experts) if not expert_configs: rprint(f"[red]无法解析专家配置: {routing.specified_experts}[/red]") return True team = ExpertTeam(pool=agent_pool, template_registry=template_registry) # Mutable state captured by the event handler closure synthesis_emitted = {"value": False} async def _event_handler(message: dict) -> None: """Render orchestration events with Rich (best-effort, never raises).""" try: # U6: PM collaboration events (collaboration_contract_defined, # collaboration_notice, review_result, risk_flagged) if _render_pm_collaboration_event(message): return etype = message.get("type", "") if etype == "team_formed": experts = message.get("experts", []) lead = message.get("lead_expert", "") lines = [ f" • {e.get('name', '?')}{' (Lead)' if e.get('is_lead') else ''} " f"— {e.get('persona', '')}" for e in experts ] rprint( Panel( "\n".join(lines) or " (no experts)", title=f"[bold]团队组建[/bold] (Lead: {lead})", border_style="cyan", ) ) elif etype == "plan_update": phases = message.get("plan_phases", []) icon_map = { "completed": ("✓", "green"), "in_progress": ("▶", "blue"), "failed": ("✗", "red"), } lines = [] for ph in phases: status = ph.get("status", "pending") icon, color = icon_map.get(status, ("○", "dim")) lines.append( f" [{color}]{icon}[/{color}] {ph.get('name', '?')} → {ph.get('assigned_expert', '?')}" ) if message.get("debate_inserted"): lines.append("\n [magenta]+ 辩论阶段已插入[/magenta]") if message.get("stopped_by_user"): lines.append("\n [red]! 用户终止执行[/red]") rprint( Panel( "\n".join(lines) or " (no phases)", title="[bold]执行计划[/bold]", border_style="cyan", ) ) # U6: render collaboration contracts embedded in phases all_contracts: list[dict] = [] for ph in phases: all_contracts.extend(ph.get("collaboration_contracts", [])) if all_contracts: _render_collaboration_contracts(all_contracts) elif etype == "phase_started": rprint( f"\n[bold blue]▶ {message.get('phase_name', '?')}[/bold blue] " f"→ {message.get('assigned_expert', '?')}" ) elif etype == "phase_completed": summary = message.get("result_summary", "") rprint(f" [green]✓ {message.get('phase_name', '?')}[/green]: {summary[:120]}") elif etype == "phase_failed": rprint( f" [red]✗ {message.get('phase_name', '?')}[/red]: {message.get('error', '')}" ) elif etype == "debate_started": rprint( Panel( f"[bold]主题:[/bold] {message.get('topic', '')}\n" f"[bold]参与者:[/bold] {', '.join(message.get('participants', []))}", title=f"[bold]辩论开始[/bold] (最多 {message.get('max_rounds', 0)} 轮)", border_style="magenta", ) ) elif etype == "expert_argument": rprint( Panel( Markdown(message.get("content", "")), title=f"[bold]{message.get('expert_name', '?')}[/bold] " f"(Round {message.get('round', 0)})", border_style="blue", ) ) elif etype == "debate_round_summary": rprint( Panel( Markdown(message.get("content", "")), title=f"[bold]{message.get('moderator_name', '?')}[/bold] " f"(Round {message.get('round', 0)} 总结)", border_style="cyan", ) ) elif etype == "debate_resolved": decision = message.get("decision", "inconclusive") color = { "accepted": "green", "rejected": "red", "compromise": "yellow", }.get(decision, "magenta") rprint( Panel( f"[bold]裁决:[/bold] [{color}]{decision}[/{color}]\n" f"[bold]结论:[/bold] {message.get('conclusion', '')}\n" f"[bold]理由:[/bold] {message.get('rationale', '')}", title="[bold]辩论结束[/bold]", border_style="magenta", ) ) elif etype == "team_synthesis": synthesis_emitted["value"] = True rprint( Panel( Markdown(message.get("content", "")), title="[bold]团队综合结果[/bold]", border_style="green", ) ) elif etype == "team_dissolved": rprint("[dim]团队已解散[/dim]") elif etype == "user_intervention": pass # User typed it themselves # Other events (expert_step, expert_result, expert_joined, etc.) are not rendered except asyncio.CancelledError: raise except Exception: # noqa: BLE001 — best-effort rendering; must not break orchestration pass # Rendering is best-effort; never break orchestration team.handoff_transport.register_handler(team.team_channel, _event_handler) lead_config = expert_configs[0] member_configs = expert_configs[1:] try: await team.create_team(lead_config=lead_config, member_configs=member_configs) # Wire gateway into experts (safety: ensure each agent has the gateway) for expert in team.experts: if hasattr(expert, "agent") and hasattr(expert.agent, "_llm_gateway"): if expert.agent._llm_gateway is None: expert.agent._llm_gateway = gateway orchestrator = TeamOrchestrator(team) exec_task = asyncio.create_task(orchestrator.execute(task)) # ponytail: select() on stdin is Unix-only; Windows would need msvcrt. # Ceiling: non-interactive stdin (redirected/piped) raises OSError → fall back to sleep. # Upgrade path: use prompt_toolkit's async input for cross-platform support. while not exec_task.done(): try: readable, _, _ = select.select([sys.stdin], [], [], 0.5) except (OSError, ValueError): # stdin not selectable (e.g., redirected) — just wait for exec await asyncio.sleep(0.5) continue if readable: try: line = sys.stdin.readline() except (OSError, ValueError, RuntimeError): line = "" if not line: break # EOF line = line.strip() if not line: continue # U4: send intervention to team (broadcasts + enqueues for orchestrator) await team.add_user_intervention(line) rprint(f"[dim]已发送干预: {line[:60]}[/dim]") result = await exec_task # Fallback: if team_synthesis wasn't emitted, print final result if not synthesis_emitted["value"]: res = result.get("result") if isinstance(result, dict) else None content = "" if isinstance(res, dict): content = res.get("content", str(res)) elif res is not None: content = str(res) if content: rprint( Panel( Markdown(content), title="[bold]团队结果[/bold]", border_style="green", ) ) except asyncio.CancelledError: raise except Exception as e: # noqa: BLE001 — team execution top-level; must report errors to user without crash rprint(f"[red]团队执行错误: {e}[/red]") finally: try: await team.dissolve() except (RuntimeError, asyncio.TimeoutError, ConnectionError): pass return True def _print_help() -> None: """Print chat command help.""" rprint( Panel( "[bold]Chat Commands[/bold]\n\n" " [cyan]/help[/cyan] — Show this help\n" " [cyan]/clear[/cyan] — Clear conversation (new session)\n" " [cyan]/model [/cyan] — Switch LLM model\n" " [cyan]/quit[/cyan] — Exit chat\n\n" "[bold]Multi-Agent[/bold]\n\n" " [magenta]@team [/magenta] — 专家团协作(项目经理模式:Lead 制定计划 + 协作契约 + 验收 + 辩论)\n" " [dim]@team:dev_team [/dim] — 使用 dev_team 模板\n" " [dim]@team:expert1,expert2 [/dim] — 指定专家\n\n" "[bold]PM Collaboration Events (during @team)[/bold]\n\n" " [cyan]协作契约[/cyan] — Lead 制定计划时定义专家间协作关系\n" " [cyan]协作通知[/cyan] — 专家完成后按契约通知相关专家\n" " [cyan]验收结果[/cyan] — Lead 验收阶段输出(通过/返工/失败)\n" " [cyan]风险标记[/cyan] — 专家标记执行中的风险\n\n" "[bold]Interventions (during @team)[/bold]\n\n" " [magenta]/debate [/magenta] — 手动发起辩论\n" " [cyan]/stop[/cyan] — 终止团队执行\n" " 其他文本 — 补充上下文给 Lead\n\n" "[bold]Tips[/bold]\n\n" " • Multi-line input: end a line with [cyan]\\[/cyan] to continue\n" " • Your conversation is stored in memory for the session", border_style="dim", ) )