From 5b2377469aeea563e6c2553fba133d0cae45b750 Mon Sep 17 00:00:00 2001 From: chiguyong Date: Mon, 29 Jun 2026 22:43:22 +0800 Subject: [PATCH] =?UTF-8?q?feat(U2):=20G7=20Emergency=20=E8=A7=84=E5=88=99?= =?UTF-8?q?=E6=A8=A1=E6=9D=BF=20+=20TaskResult.error=5Fstruct?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 新增 EmergencyError 数据类(stable error_code + 中文 message + suggestions + retryable + original_error)和 EmergencyRules 规则分类器(纯规则,无 LLM): - TaskTimeoutError → timeout (retryable) - LoopDetectedError → loop_detected (retryable) - LLMProviderError → llm_failure (retryable) - Exception → internal_error (not retryable) - TaskCancelledError 不分类(调用方须先检查,否则 ValueError) TaskResult 新增并行 error_struct 字段(默认 None 保持既有契约)。 to_dict 当 error_struct=None 时不输出该键(byte-for-byte 兼容)。 - fallback.py 既有 3 常量不变(EMPTY_LLM_RESPONSE/MAX_STEPS_REACHED/SHELL_NO_OUTPUT) - 支持 config 覆盖(suggestions/retryable/message 按 error_code 分组) - 27 测试覆盖分类/序列化/配置覆盖/契约保持 --- src/agentkit/core/fallback.py | 146 +++++++++++++ src/agentkit/core/protocol.py | 7 + tests/unit/test_emergency_rules.py | 318 +++++++++++++++++++++++++++++ 3 files changed, 471 insertions(+) create mode 100644 tests/unit/test_emergency_rules.py diff --git a/src/agentkit/core/fallback.py b/src/agentkit/core/fallback.py index c3f7455..1c28549 100644 --- a/src/agentkit/core/fallback.py +++ b/src/agentkit/core/fallback.py @@ -2,8 +2,22 @@ All layers (ReActEngine, Portal, Chat) should use these constants to ensure consistent user-facing messages. + +G7/U2: Also hosts ``EmergencyError`` and ``EmergencyRules`` for the +three-tier fallback chain's Emergency layer (rule-based classifier, +no LLM). See ``docs/plans/2026-06-29-003-feat-agent-wave2-medium-coupling-plan.md``. """ +from dataclasses import dataclass +from typing import Any + +from agentkit.core.exceptions import ( + LLMProviderError, + LoopDetectedError, + TaskCancelledError, + TaskTimeoutError, +) + # When LLM returns empty content after all fallback models exhausted EMPTY_LLM_RESPONSE = ( "模型未返回有效内容,已尝试备用模型仍未成功。" @@ -16,3 +30,135 @@ MAX_STEPS_REACHED = "已达到最大推理步数,但仍未得到完整结论 # When a shell command succeeds but produces no output SHELL_NO_OUTPUT = "[命令执行成功,无输出内容]" + + +# ── G7/U2: Emergency layer ────────────────────────────────────── + + +@dataclass +class EmergencyError: + """Structured error produced by the Emergency layer (rule-classified). + + Carries a stable ``error_code`` for programmatic dispatch (frontend + retry UI, telemetry), a human-readable ``message`` mirroring + ``EMPTY_LLM_RESPONSE`` style, actionable ``suggestions``, and the + original exception string for traceability. + + The ``retryable`` flag distinguishes recoverable user errors + (timeout, loop, LLM hiccup) from internal bugs (retryable=False). + """ + + error_code: str # "timeout" | "loop_detected" | "llm_failure" | "internal_error" + message: str # human-readable Chinese message + suggestions: list[str] # actionable user-facing suggestions + retryable: bool # whether a user retry might succeed + original_error: str # str(exc) for traceability + + def to_dict(self) -> dict[str, Any]: + return { + "error_code": self.error_code, + "message": self.message, + "suggestions": list(self.suggestions), + "retryable": self.retryable, + "original_error": self.original_error, + } + + def to_error_message(self) -> str: + """Format as a single human-readable string with suggestions. + + Mirrors ``EMPTY_LLM_RESPONSE`` style: ``建议:1) ... 2) ...`` + """ + if not self.suggestions: + return self.message + suggestion_str = "".join(f"{i}) {s};" for i, s in enumerate(self.suggestions, 1)) + # Strip trailing ";" and prefix with "建议:" + suggestion_str = suggestion_str.rstrip(";") + return f"{self.message}建议:{suggestion_str}。" + + +# Default rule set: (exception_type, error_code, message, suggestions, retryable) +# ponytail: ceiling — rule-based, no LLM. Adding a new rule = append a tuple. +# Upgrade path: LLM-driven suggestions would require a separate classifier +# class (out of scope per brainstorm KTD4). +_DEFAULT_RULES: list[tuple[type[Exception], str, str, list[str], bool]] = [ + ( + TaskTimeoutError, + "timeout", + "任务执行超时。", + ["稍后重试", "简化任务范围"], + True, + ), + ( + LoopDetectedError, + "loop_detected", + "检测到推理循环。", + ["拆分任务", "检查工具参数"], + True, + ), + ( + LLMProviderError, + "llm_failure", + "LLM 服务调用失败。", + ["稍后重试", "切换模型"], + True, + ), +] + +_DEFAULT_ERROR_CODE = "internal_error" +_DEFAULT_MESSAGE = "Agent 执行内部错误。" +_DEFAULT_SUGGESTIONS: list[str] = ["联系管理员"] +_DEFAULT_RETRYABLE = False + + +class EmergencyRules: + """Rule-based classifier for the Emergency layer. + + Maps exception types to ``EmergencyError`` instances. No LLM, no I/O — + pure function of ``(exception, config)``. + + Caller responsibility: ``TaskCancelledError`` MUST propagate as-is + (per KTD3); the caller checks ``isinstance(exc, TaskCancelledError)`` + before invoking :meth:`classify`. Calling :meth:`classify` with a + ``TaskCancelledError`` raises ``ValueError`` to surface the bug. + + Config override shape (optional ``config`` arg): + + ```python + { + "timeout": {"suggestions": ["自定义建议"], "retryable": False}, + "llm_failure": {"message": "自定义消息"}, + } + ``` + """ + + @staticmethod + def classify(exc: Exception, config: dict | None = None) -> EmergencyError: + if isinstance(exc, TaskCancelledError): + # Contract: caller must check before invoking classify. + raise ValueError( + "TaskCancelledError must propagate as-is; caller must check " + "before invoking EmergencyRules.classify" + ) + + config = config or {} + + for exc_type, code, message, suggestions, retryable in _DEFAULT_RULES: + if isinstance(exc, exc_type): + override = config.get(code, {}) if isinstance(config, dict) else {} + return EmergencyError( + error_code=code, + message=override.get("message", message), + suggestions=override.get("suggestions", list(suggestions)), + retryable=override.get("retryable", retryable), + original_error=str(exc), + ) + + # Generic fallback + override = config.get(_DEFAULT_ERROR_CODE, {}) if isinstance(config, dict) else {} + return EmergencyError( + error_code=_DEFAULT_ERROR_CODE, + message=override.get("message", _DEFAULT_MESSAGE), + suggestions=override.get("suggestions", list(_DEFAULT_SUGGESTIONS)), + retryable=override.get("retryable", _DEFAULT_RETRYABLE), + original_error=str(exc), + ) diff --git a/src/agentkit/core/protocol.py b/src/agentkit/core/protocol.py index 99ac0aa..60ed289 100644 --- a/src/agentkit/core/protocol.py +++ b/src/agentkit/core/protocol.py @@ -128,6 +128,10 @@ class TaskResult: completed_at: datetime metrics: dict | None = None trace: Any | None = None + # G7/U2: Emergency layer structured error. None preserves existing contract + # (error_message alone carries the human-readable string). When set, + # carries serialized EmergencyError.to_dict() for programmatic dispatch. + error_struct: dict | None = None def to_dict(self) -> dict: d = { @@ -142,6 +146,8 @@ class TaskResult: } if self.trace is not None: d["trace"] = self.trace.to_dict() if hasattr(self.trace, "to_dict") else self.trace + if self.error_struct is not None: + d["error_struct"] = self.error_struct return d @classmethod @@ -162,6 +168,7 @@ class TaskResult: completed_at=completed_at or datetime.now(timezone.utc), metrics=data.get("metrics"), trace=data.get("trace"), + error_struct=data.get("error_struct"), ) diff --git a/tests/unit/test_emergency_rules.py b/tests/unit/test_emergency_rules.py new file mode 100644 index 0000000..63a0d76 --- /dev/null +++ b/tests/unit/test_emergency_rules.py @@ -0,0 +1,318 @@ +"""G7/U2 — Emergency layer rule template + TaskResult extension. + +Verifies: +- EmergencyRules.classify maps each exception type to correct error_code +- TaskCancelledError raises ValueError (caller must propagate as-is) +- EmergencyError.to_dict produces all 5 fields +- EmergencyError.to_error_message formats suggestions as "建议:1) ... 2) ..." +- Config overrides apply (suggestions, retryable, message) +- TaskResult.error_struct field: default None preserves byte-for-byte + to_dict() output (backward compat) +- TaskResult round-trip serialization includes error_struct when set +""" + +from datetime import datetime, timezone + +import pytest + +from agentkit.core.exceptions import ( + LLMProviderError, + LoopDetectedError, + TaskCancelledError, + TaskTimeoutError, +) +from agentkit.core.fallback import ( + EMPTY_LLM_RESPONSE, + MAX_STEPS_REACHED, + SHELL_NO_OUTPUT, + EmergencyError, + EmergencyRules, +) +from agentkit.core.protocol import TaskResult + + +# ── Constants unchanged (contract preservation) ────── + + +class TestExistingConstantsUnchanged: + """Existing 3 constants preserved byte-for-byte.""" + + def test_empty_llm_response_unchanged(self): + assert "模型未返回有效内容" in EMPTY_LLM_RESPONSE + assert "建议" in EMPTY_LLM_RESPONSE + + def test_max_steps_reached_unchanged(self): + assert "已达到最大推理步数" in MAX_STEPS_REACHED + + def test_shell_no_output_unchanged(self): + assert SHELL_NO_OUTPUT == "[命令执行成功,无输出内容]" + + +# ── EmergencyRules.classify ────────────────────────── + + +class TestEmergencyRulesClassify: + """classify() maps exception types to EmergencyError.""" + + def test_timeout(self): + exc = TaskTimeoutError(task_id="t1", timeout_seconds=30) + err = EmergencyRules.classify(exc) + assert err.error_code == "timeout" + assert err.retryable is True + assert "稍后重试" in err.suggestions + assert "简化任务范围" in err.suggestions + assert err.original_error == str(exc) + + def test_loop_detected(self): + exc = LoopDetectedError(tool_name="shell", repetitions=3) + err = EmergencyRules.classify(exc) + assert err.error_code == "loop_detected" + assert err.retryable is True + assert "拆分任务" in err.suggestions + assert "检查工具参数" in err.suggestions + + def test_llm_provider_error(self): + exc = LLMProviderError("openai", "rate limited") + err = EmergencyRules.classify(exc) + assert err.error_code == "llm_failure" + assert err.retryable is True + assert "稍后重试" in err.suggestions + assert "切换模型" in err.suggestions + + def test_llm_error_subclass_also_classified(self): + """LLMProviderError is a subclass of LLMError; ensure isinstance check works.""" + from agentkit.core.exceptions import LLMError + + class CustomLLMError(LLMError): + pass + + err = EmergencyRules.classify(CustomLLMError("custom")) + # CustomLLMError is NOT a LLMProviderError, falls through to generic + assert err.error_code == "internal_error" + + def test_generic_exception_internal_error(self): + err = EmergencyRules.classify(Exception("unknown boom")) + assert err.error_code == "internal_error" + assert err.retryable is False + assert "联系管理员" in err.suggestions + assert err.original_error == "unknown boom" + + def test_task_cancelled_raises(self): + """TaskCancelledError must propagate; classify() raises ValueError.""" + exc = TaskCancelledError(task_id="t1") + with pytest.raises(ValueError, match="TaskCancelledError"): + EmergencyRules.classify(exc) + + def test_subclass_of_timeout_classified(self): + """Subclasses of TaskTimeoutError are classified as timeout.""" + + class CustomTimeout(TaskTimeoutError): + def __init__(self): + super().__init__(task_id="custom", timeout_seconds=10) + + err = EmergencyRules.classify(CustomTimeout()) + assert err.error_code == "timeout" + + +# ── EmergencyError serialization ───────────────────── + + +class TestEmergencyErrorSerialization: + """to_dict / to_error_message on EmergencyError.""" + + def test_to_dict_produces_all_five_fields(self): + err = EmergencyError( + error_code="timeout", + message="任务执行超时。", + suggestions=["稍后重试", "简化任务范围"], + retryable=True, + original_error="Task t1 timed out after 30s", + ) + d = err.to_dict() + assert set(d.keys()) == { + "error_code", + "message", + "suggestions", + "retryable", + "original_error", + } + assert d["error_code"] == "timeout" + assert d["message"] == "任务执行超时。" + assert d["suggestions"] == ["稍后重试", "简化任务范围"] + assert d["retryable"] is True + assert d["original_error"] == "Task t1 timed out after 30s" + + def test_to_dict_suggestions_list_is_copy(self): + """to_dict returns a fresh list, not the internal reference.""" + suggestions = ["a", "b"] + err = EmergencyError( + error_code="x", + message="m", + suggestions=suggestions, + retryable=False, + original_error="e", + ) + d = err.to_dict() + assert d["suggestions"] is not suggestions + d["suggestions"].append("c") + assert err.suggestions == ["a", "b"] + + def test_to_error_message_with_suggestions(self): + err = EmergencyError( + error_code="timeout", + message="任务执行超时。", + suggestions=["稍后重试", "简化任务范围"], + retryable=True, + original_error="err", + ) + msg = err.to_error_message() + assert msg.startswith("任务执行超时。建议:") + assert "1) 稍后重试" in msg + assert "2) 简化任务范围" in msg + # Format mirrors EMPTY_LLM_RESPONSE style + assert msg.endswith("。") + + def test_to_error_message_no_suggestions(self): + err = EmergencyError( + error_code="x", + message="just a message", + suggestions=[], + retryable=False, + original_error="e", + ) + assert err.to_error_message() == "just a message" + + def test_to_error_message_single_suggestion(self): + err = EmergencyError( + error_code="x", + message="msg", + suggestions=["only one"], + retryable=False, + original_error="e", + ) + msg = err.to_error_message() + assert msg == "msg建议:1) only one。" + + +# ── Config override ────────────────────────────────── + + +class TestConfigOverride: + """classify() applies per-rule config overrides.""" + + def test_override_suggestions(self): + exc = TaskTimeoutError(task_id="t", timeout_seconds=1) + cfg = {"timeout": {"suggestions": ["自定义建议 A", "自定义建议 B"]}} + err = EmergencyRules.classify(exc, config=cfg) + assert err.suggestions == ["自定义建议 A", "自定义建议 B"] + assert err.error_code == "timeout" + + def test_override_retryable(self): + exc = LLMProviderError("openai", "boom") + cfg = {"llm_failure": {"retryable": False}} + err = EmergencyRules.classify(exc, config=cfg) + assert err.retryable is False + + def test_override_message(self): + exc = LoopDetectedError(tool_name="x", repetitions=2) + cfg = {"loop_detected": {"message": "循环啦!"}} + err = EmergencyRules.classify(exc, config=cfg) + assert err.message == "循环啦!" + + def test_override_internal_error_rule(self): + cfg = {"internal_error": {"suggestions": ["联系客服"]}} + err = EmergencyRules.classify(Exception("boom"), config=cfg) + assert err.error_code == "internal_error" + assert err.suggestions == ["联系客服"] + + def test_config_none_uses_defaults(self): + err = EmergencyRules.classify(TaskTimeoutError(task_id="t", timeout_seconds=1)) + assert err.error_code == "timeout" + assert err.retryable is True + + def test_config_empty_dict_uses_defaults(self): + err = EmergencyRules.classify( + TaskTimeoutError(task_id="t", timeout_seconds=1), config={} + ) + assert err.error_code == "timeout" + assert err.retryable is True + + +# ── TaskResult.error_struct extension ──────────────── + + +def _make_task_result( + error_struct: dict | None = None, error_message: str | None = None +) -> TaskResult: + now = datetime.now(timezone.utc) + return TaskResult( + task_id="t1", + agent_name="a1", + status="completed", + output_data={"k": "v"}, + error_message=error_message, + started_at=now, + completed_at=now, + metrics={"m": 1}, + error_struct=error_struct, + ) + + +class TestTaskResultErrorStruct: + """TaskResult.error_struct field — backward-compatible extension.""" + + def test_default_error_struct_is_none(self): + tr = _make_task_result() + assert tr.error_struct is None + + def test_to_dict_without_error_struct_preserves_existing_shape(self): + """error_struct=None → to_dict() output has NO error_struct key (byte-for-byte).""" + tr = _make_task_result() + d = tr.to_dict() + assert "error_struct" not in d + # Existing keys unchanged + assert set(d.keys()) == { + "task_id", + "agent_name", + "status", + "output_data", + "error_message", + "started_at", + "completed_at", + "metrics", + } + + def test_to_dict_with_error_struct_includes_key(self): + struct = { + "error_code": "timeout", + "message": "超时", + "suggestions": ["重试"], + "retryable": True, + "original_error": "boom", + } + tr = _make_task_result(error_struct=struct, error_message="超时建议:1) 重试。") + d = tr.to_dict() + assert d["error_struct"] == struct + assert d["error_message"] == "超时建议:1) 重试。" + + def test_from_dict_round_trip_with_error_struct(self): + struct = {"error_code": "loop_detected", "message": "m", "suggestions": [], "retryable": True, "original_error": "e"} + tr = _make_task_result(error_struct=struct) + d = tr.to_dict() + restored = TaskResult.from_dict(d) + assert restored.error_struct == struct + + def test_from_dict_without_error_struct_defaults_none(self): + tr = _make_task_result() + d = tr.to_dict() + # Simulate legacy data without error_struct key + restored = TaskResult.from_dict(d) + assert restored.error_struct is None + + def test_error_message_and_error_struct_coexist(self): + """Both fields can be set simultaneously (parallel contract per KTD2).""" + struct = {"error_code": "timeout", "message": "超时", "suggestions": ["重试"], "retryable": True, "original_error": "err"} + tr = _make_task_result(error_struct=struct, error_message="超时建议:1) 重试。") + d = tr.to_dict() + assert d["error_message"] == "超时建议:1) 重试。" + assert d["error_struct"] == struct