{
+ if (!conversationId) return
+ loadingConversations.value.add(conversationId)
+ try {
+ const resp = await documentApi.listByConversation(conversationId)
+ documentsByConversation.value.set(conversationId, resp.documents || [])
+ } catch (e) {
+ console.error('Failed to fetch documents:', e)
+ } finally {
+ loadingConversations.value.delete(conversationId)
+ }
+ }
+
+ /** Add a document to a conversation (called when Agent creates one) */
+ function addDocument(conversationId: string, doc: IDocumentMeta): void {
+ const existing = documentsByConversation.value.get(conversationId) || []
+ // Prepend (newest first)
+ documentsByConversation.value.set(conversationId, [doc, ...existing])
+ }
+
+ /** Clear documents for a conversation */
+ function clearConversation(conversationId: string): void {
+ documentsByConversation.value.delete(conversationId)
+ }
+
+ return {
+ documentsByConversation,
+ loadingConversations,
+ getDocuments,
+ fetchDocuments,
+ addDocument,
+ clearConversation,
+ }
+})
diff --git a/src/agentkit/server/frontend/src/views/ChatView.vue b/src/agentkit/server/frontend/src/views/ChatView.vue
index eb26943..70922a5 100644
--- a/src/agentkit/server/frontend/src/views/ChatView.vue
+++ b/src/agentkit/server/frontend/src/views/ChatView.vue
@@ -86,6 +86,10 @@
+
@@ -107,6 +111,7 @@ import ChatMessage from '@/components/chat/ChatMessage.vue'
import ChatInput from '@/components/chat/ChatInput.vue'
import ExpertTeamView from '@/components/chat/ExpertTeamView.vue'
import BoardStatusView from '@/components/chat/BoardStatusView.vue'
+import DocumentPanel from '@/components/chat/DocumentPanel.vue'
const ATypographyText = ATypography.Text
diff --git a/src/agentkit/server/routes/documents.py b/src/agentkit/server/routes/documents.py
new file mode 100644
index 0000000..0282f58
--- /dev/null
+++ b/src/agentkit/server/routes/documents.py
@@ -0,0 +1,248 @@
+"""REST API routes for document operations (U7).
+
+Thin wrapper over DocumentService. All business logic lives in the
+service layer — routes handle HTTP concerns (auth, file upload/download,
+request validation).
+
+Endpoints:
+- POST /api/v1/documents/create — create a document from Markdown/JSON
+- POST /api/v1/documents/upload-template — upload a .docx template
+- GET /api/v1/documents/conversation/{conversation_id} — list docs
+- GET /api/v1/documents/download/{doc_id} — download a document
+"""
+
+from __future__ import annotations
+
+import hmac
+import logging
+import uuid
+from typing import Any
+
+from fastapi import (
+ APIRouter,
+ Depends,
+ File,
+ HTTPException,
+ Request,
+ Security,
+ UploadFile,
+)
+from fastapi.responses import FileResponse
+from fastapi.security import APIKeyHeader, APIKeyQuery
+from pydantic import BaseModel
+
+from agentkit.documents.service import DocumentService
+
+logger = logging.getLogger(__name__)
+
+router = APIRouter(prefix="/documents", tags=["documents"])
+
+MAX_TEMPLATE_SIZE = 50 * 1024 * 1024 # 50 MB
+
+# ---------------------------------------------------------------------------
+# Authentication (mirrors kb_management.py pattern)
+# ---------------------------------------------------------------------------
+
+_api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
+_api_key_query = APIKeyQuery(name="api_key", auto_error=False)
+
+
+async def _verify_api_key(
+ request: Request,
+ api_key_header: str | None = Security(_api_key_header),
+ api_key_query: str | None = Security(_api_key_query),
+) -> None:
+ """Verify API key for document endpoints. Raises 401 if invalid."""
+ configured: str | None = None
+ if hasattr(request.app.state, "server_config") and request.app.state.server_config:
+ configured = request.app.state.server_config.api_key
+ if configured is None and hasattr(request.app.state, "api_key"):
+ configured = request.app.state.api_key
+
+ # No key configured → allow all (backwards compat, same as kb_management)
+ if configured is None:
+ return
+
+ provided = api_key_header or api_key_query
+ if not hmac.compare_digest((provided or "").encode(), configured.encode()):
+ raise HTTPException(
+ status_code=401,
+ detail="Invalid or missing API key. Provide via X-API-Key header or api_key query.",
+ )
+
+
+def _get_document_service(request: Request) -> DocumentService:
+ """Get DocumentService from app.state. Raises 500 if not initialized."""
+ service = getattr(request.app.state, "document_service", None)
+ if service is None:
+ raise HTTPException(
+ status_code=503,
+ detail="Document service not available. Server may not have initialized it.",
+ )
+ return service
+
+
+# ---------------------------------------------------------------------------
+# Request / response models
+# ---------------------------------------------------------------------------
+
+
+class CreateDocumentRequest(BaseModel):
+ format: str # "word" | "excel" | "pdf"
+ content: str
+ conversation_id: str
+ filename: str | None = None
+ template: str | None = None # template file path (stored_name in uploads)
+ template_data: dict[str, Any] | None = None
+
+
+class DocumentResponse(BaseModel):
+ id: str
+ filename: str
+ format: str
+ size: int
+ conversation_id: str
+ created_at: str
+ download_url: str
+
+
+# ---------------------------------------------------------------------------
+# Endpoints
+# ---------------------------------------------------------------------------
+
+
+@router.post("/create", dependencies=[Depends(_verify_api_key)])
+async def create_document(
+ body: CreateDocumentRequest,
+ request: Request,
+) -> dict[str, Any]:
+ """Create a document from Markdown content or a template.
+
+ Returns document metadata including a download URL.
+ """
+ service = _get_document_service(request)
+
+ # If template is provided, resolve its path from stored_name
+ template_path: str | None = None
+ if body.template:
+ # Security: prevent path traversal — resolved path must stay within upload_dir.
+ # Also rejects null bytes and other invalid path characters (OS-level defense).
+ upload_dir_resolved = service.upload_dir.resolve()
+ try:
+ candidate = (upload_dir_resolved / body.template).resolve()
+ candidate.relative_to(upload_dir_resolved)
+ except (ValueError, OSError) as exc:
+ raise HTTPException(
+ status_code=400,
+ detail="Invalid template name: path traversal or invalid characters detected",
+ ) from exc
+ if not candidate.exists():
+ raise HTTPException(status_code=404, detail=f"Template not found: {body.template}")
+ template_path = str(candidate)
+
+ try:
+ meta = await service.create_document(
+ format=body.format,
+ content=body.content,
+ conversation_id=body.conversation_id,
+ filename=body.filename,
+ template_path=template_path,
+ template_data=body.template_data,
+ )
+ meta.download_url = f"/api/v1/documents/download/{meta.id}"
+ return {"success": True, "document": meta.to_dict()}
+ except ValueError as e:
+ raise HTTPException(status_code=400, detail=str(e)) from e
+ except FileNotFoundError as e:
+ raise HTTPException(status_code=404, detail=str(e)) from e
+ except Exception as e:
+ logger.error(f"Document creation failed: {e}")
+ raise HTTPException(status_code=500, detail="Document creation failed") from e
+
+
+@router.post("/upload-template", dependencies=[Depends(_verify_api_key)])
+async def upload_template(
+ request: Request,
+ file: UploadFile = File(...),
+) -> dict[str, Any]:
+ """Upload a .docx template file for later use in document creation.
+
+ Returns the stored_name to use in the /create endpoint's template field.
+ """
+ if file.size is not None and file.size > MAX_TEMPLATE_SIZE:
+ raise HTTPException(status_code=413, detail="Template exceeds 50 MB limit")
+
+ if not (file.filename or "").lower().endswith(".docx"):
+ raise HTTPException(status_code=400, detail="Only .docx templates are supported")
+
+ service = _get_document_service(request)
+ upload_dir = service._ensure_upload_dir()
+ stored_name = f"template-{uuid.uuid4().hex}.docx"
+ file_path = upload_dir / stored_name
+
+ try:
+ contents = await file.read()
+ if len(contents) > MAX_TEMPLATE_SIZE:
+ raise HTTPException(status_code=413, detail="Template exceeds 50 MB limit")
+ file_path.write_bytes(contents)
+ except HTTPException:
+ raise
+ except Exception as exc:
+ logger.error(f"Failed to save template: {exc}")
+ raise HTTPException(status_code=500, detail="Failed to save template") from exc
+ finally:
+ await file.close()
+
+ return {
+ "success": True,
+ "stored_name": stored_name,
+ "filename": file.filename,
+ "size": file_path.stat().st_size,
+ "message": f"Template uploaded. Use '{stored_name}' as the template field in /create.",
+ }
+
+
+@router.get(
+ "/conversation/{conversation_id}",
+ dependencies=[Depends(_verify_api_key)],
+)
+async def list_conversation_documents(
+ conversation_id: str,
+ request: Request,
+) -> dict[str, Any]:
+ """List all documents for a conversation, newest first."""
+ service = _get_document_service(request)
+ docs = await service.get_conversation_documents(conversation_id)
+ for doc in docs:
+ doc.download_url = f"/api/v1/documents/download/{doc.id}"
+ return {
+ "success": True,
+ "conversation_id": conversation_id,
+ "documents": [d.to_dict() for d in docs],
+ "count": len(docs),
+ }
+
+
+@router.get("/download/{doc_id}", dependencies=[Depends(_verify_api_key)])
+async def download_document(
+ doc_id: str,
+ request: Request,
+) -> FileResponse:
+ """Download a document by its ID."""
+ service = _get_document_service(request)
+
+ # Verify the document exists in metadata
+ meta = await service.get_document(doc_id)
+ if meta is None:
+ raise HTTPException(status_code=404, detail="Document not found")
+
+ # Find the file on disk
+ file_path = service.get_download_path(doc_id)
+ if file_path is None or not file_path.exists():
+ raise HTTPException(status_code=404, detail="Document file not found on disk")
+
+ return FileResponse(
+ path=str(file_path),
+ filename=meta.filename,
+ media_type="application/octet-stream",
+ )
diff --git a/src/agentkit/tools/document_tool.py b/src/agentkit/tools/document_tool.py
new file mode 100644
index 0000000..06b1807
--- /dev/null
+++ b/src/agentkit/tools/document_tool.py
@@ -0,0 +1,158 @@
+"""DocumentTool — Agent tool for creating and reading formatted documents.
+
+Wraps DocumentService (create) and DocumentLoader (read) so the LLM can
+handle documents via function calling. U6 implements "create"; U9 adds "read".
+"""
+
+from __future__ import annotations
+
+from pathlib import Path
+from typing import Any
+
+from agentkit.documents.service import DocumentService
+from agentkit.memory.document_loader import DocumentLoader
+from agentkit.tools.base import Tool
+
+
+class DocumentTool(Tool):
+ """Agent tool for document creation (Word/Excel/PDF) and reading.
+
+ The tool delegates all business logic to DocumentService (create) or
+ DocumentLoader (read) — it only handles input validation and result
+ formatting.
+ """
+
+ def __init__(self, service: DocumentService, loader: DocumentLoader | None = None):
+ super().__init__(
+ name="document",
+ description=(
+ "Create formatted documents (Word/Excel/PDF) from Markdown content, "
+ "fill a Word template with data, or read/extract text from an existing "
+ "document file (PDF/Word/Excel/Markdown/HTML/text). "
+ "Use action='create' to generate, action='read' to extract content."
+ ),
+ input_schema={
+ "type": "object",
+ "properties": {
+ "action": {
+ "type": "string",
+ "enum": ["create", "read"],
+ "description": (
+ "Operation: 'create' (default) generates a new document; "
+ "'read' extracts text from an existing file path."
+ ),
+ },
+ "format": {
+ "type": "string",
+ "enum": ["word", "excel", "pdf"],
+ "description": "Output format for create: word (.docx), excel (.xlsx), or pdf (.pdf)",
+ },
+ "content": {
+ "type": "string",
+ "description": (
+ "For create: Markdown-formatted document content. For word/excel/pdf, "
+ "use Markdown headings (#), lists (- or 1.), and tables (| col |). "
+ "For excel, can also be JSON: {\"SheetName\": [[row], ...]}"
+ ),
+ },
+ "filename": {
+ "type": "string",
+ "description": (
+ "For create: display filename (optional, auto-generated if omitted). "
+ "For read: absolute or relative path to the file to read."
+ ),
+ },
+ "conversation_id": {
+ "type": "string",
+ "description": "Conversation ID to associate the document with (create only)",
+ },
+ "template": {
+ "type": "string",
+ "description": "Path to a .docx template file (optional, word create only). Fills Jinja2 placeholders.",
+ },
+ "template_data": {
+ "type": "object",
+ "description": "Data dict for Jinja2 template filling (optional, used with template)",
+ },
+ },
+ "required": ["conversation_id"],
+ },
+ )
+ self._service = service
+ self._loader = loader or DocumentLoader()
+
+ async def execute(self, **kwargs) -> dict[str, Any]:
+ action = kwargs.get("action", "create")
+
+ if action == "read":
+ return await self._execute_read(**kwargs)
+ if action == "create":
+ return await self._execute_create(**kwargs)
+ return {"success": False, "error": f"Unknown action: {action!r} (use 'create' or 'read')"}
+
+ async def _execute_create(self, **kwargs) -> dict[str, Any]:
+ format_key = kwargs.get("format", "")
+ content = kwargs.get("content", "")
+ conversation_id = kwargs.get("conversation_id", "")
+ filename = kwargs.get("filename")
+ template = kwargs.get("template")
+ template_data = kwargs.get("template_data")
+
+ if not format_key:
+ return {"success": False, "error": "format is required for create (word/excel/pdf)"}
+ if not conversation_id:
+ return {"success": False, "error": "conversation_id is required"}
+ if not content and not template:
+ return {
+ "success": False,
+ "error": "content is required (or template for template filling)",
+ }
+
+ try:
+ meta = await self._service.create_document(
+ format=format_key,
+ content=content,
+ conversation_id=conversation_id,
+ filename=filename,
+ template_path=template,
+ template_data=template_data,
+ )
+ return {
+ "success": True,
+ "document": meta.to_dict(),
+ "message": f"Created {meta.format} document: {meta.filename} ({meta.size} bytes)",
+ }
+ except ValueError as e:
+ return {"success": False, "error": str(e)}
+ except FileNotFoundError as e:
+ return {"success": False, "error": f"Template not found: {e}"}
+ except Exception as e:
+ return {"success": False, "error": f"Document creation failed: {e}"}
+
+ async def _execute_read(self, **kwargs) -> dict[str, Any]:
+ file_path = kwargs.get("filename") or kwargs.get("content")
+ if not file_path:
+ return {"success": False, "error": "filename (file path) is required for read"}
+
+ path = Path(file_path)
+ if not path.is_absolute():
+ # ponytail: resolve relative paths against cwd; DocumentService upload_dir
+ # is the typical anchor but we don't want to couple read to create's storage.
+ path = path.resolve()
+
+ try:
+ doc = self._loader.load(path)
+ return {
+ "success": True,
+ "content": doc.content,
+ "title": doc.title,
+ "metadata": doc.metadata,
+ "message": (
+ f"Read {doc.metadata.get('format', 'unknown')} document "
+ f"({len(doc.content)} chars)"
+ ),
+ }
+ except FileNotFoundError as e:
+ return {"success": False, "error": str(e)}
+ except Exception as e:
+ return {"success": False, "error": f"Document read failed: {e}"}
diff --git a/tests/documents/test_db.py b/tests/documents/test_db.py
new file mode 100644
index 0000000..5b915c6
--- /dev/null
+++ b/tests/documents/test_db.py
@@ -0,0 +1,254 @@
+"""Tests for document DB persistence and DocumentService metadata operations.
+
+Covers U1: DocumentService core architecture + database model.
+Renderer-specific tests live in test_word_renderer.py etc.
+"""
+
+from __future__ import annotations
+
+import asyncio
+from pathlib import Path
+
+import pytest
+
+from agentkit.documents.db import (
+ delete_document,
+ get_conversation_documents,
+ get_document_by_id,
+ init_documents_db,
+ insert_document,
+)
+from agentkit.documents.models import DocumentMeta
+from agentkit.documents.service import DocumentService, _sanitize_filename
+
+
+# ---------------------------------------------------------------------------
+# Fixtures
+# ---------------------------------------------------------------------------
+
+
+@pytest.fixture
+def tmp_db(tmp_path: Path) -> Path:
+ """Provide a fresh documents DB for each test."""
+ db_path = tmp_path / "test_documents.db"
+ asyncio.run(init_documents_db(db_path))
+ return db_path
+
+
+def _make_meta(
+ doc_id: str = "test-id-1",
+ filename: str = "report.docx",
+ conversation_id: str = "conv-1",
+ format: str = "word",
+ created_at: str = "2026-06-23T00:00:00+00:00",
+) -> DocumentMeta:
+ return DocumentMeta(
+ id=doc_id,
+ filename=filename,
+ stored_name=f"{doc_id}.docx",
+ format=format,
+ size=1024,
+ conversation_id=conversation_id,
+ created_at=created_at,
+ )
+
+
+# ---------------------------------------------------------------------------
+# init_documents_db
+# ---------------------------------------------------------------------------
+
+
+async def test_init_db_idempotent(tmp_path: Path) -> None:
+ """init_documents_db called twice should not raise."""
+ db_path = tmp_path / "test.db"
+ await init_documents_db(db_path)
+ await init_documents_db(db_path) # second call is a no-op
+ assert db_path.exists()
+
+
+async def test_init_db_creates_parent_dir(tmp_path: Path) -> None:
+ """init_documents_db creates parent directories if missing."""
+ db_path = tmp_path / "nested" / "deep" / "test.db"
+ await init_documents_db(db_path)
+ assert db_path.exists()
+
+
+# ---------------------------------------------------------------------------
+# insert + query
+# ---------------------------------------------------------------------------
+
+
+async def test_insert_and_get_by_id(tmp_db: Path) -> None:
+ """Inserted document is retrievable by id."""
+ meta = _make_meta()
+ await insert_document(meta, tmp_db)
+
+ result = await get_document_by_id("test-id-1", tmp_db)
+ assert result is not None
+ assert result.id == "test-id-1"
+ assert result.filename == "report.docx"
+ assert result.format == "word"
+ assert result.size == 1024
+ assert result.conversation_id == "conv-1"
+
+
+async def test_get_by_id_not_found(tmp_db: Path) -> None:
+ """Non-existent id returns None."""
+ result = await get_document_by_id("does-not-exist", tmp_db)
+ assert result is None
+
+
+async def test_get_conversation_documents(tmp_db: Path) -> None:
+ """Multiple documents for a conversation are returned newest-first."""
+ meta1 = _make_meta(doc_id="doc-1", created_at="2026-06-23T10:00:00+00:00")
+ meta2 = _make_meta(doc_id="doc-2", created_at="2026-06-23T11:00:00+00:00")
+ meta3 = _make_meta(
+ doc_id="doc-3", conversation_id="conv-2", created_at="2026-06-23T12:00:00+00:00"
+ )
+ await insert_document(meta1, tmp_db)
+ await insert_document(meta2, tmp_db)
+ await insert_document(meta3, tmp_db)
+
+ conv1_docs = await get_conversation_documents("conv-1", tmp_db)
+ assert len(conv1_docs) == 2
+ # Newest first
+ assert conv1_docs[0].id == "doc-2"
+ assert conv1_docs[1].id == "doc-1"
+
+ conv2_docs = await get_conversation_documents("conv-2", tmp_db)
+ assert len(conv2_docs) == 1
+ assert conv2_docs[0].id == "doc-3"
+
+
+async def test_get_conversation_documents_empty(tmp_db: Path) -> None:
+ """Non-existent conversation_id returns empty list."""
+ result = await get_conversation_documents("no-such-conv", tmp_db)
+ assert result == []
+
+
+# ---------------------------------------------------------------------------
+# delete
+# ---------------------------------------------------------------------------
+
+
+async def test_delete_document(tmp_db: Path) -> None:
+ """Delete removes the row and returns True; second delete returns False."""
+ meta = _make_meta()
+ await insert_document(meta, tmp_db)
+
+ deleted = await delete_document("test-id-1", tmp_db)
+ assert deleted is True
+
+ # Second delete is a no-op
+ deleted_again = await delete_document("test-id-1", tmp_db)
+ assert deleted_again is False
+
+ # Row is gone
+ result = await get_document_by_id("test-id-1", tmp_db)
+ assert result is None
+
+
+# ---------------------------------------------------------------------------
+# _sanitize_filename (path traversal protection)
+# ---------------------------------------------------------------------------
+
+
+def test_sanitize_filename_removes_path_separators() -> None:
+ """Path traversal characters are stripped — no '/' or '\\' survives."""
+ # The sanitizer replaces path separators with '_' then keeps alnum + . _ -
+ # Key security property: no '/' or '\\' remains, so path traversal is blocked.
+ result1 = _sanitize_filename("../../etc/passwd")
+ assert "/" not in result1
+ assert "\\" not in result1
+ assert "passwd" in result1
+
+ result2 = _sanitize_filename("..\\..\\windows\\system32")
+ assert "/" not in result2
+ assert "\\" not in result2
+ assert "system32" in result2
+
+ # Normal filenames are preserved
+ assert _sanitize_filename("safe-name_v1.0.txt") == "safe-name_v1.0.txt"
+
+
+def test_sanitize_filename_empty() -> None:
+ """Empty input returns empty string; separator-only input is neutralized."""
+ assert _sanitize_filename("") == ""
+ # Separator-only input becomes underscores — no path traversal possible.
+ result = _sanitize_filename("///")
+ assert "/" not in result
+ assert "\\" not in result
+
+
+# ---------------------------------------------------------------------------
+# DocumentService (metadata + download path, no rendering in U1)
+# ---------------------------------------------------------------------------
+
+
+async def test_service_get_download_path(tmp_path: Path) -> None:
+ """get_download_path finds the file on disk by trying known extensions."""
+ db_path = tmp_path / "test.db"
+ upload_dir = tmp_path / "uploads"
+ await init_documents_db(db_path)
+
+ service = DocumentService(upload_dir=upload_dir, db_path=db_path)
+
+ # Create a fake file on disk
+ doc_id = "abc123"
+ fake_file = upload_dir / f"{doc_id}.docx"
+ upload_dir.mkdir(parents=True, exist_ok=True)
+ fake_file.write_bytes(b"fake docx content")
+
+ path = service.get_download_path(doc_id)
+ assert path is not None
+ assert path.name == f"{doc_id}.docx"
+
+
+async def test_service_get_download_path_not_found(tmp_path: Path) -> None:
+ """get_download_path returns None when no file exists."""
+ db_path = tmp_path / "test.db"
+ upload_dir = tmp_path / "uploads"
+ await init_documents_db(db_path)
+
+ service = DocumentService(upload_dir=upload_dir, db_path=db_path)
+ path = service.get_download_path("nonexistent-id")
+ assert path is None
+
+
+async def test_service_create_without_renderer_raises(tmp_path: Path) -> None:
+ """create_document raises ValueError when no renderer is registered."""
+ db_path = tmp_path / "test.db"
+ upload_dir = tmp_path / "uploads"
+ await init_documents_db(db_path)
+
+ service = DocumentService(upload_dir=upload_dir, db_path=db_path)
+ with pytest.raises(ValueError, match="No renderer registered"):
+ await service.create_document(
+ format="word", content="# Test", conversation_id="conv-1"
+ )
+
+
+async def test_service_create_unsupported_format_raises(tmp_path: Path) -> None:
+ """create_document raises ValueError for unsupported format."""
+ db_path = tmp_path / "test.db"
+ await init_documents_db(db_path)
+
+ service = DocumentService(upload_dir=tmp_path / "uploads", db_path=db_path)
+ with pytest.raises(ValueError, match="Unsupported format"):
+ await service.create_document(
+ format="pptx", content="# Test", conversation_id="conv-1"
+ )
+
+
+async def test_service_get_conversation_documents(tmp_path: Path) -> None:
+ """DocumentService.get_conversation_documents delegates to db module."""
+ db_path = tmp_path / "test.db"
+ await init_documents_db(db_path)
+
+ meta = _make_meta()
+ await insert_document(meta, db_path)
+
+ service = DocumentService(upload_dir=tmp_path / "uploads", db_path=db_path)
+ docs = await service.get_conversation_documents("conv-1")
+ assert len(docs) == 1
+ assert docs[0].id == "test-id-1"
diff --git a/tests/documents/test_document_bugs.py b/tests/documents/test_document_bugs.py
new file mode 100644
index 0000000..ad2b5b4
--- /dev/null
+++ b/tests/documents/test_document_bugs.py
@@ -0,0 +1,544 @@
+"""Bug-finding tests for document processing — edge cases, error paths, concurrency.
+
+These tests probe for bugs in:
+- Concurrent database writes
+- File system inconsistencies (metadata exists, file missing)
+- Invalid/corrupted templates
+- Boundary conditions (empty content, large content, special chars)
+- Renderer edge cases (empty cells, special characters)
+"""
+
+from __future__ import annotations
+
+import asyncio
+import io
+from pathlib import Path
+
+import pytest
+from docx import Document as DocxDocument
+from fastapi import FastAPI
+from fastapi.testclient import TestClient
+
+from agentkit.documents.db import delete_document, init_documents_db
+from agentkit.documents.models import DocumentMeta
+from agentkit.documents.renderers.excel_renderer import ExcelRenderer
+from agentkit.documents.renderers.pdf_renderer import PDFRenderer
+from agentkit.documents.renderers.word_renderer import WordRenderer
+from agentkit.documents.service import DocumentService
+from agentkit.server.routes import documents as documents_routes
+from agentkit.tools.document_tool import DocumentTool
+
+
+# ---------------------------------------------------------------------------
+# Fixtures
+# ---------------------------------------------------------------------------
+
+
+@pytest.fixture
+def service(tmp_path: Path) -> DocumentService:
+ db_path = tmp_path / "test.db"
+ upload_dir = tmp_path / "uploads"
+ asyncio.run(init_documents_db(db_path))
+ svc = DocumentService(upload_dir=upload_dir, db_path=db_path)
+ svc.register_renderer("word", WordRenderer())
+ svc.register_renderer("excel", ExcelRenderer())
+ svc.register_renderer("pdf", PDFRenderer())
+ return svc
+
+
+@pytest.fixture
+def app(service: DocumentService) -> FastAPI:
+ app = FastAPI()
+ app.state.document_service = service
+ app.state.server_config = None
+ app.include_router(documents_routes.router, prefix="/api/v1")
+ return app
+
+
+@pytest.fixture
+def client(app: FastAPI) -> TestClient:
+ return TestClient(app)
+
+
+@pytest.fixture
+def tool(service: DocumentService) -> DocumentTool:
+ return DocumentTool(service=service)
+
+
+# ---------------------------------------------------------------------------
+# Concurrent database writes
+# ---------------------------------------------------------------------------
+
+
+class TestConcurrentWrites:
+ """Verify database handles concurrent writes without corruption."""
+
+ async def test_concurrent_inserts(self, service: DocumentService) -> None:
+ """10 concurrent insert_document calls all succeed."""
+ async def create_one(i: int) -> DocumentMeta:
+ return await service.create_document(
+ format="word",
+ content=f"# Doc {i}",
+ conversation_id="conv-concurrent",
+ filename=f"doc-{i}.docx",
+ )
+
+ metas = await asyncio.gather(*[create_one(i) for i in range(10)])
+
+ # All 10 should succeed with unique IDs
+ ids = [m.id for m in metas]
+ assert len(set(ids)) == 10
+
+ # All 10 should be in the database
+ docs = await service.get_conversation_documents("conv-concurrent")
+ assert len(docs) == 10
+
+ async def test_concurrent_different_conversations(self, service: DocumentService) -> None:
+ """Concurrent creates across different conversations don't cross-contaminate."""
+ async def create(conv_id: str) -> DocumentMeta:
+ return await service.create_document(
+ format="word",
+ content=f"# {conv_id}",
+ conversation_id=conv_id,
+ )
+
+ await asyncio.gather(*[create(f"conv-{i}") for i in range(5)])
+
+ for i in range(5):
+ docs = await service.get_conversation_documents(f"conv-{i}")
+ assert len(docs) == 1, f"conv-{i} should have exactly 1 doc"
+
+
+# ---------------------------------------------------------------------------
+# File system inconsistencies
+# ---------------------------------------------------------------------------
+
+
+class TestFileSystemInconsistency:
+ """Verify behavior when metadata and filesystem are out of sync."""
+
+ def test_download_metadata_exists_file_missing(
+ self, client: TestClient, service: DocumentService
+ ) -> None:
+ """Metadata exists but file was deleted from disk → 404."""
+ # Create a document
+ resp = client.post(
+ "/api/v1/documents/create",
+ json={
+ "format": "word",
+ "content": "# Test",
+ "conversation_id": "conv-missing",
+ },
+ )
+ doc_id = resp.json()["document"]["id"]
+
+ # Delete the file from disk
+ file_path = service.get_download_path(doc_id)
+ assert file_path is not None
+ file_path.unlink()
+
+ # Download should return 404 (file not found on disk)
+ dl_resp = client.get(f"/api/v1/documents/download/{doc_id}")
+ assert dl_resp.status_code == 404
+ assert "not found on disk" in dl_resp.json()["detail"].lower()
+
+ def test_get_download_path_nonexistent(self, service: DocumentService) -> None:
+ """get_download_path returns None for non-existent doc_id."""
+ path = service.get_download_path("nonexistent-id-12345")
+ assert path is None
+
+
+# ---------------------------------------------------------------------------
+# Invalid templates
+# ---------------------------------------------------------------------------
+
+
+class TestInvalidTemplates:
+ """Verify error handling for invalid template files."""
+
+ def test_upload_invalid_docx_content(
+ self, client: TestClient, tmp_path: Path
+ ) -> None:
+ """Upload a file with .docx extension but invalid content → should handle gracefully."""
+ # Create a fake .docx (just text, not a real docx)
+ fake_path = tmp_path / "fake.docx"
+ fake_path.write_text("This is not a real docx file")
+
+ with open(fake_path, "rb") as f:
+ resp = client.post(
+ "/api/v1/documents/upload-template",
+ files={"file": ("fake.docx", f, "application/octet-stream")},
+ )
+ # Upload itself succeeds (we only check extension)
+ assert resp.status_code == 200
+
+ # But using it as a template should fail gracefully
+ stored_name = resp.json()["stored_name"]
+ create_resp = client.post(
+ "/api/v1/documents/create",
+ json={
+ "format": "word",
+ "content": "",
+ "conversation_id": "conv-invalid",
+ "template": stored_name,
+ "template_data": {"name": "test"},
+ },
+ )
+ # Should NOT be 200 — invalid template should be rejected
+ # ponytail: currently returns 500 due to WordRenderer missing render_template
+ # This is a known bug — see test_documents_security.py
+ assert create_resp.status_code != 200, (
+ "Invalid template should not produce a successful document"
+ )
+
+ def test_create_with_nonexistent_template(self, client: TestClient) -> None:
+ """template='nonexistent.docx' → 404."""
+ resp = client.post(
+ "/api/v1/documents/create",
+ json={
+ "format": "word",
+ "content": "# Test",
+ "conversation_id": "conv-1",
+ "template": "nonexistent-template.docx",
+ "template_data": {},
+ },
+ )
+ assert resp.status_code == 404
+ assert "not found" in resp.json()["detail"].lower()
+
+
+# ---------------------------------------------------------------------------
+# Boundary conditions
+# ---------------------------------------------------------------------------
+
+
+class TestBoundaryConditions:
+ """Edge cases for content, filenames, and formats."""
+
+ def test_create_empty_content_word(self, client: TestClient) -> None:
+ """Empty content for Word → still generates a valid (empty) document."""
+ resp = client.post(
+ "/api/v1/documents/create",
+ json={
+ "format": "word",
+ "content": "",
+ "conversation_id": "conv-empty",
+ },
+ )
+ assert resp.status_code == 200
+ doc_id = resp.json()["document"]["id"]
+ dl_resp = client.get(f"/api/v1/documents/download/{doc_id}")
+ assert dl_resp.status_code == 200
+ # Should be a valid docx (can be opened)
+ doc = DocxDocument(io.BytesIO(dl_resp.content))
+ assert doc is not None
+
+ def test_create_large_content(self, client: TestClient) -> None:
+ """Large content (1MB+ of Markdown) → generates without timeout."""
+ # 1MB+ of content
+ large_content = "# Big Doc\n\n" + "Paragraph. " * 100000
+ assert len(large_content) > 1_000_000
+
+ resp = client.post(
+ "/api/v1/documents/create",
+ json={
+ "format": "word",
+ "content": large_content,
+ "conversation_id": "conv-large",
+ },
+ )
+ assert resp.status_code == 200
+ # ponytail: .docx is ZIP-compressed, so 1MB text → ~40KB file.
+ # Just verify the document was created and is non-trivial.
+ assert resp.json()["document"]["size"] > 10_000
+
+ def test_filename_unicode(self, client: TestClient) -> None:
+ """Unicode filename → sanitized but preserved."""
+ resp = client.post(
+ "/api/v1/documents/create",
+ json={
+ "format": "word",
+ "content": "# Test",
+ "conversation_id": "conv-unicode",
+ "filename": "季度报告.docx",
+ },
+ )
+ assert resp.status_code == 200
+ filename = resp.json()["document"]["filename"]
+ # Unicode chars should be preserved (isalnum() returns True for CJK)
+ assert "季度报告" in filename or filename.endswith(".docx")
+
+ def test_filename_path_traversal_in_create(self, client: TestClient) -> None:
+ """filename='../../etc/passwd' → sanitized, no path separators."""
+ resp = client.post(
+ "/api/v1/documents/create",
+ json={
+ "format": "word",
+ "content": "# Test",
+ "conversation_id": "conv-traversal",
+ "filename": "../../etc/passwd.docx",
+ },
+ )
+ assert resp.status_code == 200
+ filename = resp.json()["document"]["filename"]
+ # Path separators must be removed (prevents traversal)
+ assert "/" not in filename
+ assert "\\" not in filename
+ # ponytail: dots are kept by _sanitize_filename (legitimate in filenames),
+ # but path separators are replaced with _ — no traversal possible
+
+ def test_filename_only_dots(self, client: TestClient) -> None:
+ """filename='...' → sanitized to non-empty."""
+ resp = client.post(
+ "/api/v1/documents/create",
+ json={
+ "format": "word",
+ "content": "# Test",
+ "conversation_id": "conv-dots",
+ "filename": "...",
+ },
+ )
+ assert resp.status_code == 200
+ filename = resp.json()["document"]["filename"]
+ # Should not be empty after sanitization
+ assert len(filename) > 0
+ assert filename.endswith(".docx")
+
+
+# ---------------------------------------------------------------------------
+# Renderer edge cases
+# ---------------------------------------------------------------------------
+
+
+class TestRendererEdgeCases:
+ """Edge cases in Markdown → format rendering."""
+
+ def test_excel_empty_cells_in_markdown_table(self, service: DocumentService) -> None:
+ """Markdown table with empty cells → renders correctly."""
+ async def run():
+ return await service.create_document(
+ format="excel",
+ content="| A | B | C |\n|---|---|---|\n| x | | z |",
+ conversation_id="conv-empty-cells",
+ )
+
+ meta = asyncio.run(run())
+ path = service.get_download_path(meta.id)
+ from openpyxl import load_workbook
+
+ wb = load_workbook(path)
+ ws = wb["Table1"]
+ # Row 1: header (A, B, C), Row 2: data (x, empty, z)
+ assert ws["A1"].value == "A"
+ assert ws["B1"].value == "B"
+ assert ws["C1"].value == "C"
+ assert ws["A2"].value == "x"
+ assert ws["B2"].value is None or ws["B2"].value == ""
+ assert ws["C2"].value == "z"
+ wb.close()
+
+ def test_excel_pipe_in_content(self, service: DocumentService) -> None:
+ """Cell content containing pipe character → handled gracefully."""
+ async def run():
+ return await service.create_document(
+ format="excel",
+ content='{"Data": [["a|b", "c"]]}',
+ conversation_id="conv-pipe",
+ )
+
+ meta = asyncio.run(run())
+ path = service.get_download_path(meta.id)
+ from openpyxl import load_workbook
+
+ wb = load_workbook(path)
+ ws = wb.active
+ # The pipe should be in the cell content
+ assert ws["A1"].value == "a|b"
+ wb.close()
+
+ def test_pdf_mixed_cjk_ascii(self, service: DocumentService) -> None:
+ """Mixed CJK and ASCII text in PDF → generates without error."""
+ async def run():
+ return await service.create_document(
+ format="pdf",
+ content="# 混合 Mixed Content 内容\n\nEnglish and 中文 mixed.\n\n表格 Table:",
+ conversation_id="conv-cjk",
+ )
+
+ meta = asyncio.run(run())
+ path = service.get_download_path(meta.id)
+ assert path.exists()
+ # Verify it's a valid PDF
+ content = path.read_bytes()
+ assert content[:4] == b"%PDF"
+ assert len(content) > 1000 # Non-trivial size
+
+ def test_word_nested_formatting(self, service: DocumentService) -> None:
+ """Nested formatting (bold inside italic) → doesn't crash."""
+ async def run():
+ return await service.create_document(
+ format="word",
+ content="# Test\n\n**bold *italic* bold**\n\n*italic **bold** italic*",
+ conversation_id="conv-nested",
+ )
+
+ meta = asyncio.run(run())
+ path = service.get_download_path(meta.id)
+ assert path.exists()
+ # Should be a valid docx
+ doc = DocxDocument(str(path))
+ text = "\n".join(p.text for p in doc.paragraphs)
+ assert "bold" in text
+ assert "italic" in text
+
+
+# ---------------------------------------------------------------------------
+# DocumentLoader read edge cases
+# ---------------------------------------------------------------------------
+
+
+class TestReadEdgeCases:
+ """Edge cases for document reading (U9)."""
+
+ def test_read_pdf_file(self, service: DocumentService, tool: DocumentTool) -> None:
+ """Read a PDF file created by the tool → returns text content."""
+ async def setup():
+ return await tool.execute(
+ action="create",
+ format="pdf",
+ content="# PDF Read Test\n\nThis is PDF content to read.",
+ conversation_id="conv-read-pdf",
+ )
+
+ result = asyncio.run(setup())
+ doc_id = result["document"]["id"]
+ path = service.get_download_path(doc_id)
+
+ # Read it back
+ async def read():
+ return await tool.execute(
+ action="read",
+ filename=str(path),
+ conversation_id="conv-read-pdf",
+ )
+
+ read_result = asyncio.run(read())
+ assert read_result["success"] is True
+ assert "PDF Read Test" in read_result["content"]
+ assert read_result["metadata"]["format"] == "pdf"
+
+ def test_read_html_file(self, tool: DocumentTool, tmp_path: Path) -> None:
+ """Read an HTML file → returns text (tags stripped if bs4 available)."""
+ html_file = tmp_path / "test.html"
+ html_file.write_text(
+ "Test Page"
+ "Heading
Paragraph text
",
+ encoding="utf-8",
+ )
+
+ async def read():
+ return await tool.execute(
+ action="read",
+ filename=str(html_file),
+ conversation_id="conv-1",
+ )
+
+ result = asyncio.run(read())
+ assert result["success"] is True
+ # Content should contain the text — either stripped (bs4) or raw (fallback)
+ assert "Heading" in result["content"]
+ assert "Paragraph text" in result["content"]
+ # If bs4 is available, tags should be stripped; otherwise raw HTML is returned
+ try:
+ import bs4 # noqa: F401
+
+ bs4_available = True
+ except ImportError:
+ bs4_available = False
+
+ if bs4_available:
+ assert "" not in result["content"]
+ assert "
" not in result["content"]
+
+ def test_read_empty_file(self, tool: DocumentTool, tmp_path: Path) -> None:
+ """Read an empty file → returns empty content."""
+ empty_file = tmp_path / "empty.txt"
+ empty_file.write_text("", encoding="utf-8")
+
+ async def read():
+ return await tool.execute(
+ action="read",
+ filename=str(empty_file),
+ conversation_id="conv-1",
+ )
+
+ result = asyncio.run(read())
+ assert result["success"] is True
+ assert result["content"] == ""
+
+ def test_read_binary_file_as_text(self, tool: DocumentTool, tmp_path: Path) -> None:
+ """Read a binary file with .txt extension → doesn't crash, returns something."""
+ binary_file = tmp_path / "binary.txt"
+ binary_file.write_bytes(b"\x00\x01\x02\xff\xfe")
+
+ async def read():
+ return await tool.execute(
+ action="read",
+ filename=str(binary_file),
+ conversation_id="conv-1",
+ )
+
+ result = asyncio.run(read())
+ # Should not crash — text parser uses errors="replace"
+ assert result["success"] is True
+
+
+# ---------------------------------------------------------------------------
+# Database edge cases
+# ---------------------------------------------------------------------------
+
+
+class TestDatabaseEdgeCases:
+ """Edge cases for document metadata database."""
+
+ async def test_insert_and_retrieve_roundtrip(self, service: DocumentService) -> None:
+ """Insert a document and retrieve it — all fields preserved."""
+ meta = await service.create_document(
+ format="word",
+ content="# Roundtrip Test",
+ conversation_id="conv-roundtrip",
+ filename="roundtrip.docx",
+ )
+
+ retrieved = await service.get_document(meta.id)
+ assert retrieved is not None
+ assert retrieved.id == meta.id
+ assert retrieved.filename == meta.filename
+ assert retrieved.format == meta.format
+ assert retrieved.size == meta.size
+ assert retrieved.conversation_id == meta.conversation_id
+ assert retrieved.stored_name == meta.stored_name
+
+ async def test_get_nonexistent_document(self, service: DocumentService) -> None:
+ """get_document with non-existent ID returns None."""
+ result = await service.get_document("nonexistent-id")
+ assert result is None
+
+ async def test_delete_document_removes_metadata(self, service: DocumentService) -> None:
+ """After delete, get_document returns None."""
+ meta = await service.create_document(
+ format="word",
+ content="# Delete Me",
+ conversation_id="conv-delete",
+ )
+
+ deleted = await delete_document(meta.id, service.db_path)
+ assert deleted is True
+
+ # Metadata should be gone
+ result = await service.get_document(meta.id)
+ assert result is None
+
+ # Second delete returns False
+ deleted_again = await delete_document(meta.id, service.db_path)
+ assert deleted_again is False
diff --git a/tests/documents/test_excel_renderer.py b/tests/documents/test_excel_renderer.py
new file mode 100644
index 0000000..0988ec6
--- /dev/null
+++ b/tests/documents/test_excel_renderer.py
@@ -0,0 +1,124 @@
+"""Tests for ExcelRenderer — Markdown/JSON → .xlsx mapping (U3)."""
+
+from __future__ import annotations
+
+import json
+from pathlib import Path
+
+from openpyxl import load_workbook
+
+from agentkit.documents.renderers.excel_renderer import ExcelRenderer
+
+
+def _render(content: str, tmp_path: Path) -> Path:
+ out = tmp_path / "test.xlsx"
+ ExcelRenderer().render(content, out)
+ return out
+
+
+def _read_workbook(path: Path) -> dict[str, list[list[str]]]:
+ """Return {sheet_name: [[row cells], ...]} from a .xlsx file."""
+ wb = load_workbook(str(path))
+ result: dict[str, list[list[str]]] = {}
+ for ws in wb.worksheets:
+ rows: list[list[str]] = []
+ for row in ws.iter_rows(values_only=True):
+ rows.append([str(c) if c is not None else "" for c in row])
+ result[ws.title] = rows
+ return result
+
+
+def test_markdown_single_table(tmp_path: Path) -> None:
+ """A single GFM table becomes a Table1 sheet with correct data."""
+ md = "| Name | Age |\n| --- | --- |\n| Alice | 30 |\n| Bob | 25 |\n"
+ path = _render(md, tmp_path)
+ sheets = _read_workbook(path)
+ assert "Table1" in sheets
+ rows = sheets["Table1"]
+ assert rows[0] == ["Name", "Age"]
+ assert rows[1] == ["Alice", "30"]
+ assert rows[2] == ["Bob", "25"]
+
+
+def test_markdown_multiple_tables(tmp_path: Path) -> None:
+ """Multiple GFM tables become separate sheets (Table1, Table2)."""
+ md = (
+ "| A | B |\n| --- | --- |\n| 1 | 2 |\n\n"
+ "Some text between.\n\n"
+ "| C | D |\n| --- | --- |\n| 3 | 4 |\n"
+ )
+ path = _render(md, tmp_path)
+ sheets = _read_workbook(path)
+ assert "Table1" in sheets
+ assert "Table2" in sheets
+ assert sheets["Table1"][0] == ["A", "B"]
+ assert sheets["Table2"][0] == ["C", "D"]
+
+
+def test_markdown_no_table_creates_summary(tmp_path: Path) -> None:
+ """Markdown without tables puts text lines in a Summary sheet."""
+ md = "Just some text.\nAnother line.\n"
+ path = _render(md, tmp_path)
+ sheets = _read_workbook(path)
+ # At least one sheet exists with the text
+ all_text = []
+ for rows in sheets.values():
+ all_text.extend(cell for row in rows for cell in row)
+ assert "Just some text." in all_text
+ assert "Another line." in all_text
+
+
+def test_json_input_multi_sheet(tmp_path: Path) -> None:
+ """JSON input {sheet: rows} creates named sheets."""
+ data = {
+ "Sales": [["Product", "Revenue"], ["Widget", "1000"], ["Gadget", "2000"]],
+ "Costs": [["Item", "Amount"], ["Rent", "500"]],
+ }
+ path = _render(json.dumps(data), tmp_path)
+ sheets = _read_workbook(path)
+ assert "Sales" in sheets
+ assert "Costs" in sheets
+ assert sheets["Sales"][0] == ["Product", "Revenue"]
+ assert sheets["Sales"][1] == ["Widget", "1000"]
+ assert sheets["Costs"][1] == ["Rent", "500"]
+
+
+def test_json_input_single_sheet(tmp_path: Path) -> None:
+ """JSON with one sheet creates exactly that sheet."""
+ data = {"Data": [["X", "Y"], ["1", "2"]]}
+ path = _render(json.dumps(data), tmp_path)
+ sheets = _read_workbook(path)
+ assert "Data" in sheets
+ assert sheets["Data"][0] == ["X", "Y"]
+
+
+def test_empty_markdown(tmp_path: Path) -> None:
+ """Empty input produces a valid workbook with at least one sheet."""
+ path = _render("", tmp_path)
+ assert path.exists()
+ wb = load_workbook(str(path))
+ assert len(wb.sheetnames) >= 1
+
+
+def test_mixed_table_and_text(tmp_path: Path) -> None:
+ """Text before/after a table goes to Summary, table goes to Table1."""
+ md = "Intro line.\n\n| Col1 | Col2 |\n| --- | --- |\n| a | b |\n\nOutro line.\n"
+ path = _render(md, tmp_path)
+ sheets = _read_workbook(path)
+ assert "Table1" in sheets
+ # Summary should contain intro and outro
+ if "Summary" in sheets:
+ summary_cells = [cell for row in sheets["Summary"] for cell in row]
+ assert "Intro line." in summary_cells
+ assert "Outro line." in summary_cells
+
+
+def test_long_sheet_name_truncated(tmp_path: Path) -> None:
+ """Sheet names longer than 31 chars are truncated (Excel limit)."""
+ long_name = "A" * 50
+ data = {long_name: [["x"]]}
+ path = _render(json.dumps(data), tmp_path)
+ wb = load_workbook(str(path))
+ # The sheet name should be at most 31 chars
+ for name in wb.sheetnames:
+ assert len(name) <= 31
diff --git a/tests/documents/test_pdf_renderer.py b/tests/documents/test_pdf_renderer.py
new file mode 100644
index 0000000..0576454
--- /dev/null
+++ b/tests/documents/test_pdf_renderer.py
@@ -0,0 +1,99 @@
+"""Tests for PDFRenderer — Markdown → PDF mapping (U4)."""
+
+from __future__ import annotations
+
+from pathlib import Path
+
+from agentkit.documents.renderers.pdf_renderer import PDFRenderer
+
+
+def _render(markdown: str, tmp_path: Path) -> Path:
+ out = tmp_path / "test.pdf"
+ PDFRenderer().render(markdown, out)
+ return out
+
+
+def test_basic_pdf_generation(tmp_path: Path) -> None:
+ """Markdown with heading + paragraph produces a valid PDF."""
+ md = "# Title\n\nThis is a paragraph.\n"
+ path = _render(md, tmp_path)
+ assert path.exists()
+ assert path.stat().st_size > 0
+ # PDF magic bytes
+ assert path.read_bytes()[:4] == b"%PDF"
+
+
+def test_empty_markdown(tmp_path: Path) -> None:
+ """Empty Markdown produces a valid (minimal) PDF."""
+ path = _render("", tmp_path)
+ assert path.exists()
+ assert path.read_bytes()[:4] == b"%PDF"
+
+
+def test_headings(tmp_path: Path) -> None:
+ """Multiple heading levels render without error."""
+ md = "# H1\n## H2\n### H3\n"
+ path = _render(md, tmp_path)
+ assert path.read_bytes()[:4] == b"%PDF"
+
+
+def test_bullet_list(tmp_path: Path) -> None:
+ """Bullet list renders without error."""
+ md = "- Apple\n- Banana\n- Cherry\n"
+ path = _render(md, tmp_path)
+ assert path.read_bytes()[:4] == b"%PDF"
+
+
+def test_numbered_list(tmp_path: Path) -> None:
+ """Numbered list renders without error."""
+ md = "1. First\n2. Second\n3. Third\n"
+ path = _render(md, tmp_path)
+ assert path.read_bytes()[:4] == b"%PDF"
+
+
+def test_table(tmp_path: Path) -> None:
+ """GFM table renders without error."""
+ md = "| Name | Age |\n| --- | --- |\n| Alice | 30 |\n| Bob | 25 |\n"
+ path = _render(md, tmp_path)
+ assert path.read_bytes()[:4] == b"%PDF"
+
+
+def test_bold_italic(tmp_path: Path) -> None:
+ """Bold and italic inline formatting render without error."""
+ md = "This has **bold** and *italic* text.\n"
+ path = _render(md, tmp_path)
+ assert path.read_bytes()[:4] == b"%PDF"
+
+
+def test_chinese_text(tmp_path: Path) -> None:
+ """Chinese characters produce a valid PDF (font fallback is OK)."""
+ md = "# 中文标题\n\n这是中文段落内容。\n"
+ path = _render(md, tmp_path)
+ assert path.read_bytes()[:4] == b"%PDF"
+ assert path.stat().st_size > 0
+
+
+def test_mixed_content(tmp_path: Path) -> None:
+ """Heading + paragraph + list + table renders without error."""
+ md = """# Report
+
+Intro paragraph.
+
+- Item one
+- Item two
+
+| Col A | Col B |
+| ----- | ----- |
+| 1 | 2 |
+
+Final paragraph.
+"""
+ path = _render(md, tmp_path)
+ assert path.read_bytes()[:4] == b"%PDF"
+
+
+def test_xml_special_chars(tmp_path: Path) -> None:
+ """XML special characters (<, >, &) are escaped and don't break rendering."""
+ md = "Use & entities like **bold**.\n"
+ path = _render(md, tmp_path)
+ assert path.read_bytes()[:4] == b"%PDF"
diff --git a/tests/documents/test_template_renderer.py b/tests/documents/test_template_renderer.py
new file mode 100644
index 0000000..f60b056
--- /dev/null
+++ b/tests/documents/test_template_renderer.py
@@ -0,0 +1,146 @@
+"""Tests for TemplateRenderer — Word template filling with Jinja2 sandbox (U5)."""
+
+from __future__ import annotations
+
+from pathlib import Path
+
+import pytest
+from docx import Document
+
+from agentkit.documents.renderers.template_renderer import TemplateRenderer
+
+
+def _make_template(tmp_path: Path, content: str) -> Path:
+ """Create a .docx template with the given text content (single paragraph)."""
+ template_path = tmp_path / "template.docx"
+ doc = Document()
+ doc.add_paragraph(content)
+ doc.save(str(template_path))
+ return template_path
+
+
+def _read_text(path: Path) -> str:
+ """Read all paragraph text from a .docx file."""
+ doc = Document(str(path))
+ return "\n".join(p.text for p in doc.paragraphs)
+
+
+def test_simple_variable_substitution(tmp_path: Path) -> None:
+ """{{name}} is replaced with data['name']."""
+ template = _make_template(tmp_path, "Hello, {{name}}!")
+ output = tmp_path / "output.docx"
+ TemplateRenderer().render_template(template, {"name": "张三"}, output)
+ assert _read_text(output) == "Hello, 张三!"
+
+
+def test_multiple_variables(tmp_path: Path) -> None:
+ """Multiple {{var}} placeholders are all filled."""
+ template = _make_template(tmp_path, "{{greeting}}, {{name}}. You are {{role}}.")
+ output = tmp_path / "output.docx"
+ TemplateRenderer().render_template(
+ template, {"greeting": "Hi", "name": "Alice", "role": "admin"}, output
+ )
+ assert _read_text(output) == "Hi, Alice. You are admin."
+
+
+def test_for_loop(tmp_path: Path) -> None:
+ """{% for %} loop expands correctly."""
+ # Create a template with a for loop in a single paragraph
+ template_path = tmp_path / "template.docx"
+ doc = Document()
+ # docxtpl requires the for loop tags in the paragraph
+ doc.add_paragraph("{% for item in items %}{{item}} {% endfor %}")
+ doc.save(str(template_path))
+
+ output = tmp_path / "output.docx"
+ TemplateRenderer().render_template(template_path, {"items": ["A", "B", "C"]}, output)
+ text = _read_text(output)
+ assert "A" in text
+ assert "B" in text
+ assert "C" in text
+
+
+def test_if_condition(tmp_path: Path) -> None:
+ """{% if %} conditional renders content when condition is true."""
+ template_path = tmp_path / "template.docx"
+ doc = Document()
+ doc.add_paragraph("{% if show %}Visible{% endif %}")
+ doc.save(str(template_path))
+
+ output = tmp_path / "output.docx"
+ TemplateRenderer().render_template(template_path, {"show": True}, output)
+ assert "Visible" in _read_text(output)
+
+
+def test_if_condition_false(tmp_path: Path) -> None:
+ """{% if %} conditional hides content when condition is false."""
+ template_path = tmp_path / "template.docx"
+ doc = Document()
+ doc.add_paragraph("{% if show %}Visible{% endif %}")
+ doc.save(str(template_path))
+
+ output = tmp_path / "output.docx"
+ TemplateRenderer().render_template(template_path, {"show": False}, output)
+ assert "Visible" not in _read_text(output)
+
+
+def test_template_not_found(tmp_path: Path) -> None:
+ """Missing template file raises FileNotFoundError."""
+ output = tmp_path / "output.docx"
+ with pytest.raises(FileNotFoundError, match="Template not found"):
+ TemplateRenderer().render_template(
+ tmp_path / "nonexistent.docx", {}, output
+ )
+
+
+def test_no_placeholders(tmp_path: Path) -> None:
+ """Template with no Jinja2 tags is output unchanged."""
+ template = _make_template(tmp_path, "Just plain text, no variables.")
+ output = tmp_path / "output.docx"
+ TemplateRenderer().render_template(template, {}, output)
+ assert _read_text(output) == "Just plain text, no variables."
+
+
+def test_ssti_blocked(tmp_path: Path) -> None:
+ """Sandbox blocks access to dunder attributes (SSTI protection).
+
+ {{config.__class__}} should not expose Python internals. Jinja2's
+ SandboxedEnvironment returns Undefined for attributes starting with
+ '_', so the output is empty rather than raising — the key security
+ property is that internal class info is never leaked.
+ """
+ template = _make_template(tmp_path, "{{config.__class__}}")
+ output = tmp_path / "output.docx"
+ # Should not raise (SandboxedEnvironment returns Undefined), but
+ # critically should NOT expose class info.
+ TemplateRenderer().render_template(template, {"config": {}}, output)
+ text = _read_text(output)
+ # The dunder access is blocked — no class info leaks
+ assert "dict" not in text.lower()
+ assert "class" not in text.lower()
+ assert "{{" not in text # placeholder is consumed (replaced with empty)
+
+
+def test_ssti_globals_blocked(tmp_path: Path) -> None:
+ """Sandbox blocks __globals__ access (deeper SSTI payload)."""
+ template = _make_template(
+ tmp_path, "{{config.__class__.__init__.__globals__}}"
+ )
+ output = tmp_path / "output.docx"
+ TemplateRenderer().render_template(template, {"config": {}}, output)
+ text = _read_text(output)
+ # No globals should leak
+ assert "builtins" not in text.lower()
+ assert "import" not in text.lower()
+
+
+def test_missing_variable(tmp_path: Path) -> None:
+ """Missing variable in data dict — Jinja2 default behavior (empty string)."""
+ template = _make_template(tmp_path, "Hello, {{name}}!")
+ output = tmp_path / "output.docx"
+ # With no 'name' in data, Jinja2 SandboxedEnvironment defaults to undefined
+ # which renders as empty string (not an error)
+ TemplateRenderer().render_template(template, {}, output)
+ text = _read_text(output)
+ # The placeholder should be gone (replaced with empty)
+ assert "{{name}}" not in text
diff --git a/tests/documents/test_word_renderer.py b/tests/documents/test_word_renderer.py
new file mode 100644
index 0000000..195d0e1
--- /dev/null
+++ b/tests/documents/test_word_renderer.py
@@ -0,0 +1,147 @@
+"""Tests for WordRenderer — Markdown → .docx mapping (U2)."""
+
+from __future__ import annotations
+
+from pathlib import Path
+
+from docx import Document
+
+from agentkit.documents.renderers.word_renderer import WordRenderer
+
+
+def _render(markdown: str, tmp_path: Path) -> Path:
+ """Render markdown to a temp .docx and return the path."""
+ out = tmp_path / "test.docx"
+ WordRenderer().render(markdown, out)
+ return out
+
+
+def _read_paragraphs(path: Path) -> list[str]:
+ """Return all paragraph texts from a .docx."""
+ doc = Document(str(path))
+ return [p.text for p in doc.paragraphs]
+
+
+def test_heading_levels(tmp_path: Path) -> None:
+ """# / ## / ### map to heading levels 1/2/3."""
+ md = "# Title\n## Subtitle\n### Section\n"
+ path = _render(md, tmp_path)
+ doc = Document(str(path))
+ headings = [(p.style.name, p.text) for p in doc.paragraphs if p.text]
+ assert ("Heading 1", "Title") in headings
+ assert ("Heading 2", "Subtitle") in headings
+ assert ("Heading 3", "Section") in headings
+
+
+def test_paragraphs(tmp_path: Path) -> None:
+ """Plain text lines become paragraphs."""
+ md = "First paragraph.\n\nSecond paragraph.\n"
+ path = _render(md, tmp_path)
+ texts = _read_paragraphs(path)
+ assert "First paragraph." in texts
+ assert "Second paragraph." in texts
+
+
+def test_bullet_list(tmp_path: Path) -> None:
+ """Bullet items use List Bullet style."""
+ md = "- Apple\n- Banana\n- Cherry\n"
+ path = _render(md, tmp_path)
+ doc = Document(str(path))
+ bullets = [p for p in doc.paragraphs if p.style.name == "List Bullet"]
+ assert len(bullets) == 3
+ assert bullets[0].text == "Apple"
+ assert bullets[1].text == "Banana"
+ assert bullets[2].text == "Cherry"
+
+
+def test_numbered_list(tmp_path: Path) -> None:
+ """Numbered items use List Number style."""
+ md = "1. First\n2. Second\n3. Third\n"
+ path = _render(md, tmp_path)
+ doc = Document(str(path))
+ numbers = [p for p in doc.paragraphs if p.style.name == "List Number"]
+ assert len(numbers) == 3
+ assert numbers[0].text == "First"
+ assert numbers[1].text == "Second"
+
+
+def test_table(tmp_path: Path) -> None:
+ """GFM table maps to a docx table with correct cells."""
+ md = "| Name | Age |\n| --- | --- |\n| Alice | 30 |\n| Bob | 25 |\n"
+ path = _render(md, tmp_path)
+ doc = Document(str(path))
+ assert len(doc.tables) == 1
+ table = doc.tables[0]
+ # 3 rows (header + 2 data), 2 cols
+ assert len(table.rows) == 3
+ assert len(table.columns) == 2
+ assert table.cell(0, 0).text == "Name"
+ assert table.cell(0, 1).text == "Age"
+ assert table.cell(1, 0).text == "Alice"
+ assert table.cell(2, 1).text == "25"
+
+
+def test_bold_inline(tmp_path: Path) -> None:
+ """**bold** produces a bold run."""
+ md = "This has **bold** text.\n"
+ path = _render(md, tmp_path)
+ doc = Document(str(path))
+ para = doc.paragraphs[0]
+ bold_runs = [r for r in para.runs if r.bold]
+ assert len(bold_runs) == 1
+ assert bold_runs[0].text == "bold"
+
+
+def test_italic_inline(tmp_path: Path) -> None:
+ """*italic* produces an italic run."""
+ md = "This has *italic* text.\n"
+ path = _render(md, tmp_path)
+ doc = Document(str(path))
+ para = doc.paragraphs[0]
+ italic_runs = [r for r in para.runs if r.italic]
+ assert len(italic_runs) == 1
+ assert italic_runs[0].text == "italic"
+
+
+def test_empty_markdown(tmp_path: Path) -> None:
+ """Empty Markdown produces a valid (empty) document."""
+ path = _render("", tmp_path)
+ assert path.exists()
+ doc = Document(str(path))
+ # No paragraphs with text
+ assert all(not p.text for p in doc.paragraphs)
+
+
+def test_mixed_content(tmp_path: Path) -> None:
+ """Heading + paragraph + list + table renders without error."""
+ md = """# Report
+
+This is the intro.
+
+- Point one
+- Point two
+
+| Col A | Col B |
+| ----- | ----- |
+| 1 | 2 |
+
+Final paragraph.
+"""
+ path = _render(md, tmp_path)
+ assert path.exists()
+ doc = Document(str(path))
+ # Should have at least one heading, one table, two bullet items
+ headings = [p for p in doc.paragraphs if p.style.name.startswith("Heading")]
+ assert len(headings) >= 1
+ assert len(doc.tables) == 1
+ bullets = [p for p in doc.paragraphs if p.style.name == "List Bullet"]
+ assert len(bullets) == 2
+
+
+def test_chinese_text(tmp_path: Path) -> None:
+ """Chinese characters render correctly in paragraphs and headings."""
+ md = "# 中文标题\n\n这是中文段落。\n"
+ path = _render(md, tmp_path)
+ texts = _read_paragraphs(path)
+ assert "中文标题" in texts
+ assert "这是中文段落。" in texts
diff --git a/tests/integration/test_document_e2e.py b/tests/integration/test_document_e2e.py
new file mode 100644
index 0000000..a9a3c3e
--- /dev/null
+++ b/tests/integration/test_document_e2e.py
@@ -0,0 +1,424 @@
+"""End-to-end integration tests for document processing (F1, F2, F3).
+
+Verifies complete user flows:
+- F1: Create document → List → Download → Verify content
+- F2: Upload template → Create with template → Download → Verify variables replaced
+- F3: Cross-conversation isolation
+"""
+
+from __future__ import annotations
+
+import asyncio
+import io
+from pathlib import Path
+
+import pytest
+from docx import Document as DocxDocument
+from fastapi import FastAPI
+from fastapi.testclient import TestClient
+from openpyxl import load_workbook
+
+from agentkit.documents.db import init_documents_db
+from agentkit.documents.renderers.excel_renderer import ExcelRenderer
+from agentkit.documents.renderers.pdf_renderer import PDFRenderer
+from agentkit.documents.renderers.word_renderer import WordRenderer
+from agentkit.documents.service import DocumentService
+from agentkit.server.routes import documents as documents_routes
+
+
+@pytest.fixture
+def app(tmp_path: Path) -> FastAPI:
+ """Test app with all renderers registered.
+
+ After Bug 2 fix, TemplateRenderer is lazy-loaded by DocumentService
+ when template_path is provided — no need to register it separately.
+ """
+ db_path = tmp_path / "test.db"
+ upload_dir = tmp_path / "uploads"
+ asyncio.run(init_documents_db(db_path))
+
+ service = DocumentService(upload_dir=upload_dir, db_path=db_path)
+ service.register_renderer("word", WordRenderer())
+ service.register_renderer("excel", ExcelRenderer())
+ service.register_renderer("pdf", PDFRenderer())
+
+ app = FastAPI()
+ app.state.document_service = service
+ app.state.server_config = None # No auth for E2E tests
+ app.include_router(documents_routes.router, prefix="/api/v1")
+ return app
+
+
+@pytest.fixture
+def client(app: FastAPI) -> TestClient:
+ return TestClient(app)
+
+
+# ---------------------------------------------------------------------------
+# F1: Create → List → Download complete flow
+# ---------------------------------------------------------------------------
+
+
+class TestF1CreateListDownload:
+ """F1: User creates a document, sees it in the list, downloads it."""
+
+ def test_e2e_word_create_list_download(self, client: TestClient) -> None:
+ """Word: create → list contains it → download content matches."""
+ # Step 1: Create
+ create_resp = client.post(
+ "/api/v1/documents/create",
+ json={
+ "format": "word",
+ "content": "# E2E Report\n\nThis is the report content.",
+ "conversation_id": "conv-e2e-1",
+ },
+ )
+ assert create_resp.status_code == 200
+ doc = create_resp.json()["document"]
+ doc_id = doc["id"]
+ assert doc["format"] == "word"
+ assert doc["filename"].endswith(".docx")
+ assert doc["size"] > 0
+
+ # Step 2: List — document appears in conversation
+ list_resp = client.get("/api/v1/documents/conversation/conv-e2e-1")
+ assert list_resp.status_code == 200
+ docs = list_resp.json()["documents"]
+ assert len(docs) == 1
+ assert docs[0]["id"] == doc_id
+ assert docs[0]["download_url"] == f"/api/v1/documents/download/{doc_id}"
+
+ # Step 3: Download — file content is valid
+ dl_resp = client.get(f"/api/v1/documents/download/{doc_id}")
+ assert dl_resp.status_code == 200
+ assert len(dl_resp.content) == doc["size"]
+
+ # Step 4: Verify downloaded file is a valid .docx with correct content
+ docx = DocxDocument(io.BytesIO(dl_resp.content))
+ text = "\n".join(p.text for p in docx.paragraphs)
+ assert "E2E Report" in text
+ assert "This is the report content" in text
+
+ def test_e2e_excel_create_list_download(self, client: TestClient) -> None:
+ """Excel: create → list → download → verify cell content."""
+ create_resp = client.post(
+ "/api/v1/documents/create",
+ json={
+ "format": "excel",
+ "content": '{"Sales": [["Product", "Revenue"], ["Widget", "1000"], ["Gadget", "2000"]]}',
+ "conversation_id": "conv-e2e-2",
+ },
+ )
+ assert create_resp.status_code == 200
+ doc_id = create_resp.json()["document"]["id"]
+
+ # List
+ list_resp = client.get("/api/v1/documents/conversation/conv-e2e-2")
+ assert list_resp.json()["count"] == 1
+
+ # Download and verify
+ dl_resp = client.get(f"/api/v1/documents/download/{doc_id}")
+ assert dl_resp.status_code == 200
+
+ wb = load_workbook(io.BytesIO(dl_resp.content))
+ ws = wb["Sales"]
+ assert ws["A1"].value == "Product"
+ assert ws["B1"].value == "Revenue"
+ assert ws["A2"].value == "Widget"
+ assert ws["B2"].value == "1000"
+ wb.close()
+
+ def test_e2e_pdf_create_list_download(self, client: TestClient) -> None:
+ """PDF: create → list → download → verify PDF magic bytes."""
+ create_resp = client.post(
+ "/api/v1/documents/create",
+ json={
+ "format": "pdf",
+ "content": "# PDF Report\n\nContent here.",
+ "conversation_id": "conv-e2e-3",
+ },
+ )
+ assert create_resp.status_code == 200
+ doc_id = create_resp.json()["document"]["id"]
+
+ # List
+ list_resp = client.get("/api/v1/documents/conversation/conv-e2e-3")
+ assert list_resp.json()["count"] == 1
+
+ # Download and verify PDF magic
+ dl_resp = client.get(f"/api/v1/documents/download/{doc_id}")
+ assert dl_resp.status_code == 200
+ assert dl_resp.content[:4] == b"%PDF"
+
+ def test_e2e_multiple_documents_same_conversation(self, client: TestClient) -> None:
+ """Multiple documents in same conversation — list shows all, ordered."""
+ conv_id = "conv-multi"
+
+ # Create 3 documents
+ for i, fmt in enumerate(["word", "excel", "pdf"]):
+ resp = client.post(
+ "/api/v1/documents/create",
+ json={
+ "format": fmt,
+ "content": f"# Doc {i}",
+ "conversation_id": conv_id,
+ },
+ )
+ assert resp.status_code == 200
+
+ # List — all 3 present
+ list_resp = client.get(f"/api/v1/documents/conversation/{conv_id}")
+ assert list_resp.status_code == 200
+ data = list_resp.json()
+ assert data["count"] == 3
+
+ formats = [d["format"] for d in data["documents"]]
+ assert set(formats) == {"word", "excel", "pdf"}
+
+ # Each has a unique download URL
+ urls = [d["download_url"] for d in data["documents"]]
+ assert len(set(urls)) == 3
+
+ def test_e2e_download_returns_correct_filename(self, client: TestClient) -> None:
+ """Download response includes the original filename in Content-Disposition."""
+ create_resp = client.post(
+ "/api/v1/documents/create",
+ json={
+ "format": "word",
+ "content": "# Test",
+ "conversation_id": "conv-fn",
+ "filename": "my-report.docx",
+ },
+ )
+ doc_id = create_resp.json()["document"]["id"]
+
+ dl_resp = client.get(f"/api/v1/documents/download/{doc_id}")
+ assert dl_resp.status_code == 200
+ # FileResponse sets filename in Content-Disposition
+ assert "my-report.docx" in dl_resp.headers.get("content-disposition", "")
+
+
+# ---------------------------------------------------------------------------
+# F2: Template upload → create with template → download
+# ---------------------------------------------------------------------------
+
+
+class TestF2TemplateWorkflow:
+ """F2: Upload template → Create with template → Download → Verify variables.
+
+ After Bug 2 fix, template filling works with the standard WordRenderer
+ registration — DocumentService lazy-loads TemplateRenderer internally.
+ """
+
+ def test_e2e_upload_template_create_download(
+ self, client: TestClient, tmp_path: Path
+ ) -> None:
+ """Complete template workflow: upload → fill → download → verify."""
+ # Step 1: Create a .docx template with Jinja2 placeholders
+ template_doc = DocxDocument()
+ template_doc.add_heading("Invoice {{invoice_number}}", level=1)
+ template_doc.add_paragraph("Customer: {{customer_name}}")
+ template_doc.add_paragraph("Amount: ${{amount}}")
+ template_path = tmp_path / "invoice_template.docx"
+ template_doc.save(str(template_path))
+
+ # Step 2: Upload the template
+ with open(template_path, "rb") as f:
+ upload_resp = client.post(
+ "/api/v1/documents/upload-template",
+ files={"file": ("invoice_template.docx", f, "application/octet-stream")},
+ )
+ assert upload_resp.status_code == 200
+ stored_name = upload_resp.json()["stored_name"]
+
+ # Step 3: Create document using the template
+ create_resp = client.post(
+ "/api/v1/documents/create",
+ json={
+ "format": "word",
+ "content": "", # Ignored when template is provided
+ "conversation_id": "conv-template",
+ "template": stored_name,
+ "template_data": {
+ "invoice_number": "INV-2026-001",
+ "customer_name": "Acme Corp",
+ "amount": "1,234.56",
+ },
+ },
+ )
+ assert create_resp.status_code == 200, create_resp.text
+ doc_id = create_resp.json()["document"]["id"]
+
+ # Step 4: Download and verify variables were replaced
+ dl_resp = client.get(f"/api/v1/documents/download/{doc_id}")
+ assert dl_resp.status_code == 200
+
+ docx = DocxDocument(io.BytesIO(dl_resp.content))
+ text = "\n".join(p.text for p in docx.paragraphs)
+ assert "INV-2026-001" in text
+ assert "Acme Corp" in text
+ assert "1,234.56" in text
+ # Placeholders should be gone
+ assert "{{" not in text
+ assert "}}" not in text
+
+ def test_e2e_template_with_loop(
+ self, client: TestClient, tmp_path: Path
+ ) -> None:
+ """Template with {% for %} loop — verify loop expands correctly."""
+ template_doc = DocxDocument()
+ template_doc.add_heading("Shopping List", level=1)
+ # ponytail: docxtpl uses {%p %} for paragraph-level loops, {% %} for inline
+ template_doc.add_paragraph("{%p for item in items %}")
+ template_doc.add_paragraph("- {{item}}")
+ template_doc.add_paragraph("{%p endfor %}")
+ template_path = tmp_path / "loop_template.docx"
+ template_doc.save(str(template_path))
+
+ with open(template_path, "rb") as f:
+ upload_resp = client.post(
+ "/api/v1/documents/upload-template",
+ files={"file": ("loop_template.docx", f, "application/octet-stream")},
+ )
+ stored_name = upload_resp.json()["stored_name"]
+
+ create_resp = client.post(
+ "/api/v1/documents/create",
+ json={
+ "format": "word",
+ "content": "",
+ "conversation_id": "conv-loop",
+ "template": stored_name,
+ "template_data": {
+ "items": ["Apple", "Banana", "Cherry"],
+ },
+ },
+ )
+ assert create_resp.status_code == 200, create_resp.text
+ doc_id = create_resp.json()["document"]["id"]
+
+ dl_resp = client.get(f"/api/v1/documents/download/{doc_id}")
+ assert dl_resp.status_code == 200
+
+ docx = DocxDocument(io.BytesIO(dl_resp.content))
+ text = "\n".join(p.text for p in docx.paragraphs)
+ assert "Apple" in text
+ assert "Banana" in text
+ assert "Cherry" in text
+
+
+# ---------------------------------------------------------------------------
+# F3: Cross-conversation isolation
+# ---------------------------------------------------------------------------
+
+
+class TestF3ConversationIsolation:
+ """F3: Documents from one conversation don't leak to another."""
+
+ def test_e2e_conversation_isolation(self, client: TestClient) -> None:
+ """Documents in conv-A don't appear in conv-B's list."""
+ # Create in conv-A
+ client.post(
+ "/api/v1/documents/create",
+ json={
+ "format": "word",
+ "content": "# Conv A Doc",
+ "conversation_id": "conv-A",
+ },
+ )
+ # Create in conv-B
+ client.post(
+ "/api/v1/documents/create",
+ json={
+ "format": "pdf",
+ "content": "# Conv B Doc",
+ "conversation_id": "conv-B",
+ },
+ )
+
+ # List conv-A — only conv-A's doc
+ resp_a = client.get("/api/v1/documents/conversation/conv-A")
+ docs_a = resp_a.json()["documents"]
+ assert len(docs_a) == 1
+ assert docs_a[0]["format"] == "word"
+
+ # List conv-B — only conv-B's doc
+ resp_b = client.get("/api/v1/documents/conversation/conv-B")
+ docs_b = resp_b.json()["documents"]
+ assert len(docs_b) == 1
+ assert docs_b[0]["format"] == "pdf"
+
+ def test_e2e_download_any_document_by_id(self, client: TestClient) -> None:
+ """Download works by doc_id regardless of conversation (no ACL in v1)."""
+ # Create in conv-A
+ create_resp = client.post(
+ "/api/v1/documents/create",
+ json={
+ "format": "word",
+ "content": "# Downloadable",
+ "conversation_id": "conv-X",
+ },
+ )
+ doc_id = create_resp.json()["document"]["id"]
+
+ # Download without specifying conversation — works (v1 has no ACL)
+ dl_resp = client.get(f"/api/v1/documents/download/{doc_id}")
+ assert dl_resp.status_code == 200
+ assert len(dl_resp.content) > 0
+
+
+# ---------------------------------------------------------------------------
+# Data consistency checks
+# ---------------------------------------------------------------------------
+
+
+class TestDataConsistency:
+ """Verify metadata matches actual files on disk."""
+
+ def test_metadata_size_matches_file(self, client: TestClient) -> None:
+ """Document metadata size equals actual file size on disk."""
+ create_resp = client.post(
+ "/api/v1/documents/create",
+ json={
+ "format": "word",
+ "content": "# Size Check\n\nContent.",
+ "conversation_id": "conv-size",
+ },
+ )
+ meta_size = create_resp.json()["document"]["size"]
+ doc_id = create_resp.json()["document"]["id"]
+
+ # Download and check actual size
+ dl_resp = client.get(f"/api/v1/documents/download/{doc_id}")
+ assert len(dl_resp.content) == meta_size
+
+ def test_filename_has_correct_extension(self, client: TestClient) -> None:
+ """Each format produces the correct file extension."""
+ for fmt, ext in [("word", ".docx"), ("excel", ".xlsx"), ("pdf", ".pdf")]:
+ resp = client.post(
+ "/api/v1/documents/create",
+ json={
+ "format": fmt,
+ "content": "# Test",
+ "conversation_id": f"conv-ext-{fmt}",
+ },
+ )
+ filename = resp.json()["document"]["filename"]
+ assert filename.endswith(ext), f"{fmt} should produce {ext}, got {filename}"
+
+ def test_custom_filename_preserved(self, client: TestClient) -> None:
+ """Custom filename is preserved in metadata and download."""
+ resp = client.post(
+ "/api/v1/documents/create",
+ json={
+ "format": "pdf",
+ "content": "# Custom Name",
+ "conversation_id": "conv-custom",
+ "filename": "quarterly-report.pdf",
+ },
+ )
+ assert resp.json()["document"]["filename"] == "quarterly-report.pdf"
+
+ doc_id = resp.json()["document"]["id"]
+ dl_resp = client.get(f"/api/v1/documents/download/{doc_id}")
+ assert "quarterly-report.pdf" in dl_resp.headers.get("content-disposition", "")
diff --git a/tests/routes/test_documents.py b/tests/routes/test_documents.py
new file mode 100644
index 0000000..a7b34b2
--- /dev/null
+++ b/tests/routes/test_documents.py
@@ -0,0 +1,250 @@
+"""Tests for /api/v1/documents routes (U7)."""
+
+from __future__ import annotations
+
+import asyncio
+from pathlib import Path
+
+import pytest
+from fastapi import FastAPI
+from fastapi.testclient import TestClient
+
+from agentkit.documents.db import init_documents_db
+from agentkit.documents.renderers.excel_renderer import ExcelRenderer
+from agentkit.documents.renderers.pdf_renderer import PDFRenderer
+from agentkit.documents.renderers.word_renderer import WordRenderer
+from agentkit.documents.service import DocumentService
+from agentkit.server.routes import documents as documents_routes
+
+
+@pytest.fixture
+def app(tmp_path: Path) -> FastAPI:
+ """Create a test app with DocumentService initialized."""
+ db_path = tmp_path / "test.db"
+ upload_dir = tmp_path / "uploads"
+ asyncio.run(init_documents_db(db_path))
+
+ service = DocumentService(upload_dir=upload_dir, db_path=db_path)
+ service.register_renderer("word", WordRenderer())
+ service.register_renderer("excel", ExcelRenderer())
+ service.register_renderer("pdf", PDFRenderer())
+
+ app = FastAPI()
+ app.state.document_service = service
+ app.state.server_config = None # No API key configured → allow all
+ app.include_router(documents_routes.router, prefix="/api/v1")
+ return app
+
+
+@pytest.fixture
+def client(app: FastAPI) -> TestClient:
+ return TestClient(app)
+
+
+# ---------------------------------------------------------------------------
+# POST /create
+# ---------------------------------------------------------------------------
+
+
+def test_create_word(client: TestClient) -> None:
+ """POST /create with format=word returns 200 + document metadata."""
+ resp = client.post(
+ "/api/v1/documents/create",
+ json={
+ "format": "word",
+ "content": "# Test\n\nParagraph.",
+ "conversation_id": "conv-1",
+ },
+ )
+ assert resp.status_code == 200
+ data = resp.json()
+ assert data["success"] is True
+ assert data["document"]["format"] == "word"
+ assert data["document"]["filename"].endswith(".docx")
+ assert data["document"]["download_url"].startswith("/api/v1/documents/download/")
+
+
+def test_create_pdf(client: TestClient) -> None:
+ """POST /create with format=pdf returns 200."""
+ resp = client.post(
+ "/api/v1/documents/create",
+ json={
+ "format": "pdf",
+ "content": "# PDF Test",
+ "conversation_id": "conv-1",
+ },
+ )
+ assert resp.status_code == 200
+ assert resp.json()["document"]["format"] == "pdf"
+
+
+def test_create_excel_json(client: TestClient) -> None:
+ """POST /create with format=excel and JSON content returns 200."""
+ resp = client.post(
+ "/api/v1/documents/create",
+ json={
+ "format": "excel",
+ "content": '{"Data": [["A", "B"], ["1", "2"]]}',
+ "conversation_id": "conv-1",
+ },
+ )
+ assert resp.status_code == 200
+ assert resp.json()["document"]["format"] == "excel"
+
+
+def test_create_invalid_format(client: TestClient) -> None:
+ """POST /create with invalid format returns 400."""
+ resp = client.post(
+ "/api/v1/documents/create",
+ json={
+ "format": "pptx",
+ "content": "test",
+ "conversation_id": "conv-1",
+ },
+ )
+ assert resp.status_code == 400
+
+
+def test_create_missing_fields(client: TestClient) -> None:
+ """POST /create with missing required fields returns 422."""
+ resp = client.post(
+ "/api/v1/documents/create",
+ json={"format": "word"},
+ )
+ assert resp.status_code == 422 # Pydantic validation error
+
+
+# ---------------------------------------------------------------------------
+# GET /conversation/{id}
+# ---------------------------------------------------------------------------
+
+
+def test_list_conversation_documents(client: TestClient) -> None:
+ """GET /conversation/{id} returns documents for that conversation."""
+ # Create a document first
+ client.post(
+ "/api/v1/documents/create",
+ json={
+ "format": "word",
+ "content": "# Doc 1",
+ "conversation_id": "conv-list",
+ },
+ )
+ client.post(
+ "/api/v1/documents/create",
+ json={
+ "format": "pdf",
+ "content": "# Doc 2",
+ "conversation_id": "conv-list",
+ },
+ )
+
+ resp = client.get("/api/v1/documents/conversation/conv-list")
+ assert resp.status_code == 200
+ data = resp.json()
+ assert data["success"] is True
+ assert data["count"] == 2
+ assert data["conversation_id"] == "conv-list"
+ formats = [d["format"] for d in data["documents"]]
+ assert "word" in formats
+ assert "pdf" in formats
+
+
+def test_list_empty_conversation(client: TestClient) -> None:
+ """GET /conversation/{id} with no documents returns empty list."""
+ resp = client.get("/api/v1/documents/conversation/no-such-conv")
+ assert resp.status_code == 200
+ data = resp.json()
+ assert data["count"] == 0
+ assert data["documents"] == []
+
+
+# ---------------------------------------------------------------------------
+# GET /download/{doc_id}
+# ---------------------------------------------------------------------------
+
+
+def test_download_document(client: TestClient) -> None:
+ """GET /download/{doc_id} returns the file."""
+ # Create a document
+ create_resp = client.post(
+ "/api/v1/documents/create",
+ json={
+ "format": "word",
+ "content": "# Downloadable",
+ "conversation_id": "conv-dl",
+ },
+ )
+ doc_id = create_resp.json()["document"]["id"]
+
+ # Download it
+ resp = client.get(f"/api/v1/documents/download/{doc_id}")
+ assert resp.status_code == 200
+ assert resp.headers["content-type"] == "application/octet-stream"
+ assert len(resp.content) > 0
+
+
+def test_download_not_found(client: TestClient) -> None:
+ """GET /download/{nonexistent} returns 404."""
+ resp = client.get("/api/v1/documents/download/nonexistent-id")
+ assert resp.status_code == 404
+
+
+# ---------------------------------------------------------------------------
+# POST /upload-template
+# ---------------------------------------------------------------------------
+
+
+def test_upload_template(client: TestClient, tmp_path: Path) -> None:
+ """POST /upload-template accepts a .docx file and returns stored_name."""
+ # Create a minimal .docx file
+ from docx import Document
+
+ template_path = tmp_path / "test_template.docx"
+ doc = Document()
+ doc.add_paragraph("Hello {{name}}!")
+ doc.save(str(template_path))
+
+ with open(template_path, "rb") as f:
+ resp = client.post(
+ "/api/v1/documents/upload-template",
+ files={"file": ("test_template.docx", f, "application/vnd.openxmlformats-officedocument.wordprocessingml.document")},
+ )
+
+ assert resp.status_code == 200
+ data = resp.json()
+ assert data["success"] is True
+ assert data["stored_name"].startswith("template-")
+ assert data["stored_name"].endswith(".docx")
+
+
+def test_upload_template_wrong_format(client: TestClient) -> None:
+ """POST /upload-template with non-.docx returns 400."""
+ resp = client.post(
+ "/api/v1/documents/upload-template",
+ files={"file": ("test.txt", b"not a docx", "text/plain")},
+ )
+ assert resp.status_code == 400
+
+
+# ---------------------------------------------------------------------------
+# Service unavailable
+# ---------------------------------------------------------------------------
+
+
+def test_service_unavailable(tmp_path: Path) -> None:
+ """When document_service is not on app.state, returns 503."""
+ app = FastAPI()
+ # No document_service set
+ app.include_router(documents_routes.router, prefix="/api/v1")
+ client = TestClient(app)
+
+ resp = client.post(
+ "/api/v1/documents/create",
+ json={
+ "format": "word",
+ "content": "test",
+ "conversation_id": "conv-1",
+ },
+ )
+ assert resp.status_code == 503
diff --git a/tests/routes/test_documents_security.py b/tests/routes/test_documents_security.py
new file mode 100644
index 0000000..7e1c566
--- /dev/null
+++ b/tests/routes/test_documents_security.py
@@ -0,0 +1,336 @@
+"""Security tests for /api/v1/documents routes (R26-R28, path traversal, SSTI).
+
+These tests verify:
+- R27: Authentication (API key required when configured)
+- Path traversal protection in template field
+- Deep SSTI protection in template rendering
+"""
+
+from __future__ import annotations
+
+import asyncio
+from pathlib import Path
+from types import SimpleNamespace
+
+import pytest
+from docx import Document as DocxDocument
+from fastapi import FastAPI
+from fastapi.testclient import TestClient
+
+from agentkit.documents.db import init_documents_db
+from agentkit.documents.renderers.excel_renderer import ExcelRenderer
+from agentkit.documents.renderers.pdf_renderer import PDFRenderer
+from agentkit.documents.renderers.template_renderer import TemplateRenderer
+from agentkit.documents.renderers.word_renderer import WordRenderer
+from agentkit.documents.service import DocumentService
+from agentkit.server.routes import documents as documents_routes
+
+TEST_API_KEY = "test-secret-key-12345"
+
+
+@pytest.fixture
+def secured_app(tmp_path: Path) -> FastAPI:
+ """App with API key configured — all endpoints require auth."""
+ db_path = tmp_path / "test.db"
+ upload_dir = tmp_path / "uploads"
+ asyncio.run(init_documents_db(db_path))
+
+ service = DocumentService(upload_dir=upload_dir, db_path=db_path)
+ service.register_renderer("word", WordRenderer())
+ service.register_renderer("excel", ExcelRenderer())
+ service.register_renderer("pdf", PDFRenderer())
+
+ app = FastAPI()
+ app.state.document_service = service
+ # Configure API key — now all endpoints require auth
+ app.state.server_config = SimpleNamespace(api_key=TEST_API_KEY)
+ app.include_router(documents_routes.router, prefix="/api/v1")
+ return app
+
+
+@pytest.fixture
+def secured_client(secured_app: FastAPI) -> TestClient:
+ return TestClient(secured_app)
+
+
+@pytest.fixture
+def open_app(tmp_path: Path) -> FastAPI:
+ """App with no API key configured — allows all (backwards compat)."""
+ db_path = tmp_path / "test.db"
+ upload_dir = tmp_path / "uploads"
+ asyncio.run(init_documents_db(db_path))
+
+ service = DocumentService(upload_dir=upload_dir, db_path=db_path)
+ service.register_renderer("word", WordRenderer())
+ service.register_renderer("excel", ExcelRenderer())
+ service.register_renderer("pdf", PDFRenderer())
+
+ app = FastAPI()
+ app.state.document_service = service
+ app.state.server_config = None # No key → allow all
+ app.include_router(documents_routes.router, prefix="/api/v1")
+ return app
+
+
+# ---------------------------------------------------------------------------
+# R27: Authentication tests
+# ---------------------------------------------------------------------------
+
+
+class TestAuthentication:
+ """Verify API key authentication on all document endpoints."""
+
+ _CREATE_BODY = {
+ "format": "word",
+ "content": "# Test",
+ "conversation_id": "conv-1",
+ }
+
+ def test_create_without_api_key_returns_401(self, secured_client: TestClient) -> None:
+ """POST /create without API key → 401."""
+ resp = secured_client.post("/api/v1/documents/create", json=self._CREATE_BODY)
+ assert resp.status_code == 401
+ assert "API key" in resp.json()["detail"]
+
+ def test_create_with_wrong_api_key_returns_401(self, secured_client: TestClient) -> None:
+ """POST /create with wrong API key → 401."""
+ resp = secured_client.post(
+ "/api/v1/documents/create",
+ json=self._CREATE_BODY,
+ headers={"X-API-Key": "wrong-key"},
+ )
+ assert resp.status_code == 401
+
+ def test_create_with_valid_api_key_header_returns_200(
+ self, secured_client: TestClient
+ ) -> None:
+ """POST /create with valid X-API-Key header → 200."""
+ resp = secured_client.post(
+ "/api/v1/documents/create",
+ json=self._CREATE_BODY,
+ headers={"X-API-Key": TEST_API_KEY},
+ )
+ assert resp.status_code == 200
+
+ def test_create_with_valid_api_key_query_param_returns_200(
+ self, secured_client: TestClient
+ ) -> None:
+ """POST /create with valid api_key query param → 200."""
+ resp = secured_client.post(
+ f"/api/v1/documents/create?api_key={TEST_API_KEY}",
+ json=self._CREATE_BODY,
+ )
+ assert resp.status_code == 200
+
+ def test_download_without_api_key_returns_401(self, secured_client: TestClient) -> None:
+ """GET /download/{id} without API key → 401."""
+ resp = secured_client.get("/api/v1/documents/download/some-id")
+ assert resp.status_code == 401
+
+ def test_list_without_api_key_returns_401(self, secured_client: TestClient) -> None:
+ """GET /conversation/{id} without API key → 401."""
+ resp = secured_client.get("/api/v1/documents/conversation/conv-1")
+ assert resp.status_code == 401
+
+ def test_upload_template_without_api_key_returns_401(
+ self, secured_client: TestClient
+ ) -> None:
+ """POST /upload-template without API key → 401."""
+ resp = secured_client.post(
+ "/api/v1/documents/upload-template",
+ files={"file": ("test.docx", b"fake", "application/octet-stream")},
+ )
+ assert resp.status_code == 401
+
+ def test_no_key_configured_allows_all(self, open_app: FastAPI) -> None:
+ """When no API key is configured, all requests are allowed (backwards compat)."""
+ client = TestClient(open_app)
+ resp = client.post("/api/v1/documents/create", json=self._CREATE_BODY)
+ assert resp.status_code == 200
+
+ def test_api_key_constant_time_comparison(self, secured_client: TestClient) -> None:
+ """API key comparison uses hmac.compare_digest (timing-safe)."""
+ # ponytail: can't directly test timing, but verify both empty and wrong keys fail
+ resp = secured_client.post(
+ "/api/v1/documents/create",
+ json=self._CREATE_BODY,
+ headers={"X-API-Key": ""},
+ )
+ assert resp.status_code == 401
+
+
+# ---------------------------------------------------------------------------
+# Path traversal in template field
+# ---------------------------------------------------------------------------
+
+
+class TestTemplatePathTraversal:
+ """Verify template field doesn't allow path traversal attacks.
+
+ BUG CONFIRMED: documents.py line 129 does:
+ template_path = str(service.upload_dir / body.template)
+ If body.template is "../../etc/passwd", this resolves outside upload_dir.
+ The Path.exists() check passes if the file exists, allowing arbitrary file read.
+ """
+
+ def test_create_with_template_path_traversal(
+ self, secured_client: TestClient, tmp_path: Path
+ ) -> None:
+ """template='../../etc/passwd' should NOT read files outside upload_dir."""
+ # Create a file outside upload_dir to simulate the target
+ secret_file = tmp_path / "secret.txt"
+ secret_file.write_text("SECRET_CONTENT")
+
+ # Compute relative path from upload_dir to secret_file
+ rel = Path("..") / "secret.txt"
+
+ resp = secured_client.post(
+ "/api/v1/documents/create",
+ json={
+ "format": "word",
+ "content": "# Test",
+ "conversation_id": "conv-1",
+ "template": str(rel),
+ "template_data": {"name": "test"},
+ },
+ headers={"X-API-Key": TEST_API_KEY},
+ )
+ # Should be 404 (template not found in upload_dir) or 400
+ # NOT 200 with the secret file content
+ assert resp.status_code in (404, 400), (
+ f"Path traversal succeeded! Status {resp.status_code}. "
+ f"Response: {resp.text}"
+ )
+
+ def test_create_with_template_absolute_path(
+ self, secured_client: TestClient
+ ) -> None:
+ """template='/etc/passwd' (absolute path) → rejected with 400.
+
+ FIXED: Path.resolve() + relative_to() check now prevents the resolved
+ path from escaping upload_dir. Previously, pathlib's `/` operator let
+ an absolute right operand override the left, allowing traversal.
+ """
+ resp = secured_client.post(
+ "/api/v1/documents/create",
+ json={
+ "format": "word",
+ "content": "# Test",
+ "conversation_id": "conv-1",
+ "template": "/etc/passwd",
+ "template_data": {},
+ },
+ headers={"X-API-Key": TEST_API_KEY},
+ )
+ # After fix: 400 (path traversal detected), not 500 or 200
+ assert resp.status_code == 400, (
+ f"Path traversal should be rejected with 400, got {resp.status_code}. "
+ f"Response: {resp.text}"
+ )
+ assert "traversal" in resp.json()["detail"].lower()
+
+ def test_create_with_template_null_byte(
+ self, secured_client: TestClient
+ ) -> None:
+ """template with null byte should be rejected (not truncate to bypass)."""
+ resp = secured_client.post(
+ "/api/v1/documents/create",
+ json={
+ "format": "word",
+ "content": "# Test",
+ "conversation_id": "conv-1",
+ "template": "file.docx\x00../../etc/passwd",
+ "template_data": {},
+ },
+ headers={"X-API-Key": TEST_API_KEY},
+ )
+ # After fix: 400 (invalid characters detected), not 200
+ assert resp.status_code == 400, (
+ f"Null byte should be rejected with 400, got {resp.status_code}"
+ )
+
+
+# ---------------------------------------------------------------------------
+# Deep SSTI tests (R26)
+# ---------------------------------------------------------------------------
+
+
+class TestDeepSSTI:
+ """Verify SandboxedEnvironment blocks advanced SSTI payloads."""
+
+ @pytest.fixture
+ def renderer(self) -> TemplateRenderer:
+ return TemplateRenderer()
+
+ @pytest.fixture
+ def template_file(self, tmp_path: Path) -> Path:
+ """Create a .docx template with a placeholder."""
+ doc = DocxDocument()
+ doc.add_paragraph("{{payload}}")
+ path = tmp_path / "ssti_template.docx"
+ doc.save(str(path))
+ return path
+
+ def _render_and_get_text(self, renderer: TemplateRenderer, template_path: Path, data: dict, output_path: Path) -> str:
+ """Render template and extract text from output."""
+ renderer.render_template(template_path, data, output_path)
+ doc = DocxDocument(str(output_path))
+ return "\n".join(p.text for p in doc.paragraphs)
+
+ def test_ssti_class_subclasses(
+ self, renderer: TemplateRenderer, template_file: Path, tmp_path: Path
+ ) -> None:
+ """{{ ''.__class__.__mro__[1].__subclasses__() }} should be blocked."""
+ # Recreate template with SSTI payload
+ doc = DocxDocument()
+ doc.add_paragraph("{{ ''.__class__.__mro__[1].__subclasses__() }}")
+ doc.save(str(template_file))
+
+ output = tmp_path / "output.docx"
+ text = self._render_and_get_text(renderer, template_file, {}, output)
+ # Should NOT contain subclass list (would expose available classes)
+ assert "subclasses" not in text.lower() or "type" not in text.lower()
+ # Should NOT contain class names like 'wrap_close', 'Popen', etc.
+ assert "Popen" not in text
+ assert "wrap_close" not in text
+
+ def test_ssti_config_access(
+ self, renderer: TemplateRenderer, template_file: Path, tmp_path: Path
+ ) -> None:
+ """{{ config }} should not leak server configuration."""
+ doc = DocxDocument()
+ doc.add_paragraph("{{ config }}")
+ doc.save(str(template_file))
+
+ output = tmp_path / "output.docx"
+ text = self._render_and_get_text(renderer, template_file, {}, output)
+ # config is undefined in sandbox → renders empty or Undefined
+ assert "api_key" not in text.lower()
+ assert "secret" not in text.lower()
+
+ def test_ssti_globals_access(
+ self, renderer: TemplateRenderer, template_file: Path, tmp_path: Path
+ ) -> None:
+ """{{ namespace.__init__.__globals__ }} should be blocked."""
+ doc = DocxDocument()
+ doc.add_paragraph("{{ namespace.__init__.__globals__ }}")
+ doc.save(str(template_file))
+
+ output = tmp_path / "output.docx"
+ text = self._render_and_get_text(renderer, template_file, {}, output)
+ # Should not expose globals
+ assert "__builtins__" not in text
+ assert "import" not in text.lower()
+
+ def test_ssti_import_statement(
+ self, renderer: TemplateRenderer, template_file: Path, tmp_path: Path
+ ) -> None:
+ """{% import os %} should be blocked by sandbox."""
+ doc = DocxDocument()
+ doc.add_paragraph("{% import os %}{{ os.popen('id').read() }}")
+ doc.save(str(template_file))
+
+ output = tmp_path / "output.docx"
+ # Should raise an exception (import not allowed in sandbox)
+ with pytest.raises(Exception):
+ self._render_and_get_text(renderer, template_file, {}, output)
diff --git a/tests/tools/test_document_tool.py b/tests/tools/test_document_tool.py
new file mode 100644
index 0000000..64ed147
--- /dev/null
+++ b/tests/tools/test_document_tool.py
@@ -0,0 +1,403 @@
+"""Tests for DocumentTool — Agent tool wrapper (U6 create + U9 read)."""
+
+from __future__ import annotations
+
+from pathlib import Path
+
+import pytest
+
+from agentkit.documents.db import init_documents_db
+from agentkit.documents.renderers.excel_renderer import ExcelRenderer
+from agentkit.documents.renderers.pdf_renderer import PDFRenderer
+from agentkit.documents.renderers.word_renderer import WordRenderer
+from agentkit.documents.service import DocumentService
+from agentkit.memory.document_loader import DocumentLoader
+from agentkit.tools.document_tool import DocumentTool
+
+
+@pytest.fixture
+def service(tmp_path: Path) -> DocumentService:
+ """Provide a DocumentService with all renderers registered."""
+ db_path = tmp_path / "test.db"
+ upload_dir = tmp_path / "uploads"
+ import asyncio
+ asyncio.run(init_documents_db(db_path))
+
+ svc = DocumentService(upload_dir=upload_dir, db_path=db_path)
+ svc.register_renderer("word", WordRenderer())
+ svc.register_renderer("excel", ExcelRenderer())
+ svc.register_renderer("pdf", PDFRenderer())
+ # TemplateRenderer is used via render_template, not render — but we
+ # register it under "word" so DocumentService can dispatch to it.
+ # Actually, DocumentService uses the same renderer for both paths:
+ # _render_content calls render(), _render_template calls render_template().
+ # WordRenderer doesn't have render_template, so we need a separate
+ # renderer for the template path. For U6 tests, we register a
+ # TemplateRenderer as a second renderer that DocumentService can use
+ # when template_path is provided.
+ # ponytail: DocumentService._render_template calls renderer.render_template,
+ # so we need the renderer to have that method. We register TemplateRenderer
+ # as the word renderer when template filling is needed. For simplicity,
+ # we use a composite approach: register WordRenderer for content rendering
+ # and handle template separately. But the current service design uses
+ # one renderer per format. Let's just test create without template here.
+ return svc
+
+
+@pytest.fixture
+def tool(service: DocumentService) -> DocumentTool:
+ return DocumentTool(service=service)
+
+
+# ---------------------------------------------------------------------------
+# create action — word
+# ---------------------------------------------------------------------------
+
+
+async def test_create_word(tool: DocumentTool) -> None:
+ """format=word creates a .docx and returns success + document metadata."""
+ result = await tool.execute(
+ format="word",
+ content="# Test Report\n\nThis is a test paragraph.\n",
+ conversation_id="conv-1",
+ )
+ assert result["success"] is True
+ assert result["document"]["format"] == "word"
+ assert result["document"]["filename"].endswith(".docx")
+ assert result["document"]["size"] > 0
+ assert result["document"]["conversation_id"] == "conv-1"
+ assert result["document"]["id"] # UUID is set
+
+
+async def test_create_excel(tool: DocumentTool) -> None:
+ """format=excel creates a .xlsx from JSON input."""
+ result = await tool.execute(
+ format="excel",
+ content='{"Data": [["A", "B"], ["1", "2"]]}',
+ conversation_id="conv-1",
+ )
+ assert result["success"] is True
+ assert result["document"]["format"] == "excel"
+ assert result["document"]["filename"].endswith(".xlsx")
+
+
+async def test_create_pdf(tool: DocumentTool) -> None:
+ """format=pdf creates a .pdf from Markdown."""
+ result = await tool.execute(
+ format="pdf",
+ content="# PDF Title\n\nParagraph text.\n",
+ conversation_id="conv-1",
+ )
+ assert result["success"] is True
+ assert result["document"]["format"] == "pdf"
+ assert result["document"]["filename"].endswith(".pdf")
+
+
+async def test_create_with_filename(tool: DocumentTool) -> None:
+ """Custom filename is used in the document metadata."""
+ result = await tool.execute(
+ format="word",
+ content="# Test",
+ conversation_id="conv-1",
+ filename="my-report.docx",
+ )
+ assert result["success"] is True
+ assert result["document"]["filename"] == "my-report.docx"
+
+
+# ---------------------------------------------------------------------------
+# error paths
+# ---------------------------------------------------------------------------
+
+
+async def test_missing_format(tool: DocumentTool) -> None:
+ """Missing format returns success=False."""
+ result = await tool.execute(
+ content="# Test",
+ conversation_id="conv-1",
+ )
+ assert result["success"] is False
+ assert "format" in result["error"]
+
+
+async def test_missing_conversation_id(tool: DocumentTool) -> None:
+ """Missing conversation_id returns success=False."""
+ result = await tool.execute(
+ format="word",
+ content="# Test",
+ )
+ assert result["success"] is False
+ assert "conversation_id" in result["error"]
+
+
+async def test_missing_content(tool: DocumentTool) -> None:
+ """Missing content returns success=False."""
+ result = await tool.execute(
+ format="word",
+ content="",
+ conversation_id="conv-1",
+ )
+ assert result["success"] is False
+ assert "content" in result["error"]
+
+
+async def test_invalid_format(tool: DocumentTool) -> None:
+ """Unsupported format returns success=False."""
+ result = await tool.execute(
+ format="pptx",
+ content="# Test",
+ conversation_id="conv-1",
+ )
+ assert result["success"] is False
+
+
+# ---------------------------------------------------------------------------
+# tool registration
+# ---------------------------------------------------------------------------
+
+
+def test_tool_name_and_schema(tool: DocumentTool) -> None:
+ """Tool has correct name and input_schema."""
+ assert tool.name == "document"
+ schema = tool.input_schema
+ assert schema["type"] == "object"
+ assert "action" in schema["properties"]
+ assert "format" in schema["properties"]
+ assert "content" in schema["properties"]
+ assert "conversation_id" in schema["properties"]
+ assert "filename" in schema["properties"]
+ # U9: conversation_id is the only hard-required field; action defaults to "create"
+ assert "conversation_id" in schema["required"]
+ assert schema["properties"]["action"]["enum"] == ["create", "read"]
+
+
+async def test_created_document_persisted(tool: DocumentTool, service: DocumentService) -> None:
+ """Created document is persisted and retrievable via service."""
+ result = await tool.execute(
+ format="word",
+ content="# Persisted",
+ conversation_id="conv-persist",
+ )
+ assert result["success"] is True
+ doc_id = result["document"]["id"]
+
+ # Retrieve via service
+ docs = await service.get_conversation_documents("conv-persist")
+ assert len(docs) == 1
+ assert docs[0].id == doc_id
+
+ # Retrieve single doc
+ doc = await service.get_document(doc_id)
+ assert doc is not None
+ assert doc.filename == result["document"]["filename"]
+
+
+# ---------------------------------------------------------------------------
+# read action (U9)
+# ---------------------------------------------------------------------------
+
+
+async def test_read_text_file(tool: DocumentTool, tmp_path: Path) -> None:
+ """action='read' extracts text from a .txt file."""
+ f = tmp_path / "notes.txt"
+ f.write_text("Hello world\nLine two", encoding="utf-8")
+
+ result = await tool.execute(action="read", filename=str(f), conversation_id="conv-1")
+ assert result["success"] is True
+ assert "Hello world" in result["content"]
+ assert result["metadata"]["format"] == "text"
+
+
+async def test_read_markdown_file(tool: DocumentTool, tmp_path: Path) -> None:
+ """action='read' extracts text from a .md file, preserving content."""
+ f = tmp_path / "doc.md"
+ f.write_text("# Title\n\nParagraph.\n", encoding="utf-8")
+
+ result = await tool.execute(action="read", filename=str(f), conversation_id="conv-1")
+ assert result["success"] is True
+ assert "# Title" in result["content"]
+ assert result["metadata"]["format"] == "markdown"
+ assert result["title"] == "Title"
+
+
+async def test_read_word_file(tool: DocumentTool, tmp_path: Path) -> None:
+ """action='read' extracts text from a .docx file created by the tool itself."""
+ # First create a docx
+ create_result = await tool.execute(
+ action="create",
+ format="word",
+ content="# Read Test\n\nContent for reading.",
+ conversation_id="conv-1",
+ filename="read-test.docx",
+ )
+ assert create_result["success"] is True
+
+ # The file is stored in service's upload_dir — find it via service
+ doc_id = create_result["document"]["id"]
+ # ponytail: use service.get_download_path to locate the file on disk
+ svc = tool._service # type: ignore[attr-defined]
+ path = svc.get_download_path(doc_id)
+ assert path is not None and path.exists()
+
+ result = await tool.execute(action="read", filename=str(path), conversation_id="conv-1")
+ assert result["success"] is True
+ assert "Read Test" in result["content"]
+ assert "Content for reading" in result["content"]
+ assert result["metadata"]["format"] == "docx"
+
+
+async def test_read_excel_file(tool: DocumentTool, tmp_path: Path) -> None:
+ """action='read' extracts text from a .xlsx file created by the tool itself."""
+ create_result = await tool.execute(
+ action="create",
+ format="excel",
+ content='{"Sheet1": [["Name", "Age"], ["Alice", "30"], ["Bob", "25"]]}',
+ conversation_id="conv-1",
+ filename="read-test.xlsx",
+ )
+ assert create_result["success"] is True
+
+ doc_id = create_result["document"]["id"]
+ svc = tool._service # type: ignore[attr-defined]
+ path = svc.get_download_path(doc_id)
+ assert path is not None and path.exists()
+
+ result = await tool.execute(action="read", filename=str(path), conversation_id="conv-1")
+ assert result["success"] is True
+ assert "Alice" in result["content"]
+ assert "Bob" in result["content"]
+ assert result["metadata"]["format"] == "xlsx"
+ assert result["metadata"]["sheet_count"] >= 1
+
+
+async def test_read_missing_file(tool: DocumentTool, tmp_path: Path) -> None:
+ """action='read' with non-existent file returns success=False."""
+ result = await tool.execute(
+ action="read",
+ filename=str(tmp_path / "nonexistent.txt"),
+ conversation_id="conv-1",
+ )
+ assert result["success"] is False
+ assert "not found" in result["error"].lower() or "no such file" in result["error"].lower()
+
+
+async def test_read_missing_filename(tool: DocumentTool) -> None:
+ """action='read' without filename returns success=False."""
+ result = await tool.execute(action="read", conversation_id="conv-1")
+ assert result["success"] is False
+ assert "filename" in result["error"].lower()
+
+
+async def test_read_uses_content_as_path_fallback(tool: DocumentTool, tmp_path: Path) -> None:
+ """action='read' falls back to 'content' as file path when filename is absent."""
+ f = tmp_path / "via-content.txt"
+ f.write_text("content-as-path", encoding="utf-8")
+
+ result = await tool.execute(
+ action="read",
+ content=str(f),
+ conversation_id="conv-1",
+ )
+ assert result["success"] is True
+ assert "content-as-path" in result["content"]
+
+
+async def test_unknown_action(tool: DocumentTool) -> None:
+ """Unknown action returns success=False."""
+ result = await tool.execute(action="delete", conversation_id="conv-1")
+ assert result["success"] is False
+ assert "unknown action" in result["error"].lower()
+
+
+async def test_create_action_explicit(tool: DocumentTool) -> None:
+ """action='create' explicitly works the same as default."""
+ result = await tool.execute(
+ action="create",
+ format="word",
+ content="# Explicit",
+ conversation_id="conv-1",
+ )
+ assert result["success"] is True
+ assert result["document"]["format"] == "word"
+
+
+# ---------------------------------------------------------------------------
+# DocumentLoader Excel support (U9)
+# ---------------------------------------------------------------------------
+
+
+def test_loader_detects_xlsx() -> None:
+ """DocumentLoader detects .xlsx and .xls as xlsx format."""
+ from agentkit.memory.document_loader import _detect_format
+
+ assert _detect_format("data.xlsx") == "xlsx"
+ assert _detect_format("data.XLS") == "xlsx"
+ assert _detect_format("data.xls") == "xlsx"
+
+
+def test_loader_parses_xlsx(tmp_path: Path) -> None:
+ """DocumentLoader._parse_xlsx extracts sheet data as Markdown table."""
+ import openpyxl
+
+ f = tmp_path / "test.xlsx"
+ wb = openpyxl.Workbook()
+ ws = wb.active
+ ws.title = "Data"
+ ws.append(["Name", "Age"])
+ ws.append(["Alice", 30])
+ ws.append(["Bob", 25])
+ wb.save(f)
+ wb.close()
+
+ loader = DocumentLoader()
+ doc = loader.load(f)
+ assert "Alice" in doc.content
+ assert "Bob" in doc.content
+ assert "Name" in doc.content
+ assert doc.metadata["format"] == "xlsx"
+ assert doc.metadata["sheet_count"] == 1
+ assert doc.metadata["row_count"] == 3
+ # Markdown table separator should be present
+ assert "---" in doc.content
+
+
+def test_loader_parses_xlsx_multiple_sheets(tmp_path: Path) -> None:
+ """DocumentLoader handles multiple sheets, each as a separate H2 section."""
+ import openpyxl
+
+ f = tmp_path / "multi.xlsx"
+ wb = openpyxl.Workbook()
+ ws1 = wb.active
+ ws1.title = "Sheet1"
+ ws1.append(["A", "B"])
+ ws1.append(["1", "2"])
+ ws2 = wb.create_sheet("Sheet2")
+ ws2.append(["C", "D"])
+ ws2.append(["3", "4"])
+ wb.save(f)
+ wb.close()
+
+ loader = DocumentLoader()
+ doc = loader.load(f)
+ assert "## Sheet1" in doc.content
+ assert "## Sheet2" in doc.content
+ assert doc.metadata["sheet_count"] == 2
+
+
+def test_loader_parses_xlsx_empty_cells(tmp_path: Path) -> None:
+ """DocumentLoader handles empty cells gracefully (renders as empty string)."""
+ import openpyxl
+
+ f = tmp_path / "empty.xlsx"
+ wb = openpyxl.Workbook()
+ ws = wb.active
+ ws.append(["A", "B", "C"])
+ ws.append(["x", None, "z"])
+ wb.save(f)
+ wb.close()
+
+ loader = DocumentLoader()
+ doc = loader.load(f)
+ # Empty cell should not crash; row should still have 3 columns
+ assert "x" in doc.content
+ assert "z" in doc.content
diff --git a/tests/unit/memory/test_document_loader.py b/tests/unit/memory/test_document_loader.py
index bff89c9..73964a9 100644
--- a/tests/unit/memory/test_document_loader.py
+++ b/tests/unit/memory/test_document_loader.py
@@ -1,8 +1,15 @@
"""DocumentLoader 单元测试 - 多格式文档解析器"""
+import io
+
import pytest
-from agentkit.memory.document_loader import Document, DocumentLoader, _detect_format
+from agentkit.memory.document_loader import (
+ MAX_ROWS_PER_SHEET,
+ Document,
+ DocumentLoader,
+ _detect_format,
+)
class TestDetectFormat:
@@ -225,3 +232,184 @@ class TestDocumentLoaderEdgeCases:
content = "Test content".encode("utf-8")
doc = loader.load_bytes(content, "reports/2024/summary.md")
assert doc.metadata["format"] == "markdown"
+
+
+class TestDocumentLoaderXlsx:
+ """Excel 解析边界情况测试 (#16)
+
+ 覆盖 _parse_xlsx 的关键路径:空工作簿、损坏字节、列数不齐、
+ 行截断、单元格截断、文件大小限制。
+ """
+
+ @staticmethod
+ def _make_xlsx_bytes(sheet_name: str = "Sheet1", rows: list[list] | None = None) -> bytes:
+ """构造内存中的 xlsx 字节内容。"""
+ from openpyxl import Workbook
+
+ wb = Workbook()
+ ws = wb.active
+ ws.title = sheet_name
+ for row in rows or []:
+ ws.append(row)
+ buf = io.BytesIO()
+ wb.save(buf)
+ return buf.getvalue()
+
+ def test_empty_workbook_falls_back_to_text(self):
+ """空工作簿(无任何行)应返回空内容,不报错。"""
+ loader = DocumentLoader()
+ content = self._make_xlsx_bytes(rows=[])
+ doc = loader.load_bytes(content, "empty.xlsx")
+
+ assert doc.metadata["format"] == "xlsx"
+ # 空工作簿:sections 为空,text 为空字符串
+ if doc.metadata.get("parser") == "openpyxl":
+ assert doc.content == ""
+ assert doc.metadata["row_count"] == 0
+ assert doc.metadata["sheet_count"] == 1
+
+ def test_malformed_bytes_falls_back_to_text(self):
+ """损坏的字节内容应回退到文本解析,不抛异常。"""
+ loader = DocumentLoader()
+ # 不是合法的 zip/xlsx 字节
+ content = b"not a real xlsx file content"
+ doc = loader.load_bytes(content, "broken.xlsx")
+
+ assert doc.metadata["format"] == "xlsx"
+ # 应回退到 text parser
+ assert doc.metadata["parser"] == "text"
+ assert isinstance(doc, Document)
+
+ def test_column_mismatch_produces_valid_markdown_table(self):
+ """行内单元格数不一致时,应填充到 max_cols 保证 Markdown 表格有效。"""
+ loader = DocumentLoader()
+ # 第一行 3 列,第二行 2 列,第三行 4 列
+ rows = [
+ ["A1", "B1", "C1"],
+ ["A2", "B2"],
+ ["A3", "B3", "C3", "D3"],
+ ]
+ content = self._make_xlsx_bytes(rows=rows)
+ doc = loader.load_bytes(content, "ragged.xlsx")
+
+ if doc.metadata.get("parser") != "openpyxl":
+ pytest.skip("openpyxl not available")
+
+ lines = doc.content.split("\n")
+ # 第一行是 "## Sheet1",然后是表头、分隔符、数据行
+ # 找到表格行(以 | 开头)
+ table_lines = [ln for ln in lines if ln.startswith("|")]
+ assert len(table_lines) == 4 # 1 header + 1 separator + 2 data rows
+
+ # 所有表格行应有相同的列数(4 列 = max_cols)
+ for line in table_lines:
+ # | a | b | c | d | -> 5 个 | 分隔符表示 4 列
+ assert line.count("|") == 5
+
+ # 分隔符行应为 | --- | --- | --- | --- |
+ sep_line = table_lines[1]
+ assert sep_line.count("---") == 4
+
+ def test_row_truncation_at_max_rows(self):
+ """行数超过 MAX_ROWS_PER_SHEET 时应截断并标记 truncated。"""
+ loader = DocumentLoader()
+ # 构造超过上限的行数(使用小批量验证逻辑)
+ # ponytail: 直接构造超大工作簿太慢,用 monkeypatch 临时调小上限
+ original_max = MAX_ROWS_PER_SHEET
+ import agentkit.memory.document_loader as dl_module
+
+ # 临时调小上限到 5 行
+ dl_module.MAX_ROWS_PER_SHEET = 5
+ try:
+ rows = [[f"r{i}", f"v{i}"] for i in range(20)]
+ content = self._make_xlsx_bytes(rows=rows)
+ doc = loader.load_bytes(content, "big.xlsx")
+
+ if doc.metadata.get("parser") != "openpyxl":
+ pytest.skip("openpyxl not available")
+
+ assert doc.metadata["truncated"] is True
+ assert doc.metadata["row_count"] == 5
+ assert f"truncated at 5 rows" in doc.content
+ finally:
+ dl_module.MAX_ROWS_PER_SHEET = original_max
+
+ def test_cell_truncation_at_max_chars(self):
+ """单元格内容超过 MAX_CELL_CHARS 时应截断。"""
+ loader = DocumentLoader()
+ import agentkit.memory.document_loader as dl_module
+
+ original_max = dl_module.MAX_CELL_CHARS
+ dl_module.MAX_CELL_CHARS = 10
+ try:
+ long_text = "X" * 100
+ content = self._make_xlsx_bytes(rows=[["header"], [long_text]])
+ doc = loader.load_bytes(content, "longcell.xlsx")
+
+ if doc.metadata.get("parser") != "openpyxl":
+ pytest.skip("openpyxl not available")
+
+ # 单元格内容应被截断到 10 字符
+ assert "XXXXXXXXXX" in doc.content
+ # 不应包含完整的 100 字符
+ assert "X" * 100 not in doc.content
+ finally:
+ dl_module.MAX_CELL_CHARS = original_max
+
+ def test_multiple_sheets_separated_by_h2(self):
+ """多个 sheet 应以 H2 标题分隔。"""
+ loader = DocumentLoader()
+ from openpyxl import Workbook
+
+ wb = Workbook()
+ ws1 = wb.active
+ ws1.title = "First"
+ ws1.append(["a", "b"])
+ ws2 = wb.create_sheet("Second")
+ ws2.append(["c", "d"])
+ buf = io.BytesIO()
+ wb.save(buf)
+ content = buf.getvalue()
+
+ doc = loader.load_bytes(content, "multi.xlsx")
+
+ if doc.metadata.get("parser") != "openpyxl":
+ pytest.skip("openpyxl not available")
+
+ assert doc.metadata["sheet_count"] == 2
+ assert "## First" in doc.content
+ assert "## Second" in doc.content
+
+ def test_file_size_limit_raises_value_error(self):
+ """内容超过 MAX_CONTENT_SIZE 应抛出 ValueError。"""
+ loader = DocumentLoader()
+ # 构造超过上限的字节(不实际分配 MAX_CONTENT_SIZE+1 字节,用 monkeypatch)
+ import agentkit.memory.document_loader as dl_module
+
+ original_max = dl_module.MAX_CONTENT_SIZE
+ dl_module.MAX_CONTENT_SIZE = 10
+ try:
+ content = b"X" * 100 # 100 > 10
+ with pytest.raises(ValueError, match="exceeds limit"):
+ loader.load_bytes(content, "big.xlsx")
+ finally:
+ dl_module.MAX_CONTENT_SIZE = original_max
+
+ def test_none_cell_values_become_empty_strings(self):
+ """None 单元格应转为空字符串,不是 'None' 文本。"""
+ loader = DocumentLoader()
+ # openpyxl 中空单元格以 None 表示
+ rows = [
+ ["header1", "header2", "header3"],
+ ["a", None, "c"],
+ ]
+ content = self._make_xlsx_bytes(rows=rows)
+ doc = loader.load_bytes(content, "none_cells.xlsx")
+
+ if doc.metadata.get("parser") != "openpyxl":
+ pytest.skip("openpyxl not available")
+
+ # 确保没有 "None" 字符串出现在表格中
+ table_lines = [ln for ln in doc.content.split("\n") if ln.startswith("|")]
+ for line in table_lines:
+ assert "None" not in line