1fedd683e0
Stand: agent-06 (Audit-Log), agent-05 (Krankmeldung), agent-07 Phase 1 (Personalnummer), Busylight-Pull-Integration, TOTP/2FA, Abwesenheiten, Zeiterfassung, Kiosk-Grundgerüst. Migrations 0001–0023 deployed auf 192.168.1.137 + .164. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
222 lines
7.3 KiB
Python
222 lines
7.3 KiB
Python
"""Tests für GET /audit-logs/ – RBAC, Filter, company-Isolation."""
|
||
import pytest
|
||
import pytest_asyncio
|
||
from httpx import AsyncClient
|
||
|
||
|
||
@pytest_asyncio.fixture(scope="session", loop_scope="session")
|
||
async def audit_company(client: AsyncClient):
|
||
resp = await client.post("/api/v1/auth/register", json={
|
||
"company_name": "AuditTest GmbH",
|
||
"first_name": "Audit",
|
||
"last_name": "Admin",
|
||
"email": "admin@auditgmbh.de",
|
||
"password": "Secret123",
|
||
})
|
||
assert resp.status_code == 201
|
||
tokens = resp.json()
|
||
me = await client.get(
|
||
"/api/v1/auth/me",
|
||
headers={"Authorization": f"Bearer {tokens['access_token']}"},
|
||
)
|
||
return {"tokens": tokens, "user": me.json()}
|
||
|
||
|
||
@pytest_asyncio.fixture(scope="session", loop_scope="session")
|
||
async def audit_headers(audit_company):
|
||
return {"Authorization": f"Bearer {audit_company['tokens']['access_token']}"}
|
||
|
||
|
||
@pytest_asyncio.fixture(scope="session", loop_scope="session")
|
||
async def audit_employee(client: AsyncClient, audit_headers):
|
||
"""Legt einen Mitarbeiter an → erzeugt AuditLog-Einträge."""
|
||
resp = await client.post(
|
||
"/api/v1/users/invite",
|
||
json={
|
||
"email": "emp@auditgmbh.de",
|
||
"first_name": "Emp",
|
||
"last_name": "Loyee",
|
||
"role": "EMPLOYEE",
|
||
"initial_password": "Passwort1",
|
||
},
|
||
headers=audit_headers,
|
||
)
|
||
assert resp.status_code == 201
|
||
return resp.json()
|
||
|
||
|
||
# ── Basis-Tests ───────────────────────────────────────────────────────────────
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_audit_logs_accessible_for_admin(
|
||
client: AsyncClient, audit_headers, audit_employee
|
||
):
|
||
resp = await client.get("/api/v1/audit-logs", headers=audit_headers)
|
||
assert resp.status_code == 200
|
||
body = resp.json()
|
||
assert "total" in body
|
||
assert "items" in body
|
||
assert body["total"] >= 0
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_audit_logs_contains_user_events(
|
||
client: AsyncClient, audit_headers, audit_employee
|
||
):
|
||
resp = await client.get("/api/v1/audit-logs?limit=200", headers=audit_headers)
|
||
assert resp.status_code == 200
|
||
items = resp.json()["items"]
|
||
actions = {i["action"] for i in items}
|
||
# Mindestens eine user-bezogene Aktion vorhanden
|
||
assert any("user" in a or "invite" in a or "register" in a or "created" in a for a in actions)
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_audit_logs_filter_by_action(client: AsyncClient, audit_headers, audit_employee):
|
||
resp = await client.get("/api/v1/audit-logs?action=user", headers=audit_headers)
|
||
assert resp.status_code == 200
|
||
items = resp.json()["items"]
|
||
for item in items:
|
||
assert "user" in item["action"].lower()
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_audit_logs_filter_by_entity_type(
|
||
client: AsyncClient, audit_headers, audit_employee
|
||
):
|
||
resp = await client.get("/api/v1/audit-logs?entity_type=user", headers=audit_headers)
|
||
assert resp.status_code == 200
|
||
for item in resp.json()["items"]:
|
||
assert item["entity_type"] == "user"
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_audit_logs_pagination(client: AsyncClient, audit_headers, audit_employee):
|
||
r1 = await client.get("/api/v1/audit-logs?limit=1&offset=0", headers=audit_headers)
|
||
r2 = await client.get("/api/v1/audit-logs?limit=1&offset=1", headers=audit_headers)
|
||
assert r1.status_code == 200
|
||
assert r2.status_code == 200
|
||
items1 = r1.json()["items"]
|
||
items2 = r2.json()["items"]
|
||
if items1 and items2:
|
||
assert items1[0]["id"] != items2[0]["id"]
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_audit_logs_forbidden_for_employee(
|
||
client: AsyncClient, audit_company, audit_employee
|
||
):
|
||
# Employee einloggen
|
||
resp = await client.post("/api/v1/auth/login", json={
|
||
"email": "emp@auditgmbh.de",
|
||
"password": "Passwort1",
|
||
})
|
||
assert resp.status_code == 200
|
||
emp_token = resp.json()["access_token"]
|
||
|
||
resp = await client.get(
|
||
"/api/v1/audit-logs",
|
||
headers={"Authorization": f"Bearer {emp_token}"},
|
||
)
|
||
assert resp.status_code == 403
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_audit_company_isolation(client: AsyncClient, audit_headers):
|
||
"""Logs einer anderen Firma dürfen nicht auftauchen."""
|
||
# Zweite Firma
|
||
resp = await client.post("/api/v1/auth/register", json={
|
||
"company_name": "Other Audit AG",
|
||
"first_name": "Other",
|
||
"last_name": "Admin",
|
||
"email": "admin@otheraudi.de",
|
||
"password": "Secret123",
|
||
})
|
||
assert resp.status_code == 201
|
||
other_token = resp.json()["access_token"]
|
||
|
||
# Beide holen ihre eigenen Logs
|
||
r1 = await client.get("/api/v1/audit-logs", headers=audit_headers)
|
||
r2 = await client.get(
|
||
"/api/v1/audit-logs",
|
||
headers={"Authorization": f"Bearer {other_token}"},
|
||
)
|
||
ids1 = {i["id"] for i in r1.json()["items"]}
|
||
ids2 = {i["id"] for i in r2.json()["items"]}
|
||
assert ids1.isdisjoint(ids2), "Audit-Logs zweier Firmen dürfen sich nicht überschneiden"
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_audit_actions_endpoint(client: AsyncClient, audit_headers, audit_employee):
|
||
resp = await client.get("/api/v1/audit-logs/actions", headers=audit_headers)
|
||
assert resp.status_code == 200
|
||
assert isinstance(resp.json(), list)
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_audit_entity_types_endpoint(client: AsyncClient, audit_headers, audit_employee):
|
||
resp = await client.get("/api/v1/audit-logs/entity-types", headers=audit_headers)
|
||
assert resp.status_code == 200
|
||
assert isinstance(resp.json(), list)
|
||
|
||
|
||
# ── User-Erstellung mit initial_password ─────────────────────────────────────
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_invite_with_initial_password_creates_active_user(
|
||
client: AsyncClient, audit_headers
|
||
):
|
||
resp = await client.post(
|
||
"/api/v1/users/invite",
|
||
json={
|
||
"email": "direct@auditgmbh.de",
|
||
"first_name": "Direct",
|
||
"last_name": "User",
|
||
"role": "EMPLOYEE",
|
||
"initial_password": "Direkt123",
|
||
},
|
||
headers=audit_headers,
|
||
)
|
||
assert resp.status_code == 201
|
||
user = resp.json()
|
||
assert user["is_active"] is True
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_direct_user_can_login(client: AsyncClient, audit_headers):
|
||
await client.post(
|
||
"/api/v1/users/invite",
|
||
json={
|
||
"email": "logintest@auditgmbh.de",
|
||
"first_name": "Login",
|
||
"last_name": "Test",
|
||
"role": "EMPLOYEE",
|
||
"initial_password": "Passwort9",
|
||
},
|
||
headers=audit_headers,
|
||
)
|
||
login = await client.post("/api/v1/auth/login", json={
|
||
"email": "logintest@auditgmbh.de",
|
||
"password": "Passwort9",
|
||
})
|
||
assert login.status_code == 200
|
||
assert "access_token" in login.json()
|
||
|
||
|
||
@pytest.mark.asyncio
|
||
async def test_invite_without_password_creates_inactive_user(
|
||
client: AsyncClient, audit_headers
|
||
):
|
||
resp = await client.post(
|
||
"/api/v1/users/invite",
|
||
json={
|
||
"email": "pending@auditgmbh.de",
|
||
"first_name": "Pending",
|
||
"last_name": "User",
|
||
"role": "EMPLOYEE",
|
||
},
|
||
headers=audit_headers,
|
||
)
|
||
assert resp.status_code == 201
|
||
assert resp.json()["is_active"] is False
|