Files
timemaster/backend/tests/test_kiosk_security.py
patrick 35fcea90f4 feat(kiosk): Stufe 3 – ServiceWorker, WebCrypto Setup-Flow, Kiosk-UI, 15 Security-Tests
3A – Frontend Kiosk-Modus:
- public/kiosk-sw.js (NEU, 187 Zeilen): ServiceWorker signiert alle /api/v1/kiosk/
  Requests automatisch mit Ed25519. Keypair-Generierung intern (non-extractable),
  Speicherung in IndexedDB. BroadcastChannel-Leader-Election für Heartbeat.
- KioskSetupPage.tsx (NEU, 307 Zeilen): Enrollment-Flow unter /kiosk/setup.
  Keypair-Generierung via WebCrypto im ServiceWorker, Public Key als PEM anzeigen.
  Browser-Kompatibilitäts-Check (Ed25519 ab Chrome 113+).
- KioskStampPage.tsx (NEU, 348 Zeilen): Kiosk-UI unter /kiosk.
  Live-Uhr mit Server-Zeit-Offset, Heartbeat-Loop 30s, Online/Offline-Indikator.
- App.tsx: /kiosk und /kiosk/setup Routen außerhalb ProtectedRoute

3B – Tests:
- tests/test_kiosk_security.py (NEU, 387 Zeilen): 15/15 Tests grün
  Abgedeckt: gültige Signatur, falscher Key, Replay-Schutz, Timestamp-Drift,
  Future-Timestamp, pending/revoked Device, unbekanntes Gerät, fehlende Header,
  Lifecycle-Tests, heartbeat_status nach Heartbeat

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-24 12:23:03 +02:00

388 lines
14 KiB
Python

"""
Ed25519-Kiosk-Sicherheitstests für TimeMaster.
Testet:
- Gültiger signierter Heartbeat
- Ungültige Signatur → 401
- Replay-Angriff (gleiche Nonce) → 401
- Timestamp-Drift > 30s → 401
- Pending-Gerät → 401
- Gerät-Lifecycle (anlegen, genehmigen, sperren)
- Fehlende Kiosk-Header → 422
- Geräteliste für Admin
"""
import base64
import hashlib
import time
import uuid
import pytest
import pytest_asyncio
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat
from httpx import AsyncClient
pytestmark = pytest.mark.asyncio
DEVICES_URL = "/api/v1/kiosk/devices"
HEARTBEAT_URL = "/api/v1/kiosk/heartbeat"
# ── Hilffunktion ──────────────────────────────────────────────────────────────
def make_kiosk_headers(
device_id: str,
private_key: Ed25519PrivateKey,
method: str,
path: str,
body: bytes = b"",
) -> dict:
"""Erzeugt gültige Kiosk-Signatur-Header für einen Request."""
timestamp = str(int(time.time()))
nonce = str(uuid.uuid4())
body_hash = hashlib.sha256(body).hexdigest()
message = f"{method} {path} {timestamp} {nonce} {body_hash}".encode()
signature = private_key.sign(message)
sig_b64 = base64.b64encode(signature).decode()
return {
"X-Kiosk-Key-Id": device_id,
"X-Kiosk-Timestamp": timestamp,
"X-Kiosk-Nonce": nonce,
"X-Kiosk-Signature": sig_b64,
}
# ── Fixtures ──────────────────────────────────────────────────────────────────
@pytest_asyncio.fixture(scope="session", loop_scope="session")
async def kiosk_keypair():
"""Ed25519-Keypair für Tests."""
private_key = Ed25519PrivateKey.generate()
public_key_pem = (
private_key.public_key()
.public_bytes(Encoding.PEM, PublicFormat.SubjectPublicKeyInfo)
.decode()
)
return private_key, public_key_pem
@pytest_asyncio.fixture(scope="session", loop_scope="session")
async def kiosk_admin_headers(registered_user):
"""Admin-Authorization-Header aus registered_user."""
return {"Authorization": f"Bearer {registered_user['tokens']['access_token']}"}
@pytest_asyncio.fixture(scope="session", loop_scope="session")
async def approved_kiosk_device(client: AsyncClient, kiosk_admin_headers, kiosk_keypair):
"""Registriert ein Kiosk-Gerät und genehmigt es direkt."""
_, public_key_pem = kiosk_keypair
resp = await client.post(
DEVICES_URL,
json={
"name": "Test-Kiosk",
"location": "Testlabor",
"public_key": public_key_pem,
},
headers=kiosk_admin_headers,
)
assert resp.status_code == 201, resp.text
device = resp.json()
approve_resp = await client.post(
f"{DEVICES_URL}/{device['id']}/approve",
headers=kiosk_admin_headers,
)
assert approve_resp.status_code == 200, approve_resp.text
return device
# ── Tests ─────────────────────────────────────────────────────────────────────
async def test_heartbeat_valid(client: AsyncClient, approved_kiosk_device, kiosk_keypair):
"""Gültiger signierter Heartbeat → 200 mit server_time, heartbeat_interval_sec, device_status."""
private_key, _ = kiosk_keypair
device_id = approved_kiosk_device["id"]
body = b'{"queued_offline_entries": 0}'
headers = make_kiosk_headers(device_id, private_key, "POST", HEARTBEAT_URL, body)
resp = await client.post(
HEARTBEAT_URL,
content=body,
headers={**headers, "Content-Type": "application/json"},
)
assert resp.status_code == 200, resp.text
data = resp.json()
assert "server_time" in data
assert "heartbeat_interval_sec" in data
assert data["device_status"] == "approved"
async def test_heartbeat_invalid_signature(client: AsyncClient, approved_kiosk_device, kiosk_keypair):
"""Heartbeat mit falschem Signing-Key → 401."""
wrong_key = Ed25519PrivateKey.generate()
device_id = approved_kiosk_device["id"]
body = b'{"queued_offline_entries": 0}'
headers = make_kiosk_headers(device_id, wrong_key, "POST", HEARTBEAT_URL, body)
resp = await client.post(
HEARTBEAT_URL,
content=body,
headers={**headers, "Content-Type": "application/json"},
)
assert resp.status_code == 401, resp.text
async def test_heartbeat_replay(client: AsyncClient, approved_kiosk_device, kiosk_keypair):
"""Gleiche Nonce zweimal senden → zweiter Request 401 (Replay-Schutz)."""
private_key, _ = kiosk_keypair
device_id = approved_kiosk_device["id"]
body = b'{"queued_offline_entries": 0}'
headers = make_kiosk_headers(device_id, private_key, "POST", HEARTBEAT_URL, body)
merged = {**headers, "Content-Type": "application/json"}
# Erster Request muss durchgehen
r1 = await client.post(HEARTBEAT_URL, content=body, headers=merged)
assert r1.status_code == 200, r1.text
# Zweiter Request mit identischen Headern (selbe Nonce) muss abgelehnt werden
r2 = await client.post(HEARTBEAT_URL, content=body, headers=merged)
assert r2.status_code == 401, r2.text
async def test_heartbeat_timestamp_drift(client: AsyncClient, approved_kiosk_device, kiosk_keypair):
"""Timestamp der mehr als 30 Sekunden zurückliegt → 401."""
private_key, _ = kiosk_keypair
device_id = approved_kiosk_device["id"]
body = b'{"queued_offline_entries": 0}'
old_timestamp = str(int(time.time()) - 60)
nonce = str(uuid.uuid4())
body_hash = hashlib.sha256(body).hexdigest()
message = f"POST {HEARTBEAT_URL} {old_timestamp} {nonce} {body_hash}".encode()
signature = private_key.sign(message)
headers = {
"X-Kiosk-Key-Id": device_id,
"X-Kiosk-Timestamp": old_timestamp,
"X-Kiosk-Nonce": nonce,
"X-Kiosk-Signature": base64.b64encode(signature).decode(),
"Content-Type": "application/json",
}
resp = await client.post(HEARTBEAT_URL, content=body, headers=headers)
assert resp.status_code == 401, resp.text
async def test_heartbeat_pending_device(client: AsyncClient, kiosk_admin_headers, kiosk_keypair):
"""Heartbeat von einem Gerät das noch nicht genehmigt wurde → 401."""
private_key, public_key_pem = kiosk_keypair
# Neues Gerät anlegen, aber NICHT genehmigen
resp = await client.post(
DEVICES_URL,
json={"name": "Pending-Kiosk", "public_key": public_key_pem},
headers=kiosk_admin_headers,
)
assert resp.status_code == 201, resp.text
device_id = resp.json()["id"]
body = b'{"queued_offline_entries": 0}'
headers = make_kiosk_headers(device_id, private_key, "POST", HEARTBEAT_URL, body)
r = await client.post(
HEARTBEAT_URL,
content=body,
headers={**headers, "Content-Type": "application/json"},
)
assert r.status_code == 401, r.text
async def test_heartbeat_revoked_device(
client: AsyncClient, kiosk_admin_headers, kiosk_keypair
):
"""Heartbeat von einem gesperrten Gerät → 401."""
private_key, public_key_pem = kiosk_keypair
# Gerät anlegen, genehmigen und dann sperren
resp = await client.post(
DEVICES_URL,
json={"name": "Revoked-Kiosk", "public_key": public_key_pem},
headers=kiosk_admin_headers,
)
assert resp.status_code == 201, resp.text
device_id = resp.json()["id"]
await client.post(f"{DEVICES_URL}/{device_id}/approve", headers=kiosk_admin_headers)
revoke_resp = await client.post(
f"{DEVICES_URL}/{device_id}/revoke", headers=kiosk_admin_headers
)
assert revoke_resp.status_code == 200, revoke_resp.text
assert revoke_resp.json()["status"] == "revoked"
body = b'{"queued_offline_entries": 0}'
headers = make_kiosk_headers(device_id, private_key, "POST", HEARTBEAT_URL, body)
r = await client.post(
HEARTBEAT_URL,
content=body,
headers={**headers, "Content-Type": "application/json"},
)
assert r.status_code == 401, r.text
async def test_kiosk_device_lifecycle(client: AsyncClient, kiosk_admin_headers, kiosk_keypair):
"""Gerät anlegen (pending) → genehmigen (approved) → sperren (revoked)."""
_, public_key_pem = kiosk_keypair
# Anlegen
r = await client.post(
DEVICES_URL,
json={"name": "Lifecycle-Test", "public_key": public_key_pem},
headers=kiosk_admin_headers,
)
assert r.status_code == 201, r.text
dev = r.json()
dev_id = dev["id"]
assert dev["status"] == "pending"
# Genehmigen
r = await client.post(f"{DEVICES_URL}/{dev_id}/approve", headers=kiosk_admin_headers)
assert r.status_code == 200, r.text
assert r.json()["status"] == "approved"
# Sperren
r = await client.post(f"{DEVICES_URL}/{dev_id}/revoke", headers=kiosk_admin_headers)
assert r.status_code == 200, r.text
assert r.json()["status"] == "revoked"
async def test_heartbeat_missing_headers(client: AsyncClient):
"""Heartbeat ohne Kiosk-Header → 422 (FastAPI Validierungsfehler)."""
resp = await client.post(
HEARTBEAT_URL,
json={"queued_offline_entries": 0},
)
assert resp.status_code == 422, resp.text
async def test_kiosk_devices_list(
client: AsyncClient, kiosk_admin_headers, approved_kiosk_device
):
"""Admin kann Geräteliste abrufen und approved_kiosk_device ist enthalten."""
resp = await client.get(DEVICES_URL, headers=kiosk_admin_headers)
assert resp.status_code == 200, resp.text
devices = resp.json()
assert isinstance(devices, list)
assert len(devices) > 0
ids = [d["id"] for d in devices]
assert approved_kiosk_device["id"] in ids
# heartbeat_status muss in jedem Eintrag vorhanden sein
assert "heartbeat_status" in devices[0]
async def test_kiosk_devices_list_no_auth(client: AsyncClient):
"""Geräteliste ohne Auth-Header → 401 oder 403."""
resp = await client.get(DEVICES_URL)
assert resp.status_code in (401, 403), resp.text
async def test_kiosk_device_create_invalid_key(client: AsyncClient, kiosk_admin_headers):
"""
Ungültiger Public Key beim Anlegen.
Das Backend validiert den Public Key erst bei der ersten signierten Anfrage
(lazy validation in verify_kiosk_request), nicht schon beim Anlegen.
Deshalb wird das Gerät mit Status 'pending' angelegt (201).
Ein Heartbeat mit diesem Gerät würde dann 401 liefern.
"""
resp = await client.post(
DEVICES_URL,
json={"name": "Bad-Key-Kiosk", "public_key": "kein-valid-key"},
headers=kiosk_admin_headers,
)
# Backend akzeptiert den Key beim Anlegen (lazy validation) → 201
assert resp.status_code == 201, resp.text
assert resp.json()["status"] == "pending"
async def test_heartbeat_unknown_device(client: AsyncClient, kiosk_keypair):
"""Heartbeat mit unbekannter Geräte-ID → 401."""
private_key, _ = kiosk_keypair
fake_device_id = str(uuid.uuid4())
body = b'{"queued_offline_entries": 0}'
headers = make_kiosk_headers(fake_device_id, private_key, "POST", HEARTBEAT_URL, body)
resp = await client.post(
HEARTBEAT_URL,
content=body,
headers={**headers, "Content-Type": "application/json"},
)
assert resp.status_code == 401, resp.text
async def test_heartbeat_invalid_key_id_format(client: AsyncClient, kiosk_keypair):
"""X-Kiosk-Key-Id ist keine gültige UUID → 401."""
private_key, _ = kiosk_keypair
body = b'{"queued_offline_entries": 0}'
timestamp = str(int(time.time()))
nonce = str(uuid.uuid4())
body_hash = hashlib.sha256(body).hexdigest()
message = f"POST {HEARTBEAT_URL} {timestamp} {nonce} {body_hash}".encode()
signature = private_key.sign(message)
headers = {
"X-Kiosk-Key-Id": "nicht-eine-uuid",
"X-Kiosk-Timestamp": timestamp,
"X-Kiosk-Nonce": nonce,
"X-Kiosk-Signature": base64.b64encode(signature).decode(),
"Content-Type": "application/json",
}
resp = await client.post(HEARTBEAT_URL, content=body, headers=headers)
assert resp.status_code == 401, resp.text
async def test_heartbeat_future_timestamp(client: AsyncClient, approved_kiosk_device, kiosk_keypair):
"""Timestamp weit in der Zukunft (>30s) → 401."""
private_key, _ = kiosk_keypair
device_id = approved_kiosk_device["id"]
body = b'{"queued_offline_entries": 0}'
future_timestamp = str(int(time.time()) + 120)
nonce = str(uuid.uuid4())
body_hash = hashlib.sha256(body).hexdigest()
message = f"POST {HEARTBEAT_URL} {future_timestamp} {nonce} {body_hash}".encode()
signature = private_key.sign(message)
headers = {
"X-Kiosk-Key-Id": device_id,
"X-Kiosk-Timestamp": future_timestamp,
"X-Kiosk-Nonce": nonce,
"X-Kiosk-Signature": base64.b64encode(signature).decode(),
"Content-Type": "application/json",
}
resp = await client.post(HEARTBEAT_URL, content=body, headers=headers)
assert resp.status_code == 401, resp.text
async def test_heartbeat_updates_heartbeat_status(
client: AsyncClient, approved_kiosk_device, kiosk_keypair, kiosk_admin_headers
):
"""Nach einem erfolgreichen Heartbeat hat das Gerät heartbeat_status 'online'."""
private_key, _ = kiosk_keypair
device_id = approved_kiosk_device["id"]
body = b'{"queued_offline_entries": 0}'
headers = make_kiosk_headers(device_id, private_key, "POST", HEARTBEAT_URL, body)
hb_resp = await client.post(
HEARTBEAT_URL,
content=body,
headers={**headers, "Content-Type": "application/json"},
)
assert hb_resp.status_code == 200, hb_resp.text
# Gerät abrufen und heartbeat_status prüfen
dev_resp = await client.get(
f"{DEVICES_URL}/{device_id}", headers=kiosk_admin_headers
)
assert dev_resp.status_code == 200, dev_resp.text
assert dev_resp.json()["heartbeat_status"] == "online"