feat(U3): G8 delta_flush_interval 调速

- ReActEngine 新增 flush_interval_ms 构造参数(默认 0 = 逐 chunk yield 向后兼容)
- execute_stream chunk 循环用 time.monotonic 节流,累积 _flush_buffer 批量 yield
- flush_interval_ms=0 条件短路为 True 逐 chunk yield 保当前行为
- 流结束 mid-interval 最终 flush 剩余 buffer 不丢字符
- ServerConfig.streaming 配置项(flush_interval_ms)
- test_delta_flush.py 覆盖 R11/R12/R14
This commit is contained in:
chiguyong 2026-06-29 20:49:52 +08:00
parent c4aaef05aa
commit 0f3f0a7550
3 changed files with 181 additions and 5 deletions

View File

@ -164,6 +164,7 @@ class ReActEngine:
enable_tool_search: bool = True, enable_tool_search: bool = True,
middleware_chain: "MiddlewareChain | None" = None, middleware_chain: "MiddlewareChain | None" = None,
prompt_cache_enable: bool = True, prompt_cache_enable: bool = True,
flush_interval_ms: int = 0,
): ):
if max_steps < 1: if max_steps < 1:
raise ValueError(f"max_steps must be >= 1, got {max_steps}") raise ValueError(f"max_steps must be >= 1, got {max_steps}")
@ -180,6 +181,9 @@ class ReActEngine:
# U2/G2: prompt cache 双块结构开关(True 时 Anthropic 用 cache_control blocks, # U2/G2: prompt cache 双块结构开关(True 时 Anthropic 用 cache_control blocks,
# 其他 provider 走字符串拼接依赖自动前缀缓存) # 其他 provider 走字符串拼接依赖自动前缀缓存)
self._prompt_cache_enable = prompt_cache_enable self._prompt_cache_enable = prompt_cache_enable
# U3/G8: token chunk 节流间隔(ms)。0 = 逐 chunk yield(向后兼容)。
# 用 time.monotonic() 不受系统时钟跳变影响。
self._flush_interval_ms = flush_interval_ms
# Tiered tool description injection config # Tiered tool description injection config
self._core_tool_names: tuple[str, ...] | None = ( self._core_tool_names: tuple[str, ...] | None = (
tuple(core_tool_names) if core_tool_names is not None else None tuple(core_tool_names) if core_tool_names is not None else None
@ -1123,6 +1127,9 @@ class ReActEngine:
stream_usage = None stream_usage = None
stream_tool_calls: list[Any] = [] stream_tool_calls: list[Any] = []
stream_model = model stream_model = model
# U3/G8: delta_flush 节流 buffer,按 flush_interval_ms 批量 yield
_flush_buffer: list[str] = []
_last_flush_ts = time.monotonic()
async for chunk in _ensure_async_iterable( async for chunk in _ensure_async_iterable(
self._llm_gateway.chat_stream( self._llm_gateway.chat_stream(
@ -1136,11 +1143,20 @@ class ReActEngine:
): ):
if chunk.content: if chunk.content:
stream_content_chunks.append(chunk.content) stream_content_chunks.append(chunk.content)
yield ReActEvent( _flush_buffer.append(chunk.content)
event_type="token", now = time.monotonic()
step=step, # flush_interval_ms=0 → 逐 chunk yield(向后兼容,条件短路为 True)
data={"content": chunk.content}, if (
) self._flush_interval_ms == 0
or now - _last_flush_ts >= self._flush_interval_ms / 1000
):
yield ReActEvent(
event_type="token",
step=step,
data={"content": "".join(_flush_buffer)},
)
_flush_buffer = []
_last_flush_ts = now
if chunk.usage: if chunk.usage:
stream_usage = chunk.usage stream_usage = chunk.usage
if chunk.tool_calls: if chunk.tool_calls:
@ -1148,6 +1164,15 @@ class ReActEngine:
if chunk.model: if chunk.model:
stream_model = chunk.model stream_model = chunk.model
# U3/G8: 流结束 mid-interval → 最终 flush 剩余 buffer(不丢字符)
if _flush_buffer:
yield ReActEvent(
event_type="token",
step=step,
data={"content": "".join(_flush_buffer)},
)
_flush_buffer = []
# Build response-like object from stream # Build response-like object from stream
stream_content = "".join(stream_content_chunks) stream_content = "".join(stream_content_chunks)
response = self._build_response_from_stream( response = self._build_response_from_stream(

View File

@ -117,6 +117,7 @@ class ServerConfig:
expert_paths: list[str] | None = None, expert_paths: list[str] | None = None,
board: dict[str, Any] | None = None, board: dict[str, Any] | None = None,
prompt_cache: dict[str, Any] | None = None, prompt_cache: dict[str, Any] | None = None,
streaming: dict[str, Any] | None = None,
on_change: Callable[["ServerConfig"], None] | None = None, on_change: Callable[["ServerConfig"], None] | None = None,
): ):
self.host = host self.host = host
@ -146,6 +147,8 @@ class ServerConfig:
self.expert_paths = expert_paths or [] self.expert_paths = expert_paths or []
self.board = board or {} self.board = board or {}
self.prompt_cache = prompt_cache or {} self.prompt_cache = prompt_cache or {}
# U3/G8: streaming.flush_interval_ms 控制 token chunk 节流(默认 0 = 逐 chunk yield)
self.streaming = streaming or {}
self.on_change = on_change self.on_change = on_change
# Config watching state # Config watching state

View File

@ -0,0 +1,148 @@
"""U3 / G8 delta_flush_interval 调速测试。
覆盖 R11-R12, R14:
- R11 chunk flush_interval_ms 间隔批量 yield
- R12 配置化(flush_interval_ms=0 退化为逐 chunk yield)
- R14 自检:合并 content 等于原始 chunks 拼接(不丢字符)
"""
from __future__ import annotations
from agentkit.core.react import ReActEngine
from agentkit.llm.protocol import StreamChunk
class _StubGateway:
"""模拟 LLMGateway,yield 一串 StreamChunk 后结束。"""
def __init__(self, chunks: list[str]):
self._chunks = chunks
def get_provider_name_for_model(self, model: str) -> str | None:
return None
async def chat_stream(self, **kwargs):
for c in self._chunks:
yield StreamChunk(content=c, model="test")
def _collect_token_events(events) -> list[str]:
return [e.data["content"] for e in events if e.event_type == "token"]
# ---- R12 Config: flush_interval_ms=0 → 逐 chunk yield(向后兼容) ----
async def test_flush_interval_zero_yields_per_chunk():
chunks = ["H", "e", "l", "l", "o"]
gw = _StubGateway(chunks)
engine = ReActEngine(llm_gateway=gw, flush_interval_ms=0)
events = []
async for ev in engine.execute_stream(
messages=[{"role": "user", "content": "hi"}],
tools=[],
model="test",
):
events.append(ev)
tokens = _collect_token_events(events)
# 5 chunks → 5 token events(每个内容 = 单 chunk)
assert tokens == ["H", "e", "l", "l", "o"]
# ---- R11 Happy path: flush_interval_ms > 0 → 批量合并 ----
async def test_flush_interval_batches_chunks_by_interval():
chunks = ["a", "b", "c", "d", "e", "f"]
gw = _StubGateway(chunks)
# 间隔设很大(10s),所有 chunks 在第一个 interval 内累积,流结束后最终 flush
engine = ReActEngine(llm_gateway=gw, flush_interval_ms=10000)
events = []
async for ev in engine.execute_stream(
messages=[{"role": "user", "content": "hi"}],
tools=[],
model="test",
):
events.append(ev)
tokens = _collect_token_events(events)
# 所有 chunk 累积到流结束,最终 flush 一次 → 1 个 token event,content = 全拼接
assert len(tokens) == 1
assert tokens[0] == "abcdef"
# ---- R14 Self-check: 合并 content 等于原始 chunks 拼接(不丢字符) ----
async def test_no_character_loss_after_merge():
chunks = ["Hello", " ", "World", "!", "你好", "世界"]
gw = _StubGateway(chunks)
engine = ReActEngine(llm_gateway=gw, flush_interval_ms=10000)
events = []
async for ev in engine.execute_stream(
messages=[{"role": "user", "content": "hi"}],
tools=[],
model="test",
):
events.append(ev)
tokens = _collect_token_events(events)
# 合并所有 token events 的 content 等于原始 chunks 拼接
merged = "".join(tokens)
assert merged == "".join(chunks) == "Hello World!你好世界"
# ---- Edge: 流结束 mid-interval → 最终 flush 剩余 buffer ----
async def test_final_flush_on_stream_end():
chunks = ["x", "y", "z"]
gw = _StubGateway(chunks)
engine = ReActEngine(llm_gateway=gw, flush_interval_ms=10000)
events = []
async for ev in engine.execute_stream(
messages=[{"role": "user", "content": "hi"}],
tools=[],
model="test",
):
events.append(ev)
tokens = _collect_token_events(events)
# mid-interval 累积 → 流结束最终 flush 一次
assert tokens == ["xyz"]
# ---- Edge: 单个 chunk 后流结束 → 立即 flush ----
async def test_single_chunk_immediate_flush():
chunks = ["only"]
gw = _StubGateway(chunks)
engine = ReActEngine(llm_gateway=gw, flush_interval_ms=10000)
events = []
async for ev in engine.execute_stream(
messages=[{"role": "user", "content": "hi"}],
tools=[],
model="test",
):
events.append(ev)
tokens = _collect_token_events(events)
assert tokens == ["only"]
# ---- Edge: chunks 含空 content(usage-only chunk)不进 buffer ----
async def test_empty_content_chunk_not_buffered():
chunks = ["a", "", "b"] # 中间 chunk 空
gw = _StubGateway(chunks)
engine = ReActEngine(llm_gateway=gw, flush_interval_ms=10000)
events = []
async for ev in engine.execute_stream(
messages=[{"role": "user", "content": "hi"}],
tools=[],
model="test",
):
events.append(ev)
tokens = _collect_token_events(events)
# 空 chunk 跳过 buffer,最终 flush "ab"
assert tokens == ["ab"]