"""Security tests for /api/v1/documents routes (R26-R28, path traversal, SSTI). These tests verify: - R27: Authentication (API key required when configured) - Path traversal protection in template field - Deep SSTI protection in template rendering """ from __future__ import annotations import asyncio from pathlib import Path from types import SimpleNamespace import pytest from docx import Document as DocxDocument from fastapi import FastAPI from fastapi.testclient import TestClient from agentkit.documents.db import init_documents_db from agentkit.documents.renderers.excel_renderer import ExcelRenderer from agentkit.documents.renderers.pdf_renderer import PDFRenderer from agentkit.documents.renderers.template_renderer import TemplateRenderer from agentkit.documents.renderers.word_renderer import WordRenderer from agentkit.documents.service import DocumentService from agentkit.server.routes import documents as documents_routes TEST_API_KEY = "test-secret-key-12345" @pytest.fixture def secured_app(tmp_path: Path) -> FastAPI: """App with API key configured — all endpoints require auth.""" db_path = tmp_path / "test.db" upload_dir = tmp_path / "uploads" asyncio.run(init_documents_db(db_path)) service = DocumentService(upload_dir=upload_dir, db_path=db_path) service.register_renderer("word", WordRenderer()) service.register_renderer("excel", ExcelRenderer()) service.register_renderer("pdf", PDFRenderer()) app = FastAPI() app.state.document_service = service # Configure API key — now all endpoints require auth app.state.server_config = SimpleNamespace(api_key=TEST_API_KEY) app.include_router(documents_routes.router, prefix="/api/v1") return app @pytest.fixture def secured_client(secured_app: FastAPI) -> TestClient: return TestClient(secured_app) @pytest.fixture def open_app(tmp_path: Path) -> FastAPI: """App with no API key configured — allows all (backwards compat).""" db_path = tmp_path / "test.db" upload_dir = tmp_path / "uploads" asyncio.run(init_documents_db(db_path)) service = DocumentService(upload_dir=upload_dir, db_path=db_path) service.register_renderer("word", WordRenderer()) service.register_renderer("excel", ExcelRenderer()) service.register_renderer("pdf", PDFRenderer()) app = FastAPI() app.state.document_service = service app.state.server_config = None # No key → allow all app.include_router(documents_routes.router, prefix="/api/v1") return app # --------------------------------------------------------------------------- # R27: Authentication tests # --------------------------------------------------------------------------- class TestAuthentication: """Verify API key authentication on all document endpoints.""" _CREATE_BODY = { "format": "word", "content": "# Test", "conversation_id": "conv-1", } def test_create_without_api_key_returns_401(self, secured_client: TestClient) -> None: """POST /create without API key → 401.""" resp = secured_client.post("/api/v1/documents/create", json=self._CREATE_BODY) assert resp.status_code == 401 assert "API key" in resp.json()["detail"] def test_create_with_wrong_api_key_returns_401(self, secured_client: TestClient) -> None: """POST /create with wrong API key → 401.""" resp = secured_client.post( "/api/v1/documents/create", json=self._CREATE_BODY, headers={"X-API-Key": "wrong-key"}, ) assert resp.status_code == 401 def test_create_with_valid_api_key_header_returns_200( self, secured_client: TestClient ) -> None: """POST /create with valid X-API-Key header → 200.""" resp = secured_client.post( "/api/v1/documents/create", json=self._CREATE_BODY, headers={"X-API-Key": TEST_API_KEY}, ) assert resp.status_code == 200 def test_create_with_valid_api_key_query_param_returns_200( self, secured_client: TestClient ) -> None: """POST /create with valid api_key query param → 200.""" resp = secured_client.post( f"/api/v1/documents/create?api_key={TEST_API_KEY}", json=self._CREATE_BODY, ) assert resp.status_code == 200 def test_download_without_api_key_returns_401(self, secured_client: TestClient) -> None: """GET /download/{id} without API key → 401.""" resp = secured_client.get("/api/v1/documents/download/some-id") assert resp.status_code == 401 def test_list_without_api_key_returns_401(self, secured_client: TestClient) -> None: """GET /conversation/{id} without API key → 401.""" resp = secured_client.get("/api/v1/documents/conversation/conv-1") assert resp.status_code == 401 def test_upload_template_without_api_key_returns_401( self, secured_client: TestClient ) -> None: """POST /upload-template without API key → 401.""" resp = secured_client.post( "/api/v1/documents/upload-template", files={"file": ("test.docx", b"fake", "application/octet-stream")}, ) assert resp.status_code == 401 def test_no_key_configured_allows_all(self, open_app: FastAPI) -> None: """When no API key is configured, all requests are allowed (backwards compat).""" client = TestClient(open_app) resp = client.post("/api/v1/documents/create", json=self._CREATE_BODY) assert resp.status_code == 200 def test_api_key_constant_time_comparison(self, secured_client: TestClient) -> None: """API key comparison uses hmac.compare_digest (timing-safe).""" # ponytail: can't directly test timing, but verify both empty and wrong keys fail resp = secured_client.post( "/api/v1/documents/create", json=self._CREATE_BODY, headers={"X-API-Key": ""}, ) assert resp.status_code == 401 # --------------------------------------------------------------------------- # Path traversal in template field # --------------------------------------------------------------------------- class TestTemplatePathTraversal: """Verify template field doesn't allow path traversal attacks. BUG CONFIRMED: documents.py line 129 does: template_path = str(service.upload_dir / body.template) If body.template is "../../etc/passwd", this resolves outside upload_dir. The Path.exists() check passes if the file exists, allowing arbitrary file read. """ def test_create_with_template_path_traversal( self, secured_client: TestClient, tmp_path: Path ) -> None: """template='../../etc/passwd' should NOT read files outside upload_dir.""" # Create a file outside upload_dir to simulate the target secret_file = tmp_path / "secret.txt" secret_file.write_text("SECRET_CONTENT") # Compute relative path from upload_dir to secret_file rel = Path("..") / "secret.txt" resp = secured_client.post( "/api/v1/documents/create", json={ "format": "word", "content": "# Test", "conversation_id": "conv-1", "template": str(rel), "template_data": {"name": "test"}, }, headers={"X-API-Key": TEST_API_KEY}, ) # Should be 404 (template not found in upload_dir) or 400 # NOT 200 with the secret file content assert resp.status_code in (404, 400), ( f"Path traversal succeeded! Status {resp.status_code}. " f"Response: {resp.text}" ) def test_create_with_template_absolute_path( self, secured_client: TestClient ) -> None: """template='/etc/passwd' (absolute path) → rejected with 400. FIXED: Path.resolve() + relative_to() check now prevents the resolved path from escaping upload_dir. Previously, pathlib's `/` operator let an absolute right operand override the left, allowing traversal. """ resp = secured_client.post( "/api/v1/documents/create", json={ "format": "word", "content": "# Test", "conversation_id": "conv-1", "template": "/etc/passwd", "template_data": {}, }, headers={"X-API-Key": TEST_API_KEY}, ) # After fix: 400 (path traversal detected), not 500 or 200 assert resp.status_code == 400, ( f"Path traversal should be rejected with 400, got {resp.status_code}. " f"Response: {resp.text}" ) assert "traversal" in resp.json()["detail"].lower() def test_create_with_template_null_byte( self, secured_client: TestClient ) -> None: """template with null byte should be rejected (not truncate to bypass).""" resp = secured_client.post( "/api/v1/documents/create", json={ "format": "word", "content": "# Test", "conversation_id": "conv-1", "template": "file.docx\x00../../etc/passwd", "template_data": {}, }, headers={"X-API-Key": TEST_API_KEY}, ) # After fix: 400 (invalid characters detected), not 200 assert resp.status_code == 400, ( f"Null byte should be rejected with 400, got {resp.status_code}" ) # --------------------------------------------------------------------------- # Deep SSTI tests (R26) # --------------------------------------------------------------------------- class TestDeepSSTI: """Verify SandboxedEnvironment blocks advanced SSTI payloads.""" @pytest.fixture def renderer(self) -> TemplateRenderer: return TemplateRenderer() @pytest.fixture def template_file(self, tmp_path: Path) -> Path: """Create a .docx template with a placeholder.""" doc = DocxDocument() doc.add_paragraph("{{payload}}") path = tmp_path / "ssti_template.docx" doc.save(str(path)) return path def _render_and_get_text(self, renderer: TemplateRenderer, template_path: Path, data: dict, output_path: Path) -> str: """Render template and extract text from output.""" renderer.render_template(template_path, data, output_path) doc = DocxDocument(str(output_path)) return "\n".join(p.text for p in doc.paragraphs) def test_ssti_class_subclasses( self, renderer: TemplateRenderer, template_file: Path, tmp_path: Path ) -> None: """{{ ''.__class__.__mro__[1].__subclasses__() }} should be blocked.""" # Recreate template with SSTI payload doc = DocxDocument() doc.add_paragraph("{{ ''.__class__.__mro__[1].__subclasses__() }}") doc.save(str(template_file)) output = tmp_path / "output.docx" text = self._render_and_get_text(renderer, template_file, {}, output) # Should NOT contain subclass list (would expose available classes) assert "subclasses" not in text.lower() or "type" not in text.lower() # Should NOT contain class names like 'wrap_close', 'Popen', etc. assert "Popen" not in text assert "wrap_close" not in text def test_ssti_config_access( self, renderer: TemplateRenderer, template_file: Path, tmp_path: Path ) -> None: """{{ config }} should not leak server configuration.""" doc = DocxDocument() doc.add_paragraph("{{ config }}") doc.save(str(template_file)) output = tmp_path / "output.docx" text = self._render_and_get_text(renderer, template_file, {}, output) # config is undefined in sandbox → renders empty or Undefined assert "api_key" not in text.lower() assert "secret" not in text.lower() def test_ssti_globals_access( self, renderer: TemplateRenderer, template_file: Path, tmp_path: Path ) -> None: """{{ namespace.__init__.__globals__ }} should be blocked.""" doc = DocxDocument() doc.add_paragraph("{{ namespace.__init__.__globals__ }}") doc.save(str(template_file)) output = tmp_path / "output.docx" text = self._render_and_get_text(renderer, template_file, {}, output) # Should not expose globals assert "__builtins__" not in text assert "import" not in text.lower() def test_ssti_import_statement( self, renderer: TemplateRenderer, template_file: Path, tmp_path: Path ) -> None: """{% import os %} should be blocked by sandbox.""" doc = DocxDocument() doc.add_paragraph("{% import os %}{{ os.popen('id').read() }}") doc.save(str(template_file)) output = tmp_path / "output.docx" # Should raise an exception (import not allowed in sandbox) with pytest.raises(Exception): self._render_and_get_text(renderer, template_file, {}, output)