fischer-agentkit/src/agentkit/experts/board_orchestrator.py

524 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""BoardOrchestrator - 私董会讨论引擎
驱动 BoardTeam 执行多轮群聊式讨论:
1. 主持人开场介绍议题和讨论规则
2. 循环 max_rounds 轮:
- 所有非主持人专家并行生成发言(基于共享讨论历史 + 角色 prompt
- 主持人小结本轮要点
- 检查用户干预和停止命令
3. 主持人最终总结(决策建议、共识点、分歧点)
终止条件:
- 正常终止:达到最大轮次
- 用户终止:用户发送 /stop
- 异常终止LLM 不可用或所有专家发言失败
"""
from __future__ import annotations
import asyncio
import logging
from typing import Any
from .expert import Expert
from .board import BoardTeam, BoardStatus
logger = logging.getLogger(__name__)
class BoardOrchestrator:
"""Board meeting orchestration engine.
The moderator (lead expert) facilitates the discussion:
- Opens with topic introduction
- Summarizes each round
- Gives final decision advice
Member experts give speeches each round based on shared history.
"""
STOP_COMMANDS = frozenset({"/stop", "停止讨论", "stop", "结束讨论"})
def __init__(self, team: BoardTeam) -> None:
self._team = team
async def execute(self, topic: str) -> dict[str, Any]:
"""Execute a board meeting discussion.
Flow:
1. Broadcast board_started event
2. Moderator opens the discussion
3. Loop max_rounds times:
- Parallel generate member speeches
- Moderator summarizes the round
- Check for user intervention / stop
4. Moderator gives final conclusion
5. Broadcast board_concluded event
Returns:
Dict with status, summary, decision_advice, total_rounds,
consensus_points, dissent_points
"""
moderator = self._team.moderator
if not moderator or not moderator.is_active:
active = self._team.active_experts
if not active:
return {
"status": "failed",
"summary": "",
"decision_advice": "",
"total_rounds": 0,
"consensus_points": [],
"dissent_points": [],
"error": "No active expert available",
}
# Promote first active expert to moderator
self._team._moderator_name = active[0].config.name
moderator = active[0]
logger.warning(
f"Moderator not available, falling back to '{moderator.config.name}'"
)
self._team.set_status(BoardStatus.DISCUSSING)
# 1. Broadcast board_started event
await self._broadcast_event(
"board_started",
{
"team_id": self._team.team_id,
"topic": topic,
"experts": [
{
"name": e.config.name,
"avatar": e.config.avatar,
"color": e.config.color,
"is_moderator": e.config.name == self._team._moderator_name,
"persona": e.config.persona[:100],
}
for e in self._team.active_experts
],
"max_rounds": self._team.max_rounds,
},
)
try:
# 2. Moderator opens the discussion
opening = await self._generate_moderator_opening(moderator, topic)
if opening:
await self._team.add_to_history(0, moderator.config.name, opening, "moderator")
await self._broadcast_event(
"expert_speech",
{
"expert_name": moderator.config.name,
"expert_avatar": moderator.config.avatar,
"expert_color": moderator.config.color,
"content": opening,
"round": 0,
"role": "moderator",
},
)
# 3. Discussion rounds
for round_num in range(1, self._team.max_rounds + 1):
self._team.increment_round()
# Check for stop command before starting the round
interventions = self._team.consume_user_interventions()
if self._has_stop_command(interventions):
logger.info(f"Discussion stopped by user at round {round_num}")
break
# Generate member speeches in parallel
members = self._team.member_experts
if members:
speech_results = await asyncio.gather(
*[self._generate_expert_speech(e, round_num) for e in members],
return_exceptions=True,
)
# Broadcast speeches in order (not parallel broadcast)
for expert, result in zip(members, speech_results):
if isinstance(result, Exception):
logger.warning(
f"Expert '{expert.config.name}' speech failed: {result}"
)
continue
await self._team.add_to_history(
round_num, expert.config.name, result, "expert"
)
await self._broadcast_event(
"expert_speech",
{
"expert_name": expert.config.name,
"expert_avatar": expert.config.avatar,
"expert_color": expert.config.color,
"content": result,
"round": round_num,
"role": "expert",
},
)
# Moderator summarizes the round
summary = await self._generate_moderator_summary(moderator, round_num)
if summary:
await self._team.add_to_history(
round_num, moderator.config.name, summary, "moderator"
)
await self._broadcast_event(
"round_summary",
{
"moderator_name": moderator.config.name,
"content": summary,
"round": round_num,
"continue": round_num < self._team.max_rounds,
},
)
# Check history length and compress if needed
gateway = self._get_llm_gateway(moderator)
if gateway and len(self._team.history) > 20:
await self._team.compress_history(moderator, gateway)
# 4. Final conclusion
self._team.set_status(BoardStatus.CONCLUDING)
conclusion = await self._generate_final_conclusion(moderator, topic)
self._team.set_status(BoardStatus.COMPLETED)
# 5. Broadcast board_concluded event
await self._broadcast_event(
"board_concluded",
{
"summary": conclusion.get("summary", ""),
"decision_advice": conclusion.get("decision_advice", ""),
"total_rounds": self._team.current_round,
"consensus_points": conclusion.get("consensus_points", []),
"dissent_points": conclusion.get("dissent_points", []),
},
)
return {
"status": "completed",
"summary": conclusion.get("summary", ""),
"decision_advice": conclusion.get("decision_advice", ""),
"total_rounds": self._team.current_round,
"consensus_points": conclusion.get("consensus_points", []),
"dissent_points": conclusion.get("dissent_points", []),
}
except Exception as e:
logger.error(f"Board meeting execution failed: {e}")
self._team.set_status(BoardStatus.DISSOLVED)
# Try to give a fallback conclusion
fallback = await self._generate_fallback_conclusion(moderator, topic)
await self._broadcast_event(
"board_concluded",
{
"summary": fallback.get("summary", ""),
"decision_advice": fallback.get("decision_advice", ""),
"total_rounds": self._team.current_round,
"consensus_points": [],
"dissent_points": [],
"error": str(e),
},
)
return {
"status": "failed",
"summary": fallback.get("summary", ""),
"decision_advice": fallback.get("decision_advice", ""),
"total_rounds": self._team.current_round,
"consensus_points": [],
"dissent_points": [],
"error": str(e),
}
async def _generate_moderator_opening(self, moderator: Expert, topic: str) -> str:
"""Generate moderator's opening speech.
The moderator introduces the topic and sets the stage for discussion.
"""
gateway = self._get_llm_gateway(moderator)
if not gateway:
return f"欢迎来到私董会。今天的讨论主题是:{topic}。请各位专家发表看法。"
prompt = (
f"你是私董会主持人 {moderator.config.name}\n"
f"你的角色:{moderator.config.persona}\n"
f"你的表达风格:{moderator.config.speaking_style}\n\n"
f"讨论主题:{topic}\n\n"
"请作为主持人开场,介绍议题并邀请各位专家发表看法。"
"开场应该简洁有力2-3 段话,点明讨论的核心问题。"
)
try:
response = await gateway.chat(
messages=[{"role": "user", "content": prompt}],
model="default",
)
return response.content.strip()
except Exception as e:
logger.warning(f"Moderator opening generation failed: {e}")
return f"欢迎来到私董会。今天的讨论主题是:{topic}。请各位专家发表看法。"
async def _generate_expert_speech(self, expert: Expert, round: int) -> str:
"""Generate an expert's speech for the current round.
The speech is based on:
- Expert's persona, thinking_style, speaking_style, decision_framework
- Full discussion history
- Current round / max rounds
"""
gateway = self._get_llm_gateway(expert)
if not gateway:
return f"[{expert.config.name} 因 LLM 不可用无法发言]"
history_text = self._team.get_history_text()
prompt = (
f"你是 {expert.config.name},正在参加私董会讨论。\n\n"
f"你的角色:{expert.config.persona}\n"
f"你的思维风格:{expert.config.thinking_style}\n"
f"你的表达风格:{expert.config.speaking_style}\n"
f"你的决策框架:{expert.config.decision_framework}\n\n"
f"讨论主题:{self._team.topic}\n"
f"当前轮次:第 {round} 轮 / 共 {self._team.max_rounds}\n\n"
)
if history_text:
prompt += f"之前的讨论历史:\n{history_text}\n\n"
prompt += (
"请基于你的角色和决策框架,就当前讨论主题发表你的看法。"
"要求:\n"
"- 保持角色一致性,用你的思维方式和表达风格发言\n"
"- 2-4 段话,简洁但有洞察力\n"
"- 可以引用或反驳之前发言者的观点\n"
"- 给出明确的立场或建议\n"
)
response = await gateway.chat(
messages=[{"role": "user", "content": prompt}],
model="default",
)
return response.content.strip()
async def _generate_moderator_summary(self, moderator: Expert, round: int) -> str:
"""Generate moderator's round summary.
The moderator summarizes the key points of the current round.
"""
gateway = self._get_llm_gateway(moderator)
if not gateway:
return f"[第 {round} 轮小结因 LLM 不可用无法生成]"
# Get only current round's speeches
round_history = [
h for h in self._team.history if h["round"] == round
]
if not round_history:
return ""
round_text = "\n\n".join(
f"[{h['expert_name']}]: {h['content']}" for h in round_history
)
prompt = (
f"你是私董会主持人 {moderator.config.name}\n"
f"你的角色:{moderator.config.persona}\n"
f"你的表达风格:{moderator.config.speaking_style}\n\n"
f"讨论主题:{self._team.topic}\n"
f"当前轮次:第 {round} 轮 / 共 {self._team.max_rounds}\n\n"
f"本轮发言:\n{round_text}\n\n"
"请作为主持人小结本轮讨论:\n"
"- 归纳各方核心观点2-3 句话)\n"
"- 指出共识点和分歧点\n"
"- 提示下一轮可以深入的方向\n"
"- 保持简洁3-5 句话\n"
)
try:
response = await gateway.chat(
messages=[{"role": "user", "content": prompt}],
model="default",
)
return response.content.strip()
except Exception as e:
logger.warning(f"Moderator summary generation failed: {e}")
return f"[第 {round} 轮讨论完成,主持人小结生成失败]"
async def _generate_final_conclusion(self, moderator: Expert, topic: str) -> dict[str, Any]:
"""Generate moderator's final conclusion.
The moderator gives:
- Overall summary of the discussion
- Decision advice
- Consensus points
- Dissent points
"""
gateway = self._get_llm_gateway(moderator)
if not gateway:
return {
"summary": "讨论已完成,但 LLM 不可用无法生成总结。",
"decision_advice": "建议参考讨论历史自行判断。",
"consensus_points": [],
"dissent_points": [],
}
history_text = self._team.get_history_text()
prompt = (
f"你是私董会主持人 {moderator.config.name}\n"
f"你的角色:{moderator.config.persona}\n"
f"你的表达风格:{moderator.config.speaking_style}\n"
f"你的决策框架:{moderator.config.decision_framework}\n\n"
f"讨论主题:{topic}\n"
f"总轮次:{self._team.current_round}\n\n"
f"完整讨论历史:\n{history_text}\n\n"
"请作为主持人给出最终总结。输出 JSON 格式:\n"
"```json\n"
"{\n"
' "summary": "整体讨论总结3-5句话",\n'
' "decision_advice": "基于讨论的决策建议,明确给出你的推荐",\n'
' "consensus_points": ["共识点1", "共识点2"],\n'
' "dissent_points": ["分歧点1", "分歧点2"]\n'
"}\n"
"```\n"
"只输出 JSON不要其他文字。"
)
try:
import json
import re
response = await gateway.chat(
messages=[{"role": "user", "content": prompt}],
model="default",
)
content = response.content.strip()
# Extract JSON from response
json_match = re.search(r"\{.*\}", content, re.DOTALL)
if json_match:
result = json.loads(json_match.group(0))
return {
"summary": result.get("summary", ""),
"decision_advice": result.get("decision_advice", ""),
"consensus_points": result.get("consensus_points", []),
"dissent_points": result.get("dissent_points", []),
}
# If JSON parsing fails, return raw content as summary
return {
"summary": content,
"decision_advice": "",
"consensus_points": [],
"dissent_points": [],
}
except Exception as e:
logger.warning(f"Final conclusion generation failed: {e}")
return {
"summary": f"讨论已完成({self._team.current_round}轮),总结生成失败。",
"decision_advice": "建议参考讨论历史自行判断。",
"consensus_points": [],
"dissent_points": [],
}
async def _generate_fallback_conclusion(self, moderator: Expert, topic: str) -> dict[str, Any]:
"""Generate a fallback conclusion when execution fails.
Uses existing discussion history to provide a basic summary.
"""
history_text = self._team.get_history_text()
if not history_text:
return {
"summary": "讨论未能正常完成,无历史记录。",
"decision_advice": "",
}
gateway = self._get_llm_gateway(moderator)
if not gateway:
# Return truncated history as summary
return {
"summary": f"讨论异常终止。已有历史({len(self._team.history)}条):\n"
+ history_text[:500],
"decision_advice": "建议参考讨论历史自行判断。",
}
prompt = (
f"你是私董会主持人 {moderator.config.name}\n"
f"讨论主题:{topic}\n"
f"讨论因异常终止,已完成 {self._team.current_round} 轮。\n\n"
f"已有讨论历史:\n{history_text}\n\n"
"请基于已有历史给出总结和决策建议。输出 JSON\n"
"```json\n"
'{"summary": "...", "decision_advice": "..."}\n'
"```\n"
)
try:
import json
import re
response = await gateway.chat(
messages=[{"role": "user", "content": prompt}],
model="default",
)
content = response.content.strip()
json_match = re.search(r"\{.*\}", content, re.DOTALL)
if json_match:
result = json.loads(json_match.group(0))
return {
"summary": result.get("summary", content),
"decision_advice": result.get("decision_advice", ""),
}
return {"summary": content, "decision_advice": ""}
except Exception:
return {
"summary": f"讨论异常终止,已完成 {self._team.current_round} 轮。",
"decision_advice": "",
}
def _has_stop_command(self, interventions: list[str]) -> bool:
"""Check if any user intervention contains a stop command."""
for msg in interventions:
msg_lower = msg.strip().lower()
if msg_lower in self.STOP_COMMANDS:
return True
return False
def _get_llm_gateway(self, expert: Expert | None = None) -> Any:
"""Get LLM gateway from the given expert or the moderator's agent.
Falls back to other active experts if the primary target has no gateway.
"""
target = expert or self._team.moderator
if target and hasattr(target, "agent") and hasattr(target.agent, "_llm_gateway"):
gateway = target.agent._llm_gateway
if gateway is not None:
return gateway
# Fallback: try first active expert with a gateway
for exp in self._team.active_experts:
if hasattr(exp, "agent") and hasattr(exp.agent, "_llm_gateway"):
gateway = exp.agent._llm_gateway
if gateway is not None:
return gateway
return None
async def _broadcast_event(self, event_type: str, data: dict[str, Any]) -> None:
"""Broadcast a board event to the team channel.
Events are emitted via handoff_transport for WebSocket relay.
"""
if self._team.handoff_transport:
try:
await self._team.handoff_transport.send(
self._team.team_channel, {"type": event_type, **data}
)
except Exception as e:
logger.warning(f"Failed to broadcast event '{event_type}': {e}")