feat(agent): Wave 1 quick wins (G1/G2/G3/G8) + review fixes #4

Merged
fischer merged 6 commits from feat/agent-wave1-quick-wins into main 2026-06-29 22:08:56 +08:00
23 changed files with 2993 additions and 111 deletions

View File

@ -33,6 +33,12 @@ A phase dynamically inserted into a team plan when divergence is detected betwee
### Resume
The act of rebuilding a crashed pipeline's runtime state from persisted checkpoints. Restores completed and failed phase statuses, rebuilds runtime counters, and re-persists any dynamically inserted phases so the restored plan matches what was executing at crash time.
### Verify Re-injection
The feedback loop triggered when a verification check fails after a final answer is produced. Errors are injected as a new user message into the conversation and the ReAct loop continues for one retry; a second verification failure interrupts execution and returns the error to the user with the verify log. Bounded to one retry to balance auto-correction against token cost, rather than looping until `max_steps`.
### Three-tier Degradation Chain
The agent-level fallback sequence when the primary agent fails: main agent → Recovery tier (reuses `ReflexionEngine` for Evaluate→Reflect→Retry) → Emergency tier (rule-based fallback returning a structured error with suggestions). Each tier is independently configurable; the Recovery tier avoids new infrastructure by reusing the existing reflection engine, and the Emergency tier replaces the previous static-text fallback with actionable error structure.
## Channels & Caching
### Per-User Cache Namespace

View File

@ -0,0 +1,186 @@
---
date: 2026-06-29
topic: advanced-agent-gap-optimization
type: feature
origin: "2026-06-24-004-feat-long-horizon-reliability-optimization-plan.md(增补);Qoder/Codex/Hermes/Trae Work 架构对比(2026-06-29)"
---
## Summary
为 2026-06-24-004 长程可靠性 plan 补齐 9 个真新缺口——对比 Qoder/Codex/Hermes/Trae Work 后未被覆盖的执行/反馈/效率短板。分 3 波按 ROI/风险交付:Wave 1 自包含快速补强(W1-1~W1-4),Wave 2 中等耦合(W2-1~W2-3),Wave 3 战略级重构(W3-1~W3-2)。每波独立 plan、独立验证、独立发布。
---
## Problem Frame
2026-06-24-004 plan 已落地循环检测(U1)、Headroom 压缩(U3)、SharedWorkspace Redis 化(U4)、中间件管道(U6)、阶段级 checkpoint+resume(U7)等长程可靠性护栏。但对照 Qoder(Spec→Coding→Verify 闭环)、Codex(apply_patch+OS 沙箱)、Hermes(三级降级链+4层记忆)、Trae Work(SOLO 四阶段状态机+Trajectory 持久化)的工程实践,agentkit 仍存在 9 个**已在生产/测试中观察到痛点**的短板,横跨三个维度:
- **反馈稳定性**:verify 失败不回灌 ReAct 直接退出、工具调用无 schema 校验、主 agent 失败后仅静态 fallback、子任务失败不回滚已写文件
- **响应效率**:prompt cache 命中率低(记忆注入破坏前缀)、摘要/压缩用主模型、文件读取无函数级分片、token chunk 无节流
- **执行能力**:ReAct 循环无阶段约束(think 阶段即可 write_file)
验证器已核对 10 项仓库声明,9 项 confirmed,2 处修正(C5:agentkit 无独立 `read_file` 工具,文件读取走 `tools/shell.py``cat`;C6:`PlanPhase` 定义在 `experts/plan.py:148``team.py`)。
---
## Key Decisions
**KTD1:G1 verify 回灌一次后中断**
verify 失败时自动把 errors 注入 conversation 继续 ReAct,但仅重试一次;二次失败则中断返回错误给用户(附 verify log)。平衡自动纠正与 token 成本,而非全自动循环到 max_steps 烧 token。
**KTD2:3 波按 ROI/风险分次**
Wave 1(G1/G2/G3/G8)自包含、低风险、可独立验证;Wave 2(G4/G7/G9)中等耦合,需触碰 LLM 层与编排层;Wave 3(G5/G6)战略级,引入新依赖或触 ReAct 核心。优先交付自包含补强,而非按维度或 LLM 层依赖分波。
**KTD3:G2 三层 prompt 结构跨 provider,记忆注入移到 volatile**
system prompt 重构为 stable(技能配置/系统指令)/ context(会话上下文)/ volatile(记忆检索+时间戳)三层。`memory_retriever` 当前在 `react.py:1042-1059` 把"## 参考信息"拼到 system prompt 末尾,每次随 query 变化破坏 cache 前缀——移到 volatile 层,stable 层保持不变以命中 prompt cache。cache 策略跨 provider 统一:Anthropic 用原生 `cache_control` 断点(`system_and_3`),OpenAI 等依赖自动前缀缓存(stable 层前置即命中),无需为每个 provider 单独适配。
**KTD4:G7 复用 ReflexionEngine 作为 Recovery 层**
主 agent 失败后触发 Recovery 层,复用现有 `ReflexionEngine`(`core/reflexion.py:58`,Evaluate→Reflect→Retry),而非新建 Recovery Agent。Recovery 仍失败则进入 Emergency 层(规则化 fallback,返回结构化错误+建议)。避免新基础设施,最大化复用现有反思机制。
**KTD5:G9 阶段级 rollback,git checkout 机制**
`PlanPhase` 增加 `validation_command`/`rollback_command` 可选字段,阶段失败时执行 rollback(默认 `git checkout`),与 U7 checkpoint 协同——checkpoint save 必须在 rollback validation 通过后。不做步骤级 rollback(粒度过细,实现复杂度高)。
**KTD6:G5 函数级分片引入 tree-sitter(Wave 3 决策可延后)**
Wave 3 的 G5 需要按 symbol/函数粒度分片读取文件,引入 tree-sitter 作为新依赖。具体集成方式(tree-sitter vs ANTLR4 vs 复用 `quality` 模块)延后到 Wave 3 自己的 plan 决策。
**KTD7:G6 扩展 PLAN_EXEC 而非新建模式**
G6 SOLO 四阶段状态机约束通过扩展现有 `ExecutionMode.PLAN_EXEC` 实现,而非新建独立模式。阶段约束(Planning 阶段只允许 think/search,Building 才允许 write_file)作为 PLAN_EXEC 的配置项。
---
## Requirements
### Wave 1 — 自包含快速补强(P0,低风险)
**G1 Verify 失败回灌 ReAct**
- R1. verify 失败时,系统自动把 errors 作为新 user 消息注入 conversation,继续 ReAct 循环,而非直接抛 `TaskResult.error_message` 退出。
- R2. 回灌后若二次 verify 仍失败,系统中断执行并返回错误给用户,附 verify log(测试输出/schema 错误明细)。
- R3. 回灌最大重试次数可配置(默认 1 次),受 `max_steps` 上限约束。
**G2 prompt cache 断点策略**
- R4. system prompt 重构为三层:stable(技能配置/系统指令)、context(会话上下文)、volatile(记忆检索+时间戳)。
- R5. 记忆检索注入从 system prompt 末尾移到 volatile 层,stable 层保持不变以命中 prompt cache。
- R6. 跨 provider 统一 cache 策略:Anthropic 显式插入 `cache_control` 断点(`system_and_3`,最多 4 个);OpenAI 等无原生断点的 provider 依赖自动前缀缓存,通过 stable 层前置保证命中。
- R7. 多轮对话输入 token 成本降低(目标降低 ~50% 输入 token,跨 provider 均受益)。
**G3 工具调用 schema 校验**
- R8. `_execute_tool` 调用工具前,基于 `tool.input_schema` 校验 LLM 传入参数(类型/必填)。
- R9. 校验失败时返回类型化错误码(`tool_call_invalid`/`schema_mismatch`),不执行工具。
- R10. 错误以 tool 角色消息回灌 conversation,给 LLM 自我修正机会。
**G8 delta_flush_interval 调速**
- R11. `execute_stream` 的 token chunk yield 加可配置节流(默认 `flush_interval_ms`,如 50ms)。
- R12. 节流配置化(`agentkit.yaml` 或 runtime 配置),允许客户端调高降低渲染开销。
### Wave 2 — 中等耦合(P1)
**G4 辅助 LLM 分流**
- R13. `ContextCompressor` 摘要任务路由到 auxiliary model(便宜模型,如 Gemini Flash/Doubao lite),而非主 model。
- R14. `auxiliary_model` 配置化,与主 `model` 分离。
- R15. 摘要质量不降级:auxiliary model 失败时回退主 model。
**G7 三级降级链**
- R16. 主 agent 失败后触发 Recovery 层(复用 `ReflexionEngine` 做 Evaluate→Reflect→Retry)。
- R17. Recovery 仍失败触发 Emergency 层(规则化 fallback,返回结构化错误+建议)。
- R18. 降级链可配置(每层最大重试次数、是否启用 Recovery/Emergency)。
**G9 原子化子任务 + rollback 绑定**
- R19. `PlanPhase`(`experts/plan.py:148`)增加 `validation_command``rollback_command` 可选字段。
- R20. 阶段失败时自动执行 `rollback_command`(默认 `git checkout`),与 U7 checkpoint 协同。
- R21. checkpoint save 必须在 rollback validation 通过后(避免持久化失败状态)。
### Wave 3 — 战略级重构(P2,高风险)
**G5 函数级代码分片**
- R22. 文件读取支持按 symbol/函数粒度分片(需引入 tree-sitter 或类似,具体方式延后到 Wave 3 plan)。
- R23. 分片能力作为工具参数(如 `symbol="function_name"`),向后兼容整文件读取。
**G6 SOLO 四阶段状态机约束**
- R24. ReAct 循环加阶段约束:Planning 阶段只允许 think/search,Building 阶段才允许 `write_file`
- R25. 阶段状态可配置(扩展 `ExecutionMode.PLAN_EXEC`,非新建独立模式)。
### Cross-cutting
- R26. 所有优化项配置化(`agentkit.yaml` 新增对应配置节,遵循 `ServerConfig.from_dict` 模式)。
- R27. 每个优化项附最小自检测试(ponytail 规则,参考 `test_pipeline_state.py``TestPipelineStateRedis` 模式)。
---
## Acceptance Examples
- **AE1**(Covers R1, R2, R3):用户发起 ReAct 任务,verify 失败(测试不通过)→ 系统自动把 errors 注入 conversation 继续 ReAct → LLM 修正后二次 verify 通过 → 任务完成。若二次 verify 仍失败 → 中断返回错误,附 verify log。
- **AE2**(Covers R4, R5, R6, R7):用户发起 50 轮长对话 → stable 层(技能配置)保持不变,volatile 层(记忆检索)随 query 变化 → Anthropic prompt cache 命中 stable 前缀 → 输入 token 成本降低 ~50%。
- **AE3**(Covers R8, R9, R10):LLM 调用工具时传错参数类型(如 `count: "abc"` 应为 int)→ schema 校验失败 → 返回 `tool_call_invalid` 错误码 → 错误回灌 conversation → LLM 修正参数类型后重试成功。
- **AE4**(Covers R16, R17, R18):主 agent 连续 3 次失败 → 触发 Recovery 层(ReflexionEngine 反思重试)→ Recovery 仍失败 → 触发 Emergency 层(规则化 fallback,返回结构化错误+建议)→ 用户收到清晰错误而非静态文案。
- **AE5**(Covers R19, R20, R21):`@team` 任务阶段 3 失败 → 自动执行 `rollback_command`(git checkout 阶段 3 写入的文件)→ rollback validation 通过后才 save checkpoint → 阶段 4 从干净状态继续。
- **AE6**(Covers R11, R12):弱网客户端连接 `execute_stream` → token chunk 按 50ms 节流批量 yield → 前端渲染开销降低,无卡顿。
---
## Scope Boundaries
### Deferred for later
- Wave 3(G5/G6)的具体实现设计延后到 Wave 3 自己的 plan,本文档只锁定战略方向(KTD6/KTD7)。
- G5 的具体集成方式(tree-sitter vs ANTLR4 vs 复用 `quality` 模块)延后到 Wave 3 plan 决策。
- G7 Emergency 层的具体规则模板(返回什么错误结构、建议什么动作)延后到 Wave 2 plan。
- 全局 LLM 并发限制(`LLMGateway` 内 semaphore)——本期只做工具层/编排层,LLM 层并发延后。
### Outside this product's identity
- 重写现有编排逻辑(拓扑排序/Board 辩论/4 层记忆保持不变)——继承自 2026-06-24-004 plan。
- 节点级 checkpoint(ReAct 循环单步)——继承自 2026-06-24-004 KTD3,阶段级已满足核心需求。
- DeerFlow 式磁盘文件系统——继承自 2026-06-24-004 KTD4,复用 Redis。
- 全盘迁移 LangGraph——继承自 2026-06-24-004,自研架构保持灵活。
- Docker 沙箱默认引入——仅文档化命令级安全边界,作为可选插件未来考虑。
---
## Dependencies / Assumptions
- **依赖**:Wave 2 的 G9 rollback 依赖现有 U7 checkpoint 已落地(`orchestrator/checkpoint.py` + `experts/orchestrator.py:265-268,335`)。
- **依赖**:Wave 1 的 G2 prompt cache 断点依赖 Anthropic provider 已支持 `cache_control`(LiteLLM 适配层 U15)。
- **假设**:G4 auxiliary model 可用——`agentkit.yaml` 的 llm 段可配置多个 model,auxiliary 复用现有 provider 注册机制。若实际不可用,回退主 model(R15)。
- **假设**:G6 阶段约束不影响现有 DIRECT_CHAT/REACT 模式——仅作用于 PLAN_EXEC 模式,向后兼容。
---
## Sources & Research
- **Qoder 架构研究**(2026-06-29):Spec→Coding→Verify 闭环、SSE 事件成对约束、`delta_flush_interval_ms`、prompt cache、自动模型路由
- **Codex CLI 架构研究**(2026-06-29):apply_patch 协议、Approval Policy 三层决策、OS 级沙箱、prompt caching 前缀匹配、`/responses/compact` 压缩
- **Hermes Agent 架构研究**(2026-06-29):Pydantic+JSON Schema 双校验、`validate_function_call_schema`、三级降级链(主→Recovery→Emergency)、`cache_control: system_and_3` 断点(4 个,~75% token 节省)、auxiliary_client 分流
- **Trae Work 架构研究**(2026-06-29):SOLO 四阶段状态机(Planning→Building→Verification→Delivery)、TrajectoryRecorder JSON 持久化、断点续跑、原子化子任务+rollback、函数级分片(ANTLR4+LLVM)
- **agentkit 现有架构**(2026-06-29 验证器核对):`verification_loop.py:111-145`、`react.py:1042-1059,1118-1134,1897-1916`、`compressor.py:45,123-154`、`reflexion.py:58-702`、`fallback.py:1-19`、`gateway.py:405-407`、`experts/plan.py:148`、`orchestrator/checkpoint.py`、`experts/orchestrator.py:265-268,335`
- **2026-06-24-004 plan**(增补来源):U1-U7 长程可靠性护栏,KTD1-KTD5 决策,U7 阶段级 checkpoint+resume 已落地
---
## Outstanding Questions
### Resolve Before Planning
(已全部解决——OQ1 确认同 plan 内顺序执行 G2→G8;OQ2 确认跨 provider 统一 cache 策略。)
### Deferred to Planning
- G5 的具体集成方式(tree-sitter vs ANTLR4 vs 复用 `quality` 模块)。
- G7 Emergency 层的具体规则模板与错误结构。
- G6 阶段约束的精确边界(Planning 阶段允许哪些工具、Building 阶段允许哪些)。
- G9 rollback_command 的默认值(git checkout 整文件 vs patch 级)。

View File

@ -0,0 +1,275 @@
---
title: "feat: Agent Wave 1 快速补强 (verify 回灌/prompt cache/schema 校验/delta_flush)"
type: feat
date: 2026-06-29
origin: docs/brainstorms/2026-06-29-advanced-agent-gap-optimization-requirements.md
---
## Summary
落地 brainstorm Wave 1 的 4 项自包含快速补强:G1 verify 失败回灌 ReAct、G2 prompt cache 三层结构跨 provider、G3 工具调用 schema 校验、G8 delta_flush_interval 调速。四项均作用于 ReAct 引擎层,同 plan 内 G2→G8 顺序执行(共享 `execute_stream` 改动),每项附最小自检测试。
---
## Problem Frame
agentkit 对比 Qoder/Codex/Hermes/Trae 后发现 9 个真新缺口(已在生产/测试观察到痛点),brainstorm 分 3 波交付。本 plan 实现 Wave 1——自包含、低风险的 4 项快速补强,覆盖反馈稳定性(verify 不回灌、工具无 schema 校验)与响应效率(prompt cache 命中率低、token chunk 无节流)。
验证器已核对仓库现状(见 origin Sources & Research):`verification_loop.py:111-145` verify 失败仅调 `fix_callback` 不回灌 ReAct;`react.py:1042-1059` 记忆注入拼到 system prompt 末尾破坏 cache 前缀;`react.py:1118-1134` 每个 token chunk 都 yield 无节流;`tools/base.py:50-77` `safe_execute` 无 schema 校验直接 `await execute()`
---
## Requirements
### G1 Verify 失败回灌 ReAct (origin R1-R3)
- R1. verify 失败时,系统自动把 errors 作为新 user 消息注入 conversation,继续 ReAct 循环,而非直接退出。
- R2. 回灌后若二次 verify 仍失败,系统中断执行并返回错误给用户,附 verify log。
- R3. 回灌最大重试次数可配置(默认 1 次),受 `max_steps` 上限约束。
### G2 Prompt Cache 三层结构 (origin R4-R7)
- R4. system prompt 重构为双块结构:stable(技能配置/系统指令)+volatile(记忆检索+时间戳)。原 context(会话上下文)层由 conversation messages 承载,不进 system prompt。
- R5. 记忆检索注入从 system prompt 末尾移到 volatile 层,stable 层保持不变。
- R6. 跨 provider 统一 cache 策略:Anthropic 显式插入 `cache_control` 断点;OpenAI 等依赖自动前缀缓存。
- R7. 多轮对话输入 token 成本降低(目标 ~50%)。
### G3 工具调用 Schema 校验 (origin R8-R10)
- R8. `safe_execute` 调用 `execute()` 前,基于 `tool.input_schema` 校验参数(类型/必填)。
- R9. 校验失败时返回类型化错误码(`tool_call_invalid`/`schema_mismatch`),不执行工具。
- R10. 错误以 tool 角色消息回灌 conversation,给 LLM 自我修正机会。
### G8 delta_flush_interval 调速 (origin R11-R12)
- R11. `execute_stream` 的 token chunk yield 加可配置节流(默认 `flush_interval_ms`)。
- R12. 节流配置化,允许客户端调高降低渲染开销。
### Cross-cutting (origin R26-R27)
- R13. 所有优化项配置化(`agentkit.yaml` 新增对应配置节)。
- R14. 每个优化项附最小自检测试(ponytail 规则)。
---
## Key Technical Decisions
**KTD1:G3 schema 校验放在 `Tool.safe_execute`(base.py),而非 `_execute_tool`(react.py)**
校验在工具基类层,所有调用方(ReActEngine、ExpertTeam、StandaloneAgent)统一受益。以 `tool.input_schema`(JSON Schema dict)为契约源,`input_schema=None` 时跳过校验保持向后兼容。用 `jsonschema` 库(已是 Python 生态标准,无新依赖)。
**KTD2:G2 双块结构用 content blocks + cache_control 标记,Anthropic provider 需改 _convert_messages**
system message 的 `content` 从字符串改为 content blocks 列表(`[{"type":"text","text":stable,"cache_control":{"type":"ephemeral"}},{"type":"text","text":volatile}]`)。**注意**:`src/agentkit/llm/providers/anthropic.py` 是 httpx 直连实现(非 LiteLLM),其 `_convert_messages`(`:102-197`)假设 system content 为字符串(`:116`),需修改以支持 list-type system content 并透传 `cache_control` blocks。OpenAI 等 provider 的 chat completions API 不支持 list-type system content,`_build_system_message` 需按 provider 能力检测:Anthropic 返回 blocks,其余返回字符串拼接(stable+volatile),依赖 stable 前缀命中自动前缀缓存。不改 gateway 方法签名。
**KTD3:G1 verify 回灌包进 ReAct 主循环,而非外层 wrapper**
verify 当前在循环外 final answer 后运行(`react.py:887` execute / `:1603` execute_stream)。回灌改为:检测到 final answer(无 tool_calls)→ 运行 verify → 失败则把 errors 作为 user 消息 append 到 conversation → `continue` 主循环(LLM 自纠正)→ 二次 final answer 再 verify → 仍失败则 break 带 verify log。保留现有 `VerificationLoop` 类与 `verify_and_retry` 方法不动(向后兼容),回灌逻辑在 ReActEngine 内。
**KTD4:G8 delta_flush 用 time.monotonic 节流,非计数器**
`execute_stream` chunk 循环内累积 chunks,按 `flush_interval_ms` 间隔批量 yield。`flush_interval_ms=0` 时退化为逐 chunk yield(向后兼容)。流结束 mid-interval 时最终 flush。用 `time.monotonic()`(不受系统时钟跳变影响)。
**KTD5:G2→G8 同 plan 内顺序执行,共享 execute_stream 改动**
G2 改 system prompt 构造(循环前),G8 改 chunk yield 逻辑(循环内),两者不冲突但都触 `execute_stream`。G2 先落地确保 stable 前缀结构稳定,G8 再加节流避免在未稳定结构上叠加。
**KTD6:ServerConfig 到 ReActEngine 的接线用独立构造参数(带默认值)**
`ReActEngine.__init__`(`react.py:154-198`)不接受 `ServerConfig` 对象,采用独立构造参数:`prompt_cache_enable: bool = True`、`flush_interval_ms: int = 0`、`max_reinjections: int = 1`(默认值保当前行为,向后兼容)。`ServerConfig` 在 agent 工厂/handler 层(`chat/handler.py` 等 ReActEngine 构造点)读取并传入。实现时需列出所有构造 ReActEngine 的调用点并更新。
---
## Implementation Units
### U1. G3 工具调用 Schema 校验
**Goal:** 在 `Tool.safe_execute` 调用 `execute()` 前校验参数,失败返回类型化错误。
**Requirements:** R8, R9, R10, R14
**Dependencies:** 无(独立,奠基性——定义错误回灌模式)
**Files:**
- 修改: `src/agentkit/tools/base.py`(`safe_execute` 加校验 + `ToolValidationError`)
- 修改: `src/agentkit/core/react.py`(`_execute_tool` `:1897-1916` 捕获 `ToolValidationError` 并 append tool 角色消息)
- 测试: `tests/unit/test_tool_schema_validation.py`
**Approach:**
- 新增 `ToolValidationError(Exception)`,带 `error_code`(`tool_call_invalid`/`schema_mismatch`)与 `details`
- `safe_execute``before_execute` 后、`execute` 前:若 `self.input_schema` 非 None,用 `jsonschema.validate(kwargs, self.input_schema)`;校验失败抛 `ToolValidationError`
- `input_schema=None` → 跳过(向后兼容,旧工具无 schema)。
- `_execute_tool`(react.py:1897-1916)在现有 `except Exception` **之前**加 `except ToolValidationError as e:` 优先捕获,返回 `{"error": str(e), "error_code": e.error_code, "details": e.details}`(保留类型化错误码,不被通用 except 平坦化为字符串)。现有调用方 `_build_tool_result_message` 把返回 dict 转为 tool 角色消息 append 到 conversation,给 LLM 自纠正机会。
**Patterns to follow:** `VerificationResult` 的类型化错误模式(`verification_loop.py:18-24`);`jsonschema` 标准用法。
**Test scenarios:**
- Covers R8. Happy path: tool 有 `input_schema={"type":"object","properties":{"count":{"type":"integer"}},"required":["count"]}`,传 `count=5` → 校验通过,execute 正常执行。
- Covers R9. Edge: `input_schema=None` → 跳过校验,execute 正常(向后兼容)。
- Covers R9. Error: 传 `count="abc"`(类型错)→ 抛 `ToolValidationError(error_code="tool_call_invalid")`,execute 不调用。
- Covers R9. Error: 缺 `count`(必填)→ 抛 `ToolValidationError(error_code="schema_mismatch")`
- Covers R10. Integration: `_execute_tool` 捕获 `ToolValidationError` → conversation append tool 角色消息 → LLM 下一轮看到错误并修正参数 → 重试成功。
**Verification:** `python3 -m pytest tests/unit/test_tool_schema_validation.py -x -q` 通过;现有工具测试不回归。
---
### U2. G2 Prompt Cache 双块结构
**Goal:** system prompt 重构为 stable/volatile 双块结构,记忆注入移到 volatile,加 cache_control 断点。
**Requirements:** R4, R5, R6, R7, R13, R14
**Dependencies:** 无(独立,与 U1 不触同代码区)
**Files:**
- 修改: `src/agentkit/core/react.py`(execute_stream `:1042-1059` 记忆注入 + system message 构造;execute 同路径若有,可选 — 见 Scope Boundaries)
- 修改: `src/agentkit/llm/providers/anthropic.py`(`_convert_messages` `:102-197` 支持 list-type system content,透传 cache_control blocks)
- 配置: `src/agentkit/config.py``ServerConfig`(`prompt_cache.enable: bool`)
- 测试: `tests/unit/test_prompt_cache_layers.py`
**Approach:**
- 新增 `_build_system_message(base_prompt, memory_context, enable_cache, provider)` 工具方法:Anthropic provider 返回 content blocks 列表,stable 块在前(带 `cache_control: {"type":"ephemeral"}`),volatile 块(记忆+时间戳)在后;非 Anthropic provider 返回字符串拼接(stable+volatile),依赖 stable 前缀命中自动前缀缓存。
- execute_stream `:1042-1059`:记忆注入从 `system_prompt += "## 参考信息"` 改为收集 `memory_context`,传给 `_build_system_message`
- conversation 的 system 消息 content:Anthropic 用 blocks 列表,其余用字符串;gateway `chat_stream``**kwargs` 透传。
- `anthropic.py``_convert_messages`(`:102-197`)需修改:`:116` 从 `system_prompt = content` 改为支持 list-type content 直接透传(payload `["system"]` 接受字符串或 content blocks)。
- 配置 `prompt_cache.enable: bool`(默认 True)。断点数硬编码为 1(stable 层),不暴露配置(YAGNI — 2 块结构下 >1 无语义)。
- `enable_cache=False` 或 provider 非 Anthropic → 退化为字符串拼接(向后兼容)。
**Patterns to follow:** LiteLLM/Anthropic `cache_control` content block 规范;现有 `memory_retriever.get_context_string` 调用不变。
**Test scenarios:**
- Covers R4, R5. Happy path: 多轮对话,stable 层(技能配置)跨轮不变,volatile 层(记忆)随 query 变 → system message content blocks 结构正确,stable 在前 volatile 在后。
- Covers R6. Integration: Anthropic provider 收到带 `cache_control` 的 content blocks → `_convert_messages` 透传 → cache 命中 stable 前缀。
- Covers R6. Edge: OpenAI provider(provider != anthropic)→ `_build_system_message` 返回字符串拼接,不报错;stable 前缀命中自动前缀缓存。
- Covers R5. Edge: `memory_retriever` 返回空 → 无 volatile 块,system message 仅 stable。
- Covers R13. Config: `prompt_cache.enable=False` → 退化为字符串拼接,行为同改动前。
**Verification:** `python3 -m pytest tests/unit/test_prompt_cache_layers.py -x -q` 通过;多轮对话 system message 结构符合预期。
---
### U3. G8 delta_flush_interval 调速
**Goal:** execute_stream token chunk yield 加时间节流。
**Requirements:** R11, R12, R13, R14
**Dependencies:** U2(共享 execute_stream,G2 先改 system prompt 结构,G8 后改 yield 逻辑)
**Files:**
- 修改: `src/agentkit/core/react.py`(`execute_stream` chunk 循环 `:1118-1134`;execute 同路径若有)
- 配置: `ServerConfig`(`streaming.flush_interval_ms`)
- 测试: `tests/unit/test_delta_flush.py`
**Approach:**
- chunk 循环内:累积 `stream_content_chunks` 同时累积 `_flush_buffer`;用 `time.monotonic()``_last_flush_ts`
- 当 `now - _last_flush_ts >= flush_interval_ms/1000` 时:yield 合并后的 buffer,清空,更新 ts。
- `flush_interval_ms=0` → 逐 chunk yield(向后兼容,当前行为)。
- 流结束(for 循环退出)→ 最终 flush 剩余 buffer。
**Patterns to follow:** `time.monotonic()` 用法(已在 `:1080` `_stream_start` 使用);现有 `ReActEvent(event_type="token")` 结构不变。
**Test scenarios:**
- Covers R11. Happy path: `flush_interval_ms=50`,模拟连续 chunks → 按 50ms 间隔批量 yield,合并 content。
- Covers R12. Config: `flush_interval_ms=0` → 逐 chunk yield(向后兼容)。
- Edge: 流结束 mid-interval → 最终 flush 剩余 buffer,不丢 content。
- Edge: 单个 chunk 后流结束 → 立即 flush。
- Covers R14. Self-check: 断言 yield 的合并 content 等于原始 chunks 拼接(不丢字符)。
**Verification:** `python3 -m pytest tests/unit/test_delta_flush.py -x -q` 通过;token 流无字符丢失。
---
### U4. G1 Verify 失败回灌 ReAct
**Goal:** verify 失败时把 errors 注入 conversation 继续 ReAct,二次失败中断带 log。
**Requirements:** R1, R2, R3, R13, R14
**Dependencies:** U1(复用错误回灌模式;U1 的 `ToolValidationError` 回灌与 G1 verify 回灌同模式)
**Files:**
- 修改: `src/agentkit/core/react.py`(`execute` `:886-907` verify 块;`execute_stream` `:1601-1629` verify 块)
- 配置: `ServerConfig`(`verification.max_reinjections: int = 1`)
- 测试: `tests/unit/test_verify_reinjection.py`
**Approach:**
- 把 verify 从"循环后一次性运行"改为"final answer 检测点 + 回灌重试"。
- 主循环内检测到 final answer(无 tool_calls)时:`if self._verification_enabled` → 运行 verify。
- verify 通过 → `break`,正常结束。
- verify 失败且 `reinjections < max_reinjections`:append `{"role":"user","content":f"验证失败,错误如下:\n{vresult.errors}"}` 到 conversation,`continue` 主循环(LLM 见 errors 自纠正)。
- verify 失败且 `reinjections >= max`:记录 verify log 到 trajectory,`break` 返回失败结果。
- 保留现有 `VerificationLoop` 类与 `verify_and_retry` 不动(向后兼容,外部调用方仍可用)。
**Patterns to follow:** 现有 verify 块的 trajectory/event 记录模式;U1 的 `ToolValidationError` 回灌模式。
**Execution note:** 先加 characterization 测试覆盖现有 verify 行为(失败仅记录),再改实现确保不回归。
**Test scenarios:**
- Covers R1. Happy path: verify 首次失败 → errors 注入 conversation → LLM 自纠正 → 二次 verify 通过 → 任务完成。
- Covers R2. Error: verify 二次失败 → 中断,返回错误附 verify log(测试输出 + errors 列表)。
- Covers R3. Config: `max_reinjections=0` → 等价于不回灌(当前行为),verify 失败直接退出。
- Covers R3. Edge: 回灌期间达到 `max_steps` → 中断(不无限循环)。
- Covers R1. Integration: 回灌的 user 消息出现在 conversation,LLM 下一轮 input 含 errors 文本。
- Covers R14. Self-check: `max_reinjections` 默认值为 1。
**Verification:** `python3 -m pytest tests/unit/test_verify_reinjection.py -x -q` 通过;`pytest tests/unit/ -x -q` 全量不回归。
---
## Scope Boundaries
### Deferred for later
- Wave 2(G4 辅助 LLM 分流 / G7 三级降级链 / G9 原子化 rollback)——见 origin Wave 2 section,单独 plan。
- Wave 3(G5 函数级代码分片 / G6 SOLO 状态机)——见 origin Wave 3 section,单独 plan。
- G7 Emergency 层规则模板、G5 tree-sitter 集成方式——origin Deferred to Planning。
### Deferred to Follow-Up Work
- `execute()`(非流式)的 G2/G8 改动:本 plan 优先 `execute_stream`(WebSocket 主路径),`execute()` 同步改动可顺带做但非必须,若拆 PR 则归 follow-up。
- prompt cache 命中率指标化(R7 的 ~50% 目标):本 plan 落结构,埋点量化归 follow-up。
### Outside this product's identity
- 重写编排逻辑/拓扑排序/Board 辩论——继承自 2026-06-24-004。
- 节点级 checkpoint——继承自 2026-06-24-004 KTD3。
- 全盘迁移 LangGraph——继承自 2026-06-24-004。
---
## Risks & Dependencies
- **风险:G2 content blocks 改动破坏现有 message 序列化。** gateway/messages 假设 `content: str`,改为 blocks 列表需核对所有序列化路径(WebSocket、日志、trace)。缓解:仅 system 消息用 blocks,user/assistant 保持 str;`enable_cache=False` 退化路径保底。
- **风险:G1 回灌增加 token 消耗。** 二次 verify 循环会多跑一轮 LLM。缓解:`max_reinjections` 默认 1,受 `max_steps` 上限约束;KTD3 设计为循环内 continue 而非递归,无栈溢出风险。
- **依赖:U3 依赖 U2 完成。** 两者共享 `execute_stream`,G2 先改 system prompt 结构稳定后再加 G8 节流(KTD5)。
- **依赖:G2 需修改 `anthropic.py` 的 `_convert_messages` 支持 list-type system content。** `anthropic.py` 是 httpx 直连实现(非 LiteLLM),需手动改 `:116``system_prompt = content` 改为支持 content blocks 透传。非 Anthropic provider 走字符串拼接退化路径,不报错。
- **依赖:G3 的 `jsonschema` 库。** `jsonschema>=4.0` 已在 `pyproject.toml` 核心依赖中(line 22),无需新增依赖。
---
## Acceptance Examples
- **AE1**(Covers R1, R2, R3):ReAct 任务 final answer 后 verify 失败 → errors 注入 conversation 继续 ReAct → LLM 修正后二次 verify 通过 → 完成。若二次失败 → 中断返回错误附 verify log。
- **AE2**(Covers R4, R5, R6, R7):50 轮长对话 → stable 层不变,volatile 随 query 变 → Anthropic cache 命中 stable 前缀,OpenAI 命中自动前缀缓存 → 输入 token 降低。
- **AE3**(Covers R8, R9, R10):LLM 传错参数类型 → schema 校验失败 → `tool_call_invalid` → 错误回灌 conversation → LLM 修正后重试成功。
- **AE4**(Covers R11, R12):弱网客户端 → token chunk 按 50ms 批量 yield → 前端渲染无卡顿,无字符丢失。
---
## Sources & Research
- **Origin:** `docs/brainstorms/2026-06-29-advanced-agent-gap-optimization-requirements.md` — Wave 1 需求文档(KTD1-KTD7、R1-R27)
- **上游 plan:** `docs/plans/2026-06-24-004-feat-long-horizon-reliability-optimization-plan.md` — U1-U7 长程可靠性护栏(本 plan 增补)
- **Learnings:** `docs/solutions/logic-errors/long-horizon-reliability-code-review-fixes.md` — 14 个 finding 教训(新字段默认值保契约、跨模块契约显式化、清理方法接入生命周期)
- **代码 grounding(验证器核对):**
- `src/agentkit/core/verification_loop.py:111-145``verify_and_retry``fix_callback`,不回灌 ReAct
- `src/agentkit/core/react.py:1042-1059` — 记忆注入拼到 system prompt 末尾
- `src/agentkit/core/react.py:1118-1134` — chunk 循环逐 token yield 无节流
- `src/agentkit/core/react.py:886-907, 1601-1629` — verify 在循环后运行,失败仅记录 trajectory
- `src/agentkit/core/react.py:1897-1916``_execute_tool` 无 schema 校验
- `src/agentkit/tools/base.py:21,28,50-77``Tool.input_schema`(JSON Schema dict,可选),`safe_execute` 无校验
- `src/agentkit/llm/gateway.py:268-281``chat_stream(**kwargs)` 可透传 cache_control
- `src/agentkit/llm/providers/anthropic.py:316` — Anthropic provider chat_stream
- **外部研究(brainstorm 阶段):** Qoder(Spec→Verify 闭环、delta_flush_interval)、Hermes(cache_control system_and_3、validate_function_call_schema)、Codex(prompt caching 前缀匹配)

509
scripts/dev-start.sh Executable file
View File

@ -0,0 +1,509 @@
#!/usr/bin/env bash
# =============================================================================
# Fischer AgentKit — 本地开发环境一键启动
# =============================================================================
#
# 启动:后端服务(+ 可选 Tauri 桌面客户端)
# 停止Ctrl+C 或运行 scripts/dev-stop.sh
#
# 用法:
# bash scripts/dev-start.sh # Web 模式agentkit gui
# bash scripts/dev-start.sh --tauri # Web + Tauri 桌面客户端
# bash scripts/dev-start.sh --serve # 仅后端 API
# bash scripts/dev-start.sh --init # 首次运行(初始化 DB
# bash scripts/dev-start.sh --help # 帮助
#
# 依赖:
# Python >= 3.11, Node.js >= 18, Redis, PostgreSQL (均自动检查)
# --tauri 需要Rust 工具链rustup / brew install rust
#
# =============================================================================
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
PROJECT_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
cd "$PROJECT_ROOT"
# ── 颜色 ────────────────────────────────────────────────────────────────────
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
CYAN='\033[0;36m'
NC='\033[0m'
# ── 帮助 ────────────────────────────────────────────────────────────────────
show_help() {
cat <<-'EOF'
Fischer AgentKit — 本地开发环境启动
用法: bash scripts/dev-start.sh [选项]
选项:
--tauri 启动 Tauri 桌面客户端Vite 热重载 + 原生窗口)
--serve 仅启动后端 API无前端
--init 首次运行,初始化数据库和测试用户
--help 显示此帮助
模式说明:
默认 Web 模式agentkit gui (前后端 + 内置静态服务)
--tauri Tauri 模式:后端 API + Vite (:5173) + Tauri 桌面窗口
端口映射:
8000 — 后端 API
8002 — Web GUI / 前端静态服务
5173 — Vite 开发服务器(--tauri 模式)
EOF
}
# ── 参数解析 ────────────────────────────────────────────────────────────────
MODE="gui"
INIT_DB=0
while [[ $# -gt 0 ]]; do
case $1 in
--tauri) MODE="tauri"; shift ;;
--serve) MODE="serve"; shift ;;
--init) INIT_DB=1; shift ;;
--help|-h) show_help; exit 0 ;;
*) shift ;;
esac
done
# ── 日志函数 ────────────────────────────────────────────────────────────────
ok() { echo -e " ${GREEN}${NC} $*"; }
info() { echo -e " ${CYAN}${NC} $*"; }
warn() { echo -e " ${YELLOW}!${NC} $*"; }
fail() { echo -e " ${RED}${NC} $*" >&2; }
section() {
echo ""
echo -e "${CYAN}────────────────────────────────────────${NC}"
echo -e "${CYAN} $*${NC}"
echo -e "${CYAN}────────────────────────────────────────${NC}"
}
# ── 进度状态机 ──────────────────────────────────────────────────────────────
# 0=未开始, 1=进行中, 2=成功, 3=失败
S_DEPS=0 S_ENV=0 S_REDIS=0 S_PG=0 S_BACKEND=0 S_FRONTEND=0 S_TAURI=0
set_status() {
local step=$1 val=$2
case $step in
deps) S_DEPS=$val ;;
env) S_ENV=$val ;;
redis) S_REDIS=$val ;;
pg) S_PG=$val ;;
backend) S_BACKEND=$val ;;
frontend) S_FRONTEND=$val ;;
tauri) S_TAURI=$val ;;
esac
}
print_status() {
echo ""
echo -e "${CYAN}═══════════════════════════════════════════════════${NC}"
echo -e "${CYAN} 启动状态总览${NC}"
echo -e "${CYAN}═══════════════════════════════════════════════════${NC}"
echo -e "$([[ $S_DEPS -eq 2 ]] && echo " ${GREEN}${NC}" || [[ $S_DEPS -eq 3 ]] && echo " ${RED}${NC}" || echo " ${YELLOW}${NC}") 依赖检查"
echo -e "$([[ $S_ENV -eq 2 ]] && echo " ${GREEN}${NC}" || [[ $S_ENV -eq 3 ]] && echo " ${RED}${NC}" || echo " ${YELLOW}${NC}") 环境配置"
echo -e "$([[ $S_REDIS -eq 2 ]] && echo " ${GREEN}${NC}" || [[ $S_REDIS -eq 3 ]] && echo " ${RED}${NC}" || echo " ${YELLOW}${NC}") Redis"
echo -e "$([[ $S_PG -eq 2 ]] && echo " ${GREEN}${NC}" || [[ $S_PG -eq 3 ]] && echo " ${RED}${NC}" || echo " ${YELLOW}${NC}") PostgreSQL"
echo -e "$([[ $S_BACKEND -eq 2 ]] && echo " ${GREEN}${NC}" || [[ $S_BACKEND -eq 3 ]] && echo " ${RED}${NC}" || echo " ${YELLOW}${NC}") 后端服务 (:8000)"
if [[ $MODE == "gui" || $MODE == "tauri" ]]; then
echo -e "$([[ $S_FRONTEND -eq 2 ]] && echo " ${GREEN}${NC}" || [[ $S_FRONTEND -eq 3 ]] && echo " ${RED}${NC}" || echo " ${YELLOW}${NC}") 前端服务 (:8002)"
fi
if [[ $MODE == "tauri" ]]; then
echo -e "$([[ $S_TAURI -eq 2 ]] && echo " ${GREEN}${NC}" || [[ $S_TAURI -eq 3 ]] && echo " ${RED}${NC}" || echo " ${YELLOW}${NC}") Tauri 客户端"
fi
}
# ── 前置检查 ────────────────────────────────────────────────────────────────
check_deps() {
section "检查依赖"
set_status deps 1
for cmd in python3 node npm; do
if ! command -v "$cmd" &>/dev/null; then
fail "缺少依赖: $cmd"
set_status deps 3
return 1
fi
done
ok "Python $(python3 --version 2>&1 | awk '{print $2}')"
ok "Node $(node --version 2>&1)"
ok "npm $(npm --version 2>&1)"
# Python 版本 >= 3.11
if ! python3 -c 'import sys; sys.exit(0 if (sys.version_info.major, sys.version_info.minor) >= (3, 11) else 1)'; then
fail "Python 版本需 >= 3.11"
set_status deps 3
return 1
fi
# Tauri 模式需要 Rust
if [[ $MODE == "tauri" ]]; then
if ! command -v rustc &>/dev/null; then
warn "未检测到 RustTauri 需要):请运行 brew install rust 或 curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh"
else
ok "Rust $(rustc --version 2>&1 | awk '{print $2}')"
fi
fi
set_status deps 2
}
check_env() {
section "检查环境配置"
set_status env 1
if [[ ! -f .env ]]; then
warn "未找到 .env将使用环境变量默认值"
if [[ -f .env.example ]]; then
cp .env.example .env
ok "已从 .env.example 生成 .env"
fi
else
ok ".env 存在"
fi
set_status env 2
}
check_redis() {
section "检查 Redis"
set_status redis 1
if command -v redis-cli &>/dev/null && redis-cli ping 2>/dev/null | grep -q PONG; then
ok "Redis 运行中"
set_status redis 2
return 0
fi
# Docker 方式
local name="fischer-redis-dev"
if docker ps --format '{{.Names}}' 2>/dev/null | grep -q "^${name}$"; then
ok "Redis 容器运行中"
set_status redis 2
return 0
fi
warn "Redis 未运行,尝试启动 Docker 容器..."
if docker run -d --name "$name" -p 6379:6379 redis:7-alpine &>/dev/null; then
sleep 2
if docker exec "$name" redis-cli ping 2>/dev/null | grep -q PONG; then
ok "Redis Docker 容器启动成功 (:6379)"
set_status redis 2
return 0
fi
fi
fail "Redis 启动失败(请确保 Docker 运行中,或手动启动 Redis"
set_status redis 3
return 1
}
check_postgres() {
section "检查 PostgreSQL"
set_status pg 1
if lsof -i :5432 2>/dev/null | grep -q LISTEN; then
ok "PostgreSQL 已在 :5432 监听"
set_status pg 2
return 0
fi
# Docker 方式
local name="fischer-pg-dev"
if docker ps --format '{{.Names}}' 2>/dev/null | grep -q "^${name}$"; then
ok "PostgreSQL Docker 容器运行中"
set_status pg 2
return 0
fi
warn "PostgreSQL 未在 :5432 运行,尝试启动 Docker 容器..."
if docker run -d --name "$name" \
-p 5432:5432 \
-e POSTGRES_USER=agentkit \
-e POSTGRES_PASSWORD=agentkit \
-e POSTGRES_DB=agentkit \
pgvector/pgvector:pg15 &>/dev/null; then
sleep 3
if docker exec "$name" pg_isready -U agentkit &>/dev/null; then
ok "PostgreSQL Docker 容器启动成功 (:5432)"
set_status pg 2
return 0
fi
fi
warn "PostgreSQL 启动失败bitable 等功能可能受限,继续启动后端..."
set_status pg 2 # 不阻塞,继续
return 0
}
# ── 安装依赖 ────────────────────────────────────────────────────────────────
install_deps() {
section "安装依赖"
info "后端 Python 依赖..."
# 虚拟环境
if [[ ! -d .venv ]]; then
python3 -m venv .venv
ok "虚拟环境 .venv 创建完成"
fi
source .venv/bin/activate
pip install -q -U pip
pip install -e ".[dev]" -q
ok "后端依赖安装完成"
# 前端 npm 依赖
local FE_DIR="$PROJECT_ROOT/src/agentkit/server/frontend"
if [[ ! -d "$FE_DIR/node_modules" ]]; then
info "前端 npm 依赖..."
cd "$FE_DIR"
npm install
cd "$PROJECT_ROOT"
ok "前端依赖安装完成"
else
ok "前端 node_modules 已存在"
fi
}
# ── 启动后端 ────────────────────────────────────────────────────────────────
start_backend() {
section "启动后端服务"
set_status backend 1
info "启动后端 API (:8000)..."
source .venv/bin/activate
agentkit serve --port 8000 &
BACKEND_PID=$!
# 等待健康检查就绪(最多 60 秒)
info "等待后端就绪..."
local attempt=0
while [[ $attempt -lt 60 ]]; do
if curl -sf http://127.0.0.1:8000/api/v1/health &>/dev/null; then
ok "后端 API 就绪 (http://127.0.0.1:8000, PID $BACKEND_PID)"
set_status backend 2
return 0
fi
# 检查进程是否还活着
if ! kill -0 $BACKEND_PID 2>/dev/null; then
fail "后端进程意外退出"
set_status backend 3
return 1
fi
sleep 1
((attempt++))
[[ $((attempt % 10)) -eq 0 ]] && info "等待中... (${attempt}s)"
done
fail "后端启动超时60s 内未响应健康检查)"
kill $BACKEND_PID 2>/dev/null || true
set_status backend 3
return 1
}
# ── 启动 Web GUI ────────────────────────────────────────────────────────────
start_gui() {
section "启动 Web GUI"
set_status frontend 1
info "启动 Web GUI (:8002)..."
source .venv/bin/activate
agentkit gui --port 8002 &
GUI_PID=$!
# 等待就绪
info "等待 Web GUI 就绪..."
local attempt=0
while [[ $attempt -lt 60 ]]; do
if curl -sf http://127.0.0.1:8002/api/v1/health &>/dev/null; then
ok "Web GUI 就绪 (http://127.0.0.1:8002, PID $GUI_PID)"
set_status frontend 2
return 0
fi
if ! kill -0 $GUI_PID 2>/dev/null; then
fail "Web GUI 进程意外退出"
set_status frontend 3
return 1
fi
sleep 1
((attempt++))
[[ $((attempt % 10)) -eq 0 ]] && info "等待中... (${attempt}s)"
done
fail "Web GUI 启动超时"
kill $GUI_PID 2>/dev/null || true
set_status frontend 3
return 1
}
# ── 启动 Tauri ─────────────────────────────────────────────────────────────
start_tauri() {
section "启动 Tauri 桌面客户端"
set_status tauri 1
local FE_DIR="$PROJECT_ROOT/src/agentkit/server/frontend"
if ! command -v tauri &>/dev/null; then
warn "Tauri CLI 未安装,跳过桌面客户端"
info "安装方式npm install -g @tauri-apps/cli"
set_status tauri 2
return 0
fi
info "启动 Tauri 桌面客户端..."
info " (Vite → :5173, 后端 API → :8000)"
cd "$FE_DIR"
npm run tauri dev &
TAURI_PID=$!
info "Tauri 启动中(首次运行需要编译 Rust可能需要几分钟..."
ok "Tauri 进程已启动 (PID $TAURI_PID)"
info "桌面窗口将自动打开,如未打开请手动查看终端输出"
set_status tauri 2
cd "$PROJECT_ROOT"
}
# ── 数据库初始化 ────────────────────────────────────────────────────────────
init_db() {
section "初始化数据库"
info "创建测试用户..."
local SETUP_SCRIPT="$PROJECT_ROOT/src/agentkit/server/frontend/e2e/setup-test-user.py"
if [[ -f "$SETUP_SCRIPT" ]]; then
source .venv/bin/activate
if python3 "$SETUP_SCRIPT" 2>/dev/null; then
ok "测试用户创建完成"
else
warn "测试用户创建失败(可能已存在或数据库未就绪)"
fi
fi
}
# ── 停止服务 ────────────────────────────────────────────────────────────────
stop_services() {
echo ""
info "正在停止所有服务..."
for port in 8000 8001 8002 5173; do
local pid=$(lsof -ti :$port 2>/dev/null || true)
if [[ -n "$pid" ]]; then
kill $pid 2>/dev/null && ok "端口 $port 已停止" || true
fi
done
# Vite 进程
pkill -f "vite" 2>/dev/null && ok "Vite 进程已停止" || true
echo ""
ok "所有服务已停止。感谢使用!"
}
# ── 主流程 ─────────────────────────────────────────────────────────────────
echo ""
echo -e "${CYAN}═══════════════════════════════════════════════════${NC}"
echo -e "${CYAN} Fischer AgentKit — 本地开发环境启动${NC}"
echo -e "${CYAN}═══════════════════════════════════════════════════${NC}"
echo ""
# 检查依赖
if ! check_deps; then
echo ""
fail "依赖检查失败,请先安装缺失的依赖"
exit 1
fi
check_env
# 初始化数据库
[[ $INIT_DB -eq 1 ]] && init_db
# 安装依赖
install_deps
# 检查中间件
check_redis || true
check_postgres || true
echo ""
# ── 启动服务 ────────────────────────────────────────────────────────────────
FAILED=0
case $MODE in
serve)
if ! start_backend; then FAILED=1; fi
;;
gui)
if ! start_backend; then FAILED=1; fi
if [[ $FAILED -eq 0 ]] && ! start_gui; then FAILED=1; fi
;;
tauri)
if ! start_backend; then FAILED=1; fi
if [[ $FAILED -eq 0 ]]; then
start_tauri
fi
;;
esac
# ── 状态总览 + 启动完成 ──────────────────────────────────────────────────────
print_status
echo ""
echo -e "${CYAN}═══════════════════════════════════════════════════${NC}"
if [[ $FAILED -eq 0 ]]; then
echo -e "${GREEN} 所有服务启动成功!${NC}"
echo -e "${CYAN}═══════════════════════════════════════════════════${NC}"
echo ""
if [[ $MODE == "gui" ]]; then
echo -e " Web GUI: ${GREEN}http://localhost:8002${NC}"
echo " (在浏览器中打开,或直接在 http://localhost:8002 访问)"
elif [[ $MODE == "tauri" ]]; then
echo -e " 后端 API: ${GREEN}http://localhost:8000${NC}"
echo -e " Vite 热重载: ${GREEN}http://localhost:5173${NC}"
echo " Tauri 桌面窗口应已自动打开"
elif [[ $MODE == "serve" ]]; then
echo -e " 后端 API: ${GREEN}http://localhost:8000${NC}"
fi
echo ""
echo -e " ${YELLOW}按 Ctrl+C 停止所有服务${NC}"
echo -e "${CYAN}═══════════════════════════════════════════════════${NC}"
else
echo -e "${RED} 服务启动失败,请查看上方错误信息${NC}"
echo -e "${CYAN}═══════════════════════════════════════════════════${NC}"
echo ""
echo -e " 诊断命令:"
echo -e " 查看日志: ${CYAN}curl http://127.0.0.1:8000/api/v1/health${NC}"
echo -e " 停止服务: ${CYAN}bash scripts/dev-stop.sh${NC}"
fi
echo ""
# 注册退出钩子
trap stop_services EXIT INT TERM
# 保持脚本运行
if [[ $MODE == "tauri" ]]; then
# Tauri 模式:等待 Tauri 进程
wait
elif [[ $MODE == "gui" || $MODE == "serve" ]]; then
# 等待后端进程
wait $BACKEND_PID 2>/dev/null || true
fi

30
scripts/dev-stop.sh Executable file
View File

@ -0,0 +1,30 @@
#!/usr/bin/env bash
# =============================================================================
# Fischer AgentKit — 本地开发环境一键停止脚本
# =============================================================================
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
PROJECT_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
cd "$PROJECT_ROOT"
echo "停止 Fischer AgentKit 开发服务..."
# 停止后端/API/GUI 进程(按端口)
for port in 8000 8001 8002 5173; do
PID=$(lsof -ti :$port 2>/dev/null || true)
if [[ -n "$PID" ]]; then
kill $PID 2>/dev/null && echo " [OK] 端口 $port 已停止 (PID $PID)" || true
fi
done
# 停止 Vite/Node 进程
pkill -f "vite" 2>/dev/null && echo " [OK] Vite 进程已停止" || true
# 停止 Docker 容器(开发专用)
for container in fischer-redis-dev fischer-pg-dev; do
if docker ps --format '{{.Names}}' 2>/dev/null | grep -q "^${container}$"; then
docker stop "$container" 2>/dev/null && echo " [OK] 容器 $container 已停止" || true
fi
done
echo "停止完成。"

218
scripts/prod-start.sh Executable file
View File

@ -0,0 +1,218 @@
#!/usr/bin/env bash
# =============================================================================
# Fischer AgentKit — 生产环境一键启动脚本
# =============================================================================
#
# 启动Docker Composeagentkit + Redis + PostgreSQL
# 要求:.env 文件存在(含 LLM API Key、JWT Secret
#
# 用法:
# bash scripts/prod-start.sh # 启动所有服务
# bash scripts/prod-start.sh --init # 首次启动(初始化 DB
# bash scripts/prod-start.sh --build # 构建镜像后启动
# bash scripts/prod-start.sh --help # 帮助
# =============================================================================
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
PROJECT_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
cd "$PROJECT_ROOT"
: "${COMPOSE_FILE:=docker-compose.yaml}"
# ── 颜色 ────────────────────────────────────────────────────────────────────
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
CYAN='\033[0;36m'
NC='\033[0m'
# ── 帮助 ────────────────────────────────────────────────────────────────────
show_help() {
cat <<-'EOF'
Fischer AgentKit — 生产环境启动
用法: bash scripts/prod-start.sh [选项]
选项:
--init 首次启动,初始化数据库表
--build 先构建/拉取镜像,再启动
--pull 仅拉取最新镜像(不启动)
--help 显示此帮助
Docker Compose 文件: docker-compose.yaml
服务端口:
8001 — AgentKit API 服务
6379 — Redis
5432 — PostgreSQL
访问: http://localhost:8001
EOF
}
# ── 参数解析 ────────────────────────────────────────────────────────────────
INIT_DB=0
BUILD=0
PULL_ONLY=0
while [[ $# -gt 0 ]]; do
case $1 in
--init) INIT_DB=1; shift ;;
--build) BUILD=1; shift ;;
--pull) PULL_ONLY=1; shift ;;
--help|-h) show_help; exit 0 ;;
*) shift ;;
esac
done
# ── 日志函数 ────────────────────────────────────────────────────────────────
info() { echo -e "${BLUE}[INFO]${NC} $*"; }
ok() { echo -e "${GREEN}[OK]${NC} $*"; }
warn() { echo -e "${YELLOW}[WARN]${NC} $*"; }
fail() { echo -e "${RED}[FAIL]${NC} $*"; }
# ── 前置检查 ────────────────────────────────────────────────────────────────
check_deps() {
info "检查依赖..."
for cmd in docker; do
if ! command -v "$cmd" &>/dev/null; then
fail "缺少依赖: $cmd"
exit 1
fi
done
# docker compose v2+ (no separate docker-compose binary needed)
if ! command -v docker-compose &>/dev/null && docker compose version &>/dev/null; then
ok "docker compose 可用v2+"
elif command -v docker-compose &>/dev/null; then
ok "docker-compose 可用"
else
fail "缺少 docker 或 docker compose v2"
exit 1
fi
ok "依赖检查通过"
}
check_env() {
info "检查环境配置..."
if [[ ! -f .env ]]; then
fail "未找到 .env 文件"
echo " 请先创建 .env 文件(参考 .env.example"
exit 1
fi
# 检查关键变量
if ! grep -q "DASHSCOPE_API_KEY\|DEEPSEEK_API_KEY" .env 2>/dev/null; then
warn ".env 中可能缺少 LLM API Key"
fi
ok ".env 存在"
}
# ── 数据库初始化 ────────────────────────────────────────────────────────────
init_db() {
info "初始化数据库..."
docker compose -f "$COMPOSE_FILE" exec -T postgres \
psql -U agentkit -d agentkit -c "SELECT 1" &>/dev/null || {
fail "PostgreSQL 容器未就绪,等待后重试..."
sleep 5
}
# 等待 PostgreSQL 完全就绪
local attempt=0
while [[ $attempt -lt 30 ]]; do
if docker compose -f "$COMPOSE_FILE" exec -T postgres \
pg_isready -U agentkit &>/dev/null; then
break
fi
sleep 1
((attempt++))
done
ok "数据库就绪"
}
# ── 主流程 ─────────────────────────────────────────────────────────────────
echo ""
echo -e "${CYAN}═══════════════════════════════════════════════════${NC}"
echo -e "${CYAN} Fischer AgentKit — 生产环境启动${NC}"
echo -e "${CYAN}═══════════════════════════════════════════════════${NC}"
echo ""
check_deps
check_env
if [[ $PULL_ONLY -eq 1 ]]; then
info "拉取最新镜像..."
docker compose -f "$COMPOSE_FILE" pull
ok "镜像拉取完成"
exit 0
fi
echo ""
echo -e "${GREEN}─────────────────────────────────────────────────${NC}"
echo -e "${GREEN} 启动 Docker 服务...${NC}"
echo -e "${GREEN}─────────────────────────────────────────────────${NC}"
echo ""
if [[ $BUILD -eq 1 ]]; then
info "构建/拉取镜像..."
docker compose -f "$COMPOSE_FILE" build --pull
ok "镜像构建完成"
fi
# 创建网络(如不存在)
docker network ls 2>/dev/null | grep -q "fischer-agentkit_default" || \
docker network create fischer-agentkit_default &>/dev/null || true
info "启动服务docker compose up -d..."
docker compose -f "$COMPOSE_FILE" up -d --remove-orphans
# 等待健康检查
info "等待服务就绪(健康检查最多 120s..."
local attempt=0
while [[ $attempt -lt 60 ]]; do
if curl -sf http://127.0.0.1:8001/api/v1/health &>/dev/null; then
break
fi
sleep 2
((attempt++))
echo -n "."
done
echo ""
# 显示状态
echo ""
echo -e "${GREEN}─────────────────────────────────────────────────${NC}"
echo -e "${GREEN} Docker 服务状态${NC}"
echo -e "${GREEN}─────────────────────────────────────────────────${NC}"
docker compose -f "$COMPOSE_FILE" ps
# 状态检查
if curl -sf http://127.0.0.1:8001/api/v1/health &>/dev/null; then
echo ""
ok "AgentKit API 就绪 -> http://127.0.0.1:8001"
echo ""
echo -e "${GREEN}═══════════════════════════════════════════════════${NC}"
echo -e "${GREEN} 生产服务全部启动成功!${NC}"
echo -e "${GREEN}─────────────────────────────────────────────────${NC}"
echo -e " AgentKit API: ${GREEN}http://localhost:8001${NC}"
echo -e " Redis: ${GREEN}localhost:6379${NC}"
echo -e " PostgreSQL: ${GREEN}localhost:5432${NC}"
echo -e "${GREEN}─────────────────────────────────────────────────${NC}"
echo -e " 查看日志: ${CYAN}docker compose -f $COMPOSE_FILE logs -f${NC}"
echo -e " 停止服务: ${CYAN}bash scripts/prod-stop.sh${NC}"
echo -e "${GREEN}═══════════════════════════════════════════════════${NC}"
else
echo ""
fail "AgentKit API 启动超时,查看日志诊断:"
echo -e " ${CYAN}docker compose -f $COMPOSE_FILE logs --tail=50${NC}"
exit 1
fi

19
scripts/prod-stop.sh Executable file
View File

@ -0,0 +1,19 @@
#!/usr/bin/env bash
# =============================================================================
# Fischer AgentKit — 生产环境一键停止脚本
# =============================================================================
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
PROJECT_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
cd "$PROJECT_ROOT"
COMPOSE_FILE="${COMPOSE_FILE:-docker-compose.yaml}"
echo "停止 Fischer AgentKit 生产服务..."
docker compose -f "$COMPOSE_FILE" down 2>/dev/null && echo " [OK] 容器已停止并移除" || echo " [OK] 无运行中的容器"
echo ""
echo "注意数据卷Redis/PostgreSQL 数据)已保留。"
echo "如需清除所有数据运行docker volume rm fischer-agentkit_*"
echo "停止完成。"

192
scripts/test-start.sh Executable file
View File

@ -0,0 +1,192 @@
#!/usr/bin/env bash
# =============================================================================
# Fischer AgentKit — 一键测试环境脚本
# =============================================================================
#
# 启动完整测试环境:后端 API + E2E 测试(使用独立端口,不影响开发环境)
#
# 用法:
# bash scripts/test-start.sh # 启动测试环境并运行全部测试
# bash scripts/test-start.sh --unit # 仅单元测试
# bash scripts/test-start.sh --e2e # 仅 E2E 测试
# bash scripts/test-start.sh --bitable # 仅 bitable 模块测试
# bash scripts/test-start.sh --skip-server # 跳过服务启动(使用已有服务)
# bash scripts/test-start.sh --help # 帮助
# =============================================================================
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
PROJECT_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)"
cd "$PROJECT_ROOT"
: "${E2E_PORT:=18765}"
: "${E2E_API_KEY:=ak_live_e2e_test_key_000000000000000000000000000000000000000000000000}"
# ── 颜色 ────────────────────────────────────────────────────────────────────
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
BLUE='\033[0;34m'
CYAN='\033[0;36m'
NC='\033[0m'
# ── 帮助 ────────────────────────────────────────────────────────────────────
show_help() {
cat <<-'EOF'
Fischer AgentKit — 测试环境
用法: bash scripts/test-start.sh [选项]
选项:
--unit 仅运行单元测试pytest -m "not integration"
--e2e 仅运行 E2E 测试
--bitable 仅运行 bitable 模块测试
--skip-server 跳过服务启动(使用已有服务)
--help 显示帮助
环境变量:
E2E_PORT 测试服务器端口(默认: 18765
E2E_API_KEY 测试 API Key默认: 内置测试 Key
EOF
}
# ── 参数解析 ────────────────────────────────────────────────────────────────
RUN_MODE="all"
SKIP_SERVER=0
while [[ $# -gt 0 ]]; do
case $1 in
--unit) RUN_MODE="unit"; shift ;;
--e2e) RUN_MODE="e2e"; shift ;;
--bitable) RUN_MODE="bitable"; shift ;;
--skip-server) SKIP_SERVER=1; shift ;;
--help|-h) show_help; exit 0 ;;
*) shift ;;
esac
done
# ── 日志函数 ────────────────────────────────────────────────────────────────
info() { echo -e "${BLUE}[INFO]${NC} $*"; }
ok() { echo -e "${GREEN}[OK]${NC} $*"; }
warn() { echo -e "${YELLOW}[WARN]${NC} $*"; }
fail() { echo -e "${RED}[FAIL]${NC} $*"; }
# ── 服务管理 ────────────────────────────────────────────────────────────────
start_test_server() {
if [[ $SKIP_SERVER -eq 1 ]]; then
info "SKIP_SERVER=1跳过服务启动"
return 0
fi
info "启动测试服务器(端口 $E2E_PORT..."
if lsof -i :$E2E_PORT 2>/dev/null | grep -q LISTEN; then
ok "测试服务器已在端口 $E2E_PORT 运行"
return 0
fi
export AGENTKIT_E2E_MODE=1
export AGENTKIT_WS_TIMEOUT=0
export AGENTKIT_API_KEY="$E2E_API_KEY"
source .venv/bin/activate 2>/dev/null || true
python3 -m agentkit.cli.main serve --host 127.0.0.1 --port "$E2E_PORT" &
SERVER_PID=$!
# 等待就绪
local attempt=0
while [[ $attempt -lt 60 ]]; do
if curl -sf "http://127.0.0.1:$E2E_PORT/api/v1/health" &>/dev/null; then
ok "测试服务器就绪 (PID $SERVER_PID,端口 $E2E_PORT)"
return 0
fi
sleep 0.5
((attempt++))
done
fail "测试服务器启动超时"
kill $SERVER_PID 2>/dev/null || true
exit 1
}
stop_test_server() {
if [[ $SKIP_SERVER -eq 1 ]]; then return 0; fi
PID=$(lsof -ti :$E2E_PORT 2>/dev/null || true)
if [[ -n "$PID" ]]; then
kill $PID 2>/dev/null && ok "测试服务器已停止" || true
fi
}
# ── 测试执行 ────────────────────────────────────────────────────────────────
run_unit_tests() {
info "运行单元测试..."
source .venv/bin/activate 2>/dev/null || true
pytest -m "not integration" -q --tb=short
ok "单元测试完成"
}
run_bitable_tests() {
info "运行 bitable 模块测试..."
source .venv/bin/activate 2>/dev/null || true
pytest tests/unit/bitable/ -v --tb=short
ok "bitable 测试完成"
}
run_e2e_tests() {
info "运行 E2E 测试..."
FE_DIR="$PROJECT_ROOT/src/agentkit/server/frontend"
cd "$FE_DIR"
if [[ ! -d node_modules ]]; then
info "安装前端依赖..."
npm install
fi
export AGENTKIT_SERVER_URL="http://127.0.0.1:$E2E_PORT"
export AGENTKIT_API_KEY="$E2E_API_KEY"
npm run test:e2e
ok "E2E 测试完成"
cd "$PROJECT_ROOT"
}
# ── 主流程 ─────────────────────────────────────────────────────────────────
echo ""
echo -e "${CYAN}═══════════════════════════════════════════════════${NC}"
echo -e "${CYAN} Fischer AgentKit — 测试环境${NC}"
echo -e "${CYAN}═══════════════════════════════════════════════════${NC}"
echo ""
trap stop_test_server EXIT INT TERM
case $RUN_MODE in
unit)
run_unit_tests
;;
e2e)
start_test_server
run_e2e_tests
;;
bitable)
run_bitable_tests
;;
all)
info "运行全部测试..."
run_unit_tests
echo ""
run_bitable_tests
echo ""
start_test_server
run_e2e_tests
;;
esac
echo ""
ok "测试完成!"

View File

@ -18,7 +18,7 @@ from agentkit.core.exceptions import LoopDetectedError, TaskCancelledError, Task
from agentkit.core.protocol import CancellationToken
from agentkit.llm.gateway import LLMGateway
from agentkit.llm.protocol import LLMResponse
from agentkit.tools.base import Tool
from agentkit.tools.base import Tool, ToolValidationError
from agentkit.telemetry.tracing import start_span, _OTEL_AVAILABLE
from agentkit.telemetry.metrics import (
agent_request_counter,
@ -119,7 +119,9 @@ class ReActResult:
trajectory: list[ReActStep]
total_steps: int
total_tokens: int
status: str = "success" # "success" | "timeout" | "cancelled" | "partial"
status: str = (
"success" # "success"|"timeout"|"cancelled"|"partial"|"empty_fallback"|"verify_failed"
)
fallback_strategy: str | None = None # e.g. "simplified_rewoo", "react", "direct"
@ -163,6 +165,9 @@ class ReActEngine:
core_tool_names: list[str] | None = None,
enable_tool_search: bool = True,
middleware_chain: "MiddlewareChain | None" = None,
prompt_cache_enable: bool = True,
flush_interval_ms: int = 0,
max_reinjections: int = 1,
):
if max_steps < 1:
raise ValueError(f"max_steps must be >= 1, got {max_steps}")
@ -176,6 +181,16 @@ class ReActEngine:
self._parallel_tools = parallel_tools
self._verification_enabled = verification_enabled
self._verification_commands = verification_commands
# U2/G2: prompt cache 双块结构开关(True 时 Anthropic 用 cache_control blocks,
# 其他 provider 走字符串拼接依赖自动前缀缓存)
self._prompt_cache_enable = prompt_cache_enable
# U3/G8: token chunk 节流间隔(ms)。0 = 逐 chunk yield(向后兼容)。
# 用 time.monotonic() 不受系统时钟跳变影响。
self._flush_interval_ms = flush_interval_ms
# U4/G1: verify 失败回灌最大重试次数。0 = 不回灌(当前行为,仅记录 trajectory);
# 1 = 首次失败回灌一次 errors 给 LLM 自纠正,二次失败中断。
# 受 max_steps 上限约束(不无限循环)。verification_enabled=False 时无效。
self._max_reinjections = max_reinjections
# Tiered tool description injection config
self._core_tool_names: tuple[str, ...] | None = (
tuple(core_tool_names) if core_tool_names is not None else None
@ -429,22 +444,22 @@ class ReActEngine:
skill_name=task_type or None,
)
# Memory retrieval: 执行前检索相关上下文注入 system_prompt
# Memory retrieval: 执行前检索相关上下文,作为 volatile 层注入 system message
# U2/G2: 不再拼到 stable(system_prompt)末尾,改由 _build_system_message 组装双块结构
memory_context = ""
if memory_retriever:
try:
query = str(messages[-1].get("content", "")) if messages else ""
top_k = (retrieval_config or {}).get("top_k", 5)
token_budget = (retrieval_config or {}).get("token_budget", 2000)
memory_context = await memory_retriever.get_context_string(
query=query,
top_k=top_k,
token_budget=token_budget,
memory_context = (
await memory_retriever.get_context_string(
query=query,
top_k=top_k,
token_budget=token_budget,
)
or ""
)
if memory_context:
if system_prompt:
system_prompt += f"\n\n## 参考信息\n{memory_context}"
else:
system_prompt = f"## 参考信息\n{memory_context}"
except Exception as e:
logger.warning(
f"Memory retrieval failed, continuing without context: {e}", exc_info=True
@ -452,8 +467,13 @@ class ReActEngine:
# 构建初始消息
conversation: list[dict[str, Any]] = []
if system_prompt:
conversation.append({"role": "system", "content": system_prompt})
system_content = self._build_system_message(
stable=system_prompt or "",
volatile=memory_context,
model=model,
)
if system_content is not None:
conversation.append({"role": "system", "content": system_content})
conversation.extend(messages)
# Context compression: 压缩超长对话历史
@ -468,6 +488,8 @@ class ReActEngine:
trace_outcome = "success"
step = 0
output = ""
# U4/G1: verify 失败回灌计数器。受 max_steps 上限约束(不无限循环)。
reinjections = 0
while step < self._max_steps:
step += 1
@ -837,6 +859,29 @@ class ReActEngine:
except Exception as e:
logger.warning(f"Incremental compression failed: {e}")
else:
# ponytail: 检查是否为畸形工具调用(含 <tool_use> 但解析失败)
# 如果是,注入纠正消息让模型重试,而不是把原始 XML 作为最终答案泄漏
if "<tool_use>" in (response.content or ""):
logger.warning(
f"Step {step}: content contains <tool_use> but "
f"parsing failed — injecting correction"
)
conversation.append({"role": "assistant", "content": response.content})
conversation.append(
{
"role": "user",
"content": (
"你上一次的工具调用格式有误,无法解析。"
"请使用正确的格式重新调用工具:\n"
"<tool_use>\n"
'{"name": "工具名", "arguments": {"参数名": "参数值"}}\n'
"</tool_use>\n"
"确保 JSON 完整且不要混入其他标签。"
),
}
)
continue
# Final answer: LLM 没有调用工具,返回最终答案
react_step = ReActStep(
step=step,
@ -856,36 +901,65 @@ class ReActEngine:
duration_ms=llm_duration_ms,
tokens_used=step_tokens,
)
break
# Verification: 如果启用验证,在 final answer 后运行测试
if self._verification_enabled and output:
try:
from agentkit.core.verification_loop import VerificationLoop
# U4/G1: verify at final-answer point with reinjection.
# 原为循环后一次性运行;现改为循环内检测 final answer 后立即 verify,
# 失败则把 errors 作为 user 消息回灌 conversation,continue 主循环让 LLM 自纠正。
# max_reinjections=0 等价于原行为(仅记录 trajectory,不回灌)。
if self._verification_enabled and output:
try:
from agentkit.core.verification_loop import VerificationLoop
vloop = VerificationLoop(commands=self._verification_commands)
vresult = await vloop.verify()
if not vresult.passed:
# 将验证失败信息作为 ReActStep 添加到轨迹
verification_step = ReActStep(
step=step + 1,
action="tool_call",
tool_name="verification",
arguments={"commands": self._verification_commands},
result={
"passed": vresult.passed,
"errors": vresult.errors,
"test_output": vresult.test_output,
},
content=(f"Verification failed:\n{vresult.test_output[:2000]}"),
)
trajectory.append(verification_step)
logger.info(
"Verification failed after final answer, "
"appended feedback to trajectory"
)
except Exception as e:
logger.warning(f"Verification loop failed: {e}")
vloop = VerificationLoop(commands=self._verification_commands)
vresult = await vloop.verify()
if not vresult.passed:
if (
reinjections < self._max_reinjections
and step < self._max_steps
):
# 回灌 errors 作为 user 消息,让 LLM 自纠正
errors_text = "\n".join(vresult.errors)
conversation.append(
{
"role": "user",
"content": (f"验证失败,错误如下:\n{errors_text}"),
}
)
reinjections += 1
logger.info(
"Verification failed (reinjection %d/%d), "
"errors injected into conversation",
reinjections,
self._max_reinjections,
)
continue
# 达到 max_reinjections 或 max_steps → 记录 verify log 并中断
verification_step = ReActStep(
step=step,
action="tool_call",
tool_name="verification",
arguments={"commands": self._verification_commands},
result={
"passed": vresult.passed,
"errors": vresult.errors,
"test_output": vresult.test_output,
},
content=(
f"Verification failed:\n{vresult.test_output[:2000]}"
),
)
trajectory.append(verification_step)
trace_outcome = "verify_failed"
logger.info(
"Verification failed after %d reinjections, "
"interrupting with verify log",
reinjections,
)
break
except Exception as e:
logger.warning(f"Verification loop failed: {e}")
break # verify 通过或未启用 → 正常退出
# 达到 max_steps 时,返回当前最佳输出
if step >= self._max_steps and not output:
@ -929,6 +1003,7 @@ class ReActEngine:
trajectory=trajectory,
total_steps=len(trajectory),
total_tokens=total_tokens,
status=trace_outcome,
)
finally:
# Telemetry: end span and record duration — always runs
@ -1014,28 +1089,34 @@ class ReActEngine:
skill_name=task_type or None,
)
# Memory retrieval: 执行前检索相关上下文注入 system_prompt
# Memory retrieval: 执行前检索相关上下文,作为 volatile 层注入 system message
# U2/G2: 不再拼到 stable(system_prompt)末尾破坏 cache 前缀,改由 _build_system_message
# 组装双块结构(stable + volatile),Anthropic provider 在 stable 上加 cache_control。
memory_context = ""
if memory_retriever:
try:
query = str(messages[-1].get("content", "")) if messages else ""
top_k = (retrieval_config or {}).get("top_k", 5)
token_budget = (retrieval_config or {}).get("token_budget", 2000)
memory_context = await memory_retriever.get_context_string(
query=query,
top_k=top_k,
token_budget=token_budget,
memory_context = (
await memory_retriever.get_context_string(
query=query,
top_k=top_k,
token_budget=token_budget,
)
or ""
)
if memory_context:
if system_prompt:
system_prompt += f"\n\n## 参考信息\n{memory_context}"
else:
system_prompt = f"## 参考信息\n{memory_context}"
except Exception as e:
logger.warning(f"Memory retrieval failed, continuing without context: {e}")
conversation: list[dict[str, Any]] = []
if system_prompt:
conversation.append({"role": "system", "content": system_prompt})
system_content = self._build_system_message(
stable=system_prompt or "",
volatile=memory_context,
model=model,
)
if system_content is not None:
conversation.append({"role": "system", "content": system_content})
conversation.extend(messages)
# Context compression: 压缩超长对话历史
@ -1052,6 +1133,8 @@ class ReActEngine:
step = 0
output = ""
trace_outcome = "success"
# U4/G1: verify 失败回灌计数器(execute_stream 版)。受 max_steps 上限约束。
reinjections = 0
_stream_start = time.monotonic()
effective_timeout = (
timeout_seconds if timeout_seconds is not None else self._default_timeout
@ -1089,6 +1172,9 @@ class ReActEngine:
stream_usage = None
stream_tool_calls: list[Any] = []
stream_model = model
# U3/G8: delta_flush 节流 buffer,按 flush_interval_ms 批量 yield
_flush_buffer: list[str] = []
_last_flush_ts = time.monotonic()
async for chunk in _ensure_async_iterable(
self._llm_gateway.chat_stream(
@ -1102,11 +1188,20 @@ class ReActEngine:
):
if chunk.content:
stream_content_chunks.append(chunk.content)
yield ReActEvent(
event_type="token",
step=step,
data={"content": chunk.content},
)
_flush_buffer.append(chunk.content)
now = time.monotonic()
# flush_interval_ms=0 → 逐 chunk yield(向后兼容,条件短路为 True)
if (
self._flush_interval_ms == 0
or now - _last_flush_ts >= self._flush_interval_ms / 1000
):
yield ReActEvent(
event_type="token",
step=step,
data={"content": "".join(_flush_buffer)},
)
_flush_buffer = []
_last_flush_ts = now
if chunk.usage:
stream_usage = chunk.usage
if chunk.tool_calls:
@ -1114,6 +1209,15 @@ class ReActEngine:
if chunk.model:
stream_model = chunk.model
# U3/G8: 流结束 mid-interval → 最终 flush 剩余 buffer(不丢字符)
if _flush_buffer:
yield ReActEvent(
event_type="token",
step=step,
data={"content": "".join(_flush_buffer)},
)
_flush_buffer = []
# Build response-like object from stream
stream_content = "".join(stream_content_chunks)
response = self._build_response_from_stream(
@ -1513,6 +1617,34 @@ class ReActEngine:
except Exception as e:
logger.warning(f"Incremental compression failed: {e}")
else:
# ponytail: 检查是否为畸形工具调用(含 <tool_use> 但解析失败)
# 如果是,注入纠正消息让模型重试,而不是把原始 XML 作为最终答案泄漏
if "<tool_use>" in (response.content or ""):
logger.warning(
f"Step {step}: content contains <tool_use> but "
f"parsing failed — injecting correction (stream)"
)
conversation.append({"role": "assistant", "content": response.content})
conversation.append(
{
"role": "user",
"content": (
"你上一次的工具调用格式有误,无法解析。"
"请使用正确的格式重新调用工具:\n"
"<tool_use>\n"
'{"name": "工具名", "arguments": {"参数名": "参数值"}}\n'
"</tool_use>\n"
"确保 JSON 完整且不要混入其他标签。"
),
}
)
yield ReActEvent(
event_type="step",
step=step,
data={"message": "工具调用格式异常,已注入纠正消息"},
)
continue
# Final answer
react_step = ReActStep(
step=step,
@ -1533,6 +1665,80 @@ class ReActEngine:
tokens_used=step_tokens,
)
# U4/G1: verify at final-answer point with reinjection (stream 版)。
# 与 execute() 同模式:失败回灌 errors 作为 user 消息,continue 主循环。
# max_reinjections=0 等价于原行为(仅记录 trajectory,不回灌)。
# 注意:final_answer 事件在 verify 通过后才 yield,避免客户端过早收到完成信号。
if self._verification_enabled and output:
try:
from agentkit.core.verification_loop import VerificationLoop
vloop = VerificationLoop(commands=self._verification_commands)
vresult = await vloop.verify()
if not vresult.passed:
if (
reinjections < self._max_reinjections
and step < self._max_steps
):
# 回灌 errors,不发 final_answer 事件,继续循环
errors_text = "\n".join(vresult.errors)
conversation.append(
{
"role": "user",
"content": (f"验证失败,错误如下:\n{errors_text}"),
}
)
reinjections += 1
yield ReActEvent(
event_type="step",
step=step,
data={
"message": (
f"验证失败,已注入错误信息让 LLM 自纠正 "
f"(reinjection {reinjections}/{self._max_reinjections})"
),
"verify_errors": vresult.errors,
},
)
continue
# 达到 max_reinjections 或 max_steps → 记录 verify log 并中断
verification_step = ReActStep(
step=step,
action="tool_call",
tool_name="verification",
arguments={"commands": self._verification_commands},
result={
"passed": vresult.passed,
"errors": vresult.errors,
"test_output": vresult.test_output,
},
content=(
f"Verification failed:\n{vresult.test_output[:2000]}"
),
)
trajectory.append(verification_step)
trace_outcome = "verify_failed"
yield ReActEvent(
event_type="tool_result",
step=step,
data={
"tool_name": "verification",
"result": {
"passed": vresult.passed,
"errors": vresult.errors,
"test_output": vresult.test_output,
},
},
)
logger.info(
"Verification failed after %d reinjections, "
"interrupting with verify log",
reinjections,
)
break
except Exception as e:
logger.warning(f"Verification loop failed: {e}")
yield ReActEvent(
event_type="final_answer",
step=step,
@ -1542,47 +1748,7 @@ class ReActEngine:
"total_tokens": total_tokens,
},
)
break
# Verification: 如果启用验证,在 final answer 后运行测试
if self._verification_enabled and output:
try:
from agentkit.core.verification_loop import VerificationLoop
vloop = VerificationLoop(commands=self._verification_commands)
vresult = await vloop.verify()
if not vresult.passed:
verification_step = ReActStep(
step=step + 1,
action="tool_call",
tool_name="verification",
arguments={"commands": self._verification_commands},
result={
"passed": vresult.passed,
"errors": vresult.errors,
"test_output": vresult.test_output,
},
content=(f"Verification failed:\n{vresult.test_output[:2000]}"),
)
trajectory.append(verification_step)
yield ReActEvent(
event_type="tool_result",
step=step + 1,
data={
"tool_name": "verification",
"result": {
"passed": vresult.passed,
"errors": vresult.errors,
"test_output": vresult.test_output,
},
},
)
logger.info(
"Verification failed after final answer, "
"appended feedback to trajectory"
)
except Exception as e:
logger.warning(f"Verification loop failed: {e}")
break # verify 通过或未启用 → 正常退出
if step >= self._max_steps and not output:
trace_outcome = "partial"
@ -1666,6 +1832,66 @@ class ReActEngine:
schemas.append(schema)
return schemas
def _build_system_message(
self,
stable: str,
volatile: str,
*,
model: str,
) -> str | list[dict[str, Any]] | None:
"""构建双块结构 system message(stable + volatile)。
- prompt_cache_enable=False 或无 stable+volatile 返回 str( None)
- Anthropic provider 返回 content blocks 列表,stable 块带 cache_control
- 其他 provider 返回字符串拼接(stable + volatile),依赖 stable 前缀命中自动前缀缓存
ponytail: 断点数硬编码为 1(stable ),不暴露配置(YAGNI 双块结构 >1 无语义)
"""
if not stable and not volatile:
return None
if not self._prompt_cache_enable:
# 退化为字符串拼接(向后兼容,行为同改动前)
if stable and volatile:
return f"{stable}\n\n## 参考信息\n{volatile}"
if volatile:
return f"## 参考信息\n{volatile}"
return stable
provider_name = self._get_provider_name(model)
if provider_name == "anthropic":
blocks: list[dict[str, Any]] = []
if stable:
blocks.append(
{
"type": "text",
"text": stable,
"cache_control": {"type": "ephemeral"},
}
)
if volatile:
blocks.append(
{
"type": "text",
"text": f"## 参考信息\n{volatile}",
}
)
return blocks if blocks else None
# 非 Anthropic:字符串拼接,stable 前缀命中 OpenAI/DashScope 自动前缀缓存
if stable and volatile:
return f"{stable}\n\n## 参考信息\n{volatile}"
if volatile:
return f"## 参考信息\n{volatile}"
return stable
def _get_provider_name(self, model: str) -> str | None:
"""通过 gateway 查询 model 对应的 provider 名。失败回退 None(字符串拼接)。"""
try:
return self._llm_gateway.get_provider_name_for_model(model)
except Exception:
# ponytail: 测试中 gateway 可能是 MagicMock,无该方法;回退保守路径
return None
def _build_tool_use_prompt(self, tools: list[Tool]) -> str:
"""Build prompt-based tool calling instructions with tiered injection.
@ -1855,6 +2081,15 @@ class ReActEngine:
try:
result = await tool.safe_execute(**clean_args)
return result
except ToolValidationError as e:
# 保留类型化错误码,不被通用 except 平坦化为字符串
error_msg = f"Tool '{tool_name}' schema validation failed: {e}"
logger.warning(error_msg)
return {
"error": str(e),
"error_code": e.error_code,
"details": e.details,
}
except Exception as e:
error_msg = f"Tool '{tool_name}' execution failed: {e}"
logger.warning(error_msg)
@ -2052,4 +2287,101 @@ class ReActEngine:
else:
logger.warning(f"Failed to parse tool_use block: {json_str[:200]}")
if calls:
return calls
# 格式 4: 畸形 <tool_use> — 缺少闭合标签或 JSON 被截断/混入杂标签
# 兜底解析:从 <tool_use> 后提取 JSON 片段,用大括号匹配法恢复完整 JSON
open_pattern = re.compile(r"<tool_use>\s*", re.IGNORECASE)
for match in open_pattern.finditer(content):
remainder = content[match.end() :]
parsed = self._extract_tool_call_from_malformed(remainder)
if parsed:
calls.append(parsed)
return calls
@staticmethod
def _extract_tool_call_from_malformed(text: str) -> dict[str, Any] | None:
"""从畸形文本中尝试提取工具调用。
处理场景
1. JSON 被截断缺少闭合大括号
2. JSON 中混入 <parameter> XML 标签
3. 完全无法解析时返回 None
"""
# 尝试用大括号匹配提取第一个 JSON 对象
brace_start = text.find("{")
if brace_start == -1:
return None
depth = 0
json_end = -1
in_string = False
escape = False
for i in range(brace_start, len(text)):
ch = text[i]
if escape:
escape = False
continue
if ch == "\\":
escape = True
continue
if ch == '"':
in_string = not in_string
continue
if in_string:
continue
if ch == "{":
depth += 1
elif ch == "}":
depth -= 1
if depth == 0:
json_end = i + 1
break
if json_end == -1:
# JSON 被截断 — 尝试补全大括号后解析
json_str = text[brace_start:].strip()
# 截断掉非 JSON 尾部(如 </parameter>, <function> 等)
cut = json_str.find("}")
if cut != -1:
json_str = json_str[: cut + 1]
else:
# 补全缺失的大括号
open_braces = json_str.count("{") - json_str.count("}")
json_str = json_str + "}" * max(open_braces, 0)
else:
json_str = text[brace_start:json_end]
try:
parsed = json.loads(json_str)
name = parsed.get("name", "")
arguments = parsed.get("arguments", {})
if name:
return {"name": name, "arguments": arguments}
except (json.JSONDecodeError, TypeError):
pass
# 最终兜底:用正则提取 name 和已知的参数字段
name_match = re.search(r'"name"\s*:\s*"([^"]+)"', text)
if not name_match:
return None
name = name_match.group(1)
arguments: dict[str, Any] = {}
# 提取 "key": "value" 模式
for kv_match in re.finditer(r'"(\w+)"\s*:\s*"([^"]*)"', text):
key = kv_match.group(1)
if key in ("name",):
continue
arguments[key] = kv_match.group(2)
# 提取 <parameter=key>value</parameter> 模式
for pm in re.finditer(r"<parameter=(\w+)>\s*(.*?)\s*</parameter>", text, re.DOTALL):
arguments[pm.group(1)] = pm.group(2).strip()
if name:
return {"name": name, "arguments": arguments}
return None

View File

@ -412,6 +412,24 @@ class LLMGateway:
return self._config.model_aliases[model]
return model
def get_provider_name_for_model(self, model: str) -> str | None:
"""返回 model 对应的 provider 名(用于 provider-specific 优化如 cache_control)。
ponytail: 仅做 alias 解析 + provider 前缀提取,不查内部状态
升级路径:ServerConfig 显式声明 provider per model
返回 None 表示无法确定( provider + "/" 前缀),调用方应回退到字符串拼接
"""
resolved = self._resolve_model_alias(model)
if "/" in resolved:
provider_name = resolved.split("/", 1)[0]
if provider_name in self._providers:
return provider_name
return None
# 无 "/" 前缀:仅当只有一个 provider 时能确定
if len(self._providers) == 1:
return next(iter(self._providers))
return None
def _resolve_model(self, model: str) -> tuple[LLMProvider, str]:
"""解析模型为 (provider, actual_model_name)"""
# model 格式: "provider/model_name" 或 "model_name"

View File

@ -99,13 +99,18 @@ class AnthropicProvider(LLMProvider):
"content-type": "application/json",
}
def _convert_messages(self, messages: list[dict[str, str]]) -> tuple[str | None, list[dict[str, Any]]]:
def _convert_messages(self, messages: list[dict[str, str]]) -> tuple[str | list[dict[str, Any]] | None, list[dict[str, Any]]]:
"""将 OpenAI 风格消息转换为 Anthropic 格式
Returns:
(system_prompt, anthropic_messages)
system_prompt 可为:
- str: 传统字符串(向后兼容)
- list[dict]: Anthropic content blocks(支持 cache_control,U2/G2)
- None: system 消息
"""
system_prompt: str | None = None
system_prompt: str | list[dict[str, Any]] | None = None
anthropic_messages: list[dict[str, Any]] = []
for msg in messages:
@ -113,6 +118,8 @@ class AnthropicProvider(LLMProvider):
content = msg.get("content", "")
if role == "system":
# U2/G2: content 可为 str(传统)或 list[dict](content blocks)
# content blocks 直接透传(包含 cache_control 标记)
system_prompt = content
continue

View File

@ -43,6 +43,9 @@ class AuthMiddleware(BaseHTTPMiddleware):
"""
WHITELIST_PATHS = (
"/",
"/login",
"/assets", # Static assets (exact match covers root; prefix below covers sub-paths)
"/api/v1/health",
"/api/v1/auth/login",
"/api/v1/auth/refresh",
@ -74,16 +77,15 @@ class AuthMiddleware(BaseHTTPMiddleware):
"""Return True if ``path`` matches a whitelisted route.
Uses exact match for auth routes (so ``/auth/logout`` does NOT
whitelist ``/auth/logout-others``) and prefix match for docs.
whitelist ``/auth/logout-others``) and prefix match for docs and assets.
"""
for prefix in self.WHITELIST_PATHS:
if path == prefix:
return True
# Prefix match only for documentation paths (trailing slash
# or sub-path is fine). Auth paths require exact match to
# avoid accidentally whitelisting sibling routes like
# /auth/logout-others under /auth/logout.
if prefix in ("/docs", "/openapi.json", "/redoc") and path.startswith(prefix):
# Prefix match for documentation paths and static assets.
# Auth paths require exact match to avoid accidentally whitelisting
# sibling routes like /auth/logout-others under /auth/logout.
if path.startswith(prefix) and prefix in ("/docs", "/openapi.json", "/redoc", "/assets"):
return True
return False

View File

@ -116,6 +116,9 @@ class ServerConfig:
evolution: dict[str, Any] | None = None,
expert_paths: list[str] | None = None,
board: dict[str, Any] | None = None,
prompt_cache: dict[str, Any] | None = None,
streaming: dict[str, Any] | None = None,
verification: dict[str, Any] | None = None,
on_change: Callable[["ServerConfig"], None] | None = None,
):
self.host = host
@ -144,6 +147,12 @@ class ServerConfig:
self.evolution = evolution or {}
self.expert_paths = expert_paths or []
self.board = board or {}
self.prompt_cache = prompt_cache or {}
# U3/G8: streaming.flush_interval_ms 控制 token chunk 节流(默认 0 = 逐 chunk yield)
self.streaming = streaming or {}
# U4/G1: verification.max_reinjections 控制 verify 失败回灌次数(默认 1)
# verification_enabled=False 时此配置无效
self.verification = verification or {}
self.on_change = on_change
# Config watching state
@ -227,6 +236,11 @@ class ServerConfig:
# Board meeting config (max_rounds, default_template, etc.)
board_data = data.get("board", {})
# U2/U3/U4: prompt_cache / streaming / verification 配置(从 YAML 读取)
prompt_cache_data = data.get("prompt_cache", {})
streaming_data = data.get("streaming", {})
verification_data = data.get("verification", {})
return cls(
host=server.get("host", "0.0.0.0"),
port=server.get("port", 8001),
@ -254,6 +268,9 @@ class ServerConfig:
evolution=evolution_data,
expert_paths=expert_paths,
board=board_data,
prompt_cache=prompt_cache_data,
streaming=streaming_data,
verification=verification_data,
)
@staticmethod

View File

@ -43,6 +43,7 @@ declare module 'vue' {
AListItem: typeof import('ant-design-vue/es')['ListItem']
AListItemMeta: typeof import('ant-design-vue/es')['ListItemMeta']
AMenu: typeof import('ant-design-vue/es')['Menu']
AMenuDivider: typeof import('ant-design-vue/es')['MenuDivider']
AMenuItem: typeof import('ant-design-vue/es')['MenuItem']
AModal: typeof import('ant-design-vue/es')['Modal']
APageHeader: typeof import('ant-design-vue/es')['PageHeader']

View File

@ -40,7 +40,7 @@
<template
v-for="f in fields"
:key="`hdr_${f.id}`"
#[`header_${f.id}`]"
#[`header_${f.id}`]
>
<ColumnHeaderMenu
:field="f"

View File

@ -5,8 +5,8 @@
<link rel="icon" type="image/svg+xml" href="/vite.svg" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Fischer AgentKit</title>
<script type="module" crossorigin src="/assets/index-DQi4f51B.js"></script>
<link rel="stylesheet" crossorigin href="/assets/index-BdbfOrU3.css">
<script type="module" crossorigin src="/assets/index-a-0N3I41.js"></script>
<link rel="stylesheet" crossorigin href="/assets/index-Mld5F0pG.css">
</head>
<body>
<div id="app"></div>

View File

@ -4,10 +4,32 @@ import time
from abc import ABC, abstractmethod
from typing import Any
import jsonschema
from agentkit.telemetry.tracing import start_span
from agentkit.telemetry.metrics import tool_duration_histogram
class ToolValidationError(Exception):
"""工具参数 schema 校验失败。
error_code:
- "tool_call_invalid" 类型不匹配(jsonschema.ValidationError.path 末段为字段名)
- "schema_mismatch" 必填缺失 / 结构性错误(默认兜底)
"""
def __init__(
self,
message: str,
*,
error_code: str = "schema_mismatch",
details: dict[str, Any] | None = None,
) -> None:
super().__init__(message)
self.error_code = error_code
self.details = details or {}
class Tool(ABC):
"""工具抽象基类
@ -57,6 +79,7 @@ class Tool(ABC):
_start = time.monotonic()
try:
await self.before_execute(**kwargs)
self._validate_input(kwargs)
result = await self.execute(**kwargs)
await self.after_execute(result, **kwargs)
_duration_ms = int((time.monotonic() - _start) * 1000)
@ -76,6 +99,37 @@ class Tool(ABC):
finally:
_span_cm.__exit__(None, None, None)
def _validate_input(self, kwargs: dict[str, Any]) -> None:
"""校验 kwargs 是否符合 self.input_schema。
- input_schema=None 跳过(向后兼容,旧工具无 schema)
- 类型不匹配 error_code="tool_call_invalid"
- 必填缺失 / 结构性错误 error_code="schema_mismatch"
"""
if self.input_schema is None:
return
# 过滤内部 kwargs(下划线开头,如 _skip_dangerous_check),不参与 schema 校验
# W3 fix: 防止 additionalProperties:false 的 schema 拒绝内部控制参数
user_kwargs = {k: v for k, v in kwargs.items() if not k.startswith("_")}
try:
jsonschema.validate(instance=user_kwargs, schema=self.input_schema)
except jsonschema.ValidationError as e:
field_path = ".".join(str(p) for p in e.absolute_path) or "<root>"
# required 缺失走 schema_mismatch;类型不符走 tool_call_invalid
if e.validator == "required":
code = "schema_mismatch"
else:
code = "tool_call_invalid"
raise ToolValidationError(
f"Tool '{self.name}' argument validation failed: {e.message}",
error_code=code,
details={
"field": field_path,
"validator": e.validator,
"schema_path": list(e.absolute_schema_path),
},
) from e
def to_dict(self) -> dict:
return {
"name": self.name,

View File

@ -50,6 +50,13 @@ class FunctionTool(Tool):
for param_name, param in sig.parameters.items():
if param_name in ("self", "cls"):
continue
# ponytail: VAR_KEYWORD(**kwargs)/VAR_POSITIONAL(*args) 是 catch-all,
# 不是具体参数,不进 schema。否则 schema 校验会要求 kwargs 字段必填。
if param.kind in (
inspect.Parameter.VAR_KEYWORD,
inspect.Parameter.VAR_POSITIONAL,
):
continue
param_type = "string"
if param.annotation != inspect.Parameter.empty:

View File

@ -0,0 +1,148 @@
"""U3 / G8 delta_flush_interval 调速测试。
覆盖 R11-R12, R14:
- R11 chunk flush_interval_ms 间隔批量 yield
- R12 配置化(flush_interval_ms=0 退化为逐 chunk yield)
- R14 自检:合并 content 等于原始 chunks 拼接(不丢字符)
"""
from __future__ import annotations
from agentkit.core.react import ReActEngine
from agentkit.llm.protocol import StreamChunk
class _StubGateway:
"""模拟 LLMGateway,yield 一串 StreamChunk 后结束。"""
def __init__(self, chunks: list[str]):
self._chunks = chunks
def get_provider_name_for_model(self, model: str) -> str | None:
return None
async def chat_stream(self, **kwargs):
for c in self._chunks:
yield StreamChunk(content=c, model="test")
def _collect_token_events(events) -> list[str]:
return [e.data["content"] for e in events if e.event_type == "token"]
# ---- R12 Config: flush_interval_ms=0 → 逐 chunk yield(向后兼容) ----
async def test_flush_interval_zero_yields_per_chunk():
chunks = ["H", "e", "l", "l", "o"]
gw = _StubGateway(chunks)
engine = ReActEngine(llm_gateway=gw, flush_interval_ms=0)
events = []
async for ev in engine.execute_stream(
messages=[{"role": "user", "content": "hi"}],
tools=[],
model="test",
):
events.append(ev)
tokens = _collect_token_events(events)
# 5 chunks → 5 token events(每个内容 = 单 chunk)
assert tokens == ["H", "e", "l", "l", "o"]
# ---- R11 Happy path: flush_interval_ms > 0 → 批量合并 ----
async def test_flush_interval_batches_chunks_by_interval():
chunks = ["a", "b", "c", "d", "e", "f"]
gw = _StubGateway(chunks)
# 间隔设很大(10s),所有 chunks 在第一个 interval 内累积,流结束后最终 flush
engine = ReActEngine(llm_gateway=gw, flush_interval_ms=10000)
events = []
async for ev in engine.execute_stream(
messages=[{"role": "user", "content": "hi"}],
tools=[],
model="test",
):
events.append(ev)
tokens = _collect_token_events(events)
# 所有 chunk 累积到流结束,最终 flush 一次 → 1 个 token event,content = 全拼接
assert len(tokens) == 1
assert tokens[0] == "abcdef"
# ---- R14 Self-check: 合并 content 等于原始 chunks 拼接(不丢字符) ----
async def test_no_character_loss_after_merge():
chunks = ["Hello", " ", "World", "!", "你好", "世界"]
gw = _StubGateway(chunks)
engine = ReActEngine(llm_gateway=gw, flush_interval_ms=10000)
events = []
async for ev in engine.execute_stream(
messages=[{"role": "user", "content": "hi"}],
tools=[],
model="test",
):
events.append(ev)
tokens = _collect_token_events(events)
# 合并所有 token events 的 content 等于原始 chunks 拼接
merged = "".join(tokens)
assert merged == "".join(chunks) == "Hello World!你好世界"
# ---- Edge: 流结束 mid-interval → 最终 flush 剩余 buffer ----
async def test_final_flush_on_stream_end():
chunks = ["x", "y", "z"]
gw = _StubGateway(chunks)
engine = ReActEngine(llm_gateway=gw, flush_interval_ms=10000)
events = []
async for ev in engine.execute_stream(
messages=[{"role": "user", "content": "hi"}],
tools=[],
model="test",
):
events.append(ev)
tokens = _collect_token_events(events)
# mid-interval 累积 → 流结束最终 flush 一次
assert tokens == ["xyz"]
# ---- Edge: 单个 chunk 后流结束 → 立即 flush ----
async def test_single_chunk_immediate_flush():
chunks = ["only"]
gw = _StubGateway(chunks)
engine = ReActEngine(llm_gateway=gw, flush_interval_ms=10000)
events = []
async for ev in engine.execute_stream(
messages=[{"role": "user", "content": "hi"}],
tools=[],
model="test",
):
events.append(ev)
tokens = _collect_token_events(events)
assert tokens == ["only"]
# ---- Edge: chunks 含空 content(usage-only chunk)不进 buffer ----
async def test_empty_content_chunk_not_buffered():
chunks = ["a", "", "b"] # 中间 chunk 空
gw = _StubGateway(chunks)
engine = ReActEngine(llm_gateway=gw, flush_interval_ms=10000)
events = []
async for ev in engine.execute_stream(
messages=[{"role": "user", "content": "hi"}],
tools=[],
model="test",
):
events.append(ev)
tokens = _collect_token_events(events)
# 空 chunk 跳过 buffer,最终 flush "ab"
assert tokens == ["ab"]

View File

@ -0,0 +1,221 @@
"""U2 / G2 Prompt Cache 双块结构测试。
覆盖 R4-R7, R13:
- R4 stable/volatile 双块结构
- R5 记忆注入从 system_prompt 末尾移到 volatile
- R6 provider 统一 cache 策略(Anthropic blocks / OpenAI 字符串)
- R7 多轮 stable 不变(由构造保证)
- R13 配置化(prompt_cache_enable=False 退化)
"""
from __future__ import annotations
from typing import Any
from agentkit.core.react import ReActEngine
class _StubGateway:
"""模拟 LLMGateway,记录最后一次 chat_stream 调用的 messages。"""
def __init__(self, provider_name: str | None = "anthropic"):
self._provider_name = provider_name
self.captured_messages: list[dict[str, Any]] | None = None
def get_provider_name_for_model(self, model: str) -> str | None:
return self._provider_name
async def chat_stream(self, **kwargs):
self.captured_messages = list(kwargs.get("messages", []))
return
yield # makes this an async generator
def _make_engine(provider_name: str | None = "anthropic", *, cache_enable: bool = True) -> tuple[ReActEngine, _StubGateway]:
gw = _StubGateway(provider_name=provider_name)
engine = ReActEngine.__new__(ReActEngine)
engine._llm_gateway = gw
engine._prompt_cache_enable = cache_enable
return engine, gw
# ---- R4/R6 Anthropic: stable + volatile → content blocks ----
def test_anthropic_provider_returns_content_blocks_with_cache_control():
engine, _ = _make_engine("anthropic")
result = engine._build_system_message(
stable="You are a helpful assistant.",
volatile="Memory: foo",
model="claude-sonnet-4",
)
assert isinstance(result, list)
assert result[0] == {
"type": "text",
"text": "You are a helpful assistant.",
"cache_control": {"type": "ephemeral"},
}
assert result[1] == {"type": "text", "text": "## 参考信息\nMemory: foo"}
# ---- R5 Anthropic: empty volatile → only stable block ----
def test_anthropic_empty_volatile_returns_only_stable_block():
engine, _ = _make_engine("anthropic")
result = engine._build_system_message(
stable="base prompt",
volatile="",
model="claude",
)
assert isinstance(result, list)
assert len(result) == 1
assert result[0]["text"] == "base prompt"
assert "cache_control" in result[0]
# ---- R6 Non-Anthropic: returns string concat ----
def test_non_anthropic_returns_string_concat():
engine, _ = _make_engine("openai")
result = engine._build_system_message(
stable="base",
volatile="ctx",
model="gpt-4",
)
assert isinstance(result, str)
assert result == "base\n\n## 参考信息\nctx"
def test_unknown_provider_returns_string_concat():
"""provider_name 无法确定时(gateway 返回 None),回退字符串拼接不报错。"""
engine, _ = _make_engine(None)
result = engine._build_system_message(
stable="base",
volatile="ctx",
model="default",
)
assert isinstance(result, str)
assert "## 参考信息" in result
# ---- R13 Config: prompt_cache_enable=False → 退化字符串 ----
def test_prompt_cache_disabled_falls_back_to_string():
engine, _ = _make_engine("anthropic", cache_enable=False)
result = engine._build_system_message(
stable="base",
volatile="ctx",
model="claude",
)
# 即便是 Anthropic,enable=False 时也返回字符串(行为同改动前)
assert isinstance(result, str)
assert "cache_control" not in result
# ---- R4 Edge: no stable + no volatile → None ----
def test_empty_stable_and_volatile_returns_none():
engine, _ = _make_engine("anthropic")
result = engine._build_system_message(stable="", volatile="", model="claude")
assert result is None
# ---- R5 Edge: empty stable + volatile → only volatile block (Anthropic) ----
def test_anthropic_empty_stable_returns_only_volatile_block():
engine, _ = _make_engine("anthropic")
result = engine._build_system_message(
stable="",
volatile="only memory",
model="claude",
)
assert isinstance(result, list)
assert len(result) == 1
assert result[0]["text"] == "## 参考信息\nonly memory"
assert "cache_control" not in result[0] # volatile 块无 cache_control
# ---- Integration: execute_stream uses _build_system_message end-to-end ----
async def test_execute_stream_with_anthropic_uses_content_blocks():
"""execute_stream 把双块 system content 透传给 gateway。"""
from agentkit.tools.base import Tool
class _NoopTool(Tool):
def __init__(self):
super().__init__(name="noop", description="noop")
async def execute(self, **kwargs):
return {}
class _MockGateway:
def __init__(self):
self.captured_messages = None
def get_provider_name_for_model(self, model: str) -> str | None:
return "anthropic"
async def chat_stream(self, **kwargs):
self.captured_messages = list(kwargs.get("messages", []))
# yield one chunk then end
from agentkit.llm.protocol import StreamChunk
yield StreamChunk(content="done", model="claude")
class _MemRetriever:
async def get_context_string(self, **kw):
return "retrieved context"
gw = _MockGateway()
engine = ReActEngine(llm_gateway=gw, prompt_cache_enable=True)
events = []
async for ev in engine.execute_stream(
messages=[{"role": "user", "content": "hi"}],
tools=[],
model="claude",
system_prompt="base",
memory_retriever=_MemRetriever(),
):
events.append(ev)
assert gw.captured_messages is not None
sys_msg = gw.captured_messages[0]
assert sys_msg["role"] == "system"
assert isinstance(sys_msg["content"], list)
assert sys_msg["content"][0]["text"] == "base"
assert "cache_control" in sys_msg["content"][0]
assert sys_msg["content"][1]["text"] == "## 参考信息\nretrieved context"
# ---- Anthropic provider _convert_messages passes through list-type system ----
def test_anthropic_convert_messages_passes_through_list_system_content():
"""AnthropicProvider._convert_messages 应直接透传 list-type system content。"""
from agentkit.llm.providers.anthropic import AnthropicProvider
provider = AnthropicProvider.__new__(AnthropicProvider)
blocks = [
{"type": "text", "text": "stable", "cache_control": {"type": "ephemeral"}},
{"type": "text", "text": "volatile"},
]
messages = [{"role": "system", "content": blocks}]
system_prompt, anthropic_messages = provider._convert_messages(messages)
assert system_prompt is blocks # same object, transparent passthrough
assert anthropic_messages == []
def test_anthropic_convert_messages_string_system_still_works():
"""传统 string system content 仍能透传(向后兼容)。"""
from agentkit.llm.providers.anthropic import AnthropicProvider
provider = AnthropicProvider.__new__(AnthropicProvider)
messages = [{"role": "system", "content": "old style"}]
system_prompt, _ = provider._convert_messages(messages)
assert system_prompt == "old style"

View File

@ -883,3 +883,109 @@ class ContextCompressorStub:
def should_compress(self, messages: list[dict]) -> bool:
return self._compress
class TestParseMalformedToolUse:
"""畸形 <tool_use> 文本的容错解析"""
def test_missing_closing_tag_still_parses(self):
"""模型输出 <tool_use> 但没有 </tool_use> 闭合标签"""
from agentkit.core.react import ReActEngine
engine = ReActEngine(llm_gateway=MagicMock(spec=LLMGateway))
content = (
'<tool_use>\n'
'{"name": "shell", "arguments": {"command": "ls -la"}}\n'
)
calls = engine._parse_text_tool_calls(content)
assert len(calls) == 1
assert calls[0]["name"] == "shell"
assert calls[0]["arguments"]["command"] == "ls -la"
def test_malformed_json_with_stray_tags(self):
"""JSON 中混入 <parameter> 等标签时仍能提取工具名和参数"""
from agentkit.core.react import ReActEngine
engine = ReActEngine(llm_gateway=MagicMock(spec=LLMGateway))
content = (
'<tool_use>\n'
'{"name": "shell", "arguments": {"command": "sudo chown -R $USER /tmp"}}\n'
'</parameter>\n'
'<parameter=timeout>30</parameter>\n'
'<function>\n'
)
calls = engine._parse_text_tool_calls(content)
assert len(calls) == 1
assert calls[0]["name"] == "shell"
assert "chown" in calls[0]["arguments"]["command"]
def test_truncated_json_still_extracts_name(self):
"""JSON 被截断时仍能提取工具名"""
from agentkit.core.react import ReActEngine
engine = ReActEngine(llm_gateway=MagicMock(spec=LLMGateway))
content = '<tool_use>\n{"name": "web_search", "arguments": {"query": "test"'
calls = engine._parse_text_tool_calls(content)
assert len(calls) == 1
assert calls[0]["name"] == "web_search"
def test_completely_unparseable_tool_use_returns_empty(self):
"""完全无法解析的 <tool_use> 返回空列表"""
from agentkit.core.react import ReActEngine
engine = ReActEngine(llm_gateway=MagicMock(spec=LLMGateway))
content = '<tool_use>\ngarbage not json at all\n'
calls = engine._parse_text_tool_calls(content)
assert calls == []
class TestMalformedToolUseNotLeakedAsFinalAnswer:
"""畸形 <tool_use> 不应作为 final_answer 泄漏给用户"""
async def test_malformed_tool_use_triggers_correction_not_leak(self):
"""模型输出畸形 <tool_use> 时,不应把原始 XML 作为最终答案返回"""
from agentkit.core.react import ReActEngine
tool = FakeTool(name="shell", result={"output": "done", "exit_code": 0})
malformed_content = (
'<tool_use>\n'
'{"name": "shell", "arguments": {"command": "ls"}}\n'
'</parameter>\n<function>\n'
)
gateway = make_mock_gateway([
make_response(content=malformed_content),
make_response(content="Done successfully"),
])
engine = ReActEngine(llm_gateway=gateway)
result = await engine.execute(
messages=[{"role": "user", "content": "list files"}],
tools=[tool],
)
# 不应把原始 XML 作为最终答案
assert "<tool_use>" not in result.output
assert "</parameter>" not in result.output
assert "<function>" not in result.output
async def test_completely_unparseable_tool_use_injects_correction(self):
"""<tool_use> 完全无法解析时,注入纠正消息让模型重试"""
from agentkit.core.react import ReActEngine
tool = FakeTool(name="search", result={"results": ["data"]})
gateway = make_mock_gateway([
# 第一次:完全无法解析的 <tool_use>
make_response(content="<tool_use>\nnot json at all just words\n"),
# 第二次:模型纠正后正常回答
make_response(content="Search completed"),
])
engine = ReActEngine(llm_gateway=gateway)
result = await engine.execute(
messages=[{"role": "user", "content": "search something"}],
tools=[tool],
)
# 不应把原始 XML 作为最终答案
assert "<tool_use>" not in result.output
assert result.output == "Search completed"

View File

@ -0,0 +1,140 @@
"""U1 / G3 工具调用 schema 校验测试。
覆盖 R8-R10:
- R8 schema 校验在 execute 前运行
- R9 类型化错误码 (tool_call_invalid / schema_mismatch)
- R10 _execute_tool 捕获后回灌 conversation(结构化 dict 保留 error_code)
"""
from __future__ import annotations
import pytest
from agentkit.tools.base import Tool, ToolValidationError
class _StubTool(Tool):
"""测试用 stub,记录调用与参数。"""
def __init__(self, *, schema=None, payload=None):
super().__init__(name="stub", description="stub", input_schema=schema)
self.calls: list[dict] = []
self._payload = payload or {"ok": True}
async def execute(self, **kwargs) -> dict:
self.calls.append(kwargs)
return self._payload
_SCHEMA = {
"type": "object",
"properties": {"count": {"type": "integer"}},
"required": ["count"],
"additionalProperties": False,
}
# ---- R8 happy path ----
async def test_schema_valid_passes_through_to_execute():
tool = _StubTool(schema=_SCHEMA)
result = await tool.safe_execute(count=5)
assert result == {"ok": True}
assert tool.calls == [{"count": 5}]
# ---- R9 backward compat: input_schema=None skips ----
async def test_none_schema_skips_validation():
tool = _StubTool(schema=None)
await tool.safe_execute(anything="ok", even_invalid_types=123)
assert tool.calls == [{"anything": "ok", "even_invalid_types": 123}]
# ---- R9 type mismatch → tool_call_invalid ----
async def test_type_mismatch_raises_tool_call_invalid():
tool = _StubTool(schema=_SCHEMA)
with pytest.raises(ToolValidationError) as ei:
await tool.safe_execute(count="abc")
assert ei.value.error_code == "tool_call_invalid"
assert ei.value.details["field"] == "count"
assert tool.calls == [] # execute not called
# ---- R9 missing required → schema_mismatch ----
async def test_missing_required_raises_schema_mismatch():
tool = _StubTool(schema=_SCHEMA)
with pytest.raises(ToolValidationError) as ei:
await tool.safe_execute()
assert ei.value.error_code == "schema_mismatch"
assert ei.value.details["validator"] == "required"
assert tool.calls == []
# ---- R10 integration: _execute_tool catches and returns structured dict ----
async def test_execute_tool_catches_validation_error_returns_structured_dict():
"""ReActEngine._execute_tool 应优先捕获 ToolValidationError 并保留 error_code。"""
from agentkit.core.react import ReActEngine
tool = _StubTool(schema=_SCHEMA)
engine = ReActEngine.__new__(ReActEngine) # 绕过 __init__
result = await engine._execute_tool("stub", {"count": "not-int"}, [tool])
assert "error" in result
assert result["error_code"] == "tool_call_invalid"
assert "details" in result
assert tool.calls == [] # execute 未执行
async def test_execute_tool_catches_missing_required_returns_schema_mismatch():
from agentkit.core.react import ReActEngine
tool = _StubTool(schema=_SCHEMA)
engine = ReActEngine.__new__(ReActEngine)
result = await engine._execute_tool("stub", {}, [tool])
assert result["error_code"] == "schema_mismatch"
assert result["details"]["validator"] == "required"
# ---- R10 self-check: structured error string survives into tool message ----
async def test_structured_error_dict_str_includes_error_code_for_llm_self_correction():
"""_build_tool_result_message 把 dict 转 str(content),LLM 看到文本含 error_code。"""
from agentkit.core.react import ReActEngine
tool = _StubTool(schema=_SCHEMA)
engine = ReActEngine.__new__(ReActEngine)
result = await engine._execute_tool("stub", {"count": "bad"}, [tool])
msg = await engine._build_tool_result_message(
tool_call_id="t1",
result=result,
)
assert msg["role"] == "tool"
assert msg["tool_call_id"] == "t1"
assert "tool_call_invalid" in msg["content"]
# ---- W3 fix: 内部 kwargs(_前缀)不参与 schema 校验 ----
async def test_internal_kwargs_underscore_prefixed_skipped_by_validation():
"""_skip_dangerous_check 等内部参数不参与 schema 校验。
防止 additionalProperties:false schema 拒绝内部控制参数(W3)
"""
tool = _StubTool(schema=_SCHEMA) # additionalProperties: False
# _skip_dangerous_check 是内部参数,不应被 schema 拒绝
result = await tool.safe_execute(count=5, _skip_dangerous_check=True)
assert result == {"ok": True}
# execute 仍然收到完整 kwargs(内部参数透传给工具)
assert tool.calls == [{"count": 5, "_skip_dangerous_check": True}]

View File

@ -0,0 +1,394 @@
"""U4/G1: Verify 失败回灌 ReAct 测试
characterization-first: 先覆盖现有 verify 行为(max_reinjections=0 等价于不回灌),
再测新回灌行为(reinjection on first fail, break on second fail)
R1: verify 首次失败 errors 注入 conversation LLM 自纠正 二次 verify 通过
R2: verify 二次失败 中断返回错误附 verify log
R3: max_reinjections 可配置(默认 1),=0 等价于不回灌;回灌受 max_steps 约束
R13: ServerConfig.verification 配置项
R14: max_reinjections 默认值为 1
"""
from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock, patch
from agentkit.core.react import ReActEngine
from agentkit.core.verification_loop import VerificationResult
from agentkit.llm.gateway import LLMGateway
from agentkit.llm.protocol import LLMResponse, TokenUsage
# ── Helpers ──────────────────────────────────────────────
def make_mock_gateway(responses: list[LLMResponse]) -> LLMGateway:
"""创建按顺序返回给定响应的 mock LLMGateway。"""
gateway = MagicMock(spec=LLMGateway)
gateway.chat = AsyncMock(side_effect=responses)
gateway.get_provider_name_for_model = MagicMock(return_value=None)
return gateway
def make_response(content: str = "") -> LLMResponse:
return LLMResponse(
content=content,
model="test-model",
usage=TokenUsage(prompt_tokens=10, completion_tokens=20),
tool_calls=[],
)
def make_verify_result(passed: bool, errors: list[str] | None = None) -> VerificationResult:
return VerificationResult(
passed=passed,
attempts=1,
test_output="$ pytest\nFAILED test_x.py" if not passed else "$ pytest\nOK",
errors=errors or ([] if passed else ["test_x.py::test_failed"]),
)
def make_mock_vloop(verify_results: list[VerificationResult]) -> MagicMock:
"""创建一个 mock VerificationLoop,verify() 按顺序返回给定结果。"""
vloop = MagicMock()
vloop.verify = AsyncMock(side_effect=verify_results)
return vloop
# ── Characterization: max_reinjections=0 等价于当前行为 ──────────
class TestVerifyCharacterization:
"""现有 verify 行为(max_reinjections=0):失败仅记录 trajectory,不回灌。"""
async def test_verify_disabled_no_verify_step(self):
"""verification_enabled=False → 不运行 verify,trajectory 无 verification step。"""
gateway = make_mock_gateway([make_response("final answer")])
engine = ReActEngine(llm_gateway=gateway, max_steps=3)
result = await engine.execute(
messages=[{"role": "user", "content": "do something"}],
)
assert result.output == "final answer"
assert all(s.tool_name != "verification" for s in result.trajectory)
async def test_verify_pass_no_extra_step(self):
"""verify 通过 → 不追加 verification step。"""
gateway = make_mock_gateway([make_response("answer")])
engine = ReActEngine(
llm_gateway=gateway,
max_steps=3,
verification_enabled=True,
verification_commands=["echo ok"],
max_reinjections=0,
)
with patch(
"agentkit.core.verification_loop.VerificationLoop",
return_value=make_mock_vloop([make_verify_result(passed=True)]),
):
result = await engine.execute(
messages=[{"role": "user", "content": "do something"}],
)
assert result.output == "answer"
assert all(s.tool_name != "verification" for s in result.trajectory)
async def test_verify_fail_max_zero_no_reinjection(self):
"""max_reinjections=0 + verify 失败 → 仅记录 trajectory,不回灌 LLM。
这是当前行为的 characterization:gateway.chat 只被调用一次
"""
gateway = make_mock_gateway([make_response("bad answer")])
engine = ReActEngine(
llm_gateway=gateway,
max_steps=3,
verification_enabled=True,
verification_commands=["false"],
max_reinjections=0,
)
with patch(
"agentkit.core.verification_loop.VerificationLoop",
return_value=make_mock_vloop([make_verify_result(passed=False)]),
):
result = await engine.execute(
messages=[{"role": "user", "content": "do something"}],
)
# LLM 只被调用一次(无回灌)
assert gateway.chat.await_count == 1
# 输出仍保留
assert result.output == "bad answer"
# trajectory 包含 verification step
verify_steps = [s for s in result.trajectory if s.tool_name == "verification"]
assert len(verify_steps) == 1
assert verify_steps[0].result["passed"] is False
# ── R1: 回灌后 LLM 自纠正 → 二次 verify 通过 ──────────
class TestVerifyReinjection:
"""verify 失败回灌 conversation,LLM 自纠正后二次 verify 通过。"""
async def test_first_fail_reinject_second_pass(self):
"""R1: verify 首次失败 → errors 注入 conversation → LLM 修正 → 二次 verify 通过。"""
gateway = make_mock_gateway(
[
make_response("bad code"), # 第一次:错误答案
make_response("fixed code"), # 第二次:修正后答案
]
)
engine = ReActEngine(
llm_gateway=gateway,
max_steps=5,
verification_enabled=True,
verification_commands=["pytest"],
max_reinjections=1, # 默认值,允许 1 次回灌
)
with patch(
"agentkit.core.verification_loop.VerificationLoop",
return_value=make_mock_vloop(
[
make_verify_result(passed=False), # 第一次 verify 失败
make_verify_result(passed=True), # 第二次 verify 通过
]
),
):
result = await engine.execute(
messages=[{"role": "user", "content": "write code"}],
)
# LLM 被调用两次
assert gateway.chat.await_count == 2
# 最终输出是修正后的答案
assert result.output == "fixed code"
# 二次 verify 通过,不追加 verification step
verify_steps = [s for s in result.trajectory if s.tool_name == "verification"]
assert len(verify_steps) == 0
async def test_reinjected_user_message_appears_in_conversation(self):
"""R1 集成:回灌的 user 消息出现在 conversation,含 errors 文本。"""
gateway = make_mock_gateway(
[
make_response("bad"),
make_response("good"),
]
)
engine = ReActEngine(
llm_gateway=gateway,
max_steps=5,
verification_enabled=True,
verification_commands=["pytest"],
max_reinjections=1,
)
with patch(
"agentkit.core.verification_loop.VerificationLoop",
return_value=make_mock_vloop(
[
make_verify_result(passed=False, errors=["AssertionError: x != y"]),
make_verify_result(passed=True),
]
),
):
await engine.execute(
messages=[{"role": "user", "content": "write code"}],
)
# 第二次 LLM 调用时,conversation 应包含回灌的 user 消息
second_call = gateway.chat.await_args_list[1]
msgs_sent = second_call.kwargs.get("messages") or second_call[1].get("messages")
reinjected = [
m for m in msgs_sent if m.get("role") == "user" and "验证失败" in m.get("content", "")
]
assert len(reinjected) >= 1
assert "AssertionError: x != y" in reinjected[-1]["content"]
# ── R2: 二次 verify 失败 → 中断返回错误 ──────────
class TestVerifyDoubleFailure:
"""verify 二次失败 → 中断,返回错误附 verify log。"""
async def test_second_fail_breaks_with_verify_log(self):
"""R2: 二次 verify 失败 → 中断,trajectory 含 verify log + errors。"""
gateway = make_mock_gateway(
[
make_response("bad v1"),
make_response("bad v2"),
]
)
engine = ReActEngine(
llm_gateway=gateway,
max_steps=5,
verification_enabled=True,
verification_commands=["pytest"],
max_reinjections=1,
)
with patch(
"agentkit.core.verification_loop.VerificationLoop",
return_value=make_mock_vloop(
[
make_verify_result(passed=False, errors=["err1"]),
make_verify_result(passed=False, errors=["err2"]),
]
),
):
result = await engine.execute(
messages=[{"role": "user", "content": "write code"}],
)
# LLM 被调用两次(initial + 1 reinjection)
assert gateway.chat.await_count == 2
# 状态标记 verify 失败
assert result.status == "verify_failed"
# 输出保留(LLM 最后的答案)
assert result.output == "bad v2"
# trajectory 含 verification step with errors
verify_steps = [s for s in result.trajectory if s.tool_name == "verification"]
assert len(verify_steps) == 1
assert verify_steps[0].result["passed"] is False
assert "err2" in verify_steps[0].result["errors"]
# ── R3: 配置 + 边界 ──────────
class TestVerifyReinjectionConfig:
"""max_reinjections 配置测试。"""
def test_default_max_reinjections_is_one(self):
"""R14 self-check: max_reinjections 默认值为 1。"""
gateway = make_mock_gateway([])
engine = ReActEngine(llm_gateway=gateway)
assert engine._max_reinjections == 1
async def test_max_reinjections_zero_skips_reinjection(self):
"""R3: max_reinjections=0 → 等价于不回灌(当前行为)。"""
gateway = make_mock_gateway([make_response("only answer")])
engine = ReActEngine(
llm_gateway=gateway,
max_steps=5,
verification_enabled=True,
verification_commands=["false"],
max_reinjections=0,
)
with patch(
"agentkit.core.verification_loop.VerificationLoop",
return_value=make_mock_vloop([make_verify_result(passed=False)]),
):
result = await engine.execute(
messages=[{"role": "user", "content": "do something"}],
)
assert gateway.chat.await_count == 1 # 无回灌
assert result.output == "only answer"
async def test_reinjection_hits_max_steps_interrupts(self):
"""R3 edge: 回灌期间达到 max_steps → 中断(不无限循环)。"""
# max_steps=2, max_reinjections=5(max_steps 先到)
# LLM 调用 1:final answer → verify 失败 → reinject
# LLM 调用 2:final answer → verify 失败 → step=2=max_steps → 中断
gateway = make_mock_gateway(
[
make_response("ans1"),
make_response("ans2"),
]
)
engine = ReActEngine(
llm_gateway=gateway,
max_steps=2,
verification_enabled=True,
verification_commands=["false"],
max_reinjections=5, # 远大于 max_steps,验证 max_steps 优先
)
with patch(
"agentkit.core.verification_loop.VerificationLoop",
return_value=make_mock_vloop(
[
make_verify_result(passed=False),
make_verify_result(passed=False),
]
),
):
result = await engine.execute(
messages=[{"role": "user", "content": "do something"}],
)
# LLM 被调用 2 次(受 max_steps=2 限制)
assert gateway.chat.await_count == 2
# 第二次 verify 失败,因 max_reinjections=5 未到但 max_steps 到了
# → 应中断(verify_failed 或 partial)
assert result.status in ("verify_failed", "partial")
# ── execute_stream 回灌 ──────────
class TestVerifyReinjectionStream:
"""execute_stream 模式下的回灌行为。"""
async def test_stream_reinjection_first_fail_second_pass(self):
"""R1 stream: verify 首次失败 → 回灌 → 二次通过。"""
from agentkit.llm.protocol import StreamChunk
def make_stream_chunks(content: str):
"""返回一个返回 chunk 列表的 async generator factory。"""
async def _stream(**kwargs):
# Simulate streaming: yield content in 2 chunks
mid = len(content) // 2
yield StreamChunk(content=content[:mid], model="test-model")
yield StreamChunk(content=content[mid:], model="test-model")
return _stream
gateway = MagicMock(spec=LLMGateway)
gateway.chat_stream = MagicMock(
side_effect=[
make_stream_chunks("bad code")(),
make_stream_chunks("fixed code")(),
]
)
gateway.get_provider_name_for_model = MagicMock(return_value=None)
engine = ReActEngine(
llm_gateway=gateway,
max_steps=5,
verification_enabled=True,
verification_commands=["pytest"],
max_reinjections=1,
)
with patch(
"agentkit.core.verification_loop.VerificationLoop",
return_value=make_mock_vloop(
[
make_verify_result(passed=False),
make_verify_result(passed=True),
]
),
):
events = []
async for event in engine.execute_stream(
messages=[{"role": "user", "content": "write code"}],
):
events.append(event)
# chat_stream 被调用两次
assert gateway.chat_stream.call_count == 2
# 有 final_answer 事件
final_events = [e for e in events if e.event_type == "final_answer"]
assert len(final_events) >= 1
# 最终输出是修正后的答案
assert "fixed code" in final_events[-1].data.get("output", "")