"""Tests for document DB persistence and DocumentService metadata operations. Covers U1: DocumentService core architecture + database model. Renderer-specific tests live in test_word_renderer.py etc. """ from __future__ import annotations import asyncio from pathlib import Path import pytest from agentkit.documents.db import ( delete_document, get_conversation_documents, get_document_by_id, init_documents_db, insert_document, ) from agentkit.documents.models import DocumentMeta from agentkit.documents.service import DocumentService, _sanitize_filename # --------------------------------------------------------------------------- # Fixtures # --------------------------------------------------------------------------- @pytest.fixture def tmp_db(tmp_path: Path) -> Path: """Provide a fresh documents DB for each test.""" db_path = tmp_path / "test_documents.db" asyncio.run(init_documents_db(db_path)) return db_path def _make_meta( doc_id: str = "test-id-1", filename: str = "report.docx", conversation_id: str = "conv-1", format: str = "word", created_at: str = "2026-06-23T00:00:00+00:00", ) -> DocumentMeta: return DocumentMeta( id=doc_id, filename=filename, stored_name=f"{doc_id}.docx", format=format, size=1024, conversation_id=conversation_id, created_at=created_at, ) # --------------------------------------------------------------------------- # init_documents_db # --------------------------------------------------------------------------- async def test_init_db_idempotent(tmp_path: Path) -> None: """init_documents_db called twice should not raise.""" db_path = tmp_path / "test.db" await init_documents_db(db_path) await init_documents_db(db_path) # second call is a no-op assert db_path.exists() async def test_init_db_creates_parent_dir(tmp_path: Path) -> None: """init_documents_db creates parent directories if missing.""" db_path = tmp_path / "nested" / "deep" / "test.db" await init_documents_db(db_path) assert db_path.exists() # --------------------------------------------------------------------------- # insert + query # --------------------------------------------------------------------------- async def test_insert_and_get_by_id(tmp_db: Path) -> None: """Inserted document is retrievable by id.""" meta = _make_meta() await insert_document(meta, tmp_db) result = await get_document_by_id("test-id-1", tmp_db) assert result is not None assert result.id == "test-id-1" assert result.filename == "report.docx" assert result.format == "word" assert result.size == 1024 assert result.conversation_id == "conv-1" async def test_get_by_id_not_found(tmp_db: Path) -> None: """Non-existent id returns None.""" result = await get_document_by_id("does-not-exist", tmp_db) assert result is None async def test_get_conversation_documents(tmp_db: Path) -> None: """Multiple documents for a conversation are returned newest-first.""" meta1 = _make_meta(doc_id="doc-1", created_at="2026-06-23T10:00:00+00:00") meta2 = _make_meta(doc_id="doc-2", created_at="2026-06-23T11:00:00+00:00") meta3 = _make_meta( doc_id="doc-3", conversation_id="conv-2", created_at="2026-06-23T12:00:00+00:00" ) await insert_document(meta1, tmp_db) await insert_document(meta2, tmp_db) await insert_document(meta3, tmp_db) conv1_docs = await get_conversation_documents("conv-1", tmp_db) assert len(conv1_docs) == 2 # Newest first assert conv1_docs[0].id == "doc-2" assert conv1_docs[1].id == "doc-1" conv2_docs = await get_conversation_documents("conv-2", tmp_db) assert len(conv2_docs) == 1 assert conv2_docs[0].id == "doc-3" async def test_get_conversation_documents_empty(tmp_db: Path) -> None: """Non-existent conversation_id returns empty list.""" result = await get_conversation_documents("no-such-conv", tmp_db) assert result == [] # --------------------------------------------------------------------------- # delete # --------------------------------------------------------------------------- async def test_delete_document(tmp_db: Path) -> None: """Delete removes the row and returns True; second delete returns False.""" meta = _make_meta() await insert_document(meta, tmp_db) deleted = await delete_document("test-id-1", tmp_db) assert deleted is True # Second delete is a no-op deleted_again = await delete_document("test-id-1", tmp_db) assert deleted_again is False # Row is gone result = await get_document_by_id("test-id-1", tmp_db) assert result is None # --------------------------------------------------------------------------- # _sanitize_filename (path traversal protection) # --------------------------------------------------------------------------- def test_sanitize_filename_removes_path_separators() -> None: """Path traversal characters are stripped — no '/' or '\\' survives.""" # The sanitizer replaces path separators with '_' then keeps alnum + . _ - # Key security property: no '/' or '\\' remains, so path traversal is blocked. result1 = _sanitize_filename("../../etc/passwd") assert "/" not in result1 assert "\\" not in result1 assert "passwd" in result1 result2 = _sanitize_filename("..\\..\\windows\\system32") assert "/" not in result2 assert "\\" not in result2 assert "system32" in result2 # Normal filenames are preserved assert _sanitize_filename("safe-name_v1.0.txt") == "safe-name_v1.0.txt" def test_sanitize_filename_empty() -> None: """Empty input returns empty string; separator-only input is neutralized.""" assert _sanitize_filename("") == "" # Separator-only input becomes underscores — no path traversal possible. result = _sanitize_filename("///") assert "/" not in result assert "\\" not in result # --------------------------------------------------------------------------- # DocumentService (metadata + download path, no rendering in U1) # --------------------------------------------------------------------------- async def test_service_get_download_path(tmp_path: Path) -> None: """get_download_path finds the file on disk by trying known extensions.""" db_path = tmp_path / "test.db" upload_dir = tmp_path / "uploads" await init_documents_db(db_path) service = DocumentService(upload_dir=upload_dir, db_path=db_path) # Create a fake file on disk doc_id = "abc123" fake_file = upload_dir / f"{doc_id}.docx" upload_dir.mkdir(parents=True, exist_ok=True) fake_file.write_bytes(b"fake docx content") path = service.get_download_path(doc_id) assert path is not None assert path.name == f"{doc_id}.docx" async def test_service_get_download_path_not_found(tmp_path: Path) -> None: """get_download_path returns None when no file exists.""" db_path = tmp_path / "test.db" upload_dir = tmp_path / "uploads" await init_documents_db(db_path) service = DocumentService(upload_dir=upload_dir, db_path=db_path) path = service.get_download_path("nonexistent-id") assert path is None async def test_service_create_without_renderer_raises(tmp_path: Path) -> None: """create_document raises ValueError when no renderer is registered.""" db_path = tmp_path / "test.db" upload_dir = tmp_path / "uploads" await init_documents_db(db_path) service = DocumentService(upload_dir=upload_dir, db_path=db_path) with pytest.raises(ValueError, match="No renderer registered"): await service.create_document( format="word", content="# Test", conversation_id="conv-1" ) async def test_service_create_unsupported_format_raises(tmp_path: Path) -> None: """create_document raises ValueError for unsupported format.""" db_path = tmp_path / "test.db" await init_documents_db(db_path) service = DocumentService(upload_dir=tmp_path / "uploads", db_path=db_path) with pytest.raises(ValueError, match="Unsupported format"): await service.create_document( format="pptx", content="# Test", conversation_id="conv-1" ) async def test_service_get_conversation_documents(tmp_path: Path) -> None: """DocumentService.get_conversation_documents delegates to db module.""" db_path = tmp_path / "test.db" await init_documents_db(db_path) meta = _make_meta() await insert_document(meta, db_path) service = DocumentService(upload_dir=tmp_path / "uploads", db_path=db_path) docs = await service.get_conversation_documents("conv-1") assert len(docs) == 1 assert docs[0].id == "test-id-1"