545 lines
20 KiB
Python
545 lines
20 KiB
Python
"""Bug-finding tests for document processing — edge cases, error paths, concurrency.
|
|
|
|
These tests probe for bugs in:
|
|
- Concurrent database writes
|
|
- File system inconsistencies (metadata exists, file missing)
|
|
- Invalid/corrupted templates
|
|
- Boundary conditions (empty content, large content, special chars)
|
|
- Renderer edge cases (empty cells, special characters)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import io
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
from docx import Document as DocxDocument
|
|
from fastapi import FastAPI
|
|
from fastapi.testclient import TestClient
|
|
|
|
from agentkit.documents.db import delete_document, init_documents_db
|
|
from agentkit.documents.models import DocumentMeta
|
|
from agentkit.documents.renderers.excel_renderer import ExcelRenderer
|
|
from agentkit.documents.renderers.pdf_renderer import PDFRenderer
|
|
from agentkit.documents.renderers.word_renderer import WordRenderer
|
|
from agentkit.documents.service import DocumentService
|
|
from agentkit.server.routes import documents as documents_routes
|
|
from agentkit.tools.document_tool import DocumentTool
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fixtures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
@pytest.fixture
|
|
def service(tmp_path: Path) -> DocumentService:
|
|
db_path = tmp_path / "test.db"
|
|
upload_dir = tmp_path / "uploads"
|
|
asyncio.run(init_documents_db(db_path))
|
|
svc = DocumentService(upload_dir=upload_dir, db_path=db_path)
|
|
svc.register_renderer("word", WordRenderer())
|
|
svc.register_renderer("excel", ExcelRenderer())
|
|
svc.register_renderer("pdf", PDFRenderer())
|
|
return svc
|
|
|
|
|
|
@pytest.fixture
|
|
def app(service: DocumentService) -> FastAPI:
|
|
app = FastAPI()
|
|
app.state.document_service = service
|
|
app.state.server_config = None
|
|
app.include_router(documents_routes.router, prefix="/api/v1")
|
|
return app
|
|
|
|
|
|
@pytest.fixture
|
|
def client(app: FastAPI) -> TestClient:
|
|
return TestClient(app)
|
|
|
|
|
|
@pytest.fixture
|
|
def tool(service: DocumentService) -> DocumentTool:
|
|
return DocumentTool(service=service)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Concurrent database writes
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestConcurrentWrites:
|
|
"""Verify database handles concurrent writes without corruption."""
|
|
|
|
async def test_concurrent_inserts(self, service: DocumentService) -> None:
|
|
"""10 concurrent insert_document calls all succeed."""
|
|
async def create_one(i: int) -> DocumentMeta:
|
|
return await service.create_document(
|
|
format="word",
|
|
content=f"# Doc {i}",
|
|
conversation_id="conv-concurrent",
|
|
filename=f"doc-{i}.docx",
|
|
)
|
|
|
|
metas = await asyncio.gather(*[create_one(i) for i in range(10)])
|
|
|
|
# All 10 should succeed with unique IDs
|
|
ids = [m.id for m in metas]
|
|
assert len(set(ids)) == 10
|
|
|
|
# All 10 should be in the database
|
|
docs = await service.get_conversation_documents("conv-concurrent")
|
|
assert len(docs) == 10
|
|
|
|
async def test_concurrent_different_conversations(self, service: DocumentService) -> None:
|
|
"""Concurrent creates across different conversations don't cross-contaminate."""
|
|
async def create(conv_id: str) -> DocumentMeta:
|
|
return await service.create_document(
|
|
format="word",
|
|
content=f"# {conv_id}",
|
|
conversation_id=conv_id,
|
|
)
|
|
|
|
await asyncio.gather(*[create(f"conv-{i}") for i in range(5)])
|
|
|
|
for i in range(5):
|
|
docs = await service.get_conversation_documents(f"conv-{i}")
|
|
assert len(docs) == 1, f"conv-{i} should have exactly 1 doc"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# File system inconsistencies
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestFileSystemInconsistency:
|
|
"""Verify behavior when metadata and filesystem are out of sync."""
|
|
|
|
def test_download_metadata_exists_file_missing(
|
|
self, client: TestClient, service: DocumentService
|
|
) -> None:
|
|
"""Metadata exists but file was deleted from disk → 404."""
|
|
# Create a document
|
|
resp = client.post(
|
|
"/api/v1/documents/create",
|
|
json={
|
|
"format": "word",
|
|
"content": "# Test",
|
|
"conversation_id": "conv-missing",
|
|
},
|
|
)
|
|
doc_id = resp.json()["document"]["id"]
|
|
|
|
# Delete the file from disk
|
|
file_path = service.get_download_path(doc_id)
|
|
assert file_path is not None
|
|
file_path.unlink()
|
|
|
|
# Download should return 404 (file not found on disk)
|
|
dl_resp = client.get(f"/api/v1/documents/download/{doc_id}")
|
|
assert dl_resp.status_code == 404
|
|
assert "not found on disk" in dl_resp.json()["detail"].lower()
|
|
|
|
def test_get_download_path_nonexistent(self, service: DocumentService) -> None:
|
|
"""get_download_path returns None for non-existent doc_id."""
|
|
path = service.get_download_path("nonexistent-id-12345")
|
|
assert path is None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Invalid templates
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestInvalidTemplates:
|
|
"""Verify error handling for invalid template files."""
|
|
|
|
def test_upload_invalid_docx_content(
|
|
self, client: TestClient, tmp_path: Path
|
|
) -> None:
|
|
"""Upload a file with .docx extension but invalid content → should handle gracefully."""
|
|
# Create a fake .docx (just text, not a real docx)
|
|
fake_path = tmp_path / "fake.docx"
|
|
fake_path.write_text("This is not a real docx file")
|
|
|
|
with open(fake_path, "rb") as f:
|
|
resp = client.post(
|
|
"/api/v1/documents/upload-template",
|
|
files={"file": ("fake.docx", f, "application/octet-stream")},
|
|
)
|
|
# Upload itself succeeds (we only check extension)
|
|
assert resp.status_code == 200
|
|
|
|
# But using it as a template should fail gracefully
|
|
stored_name = resp.json()["stored_name"]
|
|
create_resp = client.post(
|
|
"/api/v1/documents/create",
|
|
json={
|
|
"format": "word",
|
|
"content": "",
|
|
"conversation_id": "conv-invalid",
|
|
"template": stored_name,
|
|
"template_data": {"name": "test"},
|
|
},
|
|
)
|
|
# Should NOT be 200 — invalid template should be rejected
|
|
# ponytail: currently returns 500 due to WordRenderer missing render_template
|
|
# This is a known bug — see test_documents_security.py
|
|
assert create_resp.status_code != 200, (
|
|
"Invalid template should not produce a successful document"
|
|
)
|
|
|
|
def test_create_with_nonexistent_template(self, client: TestClient) -> None:
|
|
"""template='nonexistent.docx' → 404."""
|
|
resp = client.post(
|
|
"/api/v1/documents/create",
|
|
json={
|
|
"format": "word",
|
|
"content": "# Test",
|
|
"conversation_id": "conv-1",
|
|
"template": "nonexistent-template.docx",
|
|
"template_data": {},
|
|
},
|
|
)
|
|
assert resp.status_code == 404
|
|
assert "not found" in resp.json()["detail"].lower()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Boundary conditions
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestBoundaryConditions:
|
|
"""Edge cases for content, filenames, and formats."""
|
|
|
|
def test_create_empty_content_word(self, client: TestClient) -> None:
|
|
"""Empty content for Word → still generates a valid (empty) document."""
|
|
resp = client.post(
|
|
"/api/v1/documents/create",
|
|
json={
|
|
"format": "word",
|
|
"content": "",
|
|
"conversation_id": "conv-empty",
|
|
},
|
|
)
|
|
assert resp.status_code == 200
|
|
doc_id = resp.json()["document"]["id"]
|
|
dl_resp = client.get(f"/api/v1/documents/download/{doc_id}")
|
|
assert dl_resp.status_code == 200
|
|
# Should be a valid docx (can be opened)
|
|
doc = DocxDocument(io.BytesIO(dl_resp.content))
|
|
assert doc is not None
|
|
|
|
def test_create_large_content(self, client: TestClient) -> None:
|
|
"""Large content (1MB+ of Markdown) → generates without timeout."""
|
|
# 1MB+ of content
|
|
large_content = "# Big Doc\n\n" + "Paragraph. " * 100000
|
|
assert len(large_content) > 1_000_000
|
|
|
|
resp = client.post(
|
|
"/api/v1/documents/create",
|
|
json={
|
|
"format": "word",
|
|
"content": large_content,
|
|
"conversation_id": "conv-large",
|
|
},
|
|
)
|
|
assert resp.status_code == 200
|
|
# ponytail: .docx is ZIP-compressed, so 1MB text → ~40KB file.
|
|
# Just verify the document was created and is non-trivial.
|
|
assert resp.json()["document"]["size"] > 10_000
|
|
|
|
def test_filename_unicode(self, client: TestClient) -> None:
|
|
"""Unicode filename → sanitized but preserved."""
|
|
resp = client.post(
|
|
"/api/v1/documents/create",
|
|
json={
|
|
"format": "word",
|
|
"content": "# Test",
|
|
"conversation_id": "conv-unicode",
|
|
"filename": "季度报告.docx",
|
|
},
|
|
)
|
|
assert resp.status_code == 200
|
|
filename = resp.json()["document"]["filename"]
|
|
# Unicode chars should be preserved (isalnum() returns True for CJK)
|
|
assert "季度报告" in filename or filename.endswith(".docx")
|
|
|
|
def test_filename_path_traversal_in_create(self, client: TestClient) -> None:
|
|
"""filename='../../etc/passwd' → sanitized, no path separators."""
|
|
resp = client.post(
|
|
"/api/v1/documents/create",
|
|
json={
|
|
"format": "word",
|
|
"content": "# Test",
|
|
"conversation_id": "conv-traversal",
|
|
"filename": "../../etc/passwd.docx",
|
|
},
|
|
)
|
|
assert resp.status_code == 200
|
|
filename = resp.json()["document"]["filename"]
|
|
# Path separators must be removed (prevents traversal)
|
|
assert "/" not in filename
|
|
assert "\\" not in filename
|
|
# ponytail: dots are kept by _sanitize_filename (legitimate in filenames),
|
|
# but path separators are replaced with _ — no traversal possible
|
|
|
|
def test_filename_only_dots(self, client: TestClient) -> None:
|
|
"""filename='...' → sanitized to non-empty."""
|
|
resp = client.post(
|
|
"/api/v1/documents/create",
|
|
json={
|
|
"format": "word",
|
|
"content": "# Test",
|
|
"conversation_id": "conv-dots",
|
|
"filename": "...",
|
|
},
|
|
)
|
|
assert resp.status_code == 200
|
|
filename = resp.json()["document"]["filename"]
|
|
# Should not be empty after sanitization
|
|
assert len(filename) > 0
|
|
assert filename.endswith(".docx")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Renderer edge cases
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestRendererEdgeCases:
|
|
"""Edge cases in Markdown → format rendering."""
|
|
|
|
def test_excel_empty_cells_in_markdown_table(self, service: DocumentService) -> None:
|
|
"""Markdown table with empty cells → renders correctly."""
|
|
async def run():
|
|
return await service.create_document(
|
|
format="excel",
|
|
content="| A | B | C |\n|---|---|---|\n| x | | z |",
|
|
conversation_id="conv-empty-cells",
|
|
)
|
|
|
|
meta = asyncio.run(run())
|
|
path = service.get_download_path(meta.id)
|
|
from openpyxl import load_workbook
|
|
|
|
wb = load_workbook(path)
|
|
ws = wb["Table1"]
|
|
# Row 1: header (A, B, C), Row 2: data (x, empty, z)
|
|
assert ws["A1"].value == "A"
|
|
assert ws["B1"].value == "B"
|
|
assert ws["C1"].value == "C"
|
|
assert ws["A2"].value == "x"
|
|
assert ws["B2"].value is None or ws["B2"].value == ""
|
|
assert ws["C2"].value == "z"
|
|
wb.close()
|
|
|
|
def test_excel_pipe_in_content(self, service: DocumentService) -> None:
|
|
"""Cell content containing pipe character → handled gracefully."""
|
|
async def run():
|
|
return await service.create_document(
|
|
format="excel",
|
|
content='{"Data": [["a|b", "c"]]}',
|
|
conversation_id="conv-pipe",
|
|
)
|
|
|
|
meta = asyncio.run(run())
|
|
path = service.get_download_path(meta.id)
|
|
from openpyxl import load_workbook
|
|
|
|
wb = load_workbook(path)
|
|
ws = wb.active
|
|
# The pipe should be in the cell content
|
|
assert ws["A1"].value == "a|b"
|
|
wb.close()
|
|
|
|
def test_pdf_mixed_cjk_ascii(self, service: DocumentService) -> None:
|
|
"""Mixed CJK and ASCII text in PDF → generates without error."""
|
|
async def run():
|
|
return await service.create_document(
|
|
format="pdf",
|
|
content="# 混合 Mixed Content 内容\n\nEnglish and 中文 mixed.\n\n表格 Table:",
|
|
conversation_id="conv-cjk",
|
|
)
|
|
|
|
meta = asyncio.run(run())
|
|
path = service.get_download_path(meta.id)
|
|
assert path.exists()
|
|
# Verify it's a valid PDF
|
|
content = path.read_bytes()
|
|
assert content[:4] == b"%PDF"
|
|
assert len(content) > 1000 # Non-trivial size
|
|
|
|
def test_word_nested_formatting(self, service: DocumentService) -> None:
|
|
"""Nested formatting (bold inside italic) → doesn't crash."""
|
|
async def run():
|
|
return await service.create_document(
|
|
format="word",
|
|
content="# Test\n\n**bold *italic* bold**\n\n*italic **bold** italic*",
|
|
conversation_id="conv-nested",
|
|
)
|
|
|
|
meta = asyncio.run(run())
|
|
path = service.get_download_path(meta.id)
|
|
assert path.exists()
|
|
# Should be a valid docx
|
|
doc = DocxDocument(str(path))
|
|
text = "\n".join(p.text for p in doc.paragraphs)
|
|
assert "bold" in text
|
|
assert "italic" in text
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# DocumentLoader read edge cases
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestReadEdgeCases:
|
|
"""Edge cases for document reading (U9)."""
|
|
|
|
def test_read_pdf_file(self, service: DocumentService, tool: DocumentTool) -> None:
|
|
"""Read a PDF file created by the tool → returns text content."""
|
|
async def setup():
|
|
return await tool.execute(
|
|
action="create",
|
|
format="pdf",
|
|
content="# PDF Read Test\n\nThis is PDF content to read.",
|
|
conversation_id="conv-read-pdf",
|
|
)
|
|
|
|
result = asyncio.run(setup())
|
|
doc_id = result["document"]["id"]
|
|
path = service.get_download_path(doc_id)
|
|
|
|
# Read it back
|
|
async def read():
|
|
return await tool.execute(
|
|
action="read",
|
|
filename=str(path),
|
|
conversation_id="conv-read-pdf",
|
|
)
|
|
|
|
read_result = asyncio.run(read())
|
|
assert read_result["success"] is True
|
|
assert "PDF Read Test" in read_result["content"]
|
|
assert read_result["metadata"]["format"] == "pdf"
|
|
|
|
def test_read_html_file(self, tool: DocumentTool, tmp_path: Path) -> None:
|
|
"""Read an HTML file → returns text (tags stripped if bs4 available)."""
|
|
html_file = tmp_path / "test.html"
|
|
html_file.write_text(
|
|
"<html><head><title>Test Page</title></head>"
|
|
"<body><h1>Heading</h1><p>Paragraph text</p></body></html>",
|
|
encoding="utf-8",
|
|
)
|
|
|
|
async def read():
|
|
return await tool.execute(
|
|
action="read",
|
|
filename=str(html_file),
|
|
conversation_id="conv-1",
|
|
)
|
|
|
|
result = asyncio.run(read())
|
|
assert result["success"] is True
|
|
# Content should contain the text — either stripped (bs4) or raw (fallback)
|
|
assert "Heading" in result["content"]
|
|
assert "Paragraph text" in result["content"]
|
|
# If bs4 is available, tags should be stripped; otherwise raw HTML is returned
|
|
try:
|
|
import bs4 # noqa: F401
|
|
|
|
bs4_available = True
|
|
except ImportError:
|
|
bs4_available = False
|
|
|
|
if bs4_available:
|
|
assert "<h1>" not in result["content"]
|
|
assert "<p>" not in result["content"]
|
|
|
|
def test_read_empty_file(self, tool: DocumentTool, tmp_path: Path) -> None:
|
|
"""Read an empty file → returns empty content."""
|
|
empty_file = tmp_path / "empty.txt"
|
|
empty_file.write_text("", encoding="utf-8")
|
|
|
|
async def read():
|
|
return await tool.execute(
|
|
action="read",
|
|
filename=str(empty_file),
|
|
conversation_id="conv-1",
|
|
)
|
|
|
|
result = asyncio.run(read())
|
|
assert result["success"] is True
|
|
assert result["content"] == ""
|
|
|
|
def test_read_binary_file_as_text(self, tool: DocumentTool, tmp_path: Path) -> None:
|
|
"""Read a binary file with .txt extension → doesn't crash, returns something."""
|
|
binary_file = tmp_path / "binary.txt"
|
|
binary_file.write_bytes(b"\x00\x01\x02\xff\xfe")
|
|
|
|
async def read():
|
|
return await tool.execute(
|
|
action="read",
|
|
filename=str(binary_file),
|
|
conversation_id="conv-1",
|
|
)
|
|
|
|
result = asyncio.run(read())
|
|
# Should not crash — text parser uses errors="replace"
|
|
assert result["success"] is True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Database edge cases
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestDatabaseEdgeCases:
|
|
"""Edge cases for document metadata database."""
|
|
|
|
async def test_insert_and_retrieve_roundtrip(self, service: DocumentService) -> None:
|
|
"""Insert a document and retrieve it — all fields preserved."""
|
|
meta = await service.create_document(
|
|
format="word",
|
|
content="# Roundtrip Test",
|
|
conversation_id="conv-roundtrip",
|
|
filename="roundtrip.docx",
|
|
)
|
|
|
|
retrieved = await service.get_document(meta.id)
|
|
assert retrieved is not None
|
|
assert retrieved.id == meta.id
|
|
assert retrieved.filename == meta.filename
|
|
assert retrieved.format == meta.format
|
|
assert retrieved.size == meta.size
|
|
assert retrieved.conversation_id == meta.conversation_id
|
|
assert retrieved.stored_name == meta.stored_name
|
|
|
|
async def test_get_nonexistent_document(self, service: DocumentService) -> None:
|
|
"""get_document with non-existent ID returns None."""
|
|
result = await service.get_document("nonexistent-id")
|
|
assert result is None
|
|
|
|
async def test_delete_document_removes_metadata(self, service: DocumentService) -> None:
|
|
"""After delete, get_document returns None."""
|
|
meta = await service.create_document(
|
|
format="word",
|
|
content="# Delete Me",
|
|
conversation_id="conv-delete",
|
|
)
|
|
|
|
deleted = await delete_document(meta.id, service.db_path)
|
|
assert deleted is True
|
|
|
|
# Metadata should be gone
|
|
result = await service.get_document(meta.id)
|
|
assert result is None
|
|
|
|
# Second delete returns False
|
|
deleted_again = await delete_document(meta.id, service.db_path)
|
|
assert deleted_again is False
|