fischer-agentkit/src/agentkit/tools/output_parser.py

295 lines
8.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""OutputParser - 结构化解析命令输出
将命令行输出解析为结构化格式,包含错误类型识别、退出码含义和可操作建议。
"""
from __future__ import annotations
import re
from dataclasses import dataclass, field
from enum import Enum
from typing import Any
class ErrorType(Enum):
"""命令输出错误类型"""
NONE = "none"
PERMISSION_DENIED = "permission_denied"
NOT_FOUND = "not_found"
TIMEOUT = "timeout"
SYNTAX_ERROR = "syntax_error"
CONNECTION_REFUSED = "connection_refused"
OUT_OF_MEMORY = "out_of_memory"
DISK_FULL = "disk_full"
ALREADY_EXISTS = "already_exists"
INVALID_ARGUMENT = "invalid_argument"
PROCESS_NOT_FOUND = "process_not_found"
NETWORK_ERROR = "network_error"
UNKNOWN = "unknown"
@dataclass
class ParsedOutput:
"""结构化命令输出
Attributes:
exit_code: 命令退出码
is_error: 是否为错误输出
error_type: 错误类型(仅当 is_error=True 时有值)
message: 输出消息摘要
raw_output: 原始输出文本
suggestions: 可操作建议列表
"""
exit_code: int
is_error: bool
error_type: ErrorType = ErrorType.NONE
message: str = ""
raw_output: str = ""
suggestions: list[str] = field(default_factory=list)
def to_dict(self) -> dict[str, Any]:
return {
"exit_code": self.exit_code,
"is_error": self.is_error,
"error_type": self.error_type.value,
"message": self.message,
"suggestions": self.suggestions,
}
# 错误模式匹配规则:(pattern, error_type, message_template, suggestions)
_ERROR_PATTERNS: list[tuple[re.Pattern, ErrorType, str, list[str]]] = [
(
re.compile(r"permission denied|access denied|权限不足|拒绝访问", re.IGNORECASE),
ErrorType.PERMISSION_DENIED,
"权限不足",
[
"尝试使用 sudo 执行该命令",
"检查文件/目录权限: ls -la <path>",
"确认当前用户是否有所需权限",
],
),
(
re.compile(
r"not found|no such file|no such directory|找不到|不存在|无法找到",
re.IGNORECASE,
),
ErrorType.NOT_FOUND,
"文件或目录不存在",
[
"检查路径拼写是否正确",
"使用 ls 确认文件/目录是否存在",
"检查是否在正确的工作目录下",
],
),
(
re.compile(r"timed?\s*out|timeout|超时|时间超限", re.IGNORECASE),
ErrorType.TIMEOUT,
"命令执行超时",
[
"增加超时时间",
"检查网络连接是否正常",
"检查目标服务是否可达",
],
),
(
re.compile(
r"syntax error|syntaxerror|parse error|语法错误|解析错误",
re.IGNORECASE,
),
ErrorType.SYNTAX_ERROR,
"语法错误",
[
"检查命令语法是否正确",
"使用 --help 查看命令用法",
"检查引号和特殊字符是否正确转义",
],
),
(
re.compile(
r"connection refused|连接被拒绝|无法连接|ECONNREFUSED",
re.IGNORECASE,
),
ErrorType.CONNECTION_REFUSED,
"连接被拒绝",
[
"检查目标服务是否已启动",
"确认端口号是否正确",
"检查防火墙设置是否阻止了连接",
],
),
(
re.compile(
r"out of memory|oom|cannot allocate|内存不足|内存溢出",
re.IGNORECASE,
),
ErrorType.OUT_OF_MEMORY,
"内存不足",
[
"释放不必要的内存占用",
"增加系统可用内存",
"检查是否有内存泄漏",
],
),
(
re.compile(
r"no space left|disk full|磁盘已满|空间不足|ENOSPC",
re.IGNORECASE,
),
ErrorType.DISK_FULL,
"磁盘空间不足",
[
"清理不必要的文件: du -sh * | sort -rh | head",
"检查磁盘使用情况: df -h",
"删除临时文件或日志",
],
),
(
re.compile(
r"already exists|file exists|已存在|重复|EEXIST",
re.IGNORECASE,
),
ErrorType.ALREADY_EXISTS,
"资源已存在",
[
"使用 -f 参数强制覆盖(如适用)",
"先删除已有资源再重新创建",
"使用不同名称创建",
],
),
(
re.compile(
r"invalid argument|illegal option|bad option|无效参数|非法选项|invalid option",
re.IGNORECASE,
),
ErrorType.INVALID_ARGUMENT,
"无效参数",
[
"检查命令参数是否正确",
"使用 --help 查看支持的参数",
"确认参数值类型和范围",
],
),
(
re.compile(
r"no such process|process not found|进程不存在|进程未找到",
re.IGNORECASE,
),
ErrorType.PROCESS_NOT_FOUND,
"进程不存在",
[
"确认进程 ID 是否正确",
"使用 ps aux 查看运行中的进程",
"进程可能已经结束",
],
),
(
re.compile(
r"network is unreachable|no route to host|name resolution|网络不可达|无法解析|ENETUNREACH",
re.IGNORECASE,
),
ErrorType.NETWORK_ERROR,
"网络错误",
[
"检查网络连接是否正常",
"确认 DNS 解析是否正常: nslookup <domain>",
"检查代理设置",
],
),
]
class OutputParser:
"""命令输出结构化解析器
将命令行输出stdout + stderr和退出码解析为结构化的 ParsedOutput
包含错误类型识别、消息摘要和可操作建议。
"""
def parse(self, output: str, exit_code: int) -> ParsedOutput:
"""解析命令输出
Args:
output: 命令的标准输出和错误输出合并文本
exit_code: 命令退出码
Returns:
ParsedOutput 结构化解析结果
"""
is_error = exit_code != 0
message = self._extract_message(output)
error_type = ErrorType.NONE
suggestions: list[str] = []
if is_error:
error_type, suggestions = self._classify_error(output, exit_code)
return ParsedOutput(
exit_code=exit_code,
is_error=is_error,
error_type=error_type,
message=message,
raw_output=output,
suggestions=suggestions,
)
def _extract_message(self, output: str) -> str:
"""从输出中提取关键消息
取最后几行非空输出中的关键行作为消息摘要。
"""
if not output:
return ""
lines = [line.strip() for line in output.strip().splitlines() if line.strip()]
if not lines:
return ""
# 取最后一行作为摘要,如果太长则截断
message = lines[-1]
if len(message) > 200:
message = message[:200] + "..."
return message
def _classify_error(
self, output: str, exit_code: int
) -> tuple[ErrorType, list[str]]:
"""根据输出内容和退出码分类错误类型
Args:
output: 命令输出
exit_code: 退出码
Returns:
(error_type, suggestions) 元组
"""
# 优先根据输出内容匹配
for pattern, error_type, _msg, suggestions in _ERROR_PATTERNS:
if pattern.search(output):
return error_type, suggestions
# 退出码兜底分类
if exit_code == 126:
return ErrorType.PERMISSION_DENIED, [
"检查文件是否有执行权限: chmod +x <file>",
"确认文件格式是否正确(如行尾符)",
]
if exit_code == 127:
return ErrorType.NOT_FOUND, [
"检查命令是否已安装",
"确认命令名称拼写是否正确",
"检查 PATH 环境变量是否包含命令所在目录",
]
if exit_code == 130:
return ErrorType.TIMEOUT, [
"命令被 Ctrl+C 中断",
"可能需要增加超时时间",
]
return ErrorType.UNKNOWN, [
"检查命令输出中的错误信息",
"使用 --verbose 或 --debug 获取更多详情",
]