"""Tests für GET /audit-logs/ – RBAC, Filter, company-Isolation.""" import pytest import pytest_asyncio from httpx import AsyncClient @pytest_asyncio.fixture(scope="session", loop_scope="session") async def audit_company(client: AsyncClient): resp = await client.post("/api/v1/auth/register", json={ "company_name": "AuditTest GmbH", "first_name": "Audit", "last_name": "Admin", "email": "admin@auditgmbh.de", "password": "Secret123", }) assert resp.status_code == 201 tokens = resp.json() me = await client.get( "/api/v1/auth/me", headers={"Authorization": f"Bearer {tokens['access_token']}"}, ) return {"tokens": tokens, "user": me.json()} @pytest_asyncio.fixture(scope="session", loop_scope="session") async def audit_headers(audit_company): return {"Authorization": f"Bearer {audit_company['tokens']['access_token']}"} @pytest_asyncio.fixture(scope="session", loop_scope="session") async def audit_employee(client: AsyncClient, audit_headers): """Legt einen Mitarbeiter an → erzeugt AuditLog-Einträge.""" resp = await client.post( "/api/v1/users/invite", json={ "email": "emp@auditgmbh.de", "first_name": "Emp", "last_name": "Loyee", "role": "EMPLOYEE", "initial_password": "Passwort1", }, headers=audit_headers, ) assert resp.status_code == 201 return resp.json() # ── Basis-Tests ─────────────────────────────────────────────────────────────── @pytest.mark.asyncio async def test_audit_logs_accessible_for_admin( client: AsyncClient, audit_headers, audit_employee ): resp = await client.get("/api/v1/audit-logs", headers=audit_headers) assert resp.status_code == 200 body = resp.json() assert "total" in body assert "items" in body assert body["total"] >= 0 @pytest.mark.asyncio async def test_audit_logs_contains_user_events( client: AsyncClient, audit_headers, audit_employee ): resp = await client.get("/api/v1/audit-logs?limit=200", headers=audit_headers) assert resp.status_code == 200 items = resp.json()["items"] actions = {i["action"] for i in items} # Mindestens eine user-bezogene Aktion vorhanden assert any("user" in a or "invite" in a or "register" in a or "created" in a for a in actions) @pytest.mark.asyncio async def test_audit_logs_filter_by_action(client: AsyncClient, audit_headers, audit_employee): resp = await client.get("/api/v1/audit-logs?action=user", headers=audit_headers) assert resp.status_code == 200 items = resp.json()["items"] for item in items: assert "user" in item["action"].lower() @pytest.mark.asyncio async def test_audit_logs_filter_by_entity_type( client: AsyncClient, audit_headers, audit_employee ): resp = await client.get("/api/v1/audit-logs?entity_type=user", headers=audit_headers) assert resp.status_code == 200 for item in resp.json()["items"]: assert item["entity_type"] == "user" @pytest.mark.asyncio async def test_audit_logs_pagination(client: AsyncClient, audit_headers, audit_employee): r1 = await client.get("/api/v1/audit-logs?limit=1&offset=0", headers=audit_headers) r2 = await client.get("/api/v1/audit-logs?limit=1&offset=1", headers=audit_headers) assert r1.status_code == 200 assert r2.status_code == 200 items1 = r1.json()["items"] items2 = r2.json()["items"] if items1 and items2: assert items1[0]["id"] != items2[0]["id"] @pytest.mark.asyncio async def test_audit_logs_forbidden_for_employee( client: AsyncClient, audit_company, audit_employee ): # Employee einloggen resp = await client.post("/api/v1/auth/login", json={ "email": "emp@auditgmbh.de", "password": "Passwort1", }) assert resp.status_code == 200 emp_token = resp.json()["access_token"] resp = await client.get( "/api/v1/audit-logs", headers={"Authorization": f"Bearer {emp_token}"}, ) assert resp.status_code == 403 @pytest.mark.asyncio async def test_audit_company_isolation(client: AsyncClient, audit_headers): """Logs einer anderen Firma dürfen nicht auftauchen.""" # Zweite Firma resp = await client.post("/api/v1/auth/register", json={ "company_name": "Other Audit AG", "first_name": "Other", "last_name": "Admin", "email": "admin@otheraudi.de", "password": "Secret123", }) assert resp.status_code == 201 other_token = resp.json()["access_token"] # Beide holen ihre eigenen Logs r1 = await client.get("/api/v1/audit-logs", headers=audit_headers) r2 = await client.get( "/api/v1/audit-logs", headers={"Authorization": f"Bearer {other_token}"}, ) ids1 = {i["id"] for i in r1.json()["items"]} ids2 = {i["id"] for i in r2.json()["items"]} assert ids1.isdisjoint(ids2), "Audit-Logs zweier Firmen dürfen sich nicht überschneiden" @pytest.mark.asyncio async def test_audit_actions_endpoint(client: AsyncClient, audit_headers, audit_employee): resp = await client.get("/api/v1/audit-logs/actions", headers=audit_headers) assert resp.status_code == 200 assert isinstance(resp.json(), list) @pytest.mark.asyncio async def test_audit_entity_types_endpoint(client: AsyncClient, audit_headers, audit_employee): resp = await client.get("/api/v1/audit-logs/entity-types", headers=audit_headers) assert resp.status_code == 200 assert isinstance(resp.json(), list) # ── User-Erstellung mit initial_password ───────────────────────────────────── @pytest.mark.asyncio async def test_invite_with_initial_password_creates_active_user( client: AsyncClient, audit_headers ): resp = await client.post( "/api/v1/users/invite", json={ "email": "direct@auditgmbh.de", "first_name": "Direct", "last_name": "User", "role": "EMPLOYEE", "initial_password": "Direkt123", }, headers=audit_headers, ) assert resp.status_code == 201 user = resp.json() assert user["is_active"] is True @pytest.mark.asyncio async def test_direct_user_can_login(client: AsyncClient, audit_headers): await client.post( "/api/v1/users/invite", json={ "email": "logintest@auditgmbh.de", "first_name": "Login", "last_name": "Test", "role": "EMPLOYEE", "initial_password": "Passwort9", }, headers=audit_headers, ) login = await client.post("/api/v1/auth/login", json={ "email": "logintest@auditgmbh.de", "password": "Passwort9", }) assert login.status_code == 200 assert "access_token" in login.json() @pytest.mark.asyncio async def test_invite_without_password_creates_inactive_user( client: AsyncClient, audit_headers ): resp = await client.post( "/api/v1/users/invite", json={ "email": "pending@auditgmbh.de", "first_name": "Pending", "last_name": "User", "role": "EMPLOYEE", }, headers=audit_headers, ) assert resp.status_code == 201 assert resp.json()["is_active"] is False