"""DocumentLoader 单元测试 - 多格式文档解析器""" import io import pytest from agentkit.memory.document_loader import ( MAX_ROWS_PER_SHEET, Document, DocumentLoader, _detect_format, ) class TestDetectFormat: """格式检测测试""" def test_pdf_format(self): assert _detect_format("report.pdf") == "pdf" def test_docx_format(self): assert _detect_format("document.docx") == "docx" assert _detect_format("document.doc") == "docx" def test_markdown_format(self): assert _detect_format("readme.md") == "markdown" assert _detect_format("notes.markdown") == "markdown" def test_html_format(self): assert _detect_format("page.html") == "html" assert _detect_format("page.htm") == "html" def test_text_format(self): assert _detect_format("data.txt") == "text" assert _detect_format("data.csv") == "text" assert _detect_format("data.json") == "text" def test_unknown_format_falls_back_to_text(self): assert _detect_format("data.xyz") == "text" class TestDocument: """Document 数据类测试""" def test_default_metadata(self): doc = Document(doc_id="1", title="Test", content="Hello") assert doc.metadata["source"] == "" assert doc.metadata["format"] == "unknown" assert doc.metadata["page_count"] == 0 assert "created_at" in doc.metadata def test_custom_metadata(self): doc = Document( doc_id="1", title="Test", content="Hello", metadata={"source": "test.pdf", "format": "pdf", "page_count": 5}, ) assert doc.metadata["source"] == "test.pdf" assert doc.metadata["format"] == "pdf" assert doc.metadata["page_count"] == 5 def test_to_dict(self): doc = Document(doc_id="1", title="Test", content="Hello", metadata={"format": "text"}) d = doc.to_dict() assert d["doc_id"] == "1" assert d["title"] == "Test" assert d["content"] == "Hello" assert d["metadata"]["format"] == "text" class TestDocumentLoaderText: """纯文本解析测试""" def test_load_text_bytes(self): loader = DocumentLoader() content = "Hello, world!\nThis is a test document.".encode("utf-8") doc = loader.load_bytes(content, "test.txt") assert doc.title == "test" assert doc.content == "Hello, world!\nThis is a test document." assert doc.metadata["format"] == "text" assert doc.metadata["source"] == "test.txt" assert doc.metadata["parser"] == "text" assert doc.doc_id # 非空 UUID def test_load_text_file(self, tmp_path): loader = DocumentLoader() text_file = tmp_path / "sample.txt" text_file.write_text("Sample text content", encoding="utf-8") doc = loader.load(text_file) assert doc.content == "Sample text content" assert doc.metadata["format"] == "text" def test_load_nonexistent_file(self): loader = DocumentLoader() with pytest.raises(FileNotFoundError): loader.load("/nonexistent/path/file.txt") class TestDocumentLoaderMarkdown: """Markdown 解析测试""" def test_load_markdown_bytes(self): loader = DocumentLoader() md_content = """# Project Title ## Introduction This is the introduction section. ## Details Some details here. """ doc = loader.load_bytes(md_content.encode("utf-8"), "readme.md") assert doc.metadata["format"] == "markdown" assert doc.metadata["title"] == "Project Title" assert "Introduction" in doc.content assert "Details" in doc.content def test_markdown_without_title(self): loader = DocumentLoader() md_content = "Just some text without a heading." doc = loader.load_bytes(md_content.encode("utf-8"), "notes.md") assert doc.metadata["format"] == "markdown" assert doc.content == "Just some text without a heading." class TestDocumentLoaderHTML: """HTML 解析测试""" def test_load_html_with_beautifulsoup(self): """测试 BeautifulSoup 解析(如果可用)""" loader = DocumentLoader() html_content = """
This is a paragraph.
""" doc = loader.load_bytes(html_content.encode("utf-8"), "page.html") assert doc.metadata["format"] == "html" # BeautifulSoup 应该移除 script/style 标签 # 如果 BeautifulSoup 不可用,则回退到文本 if doc.metadata.get("parser") == "beautifulsoup": assert "Test Page" in doc.metadata.get("title", "") or "Hello" in doc.content assert "var x" not in doc.content assert ".cls" not in doc.content assert "Hello" in doc.content else: # 纯文本回退,内容可能包含 HTML 标签 assert len(doc.content) > 0 def test_load_html_fallback_to_text(self): """即使没有 BeautifulSoup,HTML 也能作为文本加载""" loader = DocumentLoader() html_content = "Simple content" doc = loader.load_bytes(html_content.encode("utf-8"), "page.html") assert doc.metadata["format"] == "html" assert len(doc.content) > 0 class TestDocumentLoaderPDF: """PDF 解析测试""" def test_load_pdf_without_parser(self): """没有 PDF 解析器时回退到文本""" loader = DocumentLoader() # 传入一个非 PDF 二进制内容,模拟解析失败后的回退 content = b"%PDF-1.4 fake pdf content" doc = loader.load_bytes(content, "report.pdf") assert doc.metadata["format"] == "pdf" # 即使解析失败,也应该返回文档对象(内容可能为空或乱码) assert isinstance(doc, Document) class TestDocumentLoaderDocx: """Word 解析测试""" def test_load_docx_without_parser(self): """没有 python-docx 时回退到文本""" loader = DocumentLoader() # 传入一个非 docx 二进制内容 content = b"PK\x03\x04 fake docx content" doc = loader.load_bytes(content, "document.docx") assert doc.metadata["format"] == "docx" assert isinstance(doc, Document) class TestDocumentLoaderEdgeCases: """边界情况测试""" def test_empty_content(self): loader = DocumentLoader() doc = loader.load_bytes(b"", "empty.txt") assert doc.content == "" assert doc.metadata["format"] == "text" def test_unicode_content(self): loader = DocumentLoader() content = "中文内容测试\n日本語テスト\n한국어 테스트".encode("utf-8") doc = loader.load_bytes(content, "unicode.txt") assert "中文内容测试" in doc.content assert "日本語テスト" in doc.content def test_large_content(self): loader = DocumentLoader() content = "A" * 1_000_000 # 1MB text doc = loader.load_bytes(content.encode("utf-8"), "large.txt") assert len(doc.content) == 1_000_000 def test_filename_with_spaces(self): loader = DocumentLoader() content = "Test content".encode("utf-8") doc = loader.load_bytes(content, "my document.txt") assert doc.title == "my document" def test_filename_with_path(self): loader = DocumentLoader() content = "Test content".encode("utf-8") doc = loader.load_bytes(content, "reports/2024/summary.md") assert doc.metadata["format"] == "markdown" class TestDocumentLoaderXlsx: """Excel 解析边界情况测试 (#16) 覆盖 _parse_xlsx 的关键路径:空工作簿、损坏字节、列数不齐、 行截断、单元格截断、文件大小限制。 """ @staticmethod def _make_xlsx_bytes(sheet_name: str = "Sheet1", rows: list[list] | None = None) -> bytes: """构造内存中的 xlsx 字节内容。""" from openpyxl import Workbook wb = Workbook() ws = wb.active ws.title = sheet_name for row in rows or []: ws.append(row) buf = io.BytesIO() wb.save(buf) return buf.getvalue() def test_empty_workbook_falls_back_to_text(self): """空工作簿(无任何行)应返回空内容,不报错。""" loader = DocumentLoader() content = self._make_xlsx_bytes(rows=[]) doc = loader.load_bytes(content, "empty.xlsx") assert doc.metadata["format"] == "xlsx" # 空工作簿:sections 为空,text 为空字符串 if doc.metadata.get("parser") == "openpyxl": assert doc.content == "" assert doc.metadata["row_count"] == 0 assert doc.metadata["sheet_count"] == 1 def test_malformed_bytes_falls_back_to_text(self): """损坏的字节内容应回退到文本解析,不抛异常。""" loader = DocumentLoader() # 不是合法的 zip/xlsx 字节 content = b"not a real xlsx file content" doc = loader.load_bytes(content, "broken.xlsx") assert doc.metadata["format"] == "xlsx" # 应回退到 text parser assert doc.metadata["parser"] == "text" assert isinstance(doc, Document) def test_column_mismatch_produces_valid_markdown_table(self): """行内单元格数不一致时,应填充到 max_cols 保证 Markdown 表格有效。""" loader = DocumentLoader() # 第一行 3 列,第二行 2 列,第三行 4 列 rows = [ ["A1", "B1", "C1"], ["A2", "B2"], ["A3", "B3", "C3", "D3"], ] content = self._make_xlsx_bytes(rows=rows) doc = loader.load_bytes(content, "ragged.xlsx") if doc.metadata.get("parser") != "openpyxl": pytest.skip("openpyxl not available") lines = doc.content.split("\n") # 第一行是 "## Sheet1",然后是表头、分隔符、数据行 # 找到表格行(以 | 开头) table_lines = [ln for ln in lines if ln.startswith("|")] assert len(table_lines) == 4 # 1 header + 1 separator + 2 data rows # 所有表格行应有相同的列数(4 列 = max_cols) for line in table_lines: # | a | b | c | d | -> 5 个 | 分隔符表示 4 列 assert line.count("|") == 5 # 分隔符行应为 | --- | --- | --- | --- | sep_line = table_lines[1] assert sep_line.count("---") == 4 def test_row_truncation_at_max_rows(self): """行数超过 MAX_ROWS_PER_SHEET 时应截断并标记 truncated。""" loader = DocumentLoader() # 构造超过上限的行数(使用小批量验证逻辑) # ponytail: 直接构造超大工作簿太慢,用 monkeypatch 临时调小上限 original_max = MAX_ROWS_PER_SHEET import agentkit.memory.document_loader as dl_module # 临时调小上限到 5 行 dl_module.MAX_ROWS_PER_SHEET = 5 try: rows = [[f"r{i}", f"v{i}"] for i in range(20)] content = self._make_xlsx_bytes(rows=rows) doc = loader.load_bytes(content, "big.xlsx") if doc.metadata.get("parser") != "openpyxl": pytest.skip("openpyxl not available") assert doc.metadata["truncated"] is True assert doc.metadata["row_count"] == 5 assert "truncated at 5 rows" in doc.content finally: dl_module.MAX_ROWS_PER_SHEET = original_max def test_cell_truncation_at_max_chars(self): """单元格内容超过 MAX_CELL_CHARS 时应截断。""" loader = DocumentLoader() import agentkit.memory.document_loader as dl_module original_max = dl_module.MAX_CELL_CHARS dl_module.MAX_CELL_CHARS = 10 try: long_text = "X" * 100 content = self._make_xlsx_bytes(rows=[["header"], [long_text]]) doc = loader.load_bytes(content, "longcell.xlsx") if doc.metadata.get("parser") != "openpyxl": pytest.skip("openpyxl not available") # 单元格内容应被截断到 10 字符 assert "XXXXXXXXXX" in doc.content # 不应包含完整的 100 字符 assert "X" * 100 not in doc.content finally: dl_module.MAX_CELL_CHARS = original_max def test_multiple_sheets_separated_by_h2(self): """多个 sheet 应以 H2 标题分隔。""" loader = DocumentLoader() from openpyxl import Workbook wb = Workbook() ws1 = wb.active ws1.title = "First" ws1.append(["a", "b"]) ws2 = wb.create_sheet("Second") ws2.append(["c", "d"]) buf = io.BytesIO() wb.save(buf) content = buf.getvalue() doc = loader.load_bytes(content, "multi.xlsx") if doc.metadata.get("parser") != "openpyxl": pytest.skip("openpyxl not available") assert doc.metadata["sheet_count"] == 2 assert "## First" in doc.content assert "## Second" in doc.content def test_file_size_limit_raises_value_error(self): """内容超过 MAX_CONTENT_SIZE 应抛出 ValueError。""" loader = DocumentLoader() # 构造超过上限的字节(不实际分配 MAX_CONTENT_SIZE+1 字节,用 monkeypatch) import agentkit.memory.document_loader as dl_module original_max = dl_module.MAX_CONTENT_SIZE dl_module.MAX_CONTENT_SIZE = 10 try: content = b"X" * 100 # 100 > 10 with pytest.raises(ValueError, match="exceeds limit"): loader.load_bytes(content, "big.xlsx") finally: dl_module.MAX_CONTENT_SIZE = original_max def test_none_cell_values_become_empty_strings(self): """None 单元格应转为空字符串,不是 'None' 文本。""" loader = DocumentLoader() # openpyxl 中空单元格以 None 表示 rows = [ ["header1", "header2", "header3"], ["a", None, "c"], ] content = self._make_xlsx_bytes(rows=rows) doc = loader.load_bytes(content, "none_cells.xlsx") if doc.metadata.get("parser") != "openpyxl": pytest.skip("openpyxl not available") # 确保没有 "None" 字符串出现在表格中 table_lines = [ln for ln in doc.content.split("\n") if ln.startswith("|")] for line in table_lines: assert "None" not in line