feat(experts): add concurrency limit to TeamOrchestrator parallel phases
U2: Add asyncio.Semaphore to bound concurrent phase execution and debate argument generation. Default limit=3, configurable via max_concurrent_phases. Prevents LLM rate-limit spikes when many phases run in the same layer. Tests: 5 scenarios (happy path, 5-phase edge case, serial mode, failure release, debate integration) — all pass.
This commit is contained in:
parent
018b342d96
commit
717aad1303
|
|
@ -70,9 +70,10 @@ class TeamOrchestrator:
|
|||
MAX_RISK_FLAGS = 10 # 风险标记数量上限,防止 UI 洪泛
|
||||
MAX_DEBATE_ROUNDS = 4 # Hard cap on debate rounds per phase
|
||||
MAX_DEBATES = 3 # Hard cap on auto-inserted debate phases per execution
|
||||
DEFAULT_MAX_CONCURRENT_PHASES = 3 # 同层最大并发阶段数,避免 LLM 限流洪峰
|
||||
STOP_COMMANDS = frozenset({"/stop", "停止", "stop", "结束"})
|
||||
|
||||
def __init__(self, team: ExpertTeam) -> None:
|
||||
def __init__(self, team: ExpertTeam, max_concurrent_phases: int | None = None) -> None:
|
||||
self._team = team
|
||||
# Track temporary agent names created for context isolation (KTD3)
|
||||
# Maps phase_id -> temp_agent_name for cleanup
|
||||
|
|
@ -82,6 +83,9 @@ class TeamOrchestrator:
|
|||
# U4: User context accumulated from plain-text interventions.
|
||||
# Appended to Lead's synthesis prompt so user guidance influences result.
|
||||
self._user_context: list[str] = []
|
||||
# U2: 并发限制 — 同层并行阶段加 Semaphore,避免 LLM 限流洪峰
|
||||
limit = max_concurrent_phases or self.DEFAULT_MAX_CONCURRENT_PHASES
|
||||
self._phase_semaphore = asyncio.Semaphore(limit)
|
||||
|
||||
async def execute(self, task: str) -> dict[str, Any]:
|
||||
"""Execute a task in pipeline mode.
|
||||
|
|
@ -201,9 +205,13 @@ class TeamOrchestrator:
|
|||
logger.info("Execution stopped by user intervention")
|
||||
break
|
||||
|
||||
# Execute all phases in this layer in parallel
|
||||
# Execute all phases in this layer in parallel (with concurrency limit)
|
||||
async def _bounded_phase(ph: PlanPhase) -> dict[str, Any]:
|
||||
async with self._phase_semaphore:
|
||||
return await self._execute_phase(ph, plan)
|
||||
|
||||
results = await asyncio.gather(
|
||||
*[self._execute_phase(ph, plan) for ph in ready],
|
||||
*[_bounded_phase(ph) for ph in ready],
|
||||
return_exceptions=True,
|
||||
)
|
||||
|
||||
|
|
@ -945,12 +953,13 @@ class TeamOrchestrator:
|
|||
# No participants — Lead directly adjudicates
|
||||
break
|
||||
|
||||
# Experts argue in parallel
|
||||
# Experts argue in parallel (with concurrency limit)
|
||||
async def _bounded_debate(e: Any) -> str:
|
||||
async with self._phase_semaphore:
|
||||
return await self._generate_debate_argument(e, topic, history, round_num)
|
||||
|
||||
speech_results = await asyncio.gather(
|
||||
*[
|
||||
self._generate_debate_argument(e, topic, history, round_num)
|
||||
for e in debate_experts
|
||||
],
|
||||
*[_bounded_debate(e) for e in debate_experts],
|
||||
return_exceptions=True,
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@
|
|||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
|
|
@ -27,7 +28,7 @@ from agentkit.core.protocol import TaskResult, TaskStatus
|
|||
from agentkit.experts.config import ExpertConfig
|
||||
from agentkit.experts.expert import Expert
|
||||
from agentkit.experts.orchestrator import TeamOrchestrator
|
||||
from agentkit.experts.plan import PhaseStatus, PlanPhase, PlanStatus
|
||||
from agentkit.experts.plan import PhaseStatus, PhaseType, PlanPhase, PlanStatus, TeamPlan
|
||||
from agentkit.experts.team import ExpertTeam, TeamStatus
|
||||
|
||||
|
||||
|
|
@ -980,3 +981,205 @@ class TestLLMGateway:
|
|||
|
||||
result = orchestrator._get_llm_gateway()
|
||||
assert result is gateway
|
||||
|
||||
|
||||
# ── 并发限制测试 (U2) ─────────────────────────────────────
|
||||
|
||||
|
||||
class _ConcurrencyTracker:
|
||||
"""Track max concurrent executions for semaphore testing."""
|
||||
|
||||
def __init__(self) -> None:
|
||||
self.current = 0
|
||||
self.max_seen = 0
|
||||
|
||||
def acquire(self) -> None:
|
||||
self.current += 1
|
||||
self.max_seen = max(self.max_seen, self.current)
|
||||
|
||||
def release(self) -> None:
|
||||
self.current -= 1
|
||||
|
||||
|
||||
class TestConcurrencyLimit:
|
||||
"""U2: 子代理并发限制 — Semaphore 包裹同层并行阶段和辩论参数生成"""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_three_phases_parallel_under_default_limit(self):
|
||||
"""Happy path: 3 个阶段同层,默认 limit=3,全部并行执行"""
|
||||
team = _make_team_with_experts(expert_names=["lead", "m1", "m2"])
|
||||
orchestrator = TeamOrchestrator(team)
|
||||
|
||||
gateway = _make_mock_llm_gateway(phases=[
|
||||
{"name": "A", "assigned_expert": "lead", "task_description": "A", "depends_on": []},
|
||||
{"name": "B", "assigned_expert": "m1", "task_description": "B", "depends_on": []},
|
||||
{"name": "C", "assigned_expert": "m2", "task_description": "C", "depends_on": []},
|
||||
])
|
||||
team._experts["lead"].agent._llm_gateway = gateway
|
||||
|
||||
tracker = _ConcurrencyTracker()
|
||||
|
||||
async def tracking_execute_phase(phase, plan):
|
||||
tracker.acquire()
|
||||
await asyncio.sleep(0.02) # Ensure overlap
|
||||
tracker.release()
|
||||
phase.status = PhaseStatus.COMPLETED
|
||||
phase.result = {"content": f"Result {phase.name}"}
|
||||
return phase.result
|
||||
|
||||
orchestrator._execute_phase = tracking_execute_phase
|
||||
# Avoid divergence detection inserting debates
|
||||
orchestrator._check_divergence_and_insert_debates = AsyncMock(return_value=None)
|
||||
|
||||
result = await orchestrator.execute("test")
|
||||
|
||||
assert result["status"] == "completed"
|
||||
assert tracker.max_seen == 3 # All 3 ran in parallel
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_five_phases_max_three_concurrent(self):
|
||||
"""Edge case: 5 个阶段同层,验证最多 3 个同时执行"""
|
||||
names = ["lead", "m1", "m2", "m3", "m4"]
|
||||
team = _make_team_with_experts(expert_names=names)
|
||||
orchestrator = TeamOrchestrator(team)
|
||||
|
||||
gateway = _make_mock_llm_gateway(phases=[
|
||||
{"name": f"P{i}", "assigned_expert": name, "task_description": f"P{i}", "depends_on": []}
|
||||
for i, name in enumerate(names)
|
||||
])
|
||||
team._experts["lead"].agent._llm_gateway = gateway
|
||||
|
||||
tracker = _ConcurrencyTracker()
|
||||
|
||||
async def tracking_execute_phase(phase, plan):
|
||||
tracker.acquire()
|
||||
await asyncio.sleep(0.05) # Ensure overlap
|
||||
tracker.release()
|
||||
phase.status = PhaseStatus.COMPLETED
|
||||
phase.result = {"content": f"Result {phase.name}"}
|
||||
return phase.result
|
||||
|
||||
orchestrator._execute_phase = tracking_execute_phase
|
||||
orchestrator._check_divergence_and_insert_debates = AsyncMock(return_value=None)
|
||||
|
||||
result = await orchestrator.execute("test")
|
||||
|
||||
assert result["status"] == "completed"
|
||||
assert tracker.max_seen <= 3 # Never more than 3 concurrent
|
||||
assert tracker.max_seen >= 2 # At least some parallelism
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_max_concurrent_one_serializes(self):
|
||||
"""Edge case: max_concurrent_phases=1 时退化为串行执行"""
|
||||
team = _make_team_with_experts(expert_names=["lead", "m1", "m2"])
|
||||
orchestrator = TeamOrchestrator(team, max_concurrent_phases=1)
|
||||
|
||||
gateway = _make_mock_llm_gateway(phases=[
|
||||
{"name": "A", "assigned_expert": "lead", "task_description": "A", "depends_on": []},
|
||||
{"name": "B", "assigned_expert": "m1", "task_description": "B", "depends_on": []},
|
||||
{"name": "C", "assigned_expert": "m2", "task_description": "C", "depends_on": []},
|
||||
])
|
||||
team._experts["lead"].agent._llm_gateway = gateway
|
||||
|
||||
tracker = _ConcurrencyTracker()
|
||||
|
||||
async def tracking_execute_phase(phase, plan):
|
||||
tracker.acquire()
|
||||
await asyncio.sleep(0.02)
|
||||
tracker.release()
|
||||
phase.status = PhaseStatus.COMPLETED
|
||||
phase.result = {"content": f"Result {phase.name}"}
|
||||
return phase.result
|
||||
|
||||
orchestrator._execute_phase = tracking_execute_phase
|
||||
orchestrator._check_divergence_and_insert_debates = AsyncMock(return_value=None)
|
||||
|
||||
result = await orchestrator.execute("test")
|
||||
|
||||
assert result["status"] == "completed"
|
||||
assert tracker.max_seen == 1 # Strictly serial
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_phase_failure_releases_semaphore(self):
|
||||
"""Error path: 某阶段失败不影响 semaphore 释放(async with 保证释放)"""
|
||||
team = _make_team_with_experts(expert_names=["lead", "m1", "m2"])
|
||||
orchestrator = TeamOrchestrator(team)
|
||||
|
||||
gateway = _make_mock_llm_gateway(phases=[
|
||||
{"name": "A", "assigned_expert": "lead", "task_description": "A", "depends_on": []},
|
||||
{"name": "B", "assigned_expert": "m1", "task_description": "B", "depends_on": []},
|
||||
{"name": "C", "assigned_expert": "m2", "task_description": "C", "depends_on": []},
|
||||
])
|
||||
team._experts["lead"].agent._llm_gateway = gateway
|
||||
|
||||
tracker = _ConcurrencyTracker()
|
||||
|
||||
async def tracking_execute_phase(phase, plan):
|
||||
tracker.acquire()
|
||||
await asyncio.sleep(0.02)
|
||||
tracker.release()
|
||||
if phase.name == "A":
|
||||
raise RuntimeError("Phase A failed")
|
||||
phase.status = PhaseStatus.COMPLETED
|
||||
phase.result = {"content": f"Result {phase.name}"}
|
||||
return phase.result
|
||||
|
||||
orchestrator._execute_phase = tracking_execute_phase
|
||||
orchestrator._check_divergence_and_insert_debates = AsyncMock(return_value=None)
|
||||
|
||||
result = await orchestrator.execute("test")
|
||||
|
||||
# B and C should still complete (fallback to single agent if needed)
|
||||
assert result["status"] == "completed"
|
||||
# Semaphore was released even after A failed
|
||||
assert tracker.max_seen <= 3
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_debate_semaphore_limits_concurrency(self):
|
||||
"""Integration: 辩论阶段 4 个专家并行,semaphore 限制生效
|
||||
|
||||
ponytail: 直接调用 _execute_debate_phase(不经过 _bounded_phase),
|
||||
因此 phase 级 semaphore 未占用,4 个专家受 limit=3 限制 → 最多 3 并发。
|
||||
在完整流程中 _bounded_phase 会先占 1 个槽,辩论并发上限为 limit-1。
|
||||
"""
|
||||
team = _make_team_with_experts(expert_names=["lead", "m1", "m2", "m3", "m4"])
|
||||
orchestrator = TeamOrchestrator(team) # default limit=3
|
||||
|
||||
phase = PlanPhase(
|
||||
id="debate_1",
|
||||
name="Debate",
|
||||
assigned_expert="lead",
|
||||
task_description="Test debate topic",
|
||||
depends_on=[],
|
||||
phase_type=PhaseType.DEBATE,
|
||||
debate_config={
|
||||
"topic": "Test topic",
|
||||
"participants": ["m1", "m2", "m3", "m4"],
|
||||
"max_rounds": 1,
|
||||
},
|
||||
)
|
||||
plan = TeamPlan(id="plan_1", task="test", phases=[phase])
|
||||
|
||||
tracker = _ConcurrencyTracker()
|
||||
|
||||
async def tracking_debate_argument(expert, topic, history, round_num):
|
||||
tracker.acquire()
|
||||
await asyncio.sleep(0.05) # Ensure overlap
|
||||
tracker.release()
|
||||
return f"Argument from {expert.config.name}"
|
||||
|
||||
orchestrator._generate_debate_opening = AsyncMock(return_value="Opening statement")
|
||||
orchestrator._generate_debate_argument = tracking_debate_argument
|
||||
orchestrator._generate_debate_summary = AsyncMock(return_value="Round summary")
|
||||
orchestrator._generate_debate_verdict = AsyncMock(return_value={
|
||||
"conclusion": "Final conclusion",
|
||||
"decision": "adopt",
|
||||
"rationale": "Because",
|
||||
})
|
||||
|
||||
await orchestrator._execute_debate_phase(phase, plan)
|
||||
|
||||
assert phase.status == PhaseStatus.COMPLETED
|
||||
# 4 experts, semaphore limit=3 → max 3 concurrent
|
||||
assert tracker.max_seen <= 3
|
||||
assert tracker.max_seen >= 2 # At least some parallelism
|
||||
|
|
|
|||
Loading…
Reference in New Issue