"""文档解析器 - 支持多种格式""" import io from abc import ABC, abstractmethod from dataclasses import dataclass from typing import Optional @dataclass class ParsedDocument: """解析后的文档""" title: str content: str metadata: dict class BaseParser(ABC): """解析器基类""" @abstractmethod async def parse(self, content: bytes) -> ParsedDocument: """解析文档内容""" pass class PDFParser(BaseParser): """PDF解析器""" async def parse(self, content: bytes) -> ParsedDocument: """使用PyMuPDF解析PDF""" import fitz doc = fitz.open(stream=content) text_parts = [] metadata = {} # 提取元数据 if doc.metadata: metadata = { "author": doc.metadata.get("author", ""), "title": doc.metadata.get("title", ""), "subject": doc.metadata.get("subject", ""), } # 提取每页文本 for page_num, page in enumerate(doc): text = page.get_text() if text.strip(): text_parts.append(f"[第{page_num + 1}页]\n{text}") # 提取目录(如果存在) toc = doc.get_toc() if toc: metadata["has_toc"] = True metadata["toc_items"] = len(toc) doc.close() return ParsedDocument( title=metadata.get("title", "未命名文档") or "未命名文档", content="\n\n".join(text_parts), metadata=metadata, ) class DocxParser(BaseParser): """Word文档解析器""" async def parse(self, content: bytes) -> ParsedDocument: """使用python-docx解析Word""" from docx import Document doc = Document(io.BytesIO(content)) paragraphs = [] metadata = {} # 提取核心属性 core_props = doc.core_properties metadata = { "author": getattr(core_props, "author", "") or "", "title": getattr(core_props, "title", "") or "", "subject": getattr(core_props, "subject", "") or "", "created": str(getattr(core_props, "created", "")) or "", "modified": str(getattr(core_props, "modified", "")) or "", } # 提取段落 for para in doc.paragraphs: text = para.text.strip() if text: paragraphs.append(text) # 提取表格 for i, table in enumerate(doc.tables): table_text = [] for row in table.rows: cells = [cell.text.strip() for cell in row.cells] if any(cells): table_text.append(" | ".join(cells)) if table_text: paragraphs.append(f"[表格{i+1}]\n" + "\n".join(table_text)) return ParsedDocument( title=metadata.get("title", "未命名文档") or "未命名文档", content="\n\n".join(paragraphs), metadata=metadata, ) class MarkdownParser(BaseParser): """Markdown解析器""" async def parse(self, content: bytes) -> ParsedDocument: """解析Markdown""" text = content.decode("utf-8") # 提取标题(第一个#开头的内容) lines = text.split("\n") title = "未命名文档" for line in lines: line = line.strip() if line.startswith("# "): title = line[2:].strip() break return ParsedDocument( title=title, content=text, metadata={"format": "markdown"}, ) class TextParser(BaseParser): """纯文本解析器""" async def parse(self, content: bytes) -> ParsedDocument: """解析纯文本""" text = content.decode("utf-8") # 使用第一行作为标题 lines = text.split("\n") title = lines[0][:50] if lines else "未命名文档" return ParsedDocument( title=title, content=text, metadata={"format": "text"}, ) class ParserFactory: """解析器工厂""" PARSERS = { ".pdf": PDFParser, ".docx": DocxParser, ".md": MarkdownParser, ".txt": TextParser, ".html": MarkdownParser, # HTML当Markdown处理 } @classmethod def create(cls, file_extension: str) -> BaseParser: """创建解析器""" parser_cls = cls.PARSERS.get(file_extension.lower()) if not parser_cls: raise ValueError(f"Unsupported format: {file_extension}") return parser_cls() @classmethod def supported_formats(cls) -> list[str]: """支持的格式""" return list(cls.PARSERS.keys())