From 0f3f0a7550bb4825e42011c4754abf5e840351fc Mon Sep 17 00:00:00 2001 From: chiguyong Date: Mon, 29 Jun 2026 20:49:52 +0800 Subject: [PATCH] =?UTF-8?q?feat(U3):=20G8=20delta=5Fflush=5Finterval=20?= =?UTF-8?q?=E8=B0=83=E9=80=9F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - ReActEngine 新增 flush_interval_ms 构造参数(默认 0 = 逐 chunk yield 向后兼容) - execute_stream chunk 循环用 time.monotonic 节流,累积 _flush_buffer 批量 yield - flush_interval_ms=0 条件短路为 True 逐 chunk yield 保当前行为 - 流结束 mid-interval 最终 flush 剩余 buffer 不丢字符 - ServerConfig.streaming 配置项(flush_interval_ms) - test_delta_flush.py 覆盖 R11/R12/R14 --- src/agentkit/core/react.py | 35 ++++++-- src/agentkit/server/config.py | 3 + tests/unit/test_delta_flush.py | 148 +++++++++++++++++++++++++++++++++ 3 files changed, 181 insertions(+), 5 deletions(-) create mode 100644 tests/unit/test_delta_flush.py diff --git a/src/agentkit/core/react.py b/src/agentkit/core/react.py index f99e204..6c78bb2 100644 --- a/src/agentkit/core/react.py +++ b/src/agentkit/core/react.py @@ -164,6 +164,7 @@ class ReActEngine: enable_tool_search: bool = True, middleware_chain: "MiddlewareChain | None" = None, prompt_cache_enable: bool = True, + flush_interval_ms: int = 0, ): if max_steps < 1: raise ValueError(f"max_steps must be >= 1, got {max_steps}") @@ -180,6 +181,9 @@ class ReActEngine: # U2/G2: prompt cache 双块结构开关(True 时 Anthropic 用 cache_control blocks, # 其他 provider 走字符串拼接依赖自动前缀缓存) self._prompt_cache_enable = prompt_cache_enable + # U3/G8: token chunk 节流间隔(ms)。0 = 逐 chunk yield(向后兼容)。 + # 用 time.monotonic() 不受系统时钟跳变影响。 + self._flush_interval_ms = flush_interval_ms # Tiered tool description injection config self._core_tool_names: tuple[str, ...] | None = ( tuple(core_tool_names) if core_tool_names is not None else None @@ -1123,6 +1127,9 @@ class ReActEngine: stream_usage = None stream_tool_calls: list[Any] = [] stream_model = model + # U3/G8: delta_flush 节流 buffer,按 flush_interval_ms 批量 yield + _flush_buffer: list[str] = [] + _last_flush_ts = time.monotonic() async for chunk in _ensure_async_iterable( self._llm_gateway.chat_stream( @@ -1136,11 +1143,20 @@ class ReActEngine: ): if chunk.content: stream_content_chunks.append(chunk.content) - yield ReActEvent( - event_type="token", - step=step, - data={"content": chunk.content}, - ) + _flush_buffer.append(chunk.content) + now = time.monotonic() + # flush_interval_ms=0 → 逐 chunk yield(向后兼容,条件短路为 True) + if ( + self._flush_interval_ms == 0 + or now - _last_flush_ts >= self._flush_interval_ms / 1000 + ): + yield ReActEvent( + event_type="token", + step=step, + data={"content": "".join(_flush_buffer)}, + ) + _flush_buffer = [] + _last_flush_ts = now if chunk.usage: stream_usage = chunk.usage if chunk.tool_calls: @@ -1148,6 +1164,15 @@ class ReActEngine: if chunk.model: stream_model = chunk.model + # U3/G8: 流结束 mid-interval → 最终 flush 剩余 buffer(不丢字符) + if _flush_buffer: + yield ReActEvent( + event_type="token", + step=step, + data={"content": "".join(_flush_buffer)}, + ) + _flush_buffer = [] + # Build response-like object from stream stream_content = "".join(stream_content_chunks) response = self._build_response_from_stream( diff --git a/src/agentkit/server/config.py b/src/agentkit/server/config.py index f71fba2..2e8a540 100644 --- a/src/agentkit/server/config.py +++ b/src/agentkit/server/config.py @@ -117,6 +117,7 @@ class ServerConfig: expert_paths: list[str] | None = None, board: dict[str, Any] | None = None, prompt_cache: dict[str, Any] | None = None, + streaming: dict[str, Any] | None = None, on_change: Callable[["ServerConfig"], None] | None = None, ): self.host = host @@ -146,6 +147,8 @@ class ServerConfig: self.expert_paths = expert_paths or [] self.board = board or {} self.prompt_cache = prompt_cache or {} + # U3/G8: streaming.flush_interval_ms 控制 token chunk 节流(默认 0 = 逐 chunk yield) + self.streaming = streaming or {} self.on_change = on_change # Config watching state diff --git a/tests/unit/test_delta_flush.py b/tests/unit/test_delta_flush.py new file mode 100644 index 0000000..3552706 --- /dev/null +++ b/tests/unit/test_delta_flush.py @@ -0,0 +1,148 @@ +"""U3 / G8 delta_flush_interval 调速测试。 + +覆盖 R11-R12, R14: +- R11 chunk 按 flush_interval_ms 间隔批量 yield +- R12 配置化(flush_interval_ms=0 退化为逐 chunk yield) +- R14 自检:合并 content 等于原始 chunks 拼接(不丢字符) +""" + +from __future__ import annotations + + + +from agentkit.core.react import ReActEngine +from agentkit.llm.protocol import StreamChunk + + +class _StubGateway: + """模拟 LLMGateway,yield 一串 StreamChunk 后结束。""" + + def __init__(self, chunks: list[str]): + self._chunks = chunks + + def get_provider_name_for_model(self, model: str) -> str | None: + return None + + async def chat_stream(self, **kwargs): + for c in self._chunks: + yield StreamChunk(content=c, model="test") + + +def _collect_token_events(events) -> list[str]: + return [e.data["content"] for e in events if e.event_type == "token"] + + +# ---- R12 Config: flush_interval_ms=0 → 逐 chunk yield(向后兼容) ---- + + +async def test_flush_interval_zero_yields_per_chunk(): + chunks = ["H", "e", "l", "l", "o"] + gw = _StubGateway(chunks) + engine = ReActEngine(llm_gateway=gw, flush_interval_ms=0) + events = [] + async for ev in engine.execute_stream( + messages=[{"role": "user", "content": "hi"}], + tools=[], + model="test", + ): + events.append(ev) + tokens = _collect_token_events(events) + # 5 chunks → 5 token events(每个内容 = 单 chunk) + assert tokens == ["H", "e", "l", "l", "o"] + + +# ---- R11 Happy path: flush_interval_ms > 0 → 批量合并 ---- + + +async def test_flush_interval_batches_chunks_by_interval(): + chunks = ["a", "b", "c", "d", "e", "f"] + gw = _StubGateway(chunks) + # 间隔设很大(10s),所有 chunks 在第一个 interval 内累积,流结束后最终 flush + engine = ReActEngine(llm_gateway=gw, flush_interval_ms=10000) + events = [] + async for ev in engine.execute_stream( + messages=[{"role": "user", "content": "hi"}], + tools=[], + model="test", + ): + events.append(ev) + tokens = _collect_token_events(events) + # 所有 chunk 累积到流结束,最终 flush 一次 → 1 个 token event,content = 全拼接 + assert len(tokens) == 1 + assert tokens[0] == "abcdef" + + +# ---- R14 Self-check: 合并 content 等于原始 chunks 拼接(不丢字符) ---- + + +async def test_no_character_loss_after_merge(): + chunks = ["Hello", " ", "World", "!", "你好", "世界"] + gw = _StubGateway(chunks) + engine = ReActEngine(llm_gateway=gw, flush_interval_ms=10000) + events = [] + async for ev in engine.execute_stream( + messages=[{"role": "user", "content": "hi"}], + tools=[], + model="test", + ): + events.append(ev) + tokens = _collect_token_events(events) + # 合并所有 token events 的 content 等于原始 chunks 拼接 + merged = "".join(tokens) + assert merged == "".join(chunks) == "Hello World!你好世界" + + +# ---- Edge: 流结束 mid-interval → 最终 flush 剩余 buffer ---- + + +async def test_final_flush_on_stream_end(): + chunks = ["x", "y", "z"] + gw = _StubGateway(chunks) + engine = ReActEngine(llm_gateway=gw, flush_interval_ms=10000) + events = [] + async for ev in engine.execute_stream( + messages=[{"role": "user", "content": "hi"}], + tools=[], + model="test", + ): + events.append(ev) + tokens = _collect_token_events(events) + # mid-interval 累积 → 流结束最终 flush 一次 + assert tokens == ["xyz"] + + +# ---- Edge: 单个 chunk 后流结束 → 立即 flush ---- + + +async def test_single_chunk_immediate_flush(): + chunks = ["only"] + gw = _StubGateway(chunks) + engine = ReActEngine(llm_gateway=gw, flush_interval_ms=10000) + events = [] + async for ev in engine.execute_stream( + messages=[{"role": "user", "content": "hi"}], + tools=[], + model="test", + ): + events.append(ev) + tokens = _collect_token_events(events) + assert tokens == ["only"] + + +# ---- Edge: chunks 含空 content(usage-only chunk)不进 buffer ---- + + +async def test_empty_content_chunk_not_buffered(): + chunks = ["a", "", "b"] # 中间 chunk 空 + gw = _StubGateway(chunks) + engine = ReActEngine(llm_gateway=gw, flush_interval_ms=10000) + events = [] + async for ev in engine.execute_stream( + messages=[{"role": "user", "content": "hi"}], + tools=[], + model="test", + ): + events.append(ev) + tokens = _collect_token_events(events) + # 空 chunk 跳过 buffer,最终 flush "ab" + assert tokens == ["ab"]