fischer-agentkit/src/agentkit/memory/document_loader.py

422 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""DocumentLoader - 多格式文档解析器
支持 PDFPyMuPDF/pdfplumber、Wordpython-docx、Markdownmistune
HTMLBeautifulSoup、纯文本。所有格式依赖均为可选try/except ImportError
"""
from __future__ import annotations
import io
import logging
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
from typing import TypeAlias
from agentkit.memory.base import MetadataDict
logger = logging.getLogger(__name__)
# ponytail: resource caps prevent OOM from malicious/oversized uploads.
# Ceiling: a 100MB document is ~25M tokens — beyond any useful LLM context.
# Upgrade path: stream to disk for very large files if needed.
MAX_CONTENT_SIZE = 100 * 1024 * 1024 # 100MB
MAX_ROWS_PER_SHEET = 10_000
MAX_CELL_CHARS = 10_000
# 文档元数据source/format/parser/page_count/table_count/sheet_count/row_count/
# heading_count/created_at/title/truncated — 全部为原始标量。
DocumentMetadata: TypeAlias = MetadataDict
ParseResult: TypeAlias = tuple[str, DocumentMetadata]
@dataclass
class Document:
"""解析后的文档统一格式"""
doc_id: str
title: str
content: str
metadata: DocumentMetadata = field(default_factory=dict)
def __post_init__(self) -> None:
if "source" not in self.metadata:
self.metadata["source"] = ""
if "format" not in self.metadata:
self.metadata["format"] = "unknown"
if "page_count" not in self.metadata:
self.metadata["page_count"] = 0
if "created_at" not in self.metadata:
self.metadata["created_at"] = datetime.now(timezone.utc).isoformat()
def to_dict(self) -> dict[str, object]:
return {
"doc_id": self.doc_id,
"title": self.title,
"content": self.content,
"metadata": self.metadata,
}
def _detect_format(filename: str) -> str:
"""根据文件扩展名检测文档格式"""
ext = Path(filename).suffix.lower()
format_map = {
".pdf": "pdf",
".docx": "docx",
".doc": "docx",
".xlsx": "xlsx",
".xls": "xlsx",
".md": "markdown",
".markdown": "markdown",
".html": "html",
".htm": "html",
".txt": "text",
".csv": "text",
".json": "text",
".xml": "text",
}
return format_map.get(ext, "text")
class DocumentLoader:
"""多格式文档解析器
支持格式:
- PDF: PyMuPDF (fitz) → pdfplumber → 纯文本回退
- Word: python-docx → 纯文本回退
- Excel: openpyxl → 纯文本回退
- Markdown: mistune → 纯文本回退
- HTML: BeautifulSoup → 纯文本回退
- 纯文本: 直接读取
"""
def load(self, file_path: str | Path) -> Document:
"""从文件路径加载文档
Args:
file_path: 文件路径
Returns:
解析后的 Document 对象
Raises:
FileNotFoundError: 文件不存在
ValueError: 不支持的格式
"""
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"File not found: {path}")
content = path.read_bytes()
return self.load_bytes(content, path.name)
def load_bytes(self, content: bytes, filename: str) -> Document:
"""从字节内容加载文档
Args:
content: 文件字节内容
filename: 文件名(用于格式检测和元数据)
Returns:
解析后的 Document 对象
Raises:
ValueError: 内容超过 MAX_CONTENT_SIZE
"""
if len(content) > MAX_CONTENT_SIZE:
raise ValueError(
f"Content size {len(content)} bytes exceeds limit {MAX_CONTENT_SIZE} bytes"
)
doc_format = _detect_format(filename)
doc_id = str(uuid.uuid4())
parsers = {
"pdf": self._parse_pdf,
"docx": self._parse_docx,
"xlsx": self._parse_xlsx,
"markdown": self._parse_markdown,
"html": self._parse_html,
"text": self._parse_text,
}
parser = parsers.get(doc_format)
if parser is None:
logger.warning(
f"Unsupported format '{doc_format}' for {filename}, falling back to text"
)
parser = self._parse_text
text, extra_meta = parser(content, filename)
metadata: DocumentMetadata = {
"source": filename,
"format": doc_format,
"created_at": datetime.now(timezone.utc).isoformat(),
}
metadata.update(extra_meta)
title = Path(filename).stem
if "title" in extra_meta:
title = extra_meta["title"]
return Document(
doc_id=doc_id,
title=title,
content=text,
metadata=metadata,
)
def _parse_pdf(self, content: bytes, filename: str) -> ParseResult:
"""解析 PDF 文件
优先使用 PyMuPDF (fitz),回退到 pdfplumber最终回退到纯文本。
"""
# 尝试 PyMuPDF
try:
import fitz # PyMuPDF
doc = fitz.open(stream=content, filetype="pdf")
pages = []
for page in doc:
pages.append(page.get_text())
text = "\n\n".join(pages)
meta = {
"page_count": len(doc),
"parser": "pymupdf",
}
# 提取 PDF 元数据中的标题
pdf_meta = doc.metadata
if pdf_meta and pdf_meta.get("title"):
meta["title"] = pdf_meta["title"]
doc.close()
return text, meta
except ImportError:
pass
except Exception as e:
logger.warning(f"PyMuPDF parsing failed for {filename}: {e}")
# 尝试 pdfplumber
try:
import pdfplumber
import io
pdf = pdfplumber.open(io.BytesIO(content))
pages = []
for page in pdf.pages:
page_text = page.extract_text()
if page_text:
pages.append(page_text)
text = "\n\n".join(pages)
meta = {
"page_count": len(pdf.pages),
"parser": "pdfplumber",
}
pdf.close()
return text, meta
except ImportError:
pass
except Exception as e:
logger.warning(f"pdfplumber parsing failed for {filename}: {e}")
# 回退到纯文本
logger.warning(f"No PDF parser available for {filename}, falling back to text extraction")
return self._parse_text(content, filename)
def _parse_docx(self, content: bytes, filename: str) -> ParseResult:
"""解析 Word 文件
使用 python-docx回退到纯文本。
"""
try:
from docx import Document as DocxDocument
import io
doc = DocxDocument(io.BytesIO(content))
paragraphs = []
table_count = 0
# 提取段落文本
for para in doc.paragraphs:
if para.text.strip():
paragraphs.append(para.text.strip())
# 提取表格文本
for table in doc.tables:
table_count += 1
for row in table.rows:
row_text = " | ".join(cell.text.strip() for cell in row.cells)
if row_text.strip(" |"):
paragraphs.append(row_text)
text = "\n\n".join(paragraphs)
meta = {
"parser": "python-docx",
"table_count": table_count,
}
# 提取文档属性中的标题
if doc.core_properties and doc.core_properties.title:
meta["title"] = doc.core_properties.title
return text, meta
except ImportError:
logger.warning(f"python-docx not available for {filename}, falling back to text")
return self._parse_text(content, filename)
except Exception as e:
logger.warning(f"python-docx parsing failed for {filename}: {e}")
return self._parse_text(content, filename)
def _parse_xlsx(self, content: bytes, filename: str) -> ParseResult:
"""解析 Excel 文件
使用 openpyxl回退到纯文本。每个 sheet 转为 Markdown 表格,
多个 sheet 用空行分隔sheet 名作为 H2 标题。
注意data_only=True 对未在 Excel 中打开过的公式返回 None静默数据丢失
合并单元格仅左上角有值,其余为空。
"""
try:
from openpyxl import load_workbook
wb = load_workbook(io.BytesIO(content), data_only=True, read_only=True)
try:
sections: list[str] = []
sheet_count = 0
total_rows = 0
truncated = False
for ws in wb.worksheets:
sheet_count += 1
row_iter = ws.iter_rows(values_only=True)
rows: list[tuple] = []
for row in row_iter:
if total_rows + len(rows) >= MAX_ROWS_PER_SHEET:
truncated = True
break
rows.append(row)
if not rows:
continue
sections.append(f"## {ws.title}")
# Compute max column count for uniform Markdown table
max_cols = max(len(r) for r in rows)
for i, row in enumerate(rows):
total_rows += 1
cells = ["" if v is None else str(v)[:MAX_CELL_CHARS] for v in row]
# Pad to max_cols for valid Markdown table
cells += [""] * (max_cols - len(cells))
sections.append("| " + " | ".join(cells) + " |")
# ponytail: separator after header row for Markdown table validity
if i == 0:
sep_cells = ["---"] * max_cols
sections.append("| " + " | ".join(sep_cells) + " |")
if truncated:
sections.append(f"<!-- truncated at {MAX_ROWS_PER_SHEET} rows -->")
sections.append("") # blank line between sheets
finally:
wb.close()
text = "\n".join(sections).strip()
meta: DocumentMetadata = {
"parser": "openpyxl",
"sheet_count": sheet_count,
"row_count": total_rows,
}
if truncated:
meta["truncated"] = True
return text, meta
except ImportError:
logger.warning(f"openpyxl not available for {filename}, falling back to text")
return self._parse_text(content, filename)
except Exception as e:
logger.warning(f"openpyxl parsing failed for {filename}: {e}")
return self._parse_text(content, filename)
def _parse_markdown(self, content: bytes, filename: str) -> ParseResult:
"""解析 Markdown 文件
使用 mistune如果可用否则直接读取文本。
Markdown 原文保留,因为后续分块需要标题结构。
"""
try:
text = content.decode("utf-8")
except UnicodeDecodeError:
text = content.decode("utf-8", errors="replace")
# 提取第一个标题作为文档标题
title = ""
for line in text.split("\n"):
line_stripped = line.strip()
if line_stripped.startswith("#"):
title = line_stripped.lstrip("#").strip()
break
meta: DocumentMetadata = {
"parser": "markdown",
}
if title:
meta["title"] = title
# 统计标题数量ponytail: simple string check, no mistune dependency needed
heading_count = 0
for line in text.split("\n"):
if line.strip().startswith("#"):
heading_count += 1
meta["heading_count"] = heading_count
return text, meta
def _parse_html(self, content: bytes, filename: str) -> ParseResult:
"""解析 HTML 文件
使用 BeautifulSoup 提取文本,回退到纯文本。
"""
try:
from bs4 import BeautifulSoup
try:
html_text = content.decode("utf-8")
except UnicodeDecodeError:
html_text = content.decode("utf-8", errors="replace")
soup = BeautifulSoup(html_text, "html.parser")
# 移除 script 和 style 标签
for tag in soup(["script", "style"]):
tag.decompose()
text = soup.get_text(separator="\n", strip=True)
# 提取标题
title = ""
if soup.title and soup.title.string:
title = soup.title.string.strip()
meta: DocumentMetadata = {
"parser": "beautifulsoup",
}
if title:
meta["title"] = title
return text, meta
except ImportError:
logger.warning(f"BeautifulSoup not available for {filename}, falling back to text")
return self._parse_text(content, filename)
except Exception as e:
logger.warning(f"BeautifulSoup parsing failed for {filename}: {e}")
return self._parse_text(content, filename)
def _parse_text(self, content: bytes, filename: str) -> ParseResult:
"""解析纯文本文件"""
try:
text = content.decode("utf-8")
except UnicodeDecodeError:
text = content.decode("utf-8", errors="replace")
return text, {"parser": "text"}