feat(U2): G7 Emergency 规则模板 + TaskResult.error_struct

新增 EmergencyError 数据类(stable error_code + 中文 message + suggestions
+ retryable + original_error)和 EmergencyRules 规则分类器(纯规则,无 LLM):
- TaskTimeoutError → timeout (retryable)
- LoopDetectedError → loop_detected (retryable)
- LLMProviderError → llm_failure (retryable)
- Exception → internal_error (not retryable)
- TaskCancelledError 不分类(调用方须先检查,否则 ValueError)

TaskResult 新增并行 error_struct 字段(默认 None 保持既有契约)。
to_dict 当 error_struct=None 时不输出该键(byte-for-byte 兼容)。

- fallback.py 既有 3 常量不变(EMPTY_LLM_RESPONSE/MAX_STEPS_REACHED/SHELL_NO_OUTPUT)
- 支持 config 覆盖(suggestions/retryable/message 按 error_code 分组)
- 27 测试覆盖分类/序列化/配置覆盖/契约保持
This commit is contained in:
chiguyong 2026-06-29 22:43:22 +08:00
parent 8d5ccca604
commit 5b2377469a
3 changed files with 471 additions and 0 deletions

View File

@ -2,8 +2,22 @@
All layers (ReActEngine, Portal, Chat) should use these constants All layers (ReActEngine, Portal, Chat) should use these constants
to ensure consistent user-facing messages. to ensure consistent user-facing messages.
G7/U2: Also hosts ``EmergencyError`` and ``EmergencyRules`` for the
three-tier fallback chain's Emergency layer (rule-based classifier,
no LLM). See ``docs/plans/2026-06-29-003-feat-agent-wave2-medium-coupling-plan.md``.
""" """
from dataclasses import dataclass
from typing import Any
from agentkit.core.exceptions import (
LLMProviderError,
LoopDetectedError,
TaskCancelledError,
TaskTimeoutError,
)
# When LLM returns empty content after all fallback models exhausted # When LLM returns empty content after all fallback models exhausted
EMPTY_LLM_RESPONSE = ( EMPTY_LLM_RESPONSE = (
"模型未返回有效内容,已尝试备用模型仍未成功。" "模型未返回有效内容,已尝试备用模型仍未成功。"
@ -16,3 +30,135 @@ MAX_STEPS_REACHED = "已达到最大推理步数,但仍未得到完整结论
# When a shell command succeeds but produces no output # When a shell command succeeds but produces no output
SHELL_NO_OUTPUT = "[命令执行成功,无输出内容]" SHELL_NO_OUTPUT = "[命令执行成功,无输出内容]"
# ── G7/U2: Emergency layer ──────────────────────────────────────
@dataclass
class EmergencyError:
"""Structured error produced by the Emergency layer (rule-classified).
Carries a stable ``error_code`` for programmatic dispatch (frontend
retry UI, telemetry), a human-readable ``message`` mirroring
``EMPTY_LLM_RESPONSE`` style, actionable ``suggestions``, and the
original exception string for traceability.
The ``retryable`` flag distinguishes recoverable user errors
(timeout, loop, LLM hiccup) from internal bugs (retryable=False).
"""
error_code: str # "timeout" | "loop_detected" | "llm_failure" | "internal_error"
message: str # human-readable Chinese message
suggestions: list[str] # actionable user-facing suggestions
retryable: bool # whether a user retry might succeed
original_error: str # str(exc) for traceability
def to_dict(self) -> dict[str, Any]:
return {
"error_code": self.error_code,
"message": self.message,
"suggestions": list(self.suggestions),
"retryable": self.retryable,
"original_error": self.original_error,
}
def to_error_message(self) -> str:
"""Format as a single human-readable string with suggestions.
Mirrors ``EMPTY_LLM_RESPONSE`` style: ``<message>建议1) ... 2) ...``
"""
if not self.suggestions:
return self.message
suggestion_str = "".join(f"{i}) {s}" for i, s in enumerate(self.suggestions, 1))
# Strip trailing "" and prefix with "建议:"
suggestion_str = suggestion_str.rstrip("")
return f"{self.message}建议:{suggestion_str}"
# Default rule set: (exception_type, error_code, message, suggestions, retryable)
# ponytail: ceiling — rule-based, no LLM. Adding a new rule = append a tuple.
# Upgrade path: LLM-driven suggestions would require a separate classifier
# class (out of scope per brainstorm KTD4).
_DEFAULT_RULES: list[tuple[type[Exception], str, str, list[str], bool]] = [
(
TaskTimeoutError,
"timeout",
"任务执行超时。",
["稍后重试", "简化任务范围"],
True,
),
(
LoopDetectedError,
"loop_detected",
"检测到推理循环。",
["拆分任务", "检查工具参数"],
True,
),
(
LLMProviderError,
"llm_failure",
"LLM 服务调用失败。",
["稍后重试", "切换模型"],
True,
),
]
_DEFAULT_ERROR_CODE = "internal_error"
_DEFAULT_MESSAGE = "Agent 执行内部错误。"
_DEFAULT_SUGGESTIONS: list[str] = ["联系管理员"]
_DEFAULT_RETRYABLE = False
class EmergencyRules:
"""Rule-based classifier for the Emergency layer.
Maps exception types to ``EmergencyError`` instances. No LLM, no I/O
pure function of ``(exception, config)``.
Caller responsibility: ``TaskCancelledError`` MUST propagate as-is
(per KTD3); the caller checks ``isinstance(exc, TaskCancelledError)``
before invoking :meth:`classify`. Calling :meth:`classify` with a
``TaskCancelledError`` raises ``ValueError`` to surface the bug.
Config override shape (optional ``config`` arg):
```python
{
"timeout": {"suggestions": ["自定义建议"], "retryable": False},
"llm_failure": {"message": "自定义消息"},
}
```
"""
@staticmethod
def classify(exc: Exception, config: dict | None = None) -> EmergencyError:
if isinstance(exc, TaskCancelledError):
# Contract: caller must check before invoking classify.
raise ValueError(
"TaskCancelledError must propagate as-is; caller must check "
"before invoking EmergencyRules.classify"
)
config = config or {}
for exc_type, code, message, suggestions, retryable in _DEFAULT_RULES:
if isinstance(exc, exc_type):
override = config.get(code, {}) if isinstance(config, dict) else {}
return EmergencyError(
error_code=code,
message=override.get("message", message),
suggestions=override.get("suggestions", list(suggestions)),
retryable=override.get("retryable", retryable),
original_error=str(exc),
)
# Generic fallback
override = config.get(_DEFAULT_ERROR_CODE, {}) if isinstance(config, dict) else {}
return EmergencyError(
error_code=_DEFAULT_ERROR_CODE,
message=override.get("message", _DEFAULT_MESSAGE),
suggestions=override.get("suggestions", list(_DEFAULT_SUGGESTIONS)),
retryable=override.get("retryable", _DEFAULT_RETRYABLE),
original_error=str(exc),
)

View File

@ -128,6 +128,10 @@ class TaskResult:
completed_at: datetime completed_at: datetime
metrics: dict | None = None metrics: dict | None = None
trace: Any | None = None trace: Any | None = None
# G7/U2: Emergency layer structured error. None preserves existing contract
# (error_message alone carries the human-readable string). When set,
# carries serialized EmergencyError.to_dict() for programmatic dispatch.
error_struct: dict | None = None
def to_dict(self) -> dict: def to_dict(self) -> dict:
d = { d = {
@ -142,6 +146,8 @@ class TaskResult:
} }
if self.trace is not None: if self.trace is not None:
d["trace"] = self.trace.to_dict() if hasattr(self.trace, "to_dict") else self.trace d["trace"] = self.trace.to_dict() if hasattr(self.trace, "to_dict") else self.trace
if self.error_struct is not None:
d["error_struct"] = self.error_struct
return d return d
@classmethod @classmethod
@ -162,6 +168,7 @@ class TaskResult:
completed_at=completed_at or datetime.now(timezone.utc), completed_at=completed_at or datetime.now(timezone.utc),
metrics=data.get("metrics"), metrics=data.get("metrics"),
trace=data.get("trace"), trace=data.get("trace"),
error_struct=data.get("error_struct"),
) )

View File

@ -0,0 +1,318 @@
"""G7/U2 — Emergency layer rule template + TaskResult extension.
Verifies:
- EmergencyRules.classify maps each exception type to correct error_code
- TaskCancelledError raises ValueError (caller must propagate as-is)
- EmergencyError.to_dict produces all 5 fields
- EmergencyError.to_error_message formats suggestions as "建议1) ... 2) ..."
- Config overrides apply (suggestions, retryable, message)
- TaskResult.error_struct field: default None preserves byte-for-byte
to_dict() output (backward compat)
- TaskResult round-trip serialization includes error_struct when set
"""
from datetime import datetime, timezone
import pytest
from agentkit.core.exceptions import (
LLMProviderError,
LoopDetectedError,
TaskCancelledError,
TaskTimeoutError,
)
from agentkit.core.fallback import (
EMPTY_LLM_RESPONSE,
MAX_STEPS_REACHED,
SHELL_NO_OUTPUT,
EmergencyError,
EmergencyRules,
)
from agentkit.core.protocol import TaskResult
# ── Constants unchanged (contract preservation) ──────
class TestExistingConstantsUnchanged:
"""Existing 3 constants preserved byte-for-byte."""
def test_empty_llm_response_unchanged(self):
assert "模型未返回有效内容" in EMPTY_LLM_RESPONSE
assert "建议" in EMPTY_LLM_RESPONSE
def test_max_steps_reached_unchanged(self):
assert "已达到最大推理步数" in MAX_STEPS_REACHED
def test_shell_no_output_unchanged(self):
assert SHELL_NO_OUTPUT == "[命令执行成功,无输出内容]"
# ── EmergencyRules.classify ──────────────────────────
class TestEmergencyRulesClassify:
"""classify() maps exception types to EmergencyError."""
def test_timeout(self):
exc = TaskTimeoutError(task_id="t1", timeout_seconds=30)
err = EmergencyRules.classify(exc)
assert err.error_code == "timeout"
assert err.retryable is True
assert "稍后重试" in err.suggestions
assert "简化任务范围" in err.suggestions
assert err.original_error == str(exc)
def test_loop_detected(self):
exc = LoopDetectedError(tool_name="shell", repetitions=3)
err = EmergencyRules.classify(exc)
assert err.error_code == "loop_detected"
assert err.retryable is True
assert "拆分任务" in err.suggestions
assert "检查工具参数" in err.suggestions
def test_llm_provider_error(self):
exc = LLMProviderError("openai", "rate limited")
err = EmergencyRules.classify(exc)
assert err.error_code == "llm_failure"
assert err.retryable is True
assert "稍后重试" in err.suggestions
assert "切换模型" in err.suggestions
def test_llm_error_subclass_also_classified(self):
"""LLMProviderError is a subclass of LLMError; ensure isinstance check works."""
from agentkit.core.exceptions import LLMError
class CustomLLMError(LLMError):
pass
err = EmergencyRules.classify(CustomLLMError("custom"))
# CustomLLMError is NOT a LLMProviderError, falls through to generic
assert err.error_code == "internal_error"
def test_generic_exception_internal_error(self):
err = EmergencyRules.classify(Exception("unknown boom"))
assert err.error_code == "internal_error"
assert err.retryable is False
assert "联系管理员" in err.suggestions
assert err.original_error == "unknown boom"
def test_task_cancelled_raises(self):
"""TaskCancelledError must propagate; classify() raises ValueError."""
exc = TaskCancelledError(task_id="t1")
with pytest.raises(ValueError, match="TaskCancelledError"):
EmergencyRules.classify(exc)
def test_subclass_of_timeout_classified(self):
"""Subclasses of TaskTimeoutError are classified as timeout."""
class CustomTimeout(TaskTimeoutError):
def __init__(self):
super().__init__(task_id="custom", timeout_seconds=10)
err = EmergencyRules.classify(CustomTimeout())
assert err.error_code == "timeout"
# ── EmergencyError serialization ─────────────────────
class TestEmergencyErrorSerialization:
"""to_dict / to_error_message on EmergencyError."""
def test_to_dict_produces_all_five_fields(self):
err = EmergencyError(
error_code="timeout",
message="任务执行超时。",
suggestions=["稍后重试", "简化任务范围"],
retryable=True,
original_error="Task t1 timed out after 30s",
)
d = err.to_dict()
assert set(d.keys()) == {
"error_code",
"message",
"suggestions",
"retryable",
"original_error",
}
assert d["error_code"] == "timeout"
assert d["message"] == "任务执行超时。"
assert d["suggestions"] == ["稍后重试", "简化任务范围"]
assert d["retryable"] is True
assert d["original_error"] == "Task t1 timed out after 30s"
def test_to_dict_suggestions_list_is_copy(self):
"""to_dict returns a fresh list, not the internal reference."""
suggestions = ["a", "b"]
err = EmergencyError(
error_code="x",
message="m",
suggestions=suggestions,
retryable=False,
original_error="e",
)
d = err.to_dict()
assert d["suggestions"] is not suggestions
d["suggestions"].append("c")
assert err.suggestions == ["a", "b"]
def test_to_error_message_with_suggestions(self):
err = EmergencyError(
error_code="timeout",
message="任务执行超时。",
suggestions=["稍后重试", "简化任务范围"],
retryable=True,
original_error="err",
)
msg = err.to_error_message()
assert msg.startswith("任务执行超时。建议:")
assert "1) 稍后重试" in msg
assert "2) 简化任务范围" in msg
# Format mirrors EMPTY_LLM_RESPONSE style
assert msg.endswith("")
def test_to_error_message_no_suggestions(self):
err = EmergencyError(
error_code="x",
message="just a message",
suggestions=[],
retryable=False,
original_error="e",
)
assert err.to_error_message() == "just a message"
def test_to_error_message_single_suggestion(self):
err = EmergencyError(
error_code="x",
message="msg",
suggestions=["only one"],
retryable=False,
original_error="e",
)
msg = err.to_error_message()
assert msg == "msg建议1) only one。"
# ── Config override ──────────────────────────────────
class TestConfigOverride:
"""classify() applies per-rule config overrides."""
def test_override_suggestions(self):
exc = TaskTimeoutError(task_id="t", timeout_seconds=1)
cfg = {"timeout": {"suggestions": ["自定义建议 A", "自定义建议 B"]}}
err = EmergencyRules.classify(exc, config=cfg)
assert err.suggestions == ["自定义建议 A", "自定义建议 B"]
assert err.error_code == "timeout"
def test_override_retryable(self):
exc = LLMProviderError("openai", "boom")
cfg = {"llm_failure": {"retryable": False}}
err = EmergencyRules.classify(exc, config=cfg)
assert err.retryable is False
def test_override_message(self):
exc = LoopDetectedError(tool_name="x", repetitions=2)
cfg = {"loop_detected": {"message": "循环啦!"}}
err = EmergencyRules.classify(exc, config=cfg)
assert err.message == "循环啦!"
def test_override_internal_error_rule(self):
cfg = {"internal_error": {"suggestions": ["联系客服"]}}
err = EmergencyRules.classify(Exception("boom"), config=cfg)
assert err.error_code == "internal_error"
assert err.suggestions == ["联系客服"]
def test_config_none_uses_defaults(self):
err = EmergencyRules.classify(TaskTimeoutError(task_id="t", timeout_seconds=1))
assert err.error_code == "timeout"
assert err.retryable is True
def test_config_empty_dict_uses_defaults(self):
err = EmergencyRules.classify(
TaskTimeoutError(task_id="t", timeout_seconds=1), config={}
)
assert err.error_code == "timeout"
assert err.retryable is True
# ── TaskResult.error_struct extension ────────────────
def _make_task_result(
error_struct: dict | None = None, error_message: str | None = None
) -> TaskResult:
now = datetime.now(timezone.utc)
return TaskResult(
task_id="t1",
agent_name="a1",
status="completed",
output_data={"k": "v"},
error_message=error_message,
started_at=now,
completed_at=now,
metrics={"m": 1},
error_struct=error_struct,
)
class TestTaskResultErrorStruct:
"""TaskResult.error_struct field — backward-compatible extension."""
def test_default_error_struct_is_none(self):
tr = _make_task_result()
assert tr.error_struct is None
def test_to_dict_without_error_struct_preserves_existing_shape(self):
"""error_struct=None → to_dict() output has NO error_struct key (byte-for-byte)."""
tr = _make_task_result()
d = tr.to_dict()
assert "error_struct" not in d
# Existing keys unchanged
assert set(d.keys()) == {
"task_id",
"agent_name",
"status",
"output_data",
"error_message",
"started_at",
"completed_at",
"metrics",
}
def test_to_dict_with_error_struct_includes_key(self):
struct = {
"error_code": "timeout",
"message": "超时",
"suggestions": ["重试"],
"retryable": True,
"original_error": "boom",
}
tr = _make_task_result(error_struct=struct, error_message="超时建议1) 重试。")
d = tr.to_dict()
assert d["error_struct"] == struct
assert d["error_message"] == "超时建议1) 重试。"
def test_from_dict_round_trip_with_error_struct(self):
struct = {"error_code": "loop_detected", "message": "m", "suggestions": [], "retryable": True, "original_error": "e"}
tr = _make_task_result(error_struct=struct)
d = tr.to_dict()
restored = TaskResult.from_dict(d)
assert restored.error_struct == struct
def test_from_dict_without_error_struct_defaults_none(self):
tr = _make_task_result()
d = tr.to_dict()
# Simulate legacy data without error_struct key
restored = TaskResult.from_dict(d)
assert restored.error_struct is None
def test_error_message_and_error_struct_coexist(self):
"""Both fields can be set simultaneously (parallel contract per KTD2)."""
struct = {"error_code": "timeout", "message": "超时", "suggestions": ["重试"], "retryable": True, "original_error": "err"}
tr = _make_task_result(error_struct=struct, error_message="超时建议1) 重试。")
d = tr.to_dict()
assert d["error_message"] == "超时建议1) 重试。"
assert d["error_struct"] == struct