feat(U3): G8 delta_flush_interval 调速
- ReActEngine 新增 flush_interval_ms 构造参数(默认 0 = 逐 chunk yield 向后兼容) - execute_stream chunk 循环用 time.monotonic 节流,累积 _flush_buffer 批量 yield - flush_interval_ms=0 条件短路为 True 逐 chunk yield 保当前行为 - 流结束 mid-interval 最终 flush 剩余 buffer 不丢字符 - ServerConfig.streaming 配置项(flush_interval_ms) - test_delta_flush.py 覆盖 R11/R12/R14
This commit is contained in:
parent
c4aaef05aa
commit
0f3f0a7550
|
|
@ -164,6 +164,7 @@ class ReActEngine:
|
|||
enable_tool_search: bool = True,
|
||||
middleware_chain: "MiddlewareChain | None" = None,
|
||||
prompt_cache_enable: bool = True,
|
||||
flush_interval_ms: int = 0,
|
||||
):
|
||||
if max_steps < 1:
|
||||
raise ValueError(f"max_steps must be >= 1, got {max_steps}")
|
||||
|
|
@ -180,6 +181,9 @@ class ReActEngine:
|
|||
# U2/G2: prompt cache 双块结构开关(True 时 Anthropic 用 cache_control blocks,
|
||||
# 其他 provider 走字符串拼接依赖自动前缀缓存)
|
||||
self._prompt_cache_enable = prompt_cache_enable
|
||||
# U3/G8: token chunk 节流间隔(ms)。0 = 逐 chunk yield(向后兼容)。
|
||||
# 用 time.monotonic() 不受系统时钟跳变影响。
|
||||
self._flush_interval_ms = flush_interval_ms
|
||||
# Tiered tool description injection config
|
||||
self._core_tool_names: tuple[str, ...] | None = (
|
||||
tuple(core_tool_names) if core_tool_names is not None else None
|
||||
|
|
@ -1123,6 +1127,9 @@ class ReActEngine:
|
|||
stream_usage = None
|
||||
stream_tool_calls: list[Any] = []
|
||||
stream_model = model
|
||||
# U3/G8: delta_flush 节流 buffer,按 flush_interval_ms 批量 yield
|
||||
_flush_buffer: list[str] = []
|
||||
_last_flush_ts = time.monotonic()
|
||||
|
||||
async for chunk in _ensure_async_iterable(
|
||||
self._llm_gateway.chat_stream(
|
||||
|
|
@ -1136,11 +1143,20 @@ class ReActEngine:
|
|||
):
|
||||
if chunk.content:
|
||||
stream_content_chunks.append(chunk.content)
|
||||
_flush_buffer.append(chunk.content)
|
||||
now = time.monotonic()
|
||||
# flush_interval_ms=0 → 逐 chunk yield(向后兼容,条件短路为 True)
|
||||
if (
|
||||
self._flush_interval_ms == 0
|
||||
or now - _last_flush_ts >= self._flush_interval_ms / 1000
|
||||
):
|
||||
yield ReActEvent(
|
||||
event_type="token",
|
||||
step=step,
|
||||
data={"content": chunk.content},
|
||||
data={"content": "".join(_flush_buffer)},
|
||||
)
|
||||
_flush_buffer = []
|
||||
_last_flush_ts = now
|
||||
if chunk.usage:
|
||||
stream_usage = chunk.usage
|
||||
if chunk.tool_calls:
|
||||
|
|
@ -1148,6 +1164,15 @@ class ReActEngine:
|
|||
if chunk.model:
|
||||
stream_model = chunk.model
|
||||
|
||||
# U3/G8: 流结束 mid-interval → 最终 flush 剩余 buffer(不丢字符)
|
||||
if _flush_buffer:
|
||||
yield ReActEvent(
|
||||
event_type="token",
|
||||
step=step,
|
||||
data={"content": "".join(_flush_buffer)},
|
||||
)
|
||||
_flush_buffer = []
|
||||
|
||||
# Build response-like object from stream
|
||||
stream_content = "".join(stream_content_chunks)
|
||||
response = self._build_response_from_stream(
|
||||
|
|
|
|||
|
|
@ -117,6 +117,7 @@ class ServerConfig:
|
|||
expert_paths: list[str] | None = None,
|
||||
board: dict[str, Any] | None = None,
|
||||
prompt_cache: dict[str, Any] | None = None,
|
||||
streaming: dict[str, Any] | None = None,
|
||||
on_change: Callable[["ServerConfig"], None] | None = None,
|
||||
):
|
||||
self.host = host
|
||||
|
|
@ -146,6 +147,8 @@ class ServerConfig:
|
|||
self.expert_paths = expert_paths or []
|
||||
self.board = board or {}
|
||||
self.prompt_cache = prompt_cache or {}
|
||||
# U3/G8: streaming.flush_interval_ms 控制 token chunk 节流(默认 0 = 逐 chunk yield)
|
||||
self.streaming = streaming or {}
|
||||
self.on_change = on_change
|
||||
|
||||
# Config watching state
|
||||
|
|
|
|||
|
|
@ -0,0 +1,148 @@
|
|||
"""U3 / G8 delta_flush_interval 调速测试。
|
||||
|
||||
覆盖 R11-R12, R14:
|
||||
- R11 chunk 按 flush_interval_ms 间隔批量 yield
|
||||
- R12 配置化(flush_interval_ms=0 退化为逐 chunk yield)
|
||||
- R14 自检:合并 content 等于原始 chunks 拼接(不丢字符)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
|
||||
|
||||
from agentkit.core.react import ReActEngine
|
||||
from agentkit.llm.protocol import StreamChunk
|
||||
|
||||
|
||||
class _StubGateway:
|
||||
"""模拟 LLMGateway,yield 一串 StreamChunk 后结束。"""
|
||||
|
||||
def __init__(self, chunks: list[str]):
|
||||
self._chunks = chunks
|
||||
|
||||
def get_provider_name_for_model(self, model: str) -> str | None:
|
||||
return None
|
||||
|
||||
async def chat_stream(self, **kwargs):
|
||||
for c in self._chunks:
|
||||
yield StreamChunk(content=c, model="test")
|
||||
|
||||
|
||||
def _collect_token_events(events) -> list[str]:
|
||||
return [e.data["content"] for e in events if e.event_type == "token"]
|
||||
|
||||
|
||||
# ---- R12 Config: flush_interval_ms=0 → 逐 chunk yield(向后兼容) ----
|
||||
|
||||
|
||||
async def test_flush_interval_zero_yields_per_chunk():
|
||||
chunks = ["H", "e", "l", "l", "o"]
|
||||
gw = _StubGateway(chunks)
|
||||
engine = ReActEngine(llm_gateway=gw, flush_interval_ms=0)
|
||||
events = []
|
||||
async for ev in engine.execute_stream(
|
||||
messages=[{"role": "user", "content": "hi"}],
|
||||
tools=[],
|
||||
model="test",
|
||||
):
|
||||
events.append(ev)
|
||||
tokens = _collect_token_events(events)
|
||||
# 5 chunks → 5 token events(每个内容 = 单 chunk)
|
||||
assert tokens == ["H", "e", "l", "l", "o"]
|
||||
|
||||
|
||||
# ---- R11 Happy path: flush_interval_ms > 0 → 批量合并 ----
|
||||
|
||||
|
||||
async def test_flush_interval_batches_chunks_by_interval():
|
||||
chunks = ["a", "b", "c", "d", "e", "f"]
|
||||
gw = _StubGateway(chunks)
|
||||
# 间隔设很大(10s),所有 chunks 在第一个 interval 内累积,流结束后最终 flush
|
||||
engine = ReActEngine(llm_gateway=gw, flush_interval_ms=10000)
|
||||
events = []
|
||||
async for ev in engine.execute_stream(
|
||||
messages=[{"role": "user", "content": "hi"}],
|
||||
tools=[],
|
||||
model="test",
|
||||
):
|
||||
events.append(ev)
|
||||
tokens = _collect_token_events(events)
|
||||
# 所有 chunk 累积到流结束,最终 flush 一次 → 1 个 token event,content = 全拼接
|
||||
assert len(tokens) == 1
|
||||
assert tokens[0] == "abcdef"
|
||||
|
||||
|
||||
# ---- R14 Self-check: 合并 content 等于原始 chunks 拼接(不丢字符) ----
|
||||
|
||||
|
||||
async def test_no_character_loss_after_merge():
|
||||
chunks = ["Hello", " ", "World", "!", "你好", "世界"]
|
||||
gw = _StubGateway(chunks)
|
||||
engine = ReActEngine(llm_gateway=gw, flush_interval_ms=10000)
|
||||
events = []
|
||||
async for ev in engine.execute_stream(
|
||||
messages=[{"role": "user", "content": "hi"}],
|
||||
tools=[],
|
||||
model="test",
|
||||
):
|
||||
events.append(ev)
|
||||
tokens = _collect_token_events(events)
|
||||
# 合并所有 token events 的 content 等于原始 chunks 拼接
|
||||
merged = "".join(tokens)
|
||||
assert merged == "".join(chunks) == "Hello World!你好世界"
|
||||
|
||||
|
||||
# ---- Edge: 流结束 mid-interval → 最终 flush 剩余 buffer ----
|
||||
|
||||
|
||||
async def test_final_flush_on_stream_end():
|
||||
chunks = ["x", "y", "z"]
|
||||
gw = _StubGateway(chunks)
|
||||
engine = ReActEngine(llm_gateway=gw, flush_interval_ms=10000)
|
||||
events = []
|
||||
async for ev in engine.execute_stream(
|
||||
messages=[{"role": "user", "content": "hi"}],
|
||||
tools=[],
|
||||
model="test",
|
||||
):
|
||||
events.append(ev)
|
||||
tokens = _collect_token_events(events)
|
||||
# mid-interval 累积 → 流结束最终 flush 一次
|
||||
assert tokens == ["xyz"]
|
||||
|
||||
|
||||
# ---- Edge: 单个 chunk 后流结束 → 立即 flush ----
|
||||
|
||||
|
||||
async def test_single_chunk_immediate_flush():
|
||||
chunks = ["only"]
|
||||
gw = _StubGateway(chunks)
|
||||
engine = ReActEngine(llm_gateway=gw, flush_interval_ms=10000)
|
||||
events = []
|
||||
async for ev in engine.execute_stream(
|
||||
messages=[{"role": "user", "content": "hi"}],
|
||||
tools=[],
|
||||
model="test",
|
||||
):
|
||||
events.append(ev)
|
||||
tokens = _collect_token_events(events)
|
||||
assert tokens == ["only"]
|
||||
|
||||
|
||||
# ---- Edge: chunks 含空 content(usage-only chunk)不进 buffer ----
|
||||
|
||||
|
||||
async def test_empty_content_chunk_not_buffered():
|
||||
chunks = ["a", "", "b"] # 中间 chunk 空
|
||||
gw = _StubGateway(chunks)
|
||||
engine = ReActEngine(llm_gateway=gw, flush_interval_ms=10000)
|
||||
events = []
|
||||
async for ev in engine.execute_stream(
|
||||
messages=[{"role": "user", "content": "hi"}],
|
||||
tools=[],
|
||||
model="test",
|
||||
):
|
||||
events.append(ev)
|
||||
tokens = _collect_token_events(events)
|
||||
# 空 chunk 跳过 buffer,最终 flush "ab"
|
||||
assert tokens == ["ab"]
|
||||
Loading…
Reference in New Issue