fix(experts): resolve residual review findings from PR #13
Test / backend-test (pull_request) Has been cancelled Details
Test / frontend-unit (pull_request) Has been cancelled Details
Test / api-e2e (pull_request) Has been cancelled Details
Test / frontend-e2e (pull_request) Has been cancelled Details

Addresses 4 actionable findings (1 P1 + 3 P2) from ce-code-review of
feat/ui-ue-enhancement (PR #13), now merged to main (8066e0b).

P1 — expert_step payload alignment (_phase_executor.py)
  The thinking/tool_call/tool_result event payloads were missing the
  fields the frontend WsServerMessage contract requires
  (expert_name/expert_color/content/step). Frontend code consuming these
  events silently degraded. Now all expert_step broadcasts carry the
  full contract; tool_call/tool_result keep step_data for the raw payload.

P2 #1 — execute_stream CancellationToken registration (config_driven.py)
  execute_stream() bypassed BaseAgent.execute() and never registered a
  CancellationToken, so cancel_task() could not cooperatively cancel a
  streaming task. Now registers the token and cleans it up in finally.

P2 #2 — team_synthesis orphan milestone cleanup (orchestrator.py)
  If synthesis streaming was interrupted (cancel/exception), no terminal
  team_synthesis event was emitted, leaving the frontend streaming
  milestone spinning forever. Now an inner try/except emits a terminal
  team_synthesis with status=cancelled|error before re-raising, so the
  frontend can finalize the milestone. The success path also carries
  the synthesis_id.

P2 #3 — synthesis_id dedup (orchestrator.py + types.ts + chatStream.ts)
  Without an identifier, the frontend could not precisely match a
  team_synthesis terminal event to its streaming milestone (especially
  across retries/concurrent teams). The backend now injects a stable
  synthesis_id (`{plan.id}:synthesis`) into both team_synthesis_chunk
  and team_synthesis events; the frontend uses it for exact milestone
  matching and treats error/cancelled status as terminal.

Test updates
  - Updated test_thinking_events_forwarded_as_expert_step to assert the
    new payload contract (expert_id/name/color/content/step).
  - Added test_tool_call_events_forwarded_as_expert_step covering
    tool_call/tool_result payload shape (content=tool_name摘要 +
    step_data=原始 payload).

Verification
  - ruff check: clean
  - pytest tests/unit/experts/test_phase_executor_streaming.py: 14/14
  - npm run typecheck: clean
  - vitest: 126/127 (1 unrelated baseline failure in tauri-auth.test.ts)

Residuals doc: docs/residual-review-findings/feat-ui-ue-enhancement.md
This commit is contained in:
chiguyong 2026-07-01 13:26:19 +08:00
parent 8066e0bf8b
commit 47a437c5e3
6 changed files with 147 additions and 19 deletions

View File

@ -17,7 +17,7 @@ import yaml
from agentkit.core.base import BaseAgent
from agentkit.core.exceptions import ConfigValidationError
from agentkit.core.protocol import AgentCapability, TaskMessage
from agentkit.core.protocol import AgentCapability, CancellationToken, TaskMessage
from agentkit.core.react import ReActEvent
from agentkit.evolution.lifecycle import EvolutionMixin
from agentkit.evolution.reflector import Reflector
@ -688,10 +688,19 @@ class ConfigDrivenAgent(BaseAgent, EvolutionMixin):
镜像 execute() handle_task() 分派但不包装 TaskResult
直接 yield 事件由调用方负责转发和累积
P2 fix: 注册 CancellationToken _active_tokens使 cancel_task()
协作式取消流式任务原实现绕过 BaseAgent.execute()未注册 token
"""
token = CancellationToken()
self._active_tokens[task.task_id] = token
try:
await self._register_mcp_tools()
async for event in self.handle_task_stream(task):
yield event
finally:
# async generator 的 finally 在 generator 关闭时执行GC/aclose/正常结束)
self._active_tokens.pop(task.task_id, None)
async def handle_task_stream(self, task: TaskMessage) -> AsyncGenerator[ReActEvent, None]:
"""根据 execution_mode / task_mode 流式分派,镜像 handle_task()。"""

View File

@ -239,9 +239,14 @@ class PhaseExecutorMixin:
"expert_id": expert.config.name, "content": chunk,
})
elif etype == "thinking":
# P1 fix: payload 对齐前端 WsServerMessage 契约
# (expert_id/expert_name/expert_color/content/step)
await self._broadcast_event("expert_step", {
"expert_id": expert.config.name,
"thinking": event.data.get("content", ""),
"expert_name": expert.config.name,
"expert_color": expert.config.color,
"content": event.data.get("content", ""),
"step": "thinking",
})
elif etype == "final_answer":
# P0 fix: ReActEngine 先发 token增量再发 final_answer全文
@ -251,8 +256,18 @@ class PhaseExecutorMixin:
if output and not accumulated:
accumulated.append(str(output))
elif etype in ("tool_call", "tool_result"):
# P1 fix: payload 对齐前端契约 — content 携带可读摘要step_data 保留原始数据
await self._broadcast_event("expert_step", {
"expert_id": expert.config.name, "step_data": event.data,
"expert_id": expert.config.name,
"expert_name": expert.config.name,
"expert_color": expert.config.color,
"content": (
event.data.get("tool_name")
or event.data.get("name")
or etype
),
"step": etype,
"step_data": event.data,
})
elif etype == "error":
raise RuntimeError(

View File

@ -287,22 +287,51 @@ class TeamOrchestrator(
plan.status = PlanStatus.COMPLETED
# U3: 流式综合 — 每个 chunk 广播 team_synthesis_chunk
# P2 fix: 携带 synthesis_id 让前端去重 streaming milestone避免附身到上一次孤儿
synthesis_id = f"{plan.id}:synthesis"
async def _broadcast_synthesis_chunk(data: dict[str, object]) -> None:
# data 可能是 {"chunk": "..."} 或 {"value": "..."}synthesizer 决定)
# 统一注入 synthesis_id不破坏原 data 结构
if isinstance(data, dict):
data = {**data, "synthesis_id": synthesis_id}
await self._broadcast_event("team_synthesis_chunk", data)
# P2 fix: 流式综合期间若异常/取消,必须广播 team_synthesis 终结事件,
# 否则前端 streaming milestone 永久转圈(孤儿占位无终结信号)。
# 内层 try/except 只清理孤儿 milestone 后 re-raise让外层 except 处理 fallback。
try:
final_result = await self._synthesize_results(
lead, task, completed, broadcast_callback=_broadcast_synthesis_chunk
)
except asyncio.CancelledError:
await self._broadcast_event(
"team_synthesis",
{"content": "", "phases_completed": len(completed),
"phases_total": len(plan.phases), "status": "cancelled",
"synthesis_id": synthesis_id},
)
raise
except Exception as synth_err:
logger.error(f"Synthesis streaming failed: {synth_err}")
await self._broadcast_event(
"team_synthesis",
{"content": "", "phases_completed": len(completed),
"phases_total": len(plan.phases), "status": "error",
"error": str(synth_err), "synthesis_id": synthesis_id},
)
raise # 让外层 except 决定是否 fallback
self._team.set_status(TeamStatus.COMPLETED)
# 7. Emit team_synthesis event
# 7. Emit team_synthesis event (success path only —
# 失败路径已在上面广播 error 终结事件)
await self._broadcast_event(
"team_synthesis",
{
"content": final_result.get("content", ""),
"phases_completed": len(completed),
"phases_total": len(plan.phases),
"synthesis_id": synthesis_id,
},
)

View File

@ -85,6 +85,8 @@ export interface IChatMessage {
review_result?: IReviewResult
/** U5: PM collaboration — risk flag for RiskFlagCard */
risk_flag?: IRiskFlag
/** U4: synthesis identifier for streaming milestone dedup (team_synthesis_chunk/team_synthesis) */
synthesis_id?: string
}
/** Conversation with messages */
@ -151,8 +153,8 @@ export type WsServerMessage =
// PLAN_EXEC (U4) — phase lifecycle events emitted by ReActEngine.
| { type: 'phase_changed'; data: { phase: string; previous: string } }
| { type: 'phase_violation'; data: { current_phase: string; tool: string; message: string; violation_kind: string; command_preview?: string } }
| { type: 'team_synthesis_chunk'; data: { chunk: string } }
| { type: 'team_synthesis'; data: { content: string; phases_completed?: number; phases_total?: number } }
| { type: 'team_synthesis_chunk'; data: { chunk: string; synthesis_id?: string } }
| { type: 'team_synthesis'; data: { content: string; phases_completed?: number; phases_total?: number; synthesis_id?: string; status?: 'completed' | 'error' | 'cancelled'; error?: string } }
| { type: 'team_dissolved'; data: { team_id: string } }
// Board Meeting 模式事件
| { type: 'board_started'; data: IBoardStartedData }

View File

@ -868,19 +868,26 @@ export function dispatchWsEvent(
case "team_synthesis_chunk": {
// U4: 团队综合流式 chunk — 首次创建 streaming 占位,后续累加。
// P2 fix: 用 synthesis_id 去重,避免 chunk 附身到上一次孤儿 streaming milestone。
const conversationId = state.resolveIncomingConvId();
if (!conversationId) break;
const conv = state.conversations.value.find(
(c) => c.id === conversationId,
);
if (!conv) break;
const sid = event.data.synthesis_id;
const existing = findLastMessage(
conv.messages,
(m) => m.message_type === "milestone" && m.status === "streaming",
(m) =>
m.message_type === "milestone" &&
m.status === "streaming" &&
(sid ? m.synthesis_id === sid || m.synthesis_id === undefined : true),
);
if (existing) {
state.updateMessage(conversationId, existing.id, {
content: (existing.content || "") + (event.data.chunk || ""),
// 首次附身到无 synthesis_id 的 milestone 时,标记上 synthesis_id
...(sid && !existing.synthesis_id ? { synthesis_id: sid } : {}),
});
} else {
const synthMsg: IChatMessage = {
@ -890,6 +897,7 @@ export function dispatchWsEvent(
timestamp: new Date().toISOString(),
status: "streaming",
message_type: "milestone",
...(sid ? { synthesis_id: sid } : {}),
};
state.appendMessage(conversationId, synthMsg);
}
@ -899,20 +907,31 @@ export function dispatchWsEvent(
case "team_synthesis": {
// U4: 最终综合结果 — 覆盖累积 content标记 completed。
// 若存在 streaming 占位则 update否则 append向后兼容
// P2 fix: 用 synthesis_id 精确匹配对应 streaming milestone跨重试场景
// P2 fix: 处理 status=error/cancelled 终结事件(后端 synthesis 失败时清理孤儿)。
const conversationId = state.resolveIncomingConvId();
if (!conversationId) break;
const conv = state.conversations.value.find(
(c) => c.id === conversationId,
);
if (!conv) break;
const sid = event.data.synthesis_id;
const finalStatus: 'completed' | 'error' =
event.data.status === 'error' || event.data.status === 'cancelled'
? 'error'
: 'completed';
const existing = findLastMessage(
conv.messages,
(m) => m.message_type === "milestone" && m.status === "streaming",
(m) =>
m.message_type === "milestone" &&
m.status === "streaming" &&
(sid ? m.synthesis_id === sid || m.synthesis_id === undefined : true),
);
if (existing) {
state.updateMessage(conversationId, existing.id, {
content: event.data.content || "",
status: "completed",
status: finalStatus,
...(event.data.error ? { error_detail: event.data.error } : {}),
});
} else {
const synthesisMsg: IChatMessage = {
@ -920,8 +939,10 @@ export function dispatchWsEvent(
role: "assistant",
content: event.data.content || "",
timestamp: new Date().toISOString(),
status: "completed",
status: finalStatus,
message_type: "milestone",
...(sid ? { synthesis_id: sid } : {}),
...(event.data.error ? { error_detail: event.data.error } : {}),
};
state.appendMessage(conversationId, synthesisMsg);
}

View File

@ -193,7 +193,11 @@ class TestStreamEventForwarding:
@pytest.mark.asyncio
async def test_thinking_events_forwarded_as_expert_step(self):
"""execute_stream 产出 thinking 事件时_broadcast_event 转发 expert_step。"""
"""execute_stream 产出 thinking 事件时_broadcast_event 转发 expert_step。
P1 fix: payload 对齐前端 WsServerMessage 契约
携带 expert_id/expert_name/expert_color/content/step 字段
"""
events = [
ReActEvent(event_type="thinking", step=0, data={"content": "思考中..."}),
ReActEvent(event_type="token", step=0, data={"content": "结果"}),
@ -210,10 +214,58 @@ class TestStreamEventForwarding:
step_calls = [
c
for c in orch._broadcast_event.call_args_list
if c.args[0] == "expert_step" and "thinking" in c.args[1]
if c.args[0] == "expert_step" and c.args[1].get("step") == "thinking"
]
assert len(step_calls) == 1
assert step_calls[0].args[1]["thinking"] == "思考中..."
payload = step_calls[0].args[1]
# P1 fix: payload 必须包含前端契约字段
assert payload["expert_id"] == expert.config.name
assert payload["expert_name"] == expert.config.name
assert payload["expert_color"] == expert.config.color
assert payload["content"] == "思考中..."
assert payload["step"] == "thinking"
@pytest.mark.asyncio
async def test_tool_call_events_forwarded_as_expert_step(self):
"""execute_stream 产出 tool_call/tool_result 事件时_broadcast_event 转发 expert_step。
P1 fix: payload 对齐前端契约 content 携带 tool_name 摘要step_data 保留原始数据
"""
events = [
ReActEvent(event_type="tool_call", step=0,
data={"tool_name": "search", "args": {"q": "test"}}),
ReActEvent(event_type="tool_result", step=0,
data={"tool_name": "search", "result": "hit"}),
]
agent = _make_stream_agent(events)
expert = _make_mock_expert()
lead = _make_mock_expert(name="lead")
phase = _make_phase()
plan = _make_simple_plan()
orch = _make_orchestrator_for_streaming()
await orch._run_agent_steps(expert, agent, lead, phase, plan)
# 仅断言 tool_call/tool_result 事件phase intro step 单独由其他测试覆盖)
step_calls = [
c
for c in orch._broadcast_event.call_args_list
if c.args[0] == "expert_step"
and c.args[1].get("step") in ("tool_call", "tool_result")
]
assert len(step_calls) == 2
# tool_call event
p1 = step_calls[0].args[1]
assert p1["step"] == "tool_call"
assert p1["content"] == "search"
assert p1["step_data"]["args"] == {"q": "test"}
assert p1["expert_name"] == expert.config.name
assert p1["expert_color"] == expert.config.color
# tool_result event
p2 = step_calls[1].args[1]
assert p2["step"] == "tool_result"
assert p2["content"] == "search"
assert p2["step_data"]["result"] == "hit"
# ── expert_result 终结事件测试 ────────────────────────────