feat(experts): U4 用户干预通道 + 手动辩论触发

建立 @team 执行期间的用户干预通道,支持 /stop、/debate <topic>、
普通文本追加上下文。

ExpertTeam (src/agentkit/experts/team.py):
- 新增 _interventions: asyncio.Queue (maxsize=64) 干预队列
- add_user_intervention(msg): 广播 + 入队
- consume_user_interventions(): 排空并返回待处理干预
- broadcast_user_message 现在同时入队干预队列

TeamOrchestrator (src/agentkit/experts/orchestrator.py):
- 新增 _user_context: list[str] 累积普通文本干预
- 新增 _process_interventions(lead, plan) 在每层执行前调用:
  * /stop → 终止执行,广播 plan_update(stopped_by_user)
  * /debate <topic> → 动态插入 DEBATE phase(受 MAX_DEBATES 限制)
  * 普通文本 → 累积到 _user_context
- _synthesize_results 将 _user_context 追加到 synthesis prompt

WS 路由 (src/agentkit/server/routes/chat.py):
- 模块级 _active_teams dict 跟踪每个 session 的活跃团队
- _execute_team_collab 执行前注册、finally 注销
- WS 消息循环:若 session 有活跃团队,message 路由为干预而非新任务
- 新增 team_intervention_ack 确认消息

测试:tests/unit/experts/test_team_intervention.py(20 测试),
覆盖队列基础、/stop、/debate、普通文本、混合消息、synthesis 影响。
同步更新 test_orchestrator_debate.py 的干预通道兼容性测试
(U4 已实现 consume_user_interventions)。

全部 418 experts 测试 + 325 server 测试通过。
This commit is contained in:
chiguyong 2026-06-24 12:17:09 +08:00
parent ac26d417b3
commit c831e925b6
5 changed files with 680 additions and 8 deletions

View File

@ -65,6 +65,9 @@ class TeamOrchestrator:
self._temp_agents: dict[str, str] = {}
# Count of auto-inserted debate phases (bounded by MAX_DEBATES)
self._debate_count = 0
# U4: User context accumulated from plain-text interventions.
# Appended to Lead's synthesis prompt so user guidance influences result.
self._user_context: list[str] = []
async def execute(self, task: str) -> dict[str, Any]:
"""Execute a task in pipeline mode.
@ -174,6 +177,14 @@ class TeamOrchestrator:
if not ready:
continue
# U4: Process user interventions at phase boundary.
# /stop → terminate execution; /debate <topic> → insert DEBATE;
# plain text → accumulate as user context for Lead synthesis.
stop_requested = await self._process_interventions(lead, plan)
if stop_requested:
logger.info("Execution stopped by user intervention")
break
# Execute all phases in this layer in parallel
results = await asyncio.gather(
*[self._execute_phase(ph, plan) for ph in ready],
@ -972,6 +983,87 @@ class TeamOrchestrator:
return True
return False
# ── U4: User intervention processing at phase boundaries ──────────
async def _process_interventions(
self, lead: Expert, plan: TeamPlan
) -> bool:
"""Process pending user interventions at a phase boundary.
Handles three intervention kinds:
- ``/stop`` (or aliases) returns True to signal termination
- ``/debate <topic>`` dynamically inserts a DEBATE phase
(bounded by MAX_DEBATES); the debate depends on the most recently
completed phase so it runs before remaining pending phases
- plain text accumulated in ``_user_context`` for Lead synthesis
Returns:
True if execution should stop, False to continue.
"""
interventions = self._consume_team_interventions()
if not interventions:
return False
for msg in interventions:
stripped = msg.strip()
if not stripped:
continue
lower = stripped.lower()
# /stop → terminate
if lower in self.STOP_COMMANDS:
await self._broadcast_event(
"plan_update",
{
"plan_id": plan.id,
"plan_phases": [p.to_dict() for p in plan.phases],
"stopped_by_user": True,
},
)
return True
# /debate <topic> → insert DEBATE phase
if lower.startswith("/debate"):
topic = stripped[len("/debate"):].strip()
if not topic:
continue
if self._debate_count >= self.MAX_DEBATES:
logger.info(
f"Max debates ({self.MAX_DEBATES}) reached, "
"ignoring /debate intervention"
)
continue
participants = [
e.config.name
for e in self._team.active_experts
if e.config.name != lead.config.name
]
if not participants:
continue
# Anchor the debate on the most recently completed phase
# so it runs before remaining pending phases. If none
# completed yet, the debate has no deps and runs immediately.
anchor = plan.completed_phases[-1] if plan.completed_phases else None
trigger = anchor or plan.phases[0]
debate = self._insert_debate_phase(
plan, trigger, f"用户发起:{topic}", participants
)
if debate:
await self._broadcast_event(
"plan_update",
{
"plan_id": plan.id,
"plan_phases": [p.to_dict() for p in plan.phases],
"debate_inserted": debate.id,
},
)
continue
# Plain text → accumulate as user context
self._user_context.append(stripped)
return False
# ── U3: Divergence detection + dynamic debate insertion ────────────
async def _maybe_add_plan_review_debate(
@ -1303,8 +1395,14 @@ class TeamOrchestrator:
f"Synthesize them into a single comprehensive final result that "
f"best addresses the original task.\n\n"
+ "\n---\n".join(summaries)
+ "\n\nProvide the synthesized result directly."
)
# U4: Append accumulated user context so user guidance influences synthesis
if self._user_context:
prompt += (
"\n\n用户在执行期间补充的指导意见(请在综合时参考):\n- "
+ "\n- ".join(self._user_context)
)
prompt += "\n\nProvide the synthesized result directly."
try:
response = await gateway.chat(

View File

@ -73,6 +73,9 @@ class ExpertTeam:
self._status = TeamStatus.FORMING
self._team_channel = f"team:{self.team_id}"
self._orchestrator_task: asyncio.Task | None = None
# U4: User intervention queue — bounded to prevent unbounded growth.
# Consumed by TeamOrchestrator at phase boundaries.
self._interventions: asyncio.Queue[str] = asyncio.Queue(maxsize=64)
@property
def status(self) -> TeamStatus:
@ -251,13 +254,50 @@ class ExpertTeam:
)
async def broadcast_user_message(self, content: str) -> None:
"""Broadcast a user intervention message to all active Experts."""
"""Broadcast a user intervention message to all active Experts.
Also enqueues the message to the intervention queue so
TeamOrchestrator can consume it at phase boundaries (U4).
"""
message = {
"type": "user_intervention",
"content": content,
"timestamp": time.time(),
}
await self._handoff_transport.send(self._team_channel, message)
# U4: enqueue for orchestrator consumption (non-blocking; drop on full)
try:
self._interventions.put_nowait(content)
except asyncio.QueueFull:
logger.warning("Intervention queue full, dropping message")
async def add_user_intervention(self, content: str) -> None:
"""Add a user intervention message for the orchestrator to consume.
Broadcasts the message to the team channel and enqueues it.
Used by WS/CLI handlers during team execution (U4).
Args:
content: User's intervention message (e.g. ``/debate <topic>``,
``/stop``, or plain text to append to Lead context)
"""
await self.broadcast_user_message(content)
def consume_user_interventions(self) -> list[str]:
"""Drain and return all pending user interventions.
Called by TeamOrchestrator at phase boundaries (U4).
Returns:
List of intervention messages (oldest first). Empty if none.
"""
interventions: list[str] = []
while not self._interventions.empty():
try:
interventions.append(self._interventions.get_nowait())
except asyncio.QueueEmpty:
break
return interventions
async def get_shared_context(self) -> dict:
"""Get the team's shared context from SharedWorkspace.

View File

@ -107,6 +107,27 @@ class ChatConnectionManager:
chat_manager = ChatConnectionManager()
# U4: Active team sessions — maps session_id to the ExpertTeam currently executing.
# When a message arrives during team execution, it is routed as an intervention
# instead of starting a new chat task. Populated by _execute_team_collab.
_active_teams: dict[str, "object"] = {}
def _register_active_team(session_id: str, team: "object") -> None:
"""Register an active team for a session (intervention routing)."""
_active_teams[session_id] = team
def _unregister_active_team(session_id: str) -> None:
"""Unregister the active team for a session."""
_active_teams.pop(session_id, None)
def _get_active_team(session_id: str) -> "object | None":
"""Get the active team for a session, if any."""
return _active_teams.get(session_id)
# ── Helper ────────────────────────────────────────────────────────────
@ -404,6 +425,8 @@ async def _execute_team_collab(
await team.create_team(lead_config=lead_config, member_configs=member_configs)
orchestrator = TeamOrchestrator(team=team)
# U4: Register active team so WS messages during execution route as interventions
_register_active_team(session_id, team)
result = await orchestrator.execute(routing_result.task_content)
except asyncio.CancelledError:
logger.info(f"Team collaboration cancelled for session {session_id}")
@ -416,6 +439,9 @@ async def _execute_team_collab(
)
return True
finally:
# U4: Always unregister the active team first so subsequent messages
# don't route to a dissolving team.
_unregister_active_team(session_id)
# Always dissolve the team and remove handler to avoid leaks
try:
await team.dissolve()
@ -751,6 +777,29 @@ async def chat_websocket(websocket: WebSocket, session_id: str) -> None:
if msg_type == "message":
content = msg.get("content", "")
model = msg.get("model") # Optional model override from frontend
# U4: If a team is currently executing for this session, route
# the message as an intervention instead of a new chat task.
active_team = _get_active_team(session_id)
if active_team is not None:
try:
await active_team.add_user_intervention(content)
await websocket.send_json(
{
"type": "team_intervention_ack",
"data": {"content": content},
}
)
except Exception as e:
logger.warning(f"Failed to enqueue intervention: {e}")
await websocket.send_json(
{
"type": "error",
"data": {"message": f"干预消息入队失败: {e}"},
}
)
continue
# Create a fresh CancellationToken for each message
message_token = CancellationToken()

View File

@ -717,22 +717,23 @@ class TestDebatePhaseSharedWorkspace:
class TestInterventionChannelCompatibility:
"""干预通道 getattr 回退测试U4 兼容"""
"""干预通道兼容性测试U4 已实现干预队列"""
@pytest.mark.asyncio
async def test_no_intervention_method_returns_empty(self):
"""team 没有 consume_user_interventions 方法时返回空列表"""
async def test_empty_interventions_returns_empty(self):
"""干预队列为空时返回空列表,辩论正常执行"""
gateway = _make_smart_llm_gateway()
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
# ExpertTeam doesn't have consume_user_interventions yet (U4 not implemented)
assert not hasattr(team, "consume_user_interventions")
# U4: ExpertTeam now has consume_user_interventions; empty queue returns []
assert hasattr(team, "consume_user_interventions")
assert team.consume_user_interventions() == []
phase = _make_debate_phase(max_rounds=1, participants=["member1"])
plan = _make_plan_with_debate_phase(phase)
# Should not raise — falls back to empty list
# Should not raise — empty interventions, debate proceeds normally
await orchestrator._execute_debate_phase(phase, plan)
assert phase.status == PhaseStatus.COMPLETED

View File

@ -0,0 +1,484 @@
"""ExpertTeam 用户干预通道 + TeamOrchestrator 干预处理单元测试 (U4)
测试覆盖
- ExpertTeam 干预队列
* add_user_intervention consume_user_interventions 返回消息
* 多条干预消息累积一次性消费
* consume 后队列清空再次 consume 返回空
* broadcast_user_message 同时入队干预队列
- TeamOrchestrator._process_interventions
* /stop 返回 True终止执行+ 广播 plan_update
* /debate <topic> 插入 DEBATE phase + 广播 plan_update
* /debate MAX_DEBATES 限制
* /debate topic 时忽略
* 普通文本 累积到 _user_context
* 空干预队列 返回 False无副作用
- 集成: _user_context 影响 synthesis prompt
"""
from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock
import pytest
from agentkit.core.handoff_transport import InProcessHandoffTransport
from agentkit.experts.config import ExpertConfig
from agentkit.experts.orchestrator import TeamOrchestrator
from agentkit.experts.plan import PhaseStatus, PhaseType, PlanPhase, TeamPlan
from agentkit.experts.team import ExpertTeam
# ── 辅助函数 ──────────────────────────────────────────────
def _make_expert_config(name: str = "test_expert", is_lead: bool = False) -> ExpertConfig:
return ExpertConfig(
name=name,
agent_type="expert",
persona=f"{name}的角色描述",
thinking_style="逻辑推理",
speaking_style="简洁直接",
decision_framework="数据驱动决策",
bound_skills=["skill_a"],
is_lead=is_lead,
task_mode="llm_generate",
prompt={"identity": "测试"},
)
def _make_mock_expert(
name: str = "test_expert",
is_lead: bool = False,
is_active: bool = True,
gateway: MagicMock | None = None,
) -> MagicMock:
config = _make_expert_config(name=name, is_lead=is_lead)
expert = MagicMock()
expert.config = config
expert.is_active = is_active
expert.team_id = None
expert.get_capabilities_summary.return_value = {
"name": name,
"persona": config.persona,
"thinking_style": config.thinking_style,
"bound_skills": config.bound_skills,
"is_lead": is_lead,
}
mock_agent = MagicMock()
mock_agent._llm_gateway = gateway
expert.agent = mock_agent
return expert
def _make_team_with_experts(
expert_names: list[str] | None = None,
lead_name: str = "lead",
gateway: MagicMock | None = None,
) -> ExpertTeam:
team = ExpertTeam()
transport = AsyncMock(spec=InProcessHandoffTransport)
team._handoff_transport = transport
if expert_names is None:
expert_names = [lead_name, "member1", "member2"]
for name in expert_names:
is_lead = name == lead_name
expert = _make_mock_expert(name=name, is_lead=is_lead, gateway=gateway)
team._experts[name] = expert
if is_lead:
team._lead_expert_name = name
return team
def _make_execution_phase(
phase_id: str = "phase_1",
name: str = "阶段一",
assigned_expert: str = "member1",
depends_on: list[str] | None = None,
status: PhaseStatus = PhaseStatus.PENDING,
result: dict | None = None,
) -> PlanPhase:
return PlanPhase(
id=phase_id,
name=name,
assigned_expert=assigned_expert,
task_description=f"{name}的任务描述",
depends_on=depends_on or [],
phase_type=PhaseType.EXECUTION,
status=status,
result=result,
)
def _make_plan(
phases: list[PlanPhase],
task: str = "测试任务",
lead_expert: str = "lead",
) -> TeamPlan:
return TeamPlan(
id="test_plan",
task=task,
phases=phases,
lead_expert=lead_expert,
)
# ── ExpertTeam 干预队列测试 ──────────────────────────────
class TestExpertTeamInterventionQueue:
"""ExpertTeam 干预队列基础功能测试"""
@pytest.mark.asyncio
async def test_add_and_consume_intervention(self):
"""add_user_intervention → consume_user_interventions 返回消息"""
team = ExpertTeam()
team._handoff_transport = AsyncMock(spec=InProcessHandoffTransport)
await team.add_user_intervention("/debate 前端框架选型")
interventions = team.consume_user_interventions()
assert interventions == ["/debate 前端框架选型"]
@pytest.mark.asyncio
async def test_multiple_interventions_accumulate(self):
"""多条干预消息累积,一次性消费"""
team = ExpertTeam()
team._handoff_transport = AsyncMock(spec=InProcessHandoffTransport)
await team.add_user_intervention("第一条")
await team.add_user_intervention("第二条")
await team.add_user_intervention("第三条")
interventions = team.consume_user_interventions()
assert len(interventions) == 3
assert interventions[0] == "第一条"
assert interventions[1] == "第二条"
assert interventions[2] == "第三条"
@pytest.mark.asyncio
async def test_consume_clears_queue(self):
"""consume 后队列清空,再次 consume 返回空"""
team = ExpertTeam()
team._handoff_transport = AsyncMock(spec=InProcessHandoffTransport)
await team.add_user_intervention("消息")
first = team.consume_user_interventions()
assert len(first) == 1
second = team.consume_user_interventions()
assert second == []
def test_consume_empty_queue_returns_empty_list(self):
"""空队列 consume 返回空列表"""
team = ExpertTeam()
interventions = team.consume_user_interventions()
assert interventions == []
@pytest.mark.asyncio
async def test_broadcast_user_message_enqueues_intervention(self):
"""broadcast_user_message 同时入队干预队列"""
team = ExpertTeam()
team._handoff_transport = AsyncMock(spec=InProcessHandoffTransport)
await team.broadcast_user_message("测试消息")
interventions = team.consume_user_interventions()
assert interventions == ["测试消息"]
@pytest.mark.asyncio
async def test_add_user_intervention_broadcasts_to_channel(self):
"""add_user_intervention 广播到 team channel"""
team = ExpertTeam()
transport = AsyncMock(spec=InProcessHandoffTransport)
team._handoff_transport = transport
await team.add_user_intervention("/stop")
assert transport.send.called
call_args = transport.send.call_args
channel = call_args[0][0]
message = call_args[0][1]
assert channel == team.team_channel
assert message["type"] == "user_intervention"
assert message["content"] == "/stop"
# ── TeamOrchestrator._process_interventions 测试 ────────
class TestProcessInterventionsStop:
"""_process_interventions /stop 处理测试"""
@pytest.mark.asyncio
async def test_stop_returns_true(self):
"""/stop → 返回 True终止执行"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
plan = _make_plan(phases=[_make_execution_phase()])
await team.add_user_intervention("/stop")
result = await orchestrator._process_interventions(team.lead_expert, plan)
assert result is True
@pytest.mark.asyncio
async def test_stop_broadcasts_plan_update(self):
"""/stop → 广播 plan_update with stopped_by_user"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
plan = _make_plan(phases=[_make_execution_phase()])
await team.add_user_intervention("/stop")
await orchestrator._process_interventions(team.lead_expert, plan)
transport = team._handoff_transport
assert transport.send.called
last_call = transport.send.call_args_list[-1]
event_data = last_call[0][1]
assert event_data["type"] == "plan_update"
assert event_data["stopped_by_user"] is True
@pytest.mark.asyncio
async def test_stop_chinese_alias_works(self):
"""中文停止命令 '停止' 也能终止"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
plan = _make_plan(phases=[_make_execution_phase()])
await team.add_user_intervention("停止")
result = await orchestrator._process_interventions(team.lead_expert, plan)
assert result is True
class TestProcessInterventionsDebate:
"""_process_interventions /debate 处理测试"""
@pytest.mark.asyncio
async def test_debate_inserts_debate_phase(self):
"""/debate <topic> → 插入 DEBATE phase"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
# 需要一个已完成的 phase 作为 anchor
completed = _make_execution_phase(
phase_id="p1", status=PhaseStatus.COMPLETED, result={"content": "结果"}
)
pending = _make_execution_phase(phase_id="p2", depends_on=["p1"])
plan = _make_plan(phases=[completed, pending])
await team.add_user_intervention("/debate 前端框架选型")
result = await orchestrator._process_interventions(team.lead_expert, plan)
assert result is False # 不终止
assert orchestrator._debate_count == 1
# 应该新增一个 DEBATE phase
debate_phases = [p for p in plan.phases if p.phase_type == PhaseType.DEBATE]
assert len(debate_phases) == 1
assert "前端框架选型" in debate_phases[0].debate_config["topic"]
@pytest.mark.asyncio
async def test_debate_broadcasts_plan_update(self):
"""/debate → 广播 plan_update with debate_inserted"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
completed = _make_execution_phase(
phase_id="p1", status=PhaseStatus.COMPLETED, result={"content": "结果"}
)
plan = _make_plan(phases=[completed])
await team.add_user_intervention("/debate 测试话题")
await orchestrator._process_interventions(team.lead_expert, plan)
transport = team._handoff_transport
last_call = transport.send.call_args_list[-1]
event_data = last_call[0][1]
assert event_data["type"] == "plan_update"
assert "debate_inserted" in event_data
@pytest.mark.asyncio
async def test_debate_respects_max_debates(self):
"""/debate 受 MAX_DEBATES 限制"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
orchestrator._debate_count = orchestrator.MAX_DEBATES
completed = _make_execution_phase(
phase_id="p1", status=PhaseStatus.COMPLETED, result={"content": "结果"}
)
plan = _make_plan(phases=[completed])
await team.add_user_intervention("/debate 话题")
result = await orchestrator._process_interventions(team.lead_expert, plan)
assert result is False
assert orchestrator._debate_count == orchestrator.MAX_DEBATES
# 不应该新增 DEBATE phase
debate_phases = [p for p in plan.phases if p.phase_type == PhaseType.DEBATE]
assert len(debate_phases) == 0
@pytest.mark.asyncio
async def test_debate_without_topic_ignored(self):
"""/debate 无 topic 时忽略"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
plan = _make_plan(phases=[_make_execution_phase()])
await team.add_user_intervention("/debate")
result = await orchestrator._process_interventions(team.lead_expert, plan)
assert result is False
assert orchestrator._debate_count == 0
@pytest.mark.asyncio
async def test_debate_without_members_ignored(self):
"""/debate 无其他成员时忽略(只有 lead"""
team = _make_team_with_experts(expert_names=["lead"])
orchestrator = TeamOrchestrator(team)
completed = _make_execution_phase(
phase_id="p1", status=PhaseStatus.COMPLETED, result={"content": "结果"}
)
plan = _make_plan(phases=[completed])
await team.add_user_intervention("/debate 话题")
result = await orchestrator._process_interventions(team.lead_expert, plan)
assert result is False
assert orchestrator._debate_count == 0
class TestProcessInterventionsPlainText:
"""_process_interventions 普通文本处理测试"""
@pytest.mark.asyncio
async def test_plain_text_accumulates_to_user_context(self):
"""普通文本 → 累积到 _user_context"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
plan = _make_plan(phases=[_make_execution_phase()])
await team.add_user_intervention("请关注性能优化")
result = await orchestrator._process_interventions(team.lead_expert, plan)
assert result is False
assert "请关注性能优化" in orchestrator._user_context
@pytest.mark.asyncio
async def test_multiple_plain_texts_accumulate(self):
"""多条普通文本都累积"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
plan = _make_plan(phases=[_make_execution_phase()])
await team.add_user_intervention("第一条建议")
await team.add_user_intervention("第二条建议")
await orchestrator._process_interventions(team.lead_expert, plan)
assert len(orchestrator._user_context) == 2
assert "第一条建议" in orchestrator._user_context
assert "第二条建议" in orchestrator._user_context
@pytest.mark.asyncio
async def test_user_context_influences_synthesis_prompt(self):
"""_user_context 被追加到 synthesis prompt"""
# 用一个能捕获 prompt 的 gateway
captured_prompt = []
async def chat_side_effect(messages, model=None, **kwargs):
captured_prompt.append(messages[0]["content"])
response = MagicMock()
response.content = "综合结果"
return response
gateway = AsyncMock()
gateway.chat = AsyncMock(side_effect=chat_side_effect)
team = _make_team_with_experts(gateway=gateway)
orchestrator = TeamOrchestrator(team)
orchestrator._user_context.append("请重点关注安全性")
phases = [
_make_execution_phase(
phase_id="p1",
name="阶段A",
status=PhaseStatus.COMPLETED,
result={"content": "结果A"},
),
_make_execution_phase(
phase_id="p2",
name="阶段B",
status=PhaseStatus.COMPLETED,
result={"content": "结果B"},
),
]
await orchestrator._synthesize_results(team.lead_expert, "任务", phases)
assert len(captured_prompt) == 1
assert "请重点关注安全性" in captured_prompt[0]
assert "用户在执行期间补充的指导意见" in captured_prompt[0]
class TestProcessInterventionsEmpty:
"""_process_interventions 空队列测试"""
@pytest.mark.asyncio
async def test_empty_interventions_returns_false(self):
"""空干预队列 → 返回 False无副作用"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
plan = _make_plan(phases=[_make_execution_phase()])
result = await orchestrator._process_interventions(team.lead_expert, plan)
assert result is False
assert orchestrator._debate_count == 0
assert orchestrator._user_context == []
class TestProcessInterventionsMixed:
"""_process_interventions 混合消息测试"""
@pytest.mark.asyncio
async def test_mixed_messages_processed_in_order(self):
"""混合消息按顺序处理:文本 + debate + 文本"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
completed = _make_execution_phase(
phase_id="p1", status=PhaseStatus.COMPLETED, result={"content": "结果"}
)
plan = _make_plan(phases=[completed])
await team.add_user_intervention("先补充个上下文")
await team.add_user_intervention("/debate 架构选型")
await team.add_user_intervention("再补充一条")
result = await orchestrator._process_interventions(team.lead_expert, plan)
assert result is False
# debate 插入了
assert orchestrator._debate_count == 1
# 两条普通文本都累积了
assert len(orchestrator._user_context) == 2
assert "先补充个上下文" in orchestrator._user_context
assert "再补充一条" in orchestrator._user_context
@pytest.mark.asyncio
async def test_stop_terminates_even_with_other_messages(self):
"""混合消息中 /stop 终止执行(即使前面有其他消息)"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
completed = _make_execution_phase(
phase_id="p1", status=PhaseStatus.COMPLETED, result={"content": "结果"}
)
plan = _make_plan(phases=[completed])
await team.add_user_intervention("/debate 话题")
await team.add_user_intervention("/stop")
result = await orchestrator._process_interventions(team.lead_expert, plan)
assert result is True
# debate 在 stop 之前处理了
assert orchestrator._debate_count == 1