""" Ed25519-Kiosk-Sicherheitstests für TimeMaster. Testet: - Gültiger signierter Heartbeat - Ungültige Signatur → 401 - Replay-Angriff (gleiche Nonce) → 401 - Timestamp-Drift > 30s → 401 - Pending-Gerät → 401 - Gerät-Lifecycle (anlegen, genehmigen, sperren) - Fehlende Kiosk-Header → 422 - Geräteliste für Admin """ import base64 import hashlib import time import uuid import pytest import pytest_asyncio from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat from httpx import AsyncClient pytestmark = pytest.mark.asyncio DEVICES_URL = "/api/v1/kiosk/devices" HEARTBEAT_URL = "/api/v1/kiosk/heartbeat" # ── Hilffunktion ────────────────────────────────────────────────────────────── def make_kiosk_headers( device_id: str, private_key: Ed25519PrivateKey, method: str, path: str, body: bytes = b"", ) -> dict: """Erzeugt gültige Kiosk-Signatur-Header für einen Request.""" timestamp = str(int(time.time())) nonce = str(uuid.uuid4()) body_hash = hashlib.sha256(body).hexdigest() message = f"{method} {path} {timestamp} {nonce} {body_hash}".encode() signature = private_key.sign(message) sig_b64 = base64.b64encode(signature).decode() return { "X-Kiosk-Key-Id": device_id, "X-Kiosk-Timestamp": timestamp, "X-Kiosk-Nonce": nonce, "X-Kiosk-Signature": sig_b64, } # ── Fixtures ────────────────────────────────────────────────────────────────── @pytest_asyncio.fixture(scope="session", loop_scope="session") async def kiosk_keypair(): """Ed25519-Keypair für Tests.""" private_key = Ed25519PrivateKey.generate() public_key_pem = ( private_key.public_key() .public_bytes(Encoding.PEM, PublicFormat.SubjectPublicKeyInfo) .decode() ) return private_key, public_key_pem @pytest_asyncio.fixture(scope="session", loop_scope="session") async def kiosk_admin_headers(registered_user): """Admin-Authorization-Header aus registered_user.""" return {"Authorization": f"Bearer {registered_user['tokens']['access_token']}"} @pytest_asyncio.fixture(scope="session", loop_scope="session") async def approved_kiosk_device(client: AsyncClient, kiosk_admin_headers, kiosk_keypair): """Registriert ein Kiosk-Gerät und genehmigt es direkt.""" _, public_key_pem = kiosk_keypair resp = await client.post( DEVICES_URL, json={ "name": "Test-Kiosk", "location": "Testlabor", "public_key": public_key_pem, }, headers=kiosk_admin_headers, ) assert resp.status_code == 201, resp.text device = resp.json() approve_resp = await client.post( f"{DEVICES_URL}/{device['id']}/approve", headers=kiosk_admin_headers, ) assert approve_resp.status_code == 200, approve_resp.text return device # ── Tests ───────────────────────────────────────────────────────────────────── async def test_heartbeat_valid(client: AsyncClient, approved_kiosk_device, kiosk_keypair): """Gültiger signierter Heartbeat → 200 mit server_time, heartbeat_interval_sec, device_status.""" private_key, _ = kiosk_keypair device_id = approved_kiosk_device["id"] body = b'{"queued_offline_entries": 0}' headers = make_kiosk_headers(device_id, private_key, "POST", HEARTBEAT_URL, body) resp = await client.post( HEARTBEAT_URL, content=body, headers={**headers, "Content-Type": "application/json"}, ) assert resp.status_code == 200, resp.text data = resp.json() assert "server_time" in data assert "heartbeat_interval_sec" in data assert data["device_status"] == "approved" async def test_heartbeat_invalid_signature(client: AsyncClient, approved_kiosk_device, kiosk_keypair): """Heartbeat mit falschem Signing-Key → 401.""" wrong_key = Ed25519PrivateKey.generate() device_id = approved_kiosk_device["id"] body = b'{"queued_offline_entries": 0}' headers = make_kiosk_headers(device_id, wrong_key, "POST", HEARTBEAT_URL, body) resp = await client.post( HEARTBEAT_URL, content=body, headers={**headers, "Content-Type": "application/json"}, ) assert resp.status_code == 401, resp.text async def test_heartbeat_replay(client: AsyncClient, approved_kiosk_device, kiosk_keypair): """Gleiche Nonce zweimal senden → zweiter Request 401 (Replay-Schutz).""" private_key, _ = kiosk_keypair device_id = approved_kiosk_device["id"] body = b'{"queued_offline_entries": 0}' headers = make_kiosk_headers(device_id, private_key, "POST", HEARTBEAT_URL, body) merged = {**headers, "Content-Type": "application/json"} # Erster Request muss durchgehen r1 = await client.post(HEARTBEAT_URL, content=body, headers=merged) assert r1.status_code == 200, r1.text # Zweiter Request mit identischen Headern (selbe Nonce) muss abgelehnt werden r2 = await client.post(HEARTBEAT_URL, content=body, headers=merged) assert r2.status_code == 401, r2.text async def test_heartbeat_timestamp_drift(client: AsyncClient, approved_kiosk_device, kiosk_keypair): """Timestamp der mehr als 30 Sekunden zurückliegt → 401.""" private_key, _ = kiosk_keypair device_id = approved_kiosk_device["id"] body = b'{"queued_offline_entries": 0}' old_timestamp = str(int(time.time()) - 60) nonce = str(uuid.uuid4()) body_hash = hashlib.sha256(body).hexdigest() message = f"POST {HEARTBEAT_URL} {old_timestamp} {nonce} {body_hash}".encode() signature = private_key.sign(message) headers = { "X-Kiosk-Key-Id": device_id, "X-Kiosk-Timestamp": old_timestamp, "X-Kiosk-Nonce": nonce, "X-Kiosk-Signature": base64.b64encode(signature).decode(), "Content-Type": "application/json", } resp = await client.post(HEARTBEAT_URL, content=body, headers=headers) assert resp.status_code == 401, resp.text async def test_heartbeat_pending_device(client: AsyncClient, kiosk_admin_headers, kiosk_keypair): """Heartbeat von einem Gerät das noch nicht genehmigt wurde → 401.""" private_key, public_key_pem = kiosk_keypair # Neues Gerät anlegen, aber NICHT genehmigen resp = await client.post( DEVICES_URL, json={"name": "Pending-Kiosk", "public_key": public_key_pem}, headers=kiosk_admin_headers, ) assert resp.status_code == 201, resp.text device_id = resp.json()["id"] body = b'{"queued_offline_entries": 0}' headers = make_kiosk_headers(device_id, private_key, "POST", HEARTBEAT_URL, body) r = await client.post( HEARTBEAT_URL, content=body, headers={**headers, "Content-Type": "application/json"}, ) assert r.status_code == 401, r.text async def test_heartbeat_revoked_device( client: AsyncClient, kiosk_admin_headers, kiosk_keypair ): """Heartbeat von einem gesperrten Gerät → 401.""" private_key, public_key_pem = kiosk_keypair # Gerät anlegen, genehmigen und dann sperren resp = await client.post( DEVICES_URL, json={"name": "Revoked-Kiosk", "public_key": public_key_pem}, headers=kiosk_admin_headers, ) assert resp.status_code == 201, resp.text device_id = resp.json()["id"] await client.post(f"{DEVICES_URL}/{device_id}/approve", headers=kiosk_admin_headers) revoke_resp = await client.post( f"{DEVICES_URL}/{device_id}/revoke", headers=kiosk_admin_headers ) assert revoke_resp.status_code == 200, revoke_resp.text assert revoke_resp.json()["status"] == "revoked" body = b'{"queued_offline_entries": 0}' headers = make_kiosk_headers(device_id, private_key, "POST", HEARTBEAT_URL, body) r = await client.post( HEARTBEAT_URL, content=body, headers={**headers, "Content-Type": "application/json"}, ) assert r.status_code == 401, r.text async def test_kiosk_device_lifecycle(client: AsyncClient, kiosk_admin_headers, kiosk_keypair): """Gerät anlegen (pending) → genehmigen (approved) → sperren (revoked).""" _, public_key_pem = kiosk_keypair # Anlegen r = await client.post( DEVICES_URL, json={"name": "Lifecycle-Test", "public_key": public_key_pem}, headers=kiosk_admin_headers, ) assert r.status_code == 201, r.text dev = r.json() dev_id = dev["id"] assert dev["status"] == "pending" # Genehmigen r = await client.post(f"{DEVICES_URL}/{dev_id}/approve", headers=kiosk_admin_headers) assert r.status_code == 200, r.text assert r.json()["status"] == "approved" # Sperren r = await client.post(f"{DEVICES_URL}/{dev_id}/revoke", headers=kiosk_admin_headers) assert r.status_code == 200, r.text assert r.json()["status"] == "revoked" async def test_heartbeat_missing_headers(client: AsyncClient): """Heartbeat ohne Kiosk-Header → 422 (FastAPI Validierungsfehler).""" resp = await client.post( HEARTBEAT_URL, json={"queued_offline_entries": 0}, ) assert resp.status_code == 422, resp.text async def test_kiosk_devices_list( client: AsyncClient, kiosk_admin_headers, approved_kiosk_device ): """Admin kann Geräteliste abrufen und approved_kiosk_device ist enthalten.""" resp = await client.get(DEVICES_URL, headers=kiosk_admin_headers) assert resp.status_code == 200, resp.text devices = resp.json() assert isinstance(devices, list) assert len(devices) > 0 ids = [d["id"] for d in devices] assert approved_kiosk_device["id"] in ids # heartbeat_status muss in jedem Eintrag vorhanden sein assert "heartbeat_status" in devices[0] async def test_kiosk_devices_list_no_auth(client: AsyncClient): """Geräteliste ohne Auth-Header → 401 oder 403.""" resp = await client.get(DEVICES_URL) assert resp.status_code in (401, 403), resp.text async def test_kiosk_device_create_invalid_key(client: AsyncClient, kiosk_admin_headers): """ Ungültiger Public Key beim Anlegen. Das Backend validiert den Public Key erst bei der ersten signierten Anfrage (lazy validation in verify_kiosk_request), nicht schon beim Anlegen. Deshalb wird das Gerät mit Status 'pending' angelegt (201). Ein Heartbeat mit diesem Gerät würde dann 401 liefern. """ resp = await client.post( DEVICES_URL, json={"name": "Bad-Key-Kiosk", "public_key": "kein-valid-key"}, headers=kiosk_admin_headers, ) # Backend akzeptiert den Key beim Anlegen (lazy validation) → 201 assert resp.status_code == 201, resp.text assert resp.json()["status"] == "pending" async def test_heartbeat_unknown_device(client: AsyncClient, kiosk_keypair): """Heartbeat mit unbekannter Geräte-ID → 401.""" private_key, _ = kiosk_keypair fake_device_id = str(uuid.uuid4()) body = b'{"queued_offline_entries": 0}' headers = make_kiosk_headers(fake_device_id, private_key, "POST", HEARTBEAT_URL, body) resp = await client.post( HEARTBEAT_URL, content=body, headers={**headers, "Content-Type": "application/json"}, ) assert resp.status_code == 401, resp.text async def test_heartbeat_invalid_key_id_format(client: AsyncClient, kiosk_keypair): """X-Kiosk-Key-Id ist keine gültige UUID → 401.""" private_key, _ = kiosk_keypair body = b'{"queued_offline_entries": 0}' timestamp = str(int(time.time())) nonce = str(uuid.uuid4()) body_hash = hashlib.sha256(body).hexdigest() message = f"POST {HEARTBEAT_URL} {timestamp} {nonce} {body_hash}".encode() signature = private_key.sign(message) headers = { "X-Kiosk-Key-Id": "nicht-eine-uuid", "X-Kiosk-Timestamp": timestamp, "X-Kiosk-Nonce": nonce, "X-Kiosk-Signature": base64.b64encode(signature).decode(), "Content-Type": "application/json", } resp = await client.post(HEARTBEAT_URL, content=body, headers=headers) assert resp.status_code == 401, resp.text async def test_heartbeat_future_timestamp(client: AsyncClient, approved_kiosk_device, kiosk_keypair): """Timestamp weit in der Zukunft (>30s) → 401.""" private_key, _ = kiosk_keypair device_id = approved_kiosk_device["id"] body = b'{"queued_offline_entries": 0}' future_timestamp = str(int(time.time()) + 120) nonce = str(uuid.uuid4()) body_hash = hashlib.sha256(body).hexdigest() message = f"POST {HEARTBEAT_URL} {future_timestamp} {nonce} {body_hash}".encode() signature = private_key.sign(message) headers = { "X-Kiosk-Key-Id": device_id, "X-Kiosk-Timestamp": future_timestamp, "X-Kiosk-Nonce": nonce, "X-Kiosk-Signature": base64.b64encode(signature).decode(), "Content-Type": "application/json", } resp = await client.post(HEARTBEAT_URL, content=body, headers=headers) assert resp.status_code == 401, resp.text async def test_heartbeat_updates_heartbeat_status( client: AsyncClient, approved_kiosk_device, kiosk_keypair, kiosk_admin_headers ): """Nach einem erfolgreichen Heartbeat hat das Gerät heartbeat_status 'online'.""" private_key, _ = kiosk_keypair device_id = approved_kiosk_device["id"] body = b'{"queued_offline_entries": 0}' headers = make_kiosk_headers(device_id, private_key, "POST", HEARTBEAT_URL, body) hb_resp = await client.post( HEARTBEAT_URL, content=body, headers={**headers, "Content-Type": "application/json"}, ) assert hb_resp.status_code == 200, hb_resp.text # Gerät abrufen und heartbeat_status prüfen dev_resp = await client.get( f"{DEVICES_URL}/{device_id}", headers=kiosk_admin_headers ) assert dev_resp.status_code == 200, dev_resp.text assert dev_resp.json()["heartbeat_status"] == "online"