feat(experts): add SharedWorkspace state offloading for long-horizon runs

U4: ExpertTeam accepts redis_client, passes to SharedWorkspace. After phase
completion, full result is written to workspace and in-memory phase.result
is replaced with a 500-char summary + _ref_key. Dependency output reading
resolves offloaded content from workspace on demand, with graceful fallback
to summary on read failure.

Tests: 8 scenarios (offload creation, short content, dependency resolution,
workspace failure fallback, non-offloaded passthrough, redis_client wiring,
memory dict fallback, pipeline integration) — all pass.
This commit is contained in:
chiguyong 2026-06-24 20:32:10 +08:00
parent 122173ec2c
commit ef84e3fd53
4 changed files with 204 additions and 7 deletions

View File

@ -441,6 +441,40 @@ class TeamOrchestrator:
return phases return phases
# U4: State offloading helpers — keep memory lean for long-horizon runs.
_OFFLOAD_SUMMARY_LIMIT = 500
def _offload_result(self, content: str, ref_key: str) -> dict[str, Any]:
"""Create an offloaded result: summary in memory, full content in workspace."""
summary = (
content[: self._OFFLOAD_SUMMARY_LIMIT] + "..."
if len(content) > self._OFFLOAD_SUMMARY_LIMIT
else content
)
return {
"content": summary,
"_ref_key": ref_key,
"_offloaded": True,
}
async def _read_dependency_output(self, dep_phase: PlanPhase) -> str:
"""Read a dependency phase's output, resolving offloaded content from workspace."""
if not dep_phase.result:
return ""
content = dep_phase.result.get("content", str(dep_phase.result))
# U4: If offloaded, read full content from workspace
if dep_phase.result.get("_offloaded"):
ref_key = dep_phase.result.get("_ref_key", "")
if ref_key:
try:
full_data = await self._team.workspace.read(ref_key)
if full_data:
return full_data.get("value", content)
except Exception as e:
logger.warning(f"Failed to read offloaded output '{ref_key}': {e}")
return content
async def _execute_phase(self, phase: PlanPhase, plan: TeamPlan) -> dict[str, Any]: async def _execute_phase(self, phase: PlanPhase, plan: TeamPlan) -> dict[str, Any]:
"""Execute a single phase, dispatching by phase_type. """Execute a single phase, dispatching by phase_type.
@ -504,8 +538,9 @@ class TeamOrchestrator:
for dep_id in phase.depends_on: for dep_id in phase.depends_on:
dep_phase = plan.get_phase(dep_id) dep_phase = plan.get_phase(dep_id)
if dep_phase and dep_phase.status == PhaseStatus.COMPLETED and dep_phase.result: if dep_phase and dep_phase.status == PhaseStatus.COMPLETED and dep_phase.result:
dependency_outputs[dep_phase.name] = dep_phase.result.get( # U4: Resolve offloaded content from workspace if needed
"content", str(dep_phase.result) dependency_outputs[dep_phase.name] = await self._read_dependency_output(
dep_phase
) )
# 按协作契约读取相关专家的输出(可见性 — 打破上下文隔离,但限定在契约范围内) # 按协作契约读取相关专家的输出(可见性 — 打破上下文隔离,但限定在契约范围内)
@ -519,8 +554,10 @@ class TeamOrchestrator:
and prev_phase.status == PhaseStatus.COMPLETED and prev_phase.status == PhaseStatus.COMPLETED
and prev_phase.result and prev_phase.result
): ):
content = prev_phase.result.get("content", str(prev_phase.result)) # U4: Resolve offloaded content from workspace
collaboration_outputs[contract.from_expert] = content collaboration_outputs[contract.from_expert] = (
await self._read_dependency_output(prev_phase)
)
break break
# Emit expert_step event # Emit expert_step event
@ -635,14 +672,17 @@ class TeamOrchestrator:
if passed: if passed:
# 验收通过 — 写入 SharedWorkspace + 通知协作方 + 标记完成 # 验收通过 — 写入 SharedWorkspace + 通知协作方 + 标记完成
phase.status = PhaseStatus.COMPLETED phase.status = PhaseStatus.COMPLETED
phase.result = result
# P2: SharedWorkspace 写入移到验收通过后 — 避免持久化被拒输出 # P2: SharedWorkspace 写入移到验收通过后 — 避免持久化被拒输出
output_key = f"{plan.id}/phase/{phase.id}/output" output_key = f"{plan.id}/phase/{phase.id}/output"
full_content = result.get("content", str(result))
await self._team.workspace.write( await self._team.workspace.write(
output_key, output_key,
result.get("content", str(result)), full_content,
expert.config.name, expert.config.name,
) )
# U4: State offloading — keep only summary in memory,
# full content lives in workspace (Redis or local dict).
phase.result = self._offload_result(full_content, output_key)
await self._broadcast_event( await self._broadcast_event(
"review_result", "review_result",
{ {

View File

@ -17,6 +17,7 @@ import enum
import logging import logging
import time import time
import uuid import uuid
from typing import Any
from .config import ExpertConfig from .config import ExpertConfig
from .expert import Expert from .expert import Expert
@ -62,9 +63,12 @@ class ExpertTeam:
workspace: SharedWorkspace | None = None, workspace: SharedWorkspace | None = None,
pool: AgentPool | None = None, pool: AgentPool | None = None,
template_registry: ExpertTemplateRegistry | None = None, template_registry: ExpertTemplateRegistry | None = None,
redis_client: Any = None,
): ):
self.team_id = team_id or str(uuid.uuid4()) self.team_id = team_id or str(uuid.uuid4())
self._workspace = workspace or SharedWorkspace() # U4: Accept redis_client for SharedWorkspace state offloading.
# If workspace is explicitly provided, redis_client is ignored.
self._workspace = workspace or SharedWorkspace(redis_client=redis_client)
self._pool = pool self._pool = pool
self._template_registry = template_registry or ExpertTemplateRegistry() self._template_registry = template_registry or ExpertTemplateRegistry()
self._handoff_transport = InProcessHandoffTransport() self._handoff_transport = InProcessHandoffTransport()

View File

@ -407,6 +407,7 @@ async def _execute_team_collab(
team = ExpertTeam( team = ExpertTeam(
pool=app_state.agent_pool, pool=app_state.agent_pool,
template_registry=template_registry, template_registry=template_registry,
redis_client=getattr(app_state, "working_redis_client", None),
) )
# Register handoff_transport handler to relay team events to WebSocket # Register handoff_transport handler to relay team events to WebSocket

View File

@ -1183,3 +1183,155 @@ class TestConcurrencyLimit:
# 4 experts, semaphore limit=3 → max 3 concurrent # 4 experts, semaphore limit=3 → max 3 concurrent
assert tracker.max_seen <= 3 assert tracker.max_seen <= 3
assert tracker.max_seen >= 2 # At least some parallelism assert tracker.max_seen >= 2 # At least some parallelism
# ── U4: SharedWorkspace Redis 化 + 状态卸载测试 ─────────────
class TestSharedWorkspaceRedis:
"""U4: SharedWorkspace 状态卸载 — 阶段输出写入 workspace内存只保留摘要"""
def test_offload_result_creates_summary_with_ref(self):
"""_offload_result 返回摘要 + _ref_key + _offloaded=True"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
long_content = "x" * 1000
result = orchestrator._offload_result(long_content, "plan/phase/p1/output")
assert result["_offloaded"] is True
assert result["_ref_key"] == "plan/phase/p1/output"
assert len(result["content"]) < len(long_content)
assert result["content"].endswith("...")
def test_offload_result_short_content_no_truncation(self):
"""短内容不截断,但仍标记为 offloaded"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
short_content = "short result"
result = orchestrator._offload_result(short_content, "ref_key")
assert result["_offloaded"] is True
assert result["content"] == short_content
@pytest.mark.asyncio
async def test_read_dependency_output_resolves_offloaded(self):
"""Happy path: offloaded result → 从 workspace 读取完整内容"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
# Write full content to workspace
full_content = "x" * 1000
ref_key = "plan/phase/p1/output"
await team.workspace.write(ref_key, full_content, "lead")
# Create a phase with offloaded result
dep_phase = PlanPhase(
id="p1",
name="Phase1",
assigned_expert="lead",
task_description="test",
depends_on=[],
)
dep_phase.status = PhaseStatus.COMPLETED
dep_phase.result = orchestrator._offload_result(full_content, ref_key)
# Read should return full content from workspace
content = await orchestrator._read_dependency_output(dep_phase)
assert content == full_content
@pytest.mark.asyncio
async def test_read_dependency_output_falls_back_on_workspace_failure(self):
"""Edge case: workspace 读取失败 → 降级到内存摘要"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
dep_phase = PlanPhase(
id="p1",
name="Phase1",
assigned_expert="lead",
task_description="test",
depends_on=[],
)
dep_phase.status = PhaseStatus.COMPLETED
dep_phase.result = {
"content": "summary content",
"_ref_key": "nonexistent_key",
"_offloaded": True,
}
# Workspace read returns None (key doesn't exist) → fallback to summary
content = await orchestrator._read_dependency_output(dep_phase)
assert content == "summary content"
@pytest.mark.asyncio
async def test_read_dependency_output_non_offloaded_returns_directly(self):
"""Non-offloaded result → 直接返回 content"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
dep_phase = PlanPhase(
id="p1",
name="Phase1",
assigned_expert="lead",
task_description="test",
depends_on=[],
)
dep_phase.status = PhaseStatus.COMPLETED
dep_phase.result = {"content": "direct content"}
content = await orchestrator._read_dependency_output(dep_phase)
assert content == "direct content"
@pytest.mark.asyncio
async def test_team_accepts_redis_client(self):
"""ExpertTeam 创建时接收 redis_client 参数"""
mock_redis = AsyncMock()
team = ExpertTeam(redis_client=mock_redis)
assert team._workspace._redis is mock_redis
@pytest.mark.asyncio
async def test_team_without_redis_uses_memory_dict(self):
"""ExpertTeam 无 redis_client → 使用内存 dict"""
team = ExpertTeam()
assert team._workspace._redis is None
assert team._workspace._local_store == {}
@pytest.mark.asyncio
async def test_pipeline_offloads_large_results(self):
"""Integration: 流水线执行后,大输出被卸载到 workspace"""
team = _make_team_with_experts()
orchestrator = TeamOrchestrator(team)
gateway = _make_mock_llm_gateway(phases=[
{"name": "A", "assigned_expert": "lead", "task_description": "A", "depends_on": []},
{"name": "B", "assigned_expert": "member1", "task_description": "B", "depends_on": ["A"]},
])
team._experts["lead"].agent._llm_gateway = gateway
# Mock _execute_phase to return large content + verify offloading
large_content = "x" * 1000
async def mock_execute_phase(phase, plan):
phase.status = PhaseStatus.COMPLETED
# Simulate the offloading that happens in _execute_execution_phase
output_key = f"{plan.id}/phase/{phase.id}/output"
await team.workspace.write(output_key, large_content, phase.assigned_expert)
phase.result = orchestrator._offload_result(large_content, output_key)
return phase.result
orchestrator._execute_phase = mock_execute_phase
orchestrator._check_divergence_and_insert_debates = AsyncMock(return_value=None)
result = await orchestrator.execute("test")
assert result["status"] == "completed"
# Verify phases have offloaded results
plan = result["plan"]
for ph in plan.phases:
if ph.status == PhaseStatus.COMPLETED:
assert ph.result.get("_offloaded") is True
assert len(ph.result["content"]) < len(large_content)