"""Tests for U6: terminal three-layer whitelist + audit logging. Covers: - terminal_security.py: pattern matching, builtin whitelist, DB-backed user whitelist, blocklist, audit log writing, top-level check_command_safety_v2 - terminal_whitelist.py routes: user whitelist CRUD, blocklist CRUD, audit log query """ from __future__ import annotations from pathlib import Path import pytest from fastapi import FastAPI from httpx import ASGITransport, AsyncClient from agentkit.server.auth.models import init_auth_db from agentkit.server.auth.terminal_security import ( DECISION_BLOCKED, DECISION_CONFIRMED, DECISION_DENIED, DECISION_EXECUTED, _matches_any_pattern, _matches_builtin_whitelist, _matches_pattern, check_command_safety_v2, compute_whitelist_entry, write_audit_log, ) from agentkit.server.routes import terminal_whitelist # ── Pattern matching unit tests ─────────────────────────────────────── class TestPatternMatching: """Test the pattern matching helpers.""" def test_matches_pattern_single_word_matches_binary(self): assert _matches_pattern("ls -la", "ls") is True assert _matches_pattern("git status", "git") is True def test_matches_pattern_single_word_does_not_match_different_binary(self): assert _matches_pattern("ls -la", "cat") is False def test_matches_pattern_multi_word_prefix(self): assert _matches_pattern("git push origin main", "git push") is True assert _matches_pattern("npm install --save", "npm install") is True def test_matches_pattern_multi_word_not_prefix(self): assert _matches_pattern("git status", "git push") is False def test_matches_pattern_case_insensitive(self): assert _matches_pattern("LS -la", "ls") is True assert _matches_pattern("Git Push", "git push") is True def test_matches_pattern_empty_pattern_returns_false(self): assert _matches_pattern("ls", "") is False assert _matches_pattern("ls", " ") is False def test_matches_pattern_unparseable_command(self): # Unbalanced quote → shlex fails → binary is None → no match assert _matches_pattern('echo "unbalanced', "echo") is False def test_matches_any_pattern_returns_first_match(self): matched, pattern = _matches_any_pattern("git push", ["ls", "git", "cat"]) assert matched is True assert pattern == "git" def test_matches_any_pattern_no_match(self): matched, pattern = _matches_any_pattern("rm -rf /", ["ls", "git"]) assert matched is False assert pattern is None class TestBuiltinWhitelist: """Test the built-in safe whitelist matching.""" def test_builtin_whitelist_matches_ls(self): assert _matches_builtin_whitelist("ls -la") is True def test_builtin_whitelist_matches_cat(self): assert _matches_builtin_whitelist("cat /etc/hosts") is True def test_builtin_whitelist_matches_git_status(self): assert _matches_builtin_whitelist("git status") is True def test_builtin_whitelist_matches_docker_ps(self): assert _matches_builtin_whitelist("docker ps") is True def test_builtin_whitelist_does_not_match_rm(self): assert _matches_builtin_whitelist("rm -rf /tmp/test") is False def test_builtin_whitelist_does_not_match_unknown_command(self): assert _matches_builtin_whitelist("somecmd --flag") is False def test_builtin_whitelist_empty_command(self): assert _matches_builtin_whitelist("") is False def test_builtin_whitelist_case_insensitive(self): assert _matches_builtin_whitelist("LS -LA") is True class TestComputeWhitelistEntry: """Test the whitelist entry computation for confirmed commands.""" def test_compute_entry_for_safe_binary(self): # Non-dangerous binary (not in _DANGEROUS_BINARY_FLAGS) → just the binary name assert compute_whitelist_entry("ls -la /tmp") == "ls" def test_compute_entry_for_dangerous_binary(self): # Dangerous binary (in _DANGEROUS_BINARY_FLAGS) → None (never whitelisted, # because whitelisting the binary name would bypass danger detection for # dangerous flag variants like "rm -rf") assert compute_whitelist_entry("rm -rf /tmp/test") is None def test_compute_entry_for_binary_with_dangerous_variants(self): # Binary in _DANGEROUS_BINARY_FLAGS (e.g. docker) → None even for safe # invocations, because whitelisting "docker" would auto-execute "docker rm" assert compute_whitelist_entry("docker ps") is None def test_compute_entry_for_unparseable_command(self): assert compute_whitelist_entry('echo "unbalanced') is None def test_compute_entry_for_empty_command(self): assert compute_whitelist_entry("") is None # ── DB-backed tests (using temp SQLite) ─────────────────────────────── @pytest.fixture async def temp_db_path(tmp_path: Path) -> Path: """Create a temporary auth DB with the terminal tables.""" db_path = tmp_path / "test_auth.db" await init_auth_db(db_path) return db_path class TestCheckCommandSafetyV2: """Test the top-level three-layer whitelist check.""" @pytest.mark.asyncio async def test_builtin_whitelist_command_passes(self, temp_db_path: Path): """Built-in safe commands execute without confirmation.""" decision = await check_command_safety_v2( "ls -la", session_id="test-session", session_whitelist=set(), user_id="user-1", db_path=temp_db_path, ) assert decision.safe is True assert decision.decision == DECISION_EXECUTED assert decision.matched_layer == "builtin" @pytest.mark.asyncio async def test_dangerous_command_requires_confirmation(self, temp_db_path: Path): """Non-whitelisted dangerous commands require confirmation.""" decision = await check_command_safety_v2( "rm -rf /tmp/test", session_id="test-session", session_whitelist=set(), user_id="user-1", db_path=temp_db_path, ) assert decision.safe is False assert decision.decision == DECISION_DENIED assert decision.reason is not None @pytest.mark.asyncio async def test_session_whitelist_allows_command(self, temp_db_path: Path): """Session whitelist allows previously-confirmed commands.""" decision = await check_command_safety_v2( "rm -rf /tmp/test", session_id="test-session", session_whitelist={"rm"}, # rm is in session whitelist user_id="user-1", db_path=temp_db_path, ) assert decision.safe is True assert decision.matched_layer == "session_whitelist" @pytest.mark.asyncio async def test_user_whitelist_allows_command(self, temp_db_path: Path): """User whitelist (DB-backed) allows commands.""" import aiosqlite # Add a user whitelist entry async with aiosqlite.connect(str(temp_db_path)) as db: await db.execute( """INSERT INTO terminal_whitelist_user (id, user_id, command_pattern, scope, created_at) VALUES (?, ?, ?, 'user', ?)""", ("wl-1", "user-1", "rm", "2026-01-01T00:00:00Z"), ) await db.commit() decision = await check_command_safety_v2( "rm -rf /tmp/test", session_id="test-session", session_whitelist=set(), user_id="user-1", db_path=temp_db_path, ) assert decision.safe is True assert decision.matched_layer == "user_whitelist" @pytest.mark.asyncio async def test_blocklist_rejects_command(self, temp_db_path: Path): """Blocklist rejects commands regardless of other whitelists.""" import aiosqlite # Add a blocklist entry for "rm" async with aiosqlite.connect(str(temp_db_path)) as db: await db.execute( """INSERT INTO terminal_blocklist (id, command_pattern, reason, created_at) VALUES (?, ?, ?, ?)""", ("bl-1", "rm", "rm is blocked by admin", "2026-01-01T00:00:00Z"), ) await db.commit() # Even though "rm" is in session whitelist, blocklist takes priority decision = await check_command_safety_v2( "rm -rf /tmp/test", session_id="test-session", session_whitelist={"rm"}, # session whitelist has rm user_id="user-1", db_path=temp_db_path, ) assert decision.safe is False assert decision.decision == DECISION_BLOCKED assert decision.matched_layer == "blocklist" assert "rm is blocked by admin" in (decision.reason or "") @pytest.mark.asyncio async def test_blocklist_takes_priority_over_builtin(self, temp_db_path: Path): """Blocklist rejects even built-in safe commands.""" import aiosqlite async with aiosqlite.connect(str(temp_db_path)) as db: await db.execute( """INSERT INTO terminal_blocklist (id, command_pattern, reason, created_at) VALUES (?, ?, ?, ?)""", ("bl-1", "ls", "ls is blocked", "2026-01-01T00:00:00Z"), ) await db.commit() decision = await check_command_safety_v2( "ls -la", session_id="test-session", session_whitelist=set(), user_id="user-1", db_path=temp_db_path, ) assert decision.safe is False assert decision.decision == DECISION_BLOCKED @pytest.mark.asyncio async def test_empty_command_denied(self, temp_db_path: Path): """Empty commands are denied.""" decision = await check_command_safety_v2( " ", session_id="test-session", session_whitelist=set(), user_id="user-1", db_path=temp_db_path, ) assert decision.safe is False assert decision.decision == DECISION_DENIED @pytest.mark.asyncio async def test_dev_mode_no_db(self): """When db_path is None, DB-backed layers are skipped.""" # A command that's not in builtin whitelist and is dangerous decision = await check_command_safety_v2( "rm -rf /tmp/test", session_id="test-session", session_whitelist=set(), user_id=None, db_path=None, # No DB ) # Should fall through to danger detection assert decision.safe is False assert decision.decision == DECISION_DENIED @pytest.mark.asyncio async def test_non_dangerous_non_whitelisted_command_passes(self, temp_db_path: Path): """Commands that pass the danger check (not in any whitelist) execute.""" # "man ls" is not in the builtin whitelist but is not dangerous # per the _is_dangerous logic (man is not in _DANGEROUS_BINARIES) # Actually, man is not in _SAFE_COMMAND_PREFIXES, so it would be # considered dangerous by the default logic. Let's use a command # that's definitely not dangerous but not in the whitelist. # Looking at _is_single_command_dangerous: if binary is not in # _SAFE_COMMAND_PREFIXES and not in _DANGEROUS_BINARIES, it returns # True (dangerous). So any unknown binary is dangerous. # This means the "danger_check_passed" layer is rarely reached. # Let's test with a piped command where all parts are safe. decision = await check_command_safety_v2( "ls | grep test", session_id="test-session", session_whitelist=set(), user_id="user-1", db_path=temp_db_path, ) # ls and grep are both in builtin whitelist, but the pipe makes it # go through _is_dangerous which checks each part. Since both parts # are safe, the whole command is safe. assert decision.safe is True class TestWriteAuditLog: """Test the audit log writer.""" @pytest.mark.asyncio async def test_write_audit_log_success(self, temp_db_path: Path): """Audit log entry is written to the DB.""" await write_audit_log( temp_db_path, user_id="user-1", username="alice", session_id="session-1", command="ls -la", decision=DECISION_EXECUTED, reason="matched_layer=builtin", cwd="/tmp", exit_code=0, terminal_mode="local", ) import aiosqlite async with aiosqlite.connect(str(temp_db_path)) as db: db.row_factory = aiosqlite.Row cursor = await db.execute( "SELECT * FROM terminal_audit_logs WHERE session_id = ?", ("session-1",), ) row = await cursor.fetchone() assert row is not None assert row["user_id"] == "user-1" assert row["username"] == "alice" assert row["command"] == "ls -la" assert row["decision"] == DECISION_EXECUTED assert row["exit_code"] == 0 assert row["terminal_mode"] == "local" @pytest.mark.asyncio async def test_write_audit_log_null_user(self, temp_db_path: Path): """Audit log accepts None user_id (dev mode).""" await write_audit_log( temp_db_path, user_id=None, username=None, session_id="session-dev", command="rm -rf /tmp", decision=DECISION_CONFIRMED, terminal_mode="local", ) import aiosqlite async with aiosqlite.connect(str(temp_db_path)) as db: cursor = await db.execute( "SELECT user_id, username FROM terminal_audit_logs WHERE session_id = ?", ("session-dev",), ) row = await cursor.fetchone() assert row is not None assert row[0] is None assert row[1] is None @pytest.mark.asyncio async def test_write_audit_log_best_effort(self, tmp_path: Path): """Audit log failures are swallowed (never raise).""" # Non-existent DB path — should not raise await write_audit_log( tmp_path / "nonexistent" / "auth.db", # Invalid path user_id="user-1", username="alice", session_id="session-1", command="ls", decision=DECISION_EXECUTED, ) @pytest.mark.asyncio async def test_write_audit_log_truncates_long_command(self, temp_db_path: Path): """Commands longer than 4096 chars are truncated.""" long_command = "echo " + "a" * 5000 await write_audit_log( temp_db_path, user_id="user-1", username="alice", session_id="session-1", command=long_command, decision=DECISION_EXECUTED, ) import aiosqlite async with aiosqlite.connect(str(temp_db_path)) as db: cursor = await db.execute( "SELECT command FROM terminal_audit_logs WHERE session_id = ?", ("session-1",), ) row = await cursor.fetchone() assert row is not None assert len(row[0]) <= 4096 # ── Whitelist management API tests ──────────────────────────────────── @pytest.fixture async def whitelist_app(tmp_path: Path) -> FastAPI: """Build a FastAPI app with the whitelist routes + dev-admin middleware.""" db_path = tmp_path / "test_auth.db" await init_auth_db(db_path) app = FastAPI() app.state.auth_db_path = str(db_path) app.include_router(terminal_whitelist.router, prefix="/api/v1") # Dev-admin middleware: injects an admin user so permission checks pass @app.middleware("http") async def _set_dev_admin_user(request, call_next): request.state.current_user = { "user_id": "dev-admin-id", "username": "dev-admin", "role": "admin", "dev_mode": False, # Use real DB path } return await call_next(request) return app @pytest.fixture async def whitelist_client(whitelist_app: FastAPI): """HTTP client for the whitelist app.""" transport = ASGITransport(app=whitelist_app) async with AsyncClient(transport=transport, base_url="http://test") as client: yield client class TestUserWhitelistAPI: """Test the user whitelist CRUD endpoints.""" @pytest.mark.asyncio async def test_list_empty_user_whitelist(self, whitelist_client: AsyncClient): resp = await whitelist_client.get("/api/v1/terminal/whitelist/user") assert resp.status_code == 200 data = resp.json() assert data["entries"] == [] @pytest.mark.asyncio async def test_add_user_whitelist_entry(self, whitelist_client: AsyncClient): resp = await whitelist_client.post( "/api/v1/terminal/whitelist/user", json={"command_pattern": "docker"}, ) assert resp.status_code == 201 data = resp.json() assert data["command_pattern"] == "docker" assert data["scope"] == "user" assert "id" in data @pytest.mark.asyncio async def test_add_duplicate_user_whitelist_returns_409( self, whitelist_client: AsyncClient ): # Add once await whitelist_client.post( "/api/v1/terminal/whitelist/user", json={"command_pattern": "git push"}, ) # Add again → 409 resp = await whitelist_client.post( "/api/v1/terminal/whitelist/user", json={"command_pattern": "git push"}, ) assert resp.status_code == 409 @pytest.mark.asyncio async def test_list_user_whitelist_after_add(self, whitelist_client: AsyncClient): await whitelist_client.post( "/api/v1/terminal/whitelist/user", json={"command_pattern": "docker"}, ) await whitelist_client.post( "/api/v1/terminal/whitelist/user", json={"command_pattern": "git push"}, ) resp = await whitelist_client.get("/api/v1/terminal/whitelist/user") assert resp.status_code == 200 entries = resp.json()["entries"] assert len(entries) == 2 patterns = {e["command_pattern"] for e in entries} assert patterns == {"docker", "git push"} @pytest.mark.asyncio async def test_delete_user_whitelist_entry(self, whitelist_client: AsyncClient): add_resp = await whitelist_client.post( "/api/v1/terminal/whitelist/user", json={"command_pattern": "docker"}, ) entry_id = add_resp.json()["id"] del_resp = await whitelist_client.delete( f"/api/v1/terminal/whitelist/user/{entry_id}" ) assert del_resp.status_code == 204 # Verify it's gone list_resp = await whitelist_client.get("/api/v1/terminal/whitelist/user") assert list_resp.json()["entries"] == [] @pytest.mark.asyncio async def test_delete_nonexistent_user_whitelist_returns_404( self, whitelist_client: AsyncClient ): resp = await whitelist_client.delete( "/api/v1/terminal/whitelist/user/nonexistent-id" ) assert resp.status_code == 404 @pytest.mark.asyncio async def test_add_user_whitelist_invalid_pattern_rejected( self, whitelist_client: AsyncClient ): # Pattern with shell separator resp = await whitelist_client.post( "/api/v1/terminal/whitelist/user", json={"command_pattern": "ls; rm -rf /"}, ) assert resp.status_code == 422 @pytest.mark.asyncio async def test_add_user_whitelist_empty_pattern_rejected( self, whitelist_client: AsyncClient ): resp = await whitelist_client.post( "/api/v1/terminal/whitelist/user", json={"command_pattern": ""}, ) assert resp.status_code == 422 class TestBlocklistAPI: """Test the blocklist CRUD endpoints (admin only).""" @pytest.mark.asyncio async def test_list_empty_blocklist(self, whitelist_client: AsyncClient): resp = await whitelist_client.get("/api/v1/terminal/blocklist") assert resp.status_code == 200 assert resp.json()["entries"] == [] @pytest.mark.asyncio async def test_add_blocklist_entry(self, whitelist_client: AsyncClient): resp = await whitelist_client.post( "/api/v1/terminal/blocklist", json={"command_pattern": "mkfs", "reason": "formatting is dangerous"}, ) assert resp.status_code == 201 data = resp.json() assert data["command_pattern"] == "mkfs" assert data["reason"] == "formatting is dangerous" @pytest.mark.asyncio async def test_add_duplicate_blocklist_returns_409(self, whitelist_client: AsyncClient): await whitelist_client.post( "/api/v1/terminal/blocklist", json={"command_pattern": "mkfs"}, ) resp = await whitelist_client.post( "/api/v1/terminal/blocklist", json={"command_pattern": "mkfs"}, ) assert resp.status_code == 409 @pytest.mark.asyncio async def test_delete_blocklist_entry(self, whitelist_client: AsyncClient): add_resp = await whitelist_client.post( "/api/v1/terminal/blocklist", json={"command_pattern": "mkfs"}, ) entry_id = add_resp.json()["id"] del_resp = await whitelist_client.delete(f"/api/v1/terminal/blocklist/{entry_id}") assert del_resp.status_code == 204 # Verify deleted list_resp = await whitelist_client.get("/api/v1/terminal/blocklist") assert list_resp.json()["entries"] == [] @pytest.mark.asyncio async def test_delete_nonexistent_blocklist_returns_404( self, whitelist_client: AsyncClient ): resp = await whitelist_client.delete("/api/v1/terminal/blocklist/nonexistent") assert resp.status_code == 404 class TestAuditLogAPI: """Test the audit log query endpoint.""" @pytest.mark.asyncio async def test_list_empty_audit_logs(self, whitelist_client: AsyncClient): resp = await whitelist_client.get("/api/v1/terminal/audit-logs") assert resp.status_code == 200 data = resp.json() assert data["entries"] == [] assert data["total"] == 0 @pytest.mark.asyncio async def test_list_audit_logs_after_writes( self, whitelist_client: AsyncClient, whitelist_app: FastAPI ): db_path = whitelist_app.state.auth_db_path await write_audit_log( db_path, user_id="user-1", username="alice", session_id="session-1", command="ls -la", decision=DECISION_EXECUTED, exit_code=0, ) await write_audit_log( db_path, user_id="user-1", username="alice", session_id="session-1", command="rm -rf /tmp", decision=DECISION_CONFIRMED, exit_code=0, ) await write_audit_log( db_path, user_id="user-2", username="bob", session_id="session-2", command="mkfs /dev/sda", decision=DECISION_BLOCKED, ) # List all resp = await whitelist_client.get("/api/v1/terminal/audit-logs") assert resp.status_code == 200 data = resp.json() assert data["total"] == 3 assert len(data["entries"]) == 3 # Filter by user_id resp = await whitelist_client.get( "/api/v1/terminal/audit-logs?user_id=user-1" ) assert resp.status_code == 200 data = resp.json() assert data["total"] == 2 for entry in data["entries"]: assert entry["user_id"] == "user-1" # Filter by decision resp = await whitelist_client.get( "/api/v1/terminal/audit-logs?decision=confirmed" ) assert resp.status_code == 200 data = resp.json() assert data["total"] == 1 assert data["entries"][0]["decision"] == "confirmed" # Filter by session_id resp = await whitelist_client.get( "/api/v1/terminal/audit-logs?session_id=session-2" ) assert resp.status_code == 200 data = resp.json() assert data["total"] == 1 assert data["entries"][0]["session_id"] == "session-2" @pytest.mark.asyncio async def test_audit_logs_pagination( self, whitelist_client: AsyncClient, whitelist_app: FastAPI ): db_path = whitelist_app.state.auth_db_path for i in range(5): await write_audit_log( db_path, user_id="user-1", username="alice", session_id=f"session-{i}", command=f"ls {i}", decision=DECISION_EXECUTED, ) # Page 1: limit=2, offset=0 resp = await whitelist_client.get( "/api/v1/terminal/audit-logs?limit=2&offset=0" ) data = resp.json() assert data["total"] == 5 assert len(data["entries"]) == 2 # Page 2: limit=2, offset=2 resp = await whitelist_client.get( "/api/v1/terminal/audit-logs?limit=2&offset=2" ) data = resp.json() assert data["total"] == 5 assert len(data["entries"]) == 2 # Page 3: limit=2, offset=4 resp = await whitelist_client.get( "/api/v1/terminal/audit-logs?limit=2&offset=4" ) data = resp.json() assert data["total"] == 5 assert len(data["entries"]) == 1 # ── Integration: blocklist + safety check ───────────────────────────── class TestBlocklistIntegration: """Integration test: blocklist entry blocks command in safety check.""" @pytest.mark.asyncio async def test_blocklist_blocks_command_in_safety_check( self, whitelist_client: AsyncClient, whitelist_app: FastAPI ): """Add a blocklist entry via API, then verify safety check blocks it.""" # Add "rm" to blocklist via API resp = await whitelist_client.post( "/api/v1/terminal/blocklist", json={"command_pattern": "rm", "reason": "rm disabled by policy"}, ) assert resp.status_code == 201 # Now check command safety — rm should be blocked db_path = whitelist_app.state.auth_db_path decision = await check_command_safety_v2( "rm -rf /tmp/test", session_id="test-session", session_whitelist={"rm"}, # Even with session whitelist user_id="user-1", db_path=db_path, ) assert decision.safe is False assert decision.decision == DECISION_BLOCKED assert decision.matched_layer == "blocklist"