337 lines
13 KiB
Python
337 lines
13 KiB
Python
"""Security tests for /api/v1/documents routes (R26-R28, path traversal, SSTI).
|
|
|
|
These tests verify:
|
|
- R27: Authentication (API key required when configured)
|
|
- Path traversal protection in template field
|
|
- Deep SSTI protection in template rendering
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from pathlib import Path
|
|
from types import SimpleNamespace
|
|
|
|
import pytest
|
|
from docx import Document as DocxDocument
|
|
from fastapi import FastAPI
|
|
from fastapi.testclient import TestClient
|
|
|
|
from agentkit.documents.db import init_documents_db
|
|
from agentkit.documents.renderers.excel_renderer import ExcelRenderer
|
|
from agentkit.documents.renderers.pdf_renderer import PDFRenderer
|
|
from agentkit.documents.renderers.template_renderer import TemplateRenderer
|
|
from agentkit.documents.renderers.word_renderer import WordRenderer
|
|
from agentkit.documents.service import DocumentService
|
|
from agentkit.server.routes import documents as documents_routes
|
|
|
|
TEST_API_KEY = "test-secret-key-12345"
|
|
|
|
|
|
@pytest.fixture
|
|
def secured_app(tmp_path: Path) -> FastAPI:
|
|
"""App with API key configured — all endpoints require auth."""
|
|
db_path = tmp_path / "test.db"
|
|
upload_dir = tmp_path / "uploads"
|
|
asyncio.run(init_documents_db(db_path))
|
|
|
|
service = DocumentService(upload_dir=upload_dir, db_path=db_path)
|
|
service.register_renderer("word", WordRenderer())
|
|
service.register_renderer("excel", ExcelRenderer())
|
|
service.register_renderer("pdf", PDFRenderer())
|
|
|
|
app = FastAPI()
|
|
app.state.document_service = service
|
|
# Configure API key — now all endpoints require auth
|
|
app.state.server_config = SimpleNamespace(api_key=TEST_API_KEY)
|
|
app.include_router(documents_routes.router, prefix="/api/v1")
|
|
return app
|
|
|
|
|
|
@pytest.fixture
|
|
def secured_client(secured_app: FastAPI) -> TestClient:
|
|
return TestClient(secured_app)
|
|
|
|
|
|
@pytest.fixture
|
|
def open_app(tmp_path: Path) -> FastAPI:
|
|
"""App with no API key configured — allows all (backwards compat)."""
|
|
db_path = tmp_path / "test.db"
|
|
upload_dir = tmp_path / "uploads"
|
|
asyncio.run(init_documents_db(db_path))
|
|
|
|
service = DocumentService(upload_dir=upload_dir, db_path=db_path)
|
|
service.register_renderer("word", WordRenderer())
|
|
service.register_renderer("excel", ExcelRenderer())
|
|
service.register_renderer("pdf", PDFRenderer())
|
|
|
|
app = FastAPI()
|
|
app.state.document_service = service
|
|
app.state.server_config = None # No key → allow all
|
|
app.include_router(documents_routes.router, prefix="/api/v1")
|
|
return app
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# R27: Authentication tests
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestAuthentication:
|
|
"""Verify API key authentication on all document endpoints."""
|
|
|
|
_CREATE_BODY = {
|
|
"format": "word",
|
|
"content": "# Test",
|
|
"conversation_id": "conv-1",
|
|
}
|
|
|
|
def test_create_without_api_key_returns_401(self, secured_client: TestClient) -> None:
|
|
"""POST /create without API key → 401."""
|
|
resp = secured_client.post("/api/v1/documents/create", json=self._CREATE_BODY)
|
|
assert resp.status_code == 401
|
|
assert "API key" in resp.json()["detail"]
|
|
|
|
def test_create_with_wrong_api_key_returns_401(self, secured_client: TestClient) -> None:
|
|
"""POST /create with wrong API key → 401."""
|
|
resp = secured_client.post(
|
|
"/api/v1/documents/create",
|
|
json=self._CREATE_BODY,
|
|
headers={"X-API-Key": "wrong-key"},
|
|
)
|
|
assert resp.status_code == 401
|
|
|
|
def test_create_with_valid_api_key_header_returns_200(
|
|
self, secured_client: TestClient
|
|
) -> None:
|
|
"""POST /create with valid X-API-Key header → 200."""
|
|
resp = secured_client.post(
|
|
"/api/v1/documents/create",
|
|
json=self._CREATE_BODY,
|
|
headers={"X-API-Key": TEST_API_KEY},
|
|
)
|
|
assert resp.status_code == 200
|
|
|
|
def test_create_with_valid_api_key_query_param_returns_200(
|
|
self, secured_client: TestClient
|
|
) -> None:
|
|
"""POST /create with valid api_key query param → 200."""
|
|
resp = secured_client.post(
|
|
f"/api/v1/documents/create?api_key={TEST_API_KEY}",
|
|
json=self._CREATE_BODY,
|
|
)
|
|
assert resp.status_code == 200
|
|
|
|
def test_download_without_api_key_returns_401(self, secured_client: TestClient) -> None:
|
|
"""GET /download/{id} without API key → 401."""
|
|
resp = secured_client.get("/api/v1/documents/download/some-id")
|
|
assert resp.status_code == 401
|
|
|
|
def test_list_without_api_key_returns_401(self, secured_client: TestClient) -> None:
|
|
"""GET /conversation/{id} without API key → 401."""
|
|
resp = secured_client.get("/api/v1/documents/conversation/conv-1")
|
|
assert resp.status_code == 401
|
|
|
|
def test_upload_template_without_api_key_returns_401(
|
|
self, secured_client: TestClient
|
|
) -> None:
|
|
"""POST /upload-template without API key → 401."""
|
|
resp = secured_client.post(
|
|
"/api/v1/documents/upload-template",
|
|
files={"file": ("test.docx", b"fake", "application/octet-stream")},
|
|
)
|
|
assert resp.status_code == 401
|
|
|
|
def test_no_key_configured_allows_all(self, open_app: FastAPI) -> None:
|
|
"""When no API key is configured, all requests are allowed (backwards compat)."""
|
|
client = TestClient(open_app)
|
|
resp = client.post("/api/v1/documents/create", json=self._CREATE_BODY)
|
|
assert resp.status_code == 200
|
|
|
|
def test_api_key_constant_time_comparison(self, secured_client: TestClient) -> None:
|
|
"""API key comparison uses hmac.compare_digest (timing-safe)."""
|
|
# ponytail: can't directly test timing, but verify both empty and wrong keys fail
|
|
resp = secured_client.post(
|
|
"/api/v1/documents/create",
|
|
json=self._CREATE_BODY,
|
|
headers={"X-API-Key": ""},
|
|
)
|
|
assert resp.status_code == 401
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Path traversal in template field
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestTemplatePathTraversal:
|
|
"""Verify template field doesn't allow path traversal attacks.
|
|
|
|
BUG CONFIRMED: documents.py line 129 does:
|
|
template_path = str(service.upload_dir / body.template)
|
|
If body.template is "../../etc/passwd", this resolves outside upload_dir.
|
|
The Path.exists() check passes if the file exists, allowing arbitrary file read.
|
|
"""
|
|
|
|
def test_create_with_template_path_traversal(
|
|
self, secured_client: TestClient, tmp_path: Path
|
|
) -> None:
|
|
"""template='../../etc/passwd' should NOT read files outside upload_dir."""
|
|
# Create a file outside upload_dir to simulate the target
|
|
secret_file = tmp_path / "secret.txt"
|
|
secret_file.write_text("SECRET_CONTENT")
|
|
|
|
# Compute relative path from upload_dir to secret_file
|
|
rel = Path("..") / "secret.txt"
|
|
|
|
resp = secured_client.post(
|
|
"/api/v1/documents/create",
|
|
json={
|
|
"format": "word",
|
|
"content": "# Test",
|
|
"conversation_id": "conv-1",
|
|
"template": str(rel),
|
|
"template_data": {"name": "test"},
|
|
},
|
|
headers={"X-API-Key": TEST_API_KEY},
|
|
)
|
|
# Should be 404 (template not found in upload_dir) or 400
|
|
# NOT 200 with the secret file content
|
|
assert resp.status_code in (404, 400), (
|
|
f"Path traversal succeeded! Status {resp.status_code}. "
|
|
f"Response: {resp.text}"
|
|
)
|
|
|
|
def test_create_with_template_absolute_path(
|
|
self, secured_client: TestClient
|
|
) -> None:
|
|
"""template='/etc/passwd' (absolute path) → rejected with 400.
|
|
|
|
FIXED: Path.resolve() + relative_to() check now prevents the resolved
|
|
path from escaping upload_dir. Previously, pathlib's `/` operator let
|
|
an absolute right operand override the left, allowing traversal.
|
|
"""
|
|
resp = secured_client.post(
|
|
"/api/v1/documents/create",
|
|
json={
|
|
"format": "word",
|
|
"content": "# Test",
|
|
"conversation_id": "conv-1",
|
|
"template": "/etc/passwd",
|
|
"template_data": {},
|
|
},
|
|
headers={"X-API-Key": TEST_API_KEY},
|
|
)
|
|
# After fix: 400 (path traversal detected), not 500 or 200
|
|
assert resp.status_code == 400, (
|
|
f"Path traversal should be rejected with 400, got {resp.status_code}. "
|
|
f"Response: {resp.text}"
|
|
)
|
|
assert "traversal" in resp.json()["detail"].lower()
|
|
|
|
def test_create_with_template_null_byte(
|
|
self, secured_client: TestClient
|
|
) -> None:
|
|
"""template with null byte should be rejected (not truncate to bypass)."""
|
|
resp = secured_client.post(
|
|
"/api/v1/documents/create",
|
|
json={
|
|
"format": "word",
|
|
"content": "# Test",
|
|
"conversation_id": "conv-1",
|
|
"template": "file.docx\x00../../etc/passwd",
|
|
"template_data": {},
|
|
},
|
|
headers={"X-API-Key": TEST_API_KEY},
|
|
)
|
|
# After fix: 400 (invalid characters detected), not 200
|
|
assert resp.status_code == 400, (
|
|
f"Null byte should be rejected with 400, got {resp.status_code}"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Deep SSTI tests (R26)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestDeepSSTI:
|
|
"""Verify SandboxedEnvironment blocks advanced SSTI payloads."""
|
|
|
|
@pytest.fixture
|
|
def renderer(self) -> TemplateRenderer:
|
|
return TemplateRenderer()
|
|
|
|
@pytest.fixture
|
|
def template_file(self, tmp_path: Path) -> Path:
|
|
"""Create a .docx template with a placeholder."""
|
|
doc = DocxDocument()
|
|
doc.add_paragraph("{{payload}}")
|
|
path = tmp_path / "ssti_template.docx"
|
|
doc.save(str(path))
|
|
return path
|
|
|
|
def _render_and_get_text(self, renderer: TemplateRenderer, template_path: Path, data: dict, output_path: Path) -> str:
|
|
"""Render template and extract text from output."""
|
|
renderer.render_template(template_path, data, output_path)
|
|
doc = DocxDocument(str(output_path))
|
|
return "\n".join(p.text for p in doc.paragraphs)
|
|
|
|
def test_ssti_class_subclasses(
|
|
self, renderer: TemplateRenderer, template_file: Path, tmp_path: Path
|
|
) -> None:
|
|
"""{{ ''.__class__.__mro__[1].__subclasses__() }} should be blocked."""
|
|
# Recreate template with SSTI payload
|
|
doc = DocxDocument()
|
|
doc.add_paragraph("{{ ''.__class__.__mro__[1].__subclasses__() }}")
|
|
doc.save(str(template_file))
|
|
|
|
output = tmp_path / "output.docx"
|
|
text = self._render_and_get_text(renderer, template_file, {}, output)
|
|
# Should NOT contain subclass list (would expose available classes)
|
|
assert "subclasses" not in text.lower() or "type" not in text.lower()
|
|
# Should NOT contain class names like 'wrap_close', 'Popen', etc.
|
|
assert "Popen" not in text
|
|
assert "wrap_close" not in text
|
|
|
|
def test_ssti_config_access(
|
|
self, renderer: TemplateRenderer, template_file: Path, tmp_path: Path
|
|
) -> None:
|
|
"""{{ config }} should not leak server configuration."""
|
|
doc = DocxDocument()
|
|
doc.add_paragraph("{{ config }}")
|
|
doc.save(str(template_file))
|
|
|
|
output = tmp_path / "output.docx"
|
|
text = self._render_and_get_text(renderer, template_file, {}, output)
|
|
# config is undefined in sandbox → renders empty or Undefined
|
|
assert "api_key" not in text.lower()
|
|
assert "secret" not in text.lower()
|
|
|
|
def test_ssti_globals_access(
|
|
self, renderer: TemplateRenderer, template_file: Path, tmp_path: Path
|
|
) -> None:
|
|
"""{{ namespace.__init__.__globals__ }} should be blocked."""
|
|
doc = DocxDocument()
|
|
doc.add_paragraph("{{ namespace.__init__.__globals__ }}")
|
|
doc.save(str(template_file))
|
|
|
|
output = tmp_path / "output.docx"
|
|
text = self._render_and_get_text(renderer, template_file, {}, output)
|
|
# Should not expose globals
|
|
assert "__builtins__" not in text
|
|
assert "import" not in text.lower()
|
|
|
|
def test_ssti_import_statement(
|
|
self, renderer: TemplateRenderer, template_file: Path, tmp_path: Path
|
|
) -> None:
|
|
"""{% import os %} should be blocked by sandbox."""
|
|
doc = DocxDocument()
|
|
doc.add_paragraph("{% import os %}{{ os.popen('id').read() }}")
|
|
doc.save(str(template_file))
|
|
|
|
output = tmp_path / "output.docx"
|
|
# Should raise an exception (import not allowed in sandbox)
|
|
with pytest.raises(Exception):
|
|
self._render_and_get_text(renderer, template_file, {}, output)
|