cead46c1e1
Mitarbeiter scannen einen am Eingang ausgehängten QR-Code mit dem Privat-Handy
(/stamp?t=<token>), melden sich per Personalnummer + PIN an und stempeln ein/aus.
Eigener öffentlicher Endpunkt-Pfad, da der Kiosk-PIN-Login Ed25519-Geräte-
Signaturen verlangt, die ein Privat-Handy nicht hat.
Backend:
- Company.public_stamp_enabled (opt-in, default OFF) + rotierbares
public_stamp_token_hash (SHA-256) + created_at; Migration 0033
- Router /time/public: company/auth/action (slowapi-Limits, AuditLog)
- kiosk_auth_service.login_pin_public() reused PIN-Lockout, keyed auf
(public:company_id, personnel_number)
- public_stamp_session_service: 120s Redis-Kurz-Session
- Admin-Token-Endpunkte in companies.py (GET/rotate/DELETE)
Frontend:
- Public-Route /stamp (PublicStampPage)
- Stempel-PIN-Verwaltung in ProfilePage (reused POST /users/{id}/kiosk-pin)
- QR-Generierung/Druck/Toggle in CompanySettingsPage
Sicherheit: schwächer als Kiosk (keine Geräte-Signatur/Nonce/IP-Whitelist),
bewusster BYOD-Komfort-Tradeoff; Schutz über PIN + Lockout + opt-in.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
308 lines
11 KiB
Python
308 lines
11 KiB
Python
"""
|
||
Kiosk-User-Auth Service.
|
||
|
||
Unterstützte Methoden:
|
||
PIN → User über Personalnummer suchen, bcrypt-PIN prüfen
|
||
NFC → User über kiosk_nfc_uid suchen
|
||
QR → QR-Token ist ein kurzlebiger Redis-Key (5 min, einmalig)
|
||
List → Kein Passwort, User wählt sich aus Liste (für vertrauenswürdige Umgebungen)
|
||
"""
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import logging
|
||
import secrets
|
||
import uuid
|
||
|
||
import bcrypt
|
||
from fastapi import HTTPException
|
||
from sqlalchemy import select
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
||
from app.models.user import User
|
||
from app.services.kiosk_session_service import SESSION_TTL_SECONDS, kiosk_session_service
|
||
|
||
log = logging.getLogger(__name__)
|
||
|
||
QR_TOKEN_PREFIX = "kiosk_qr:"
|
||
QR_TOKEN_TTL = 5 * 60 # 5 Minuten
|
||
|
||
# PIN-Brute-Force-Schutz
|
||
PIN_MAX_ATTEMPTS = 5
|
||
PIN_LOCKOUT_SECONDS = 900 # 15 Minuten
|
||
|
||
|
||
class KioskAuthService:
|
||
|
||
async def _check_pin_lockout(
|
||
self, device_id: uuid.UUID, personnel_number: str, redis
|
||
) -> None:
|
||
"""Prüft ob PIN-Login für diese Kombination gesperrt ist. Wirft 429 wenn ja."""
|
||
lockout_key = f"pin_lockout:{device_id}:{personnel_number}"
|
||
if await redis.exists(lockout_key):
|
||
ttl = await redis.ttl(lockout_key)
|
||
wait_min = ttl // 60 + 1
|
||
raise HTTPException(
|
||
status_code=429,
|
||
detail=f"Zu viele Fehlversuche. Bitte {wait_min} Minute(n) warten.",
|
||
)
|
||
|
||
async def _record_pin_failure(
|
||
self, device_id: uuid.UUID, personnel_number: str, redis
|
||
) -> None:
|
||
"""Zählt einen Fehlversuch und sperrt bei Überschreitung von PIN_MAX_ATTEMPTS."""
|
||
fail_key = f"pin_fails:{device_id}:{personnel_number}"
|
||
lockout_key = f"pin_lockout:{device_id}:{personnel_number}"
|
||
|
||
fails = await redis.incr(fail_key)
|
||
await redis.expire(fail_key, PIN_LOCKOUT_SECONDS)
|
||
|
||
if fails >= PIN_MAX_ATTEMPTS:
|
||
await redis.set(lockout_key, "1", ex=PIN_LOCKOUT_SECONDS)
|
||
await redis.delete(fail_key)
|
||
log.warning(
|
||
"PIN-Lockout ausgelöst: device=%s personnel_number=%s",
|
||
device_id,
|
||
personnel_number,
|
||
)
|
||
|
||
async def _clear_pin_failures(
|
||
self, device_id: uuid.UUID, personnel_number: str, redis
|
||
) -> None:
|
||
"""Löscht Fehlversuche nach erfolgreichem Login."""
|
||
await redis.delete(f"pin_fails:{device_id}:{personnel_number}")
|
||
await redis.delete(f"pin_lockout:{device_id}:{personnel_number}")
|
||
|
||
async def login_pin(
|
||
self,
|
||
personnel_number: str,
|
||
pin: str,
|
||
company_id: uuid.UUID,
|
||
device_id: uuid.UUID,
|
||
db: AsyncSession,
|
||
) -> tuple[User, str]:
|
||
"""Authentifizierung per Personalnummer + PIN. Returns (user, session_token)."""
|
||
import redis.asyncio as aioredis
|
||
from app.core.config import settings
|
||
|
||
# Redis für Brute-Force-Schutz (async)
|
||
redis_client = aioredis.from_url(settings.redis_url, decode_responses=True)
|
||
try:
|
||
# 1. Lockout-Check vor DB-Abfrage (verhindert auch User-Enumeration via Timing)
|
||
await self._check_pin_lockout(device_id, personnel_number, redis_client)
|
||
|
||
user = await db.scalar(
|
||
select(User).where(
|
||
User.company_id == company_id,
|
||
User.personnel_number == personnel_number,
|
||
User.is_active == True,
|
||
)
|
||
)
|
||
if user is None:
|
||
# Fehlversuch zählen auch bei unbekannter Personalnummer
|
||
await self._record_pin_failure(device_id, personnel_number, redis_client)
|
||
raise HTTPException(status_code=401, detail="Personalnummer nicht gefunden.")
|
||
|
||
if not user.kiosk_pin_hash:
|
||
raise HTTPException(
|
||
status_code=401,
|
||
detail="Kein PIN gesetzt. Bitte in den Profileinstellungen einen PIN vergeben.",
|
||
)
|
||
|
||
if not bcrypt.checkpw(pin.encode(), user.kiosk_pin_hash.encode()):
|
||
await self._record_pin_failure(device_id, personnel_number, redis_client)
|
||
raise HTTPException(status_code=401, detail="Falscher PIN.")
|
||
|
||
# Erfolgreicher Login: Fehlversuche zurücksetzen
|
||
await self._clear_pin_failures(device_id, personnel_number, redis_client)
|
||
finally:
|
||
await redis_client.aclose()
|
||
|
||
session_token = await kiosk_session_service.create_session(
|
||
user_id=user.id,
|
||
company_id=company_id,
|
||
device_id=device_id,
|
||
auth_method="pin",
|
||
)
|
||
return user, session_token
|
||
|
||
async def login_pin_public(
|
||
self,
|
||
personnel_number: str,
|
||
pin: str,
|
||
company_id: uuid.UUID,
|
||
db: AsyncSession,
|
||
) -> User:
|
||
"""PIN-Auth für das öffentliche QR-Stempeln (ohne Kiosk-Gerät).
|
||
|
||
Wiederverwendet den Brute-Force-Lockout, aber keyed auf
|
||
(company_id, personnel_number) statt (device_id, ...), da hier kein
|
||
Gerät existiert. Erzeugt KEINE Kiosk-Session – der Aufrufer legt eine
|
||
separate öffentliche Kurz-Session an. Gibt nur den User zurück.
|
||
"""
|
||
import redis.asyncio as aioredis
|
||
from app.core.config import settings
|
||
|
||
# Lockout-Key-Namespace klar vom Kiosk trennen
|
||
lock_id = f"public:{company_id}"
|
||
|
||
redis_client = aioredis.from_url(settings.redis_url, decode_responses=True)
|
||
try:
|
||
await self._check_pin_lockout(lock_id, personnel_number, redis_client)
|
||
|
||
user = await db.scalar(
|
||
select(User).where(
|
||
User.company_id == company_id,
|
||
User.personnel_number == personnel_number,
|
||
User.is_active == True,
|
||
)
|
||
)
|
||
if user is None:
|
||
# Fehlversuch auch bei unbekannter Personalnummer (Anti-Enumeration)
|
||
await self._record_pin_failure(lock_id, personnel_number, redis_client)
|
||
raise HTTPException(status_code=401, detail="Personalnummer oder PIN falsch.")
|
||
|
||
if not user.kiosk_pin_hash:
|
||
raise HTTPException(
|
||
status_code=401,
|
||
detail="Kein PIN gesetzt. Bitte im Mitarbeiter-Portal einen Stempel-PIN vergeben.",
|
||
)
|
||
|
||
if not bcrypt.checkpw(pin.encode(), user.kiosk_pin_hash.encode()):
|
||
await self._record_pin_failure(lock_id, personnel_number, redis_client)
|
||
raise HTTPException(status_code=401, detail="Personalnummer oder PIN falsch.")
|
||
|
||
await self._clear_pin_failures(lock_id, personnel_number, redis_client)
|
||
finally:
|
||
await redis_client.aclose()
|
||
|
||
return user
|
||
|
||
async def login_nfc(
|
||
self,
|
||
nfc_uid: str,
|
||
company_id: uuid.UUID,
|
||
device_id: uuid.UUID,
|
||
db: AsyncSession,
|
||
) -> tuple[User, str]:
|
||
"""Authentifizierung per NFC-UID."""
|
||
user = await db.scalar(
|
||
select(User).where(
|
||
User.company_id == company_id,
|
||
User.kiosk_nfc_uid == nfc_uid,
|
||
User.is_active == True,
|
||
)
|
||
)
|
||
if user is None:
|
||
raise HTTPException(status_code=401, detail="NFC-Karte nicht registriert.")
|
||
|
||
session_token = await kiosk_session_service.create_session(
|
||
user_id=user.id,
|
||
company_id=company_id,
|
||
device_id=device_id,
|
||
auth_method="nfc",
|
||
)
|
||
return user, session_token
|
||
|
||
async def generate_qr_token(self, user_id: uuid.UUID, company_id: uuid.UUID) -> str:
|
||
"""
|
||
Erzeugt einen einmaligen QR-Token (für die Web-App: User scannt QR am Kiosk).
|
||
Token ist 5 min gültig und wird in Redis gespeichert.
|
||
"""
|
||
from app.core.redis import get_redis_client
|
||
redis = get_redis_client()
|
||
if redis is None:
|
||
raise HTTPException(status_code=503, detail="QR-Login nicht verfügbar (Redis).")
|
||
|
||
token = secrets.token_urlsafe(32)
|
||
key = QR_TOKEN_PREFIX + token
|
||
redis.setex(key, QR_TOKEN_TTL, json.dumps({
|
||
"user_id": str(user_id),
|
||
"company_id": str(company_id),
|
||
}))
|
||
return token
|
||
|
||
async def login_qr(
|
||
self,
|
||
qr_token: str,
|
||
company_id: uuid.UUID,
|
||
device_id: uuid.UUID,
|
||
db: AsyncSession,
|
||
) -> tuple[User, str]:
|
||
"""Validiert QR-Token (einmalig) und erstellt Session."""
|
||
from app.core.redis import get_redis_client
|
||
redis = get_redis_client()
|
||
if redis is None:
|
||
raise HTTPException(status_code=503, detail="QR-Login nicht verfügbar (Redis).")
|
||
|
||
key = QR_TOKEN_PREFIX + qr_token
|
||
data = redis.get(key)
|
||
if data is None:
|
||
raise HTTPException(status_code=401, detail="QR-Code abgelaufen oder ungültig.")
|
||
|
||
# Einmalig: Token sofort löschen
|
||
redis.delete(key)
|
||
|
||
payload = json.loads(data)
|
||
if str(payload.get("company_id")) != str(company_id):
|
||
raise HTTPException(status_code=401, detail="QR-Code für falsches Unternehmen.")
|
||
|
||
user_id = uuid.UUID(payload["user_id"])
|
||
user = await db.get(User, user_id)
|
||
if user is None or not user.is_active:
|
||
raise HTTPException(status_code=401, detail="User nicht gefunden oder inaktiv.")
|
||
|
||
session_token = await kiosk_session_service.create_session(
|
||
user_id=user.id,
|
||
company_id=company_id,
|
||
device_id=device_id,
|
||
auth_method="qr",
|
||
)
|
||
return user, session_token
|
||
|
||
async def login_list(
|
||
self,
|
||
user_id: uuid.UUID,
|
||
company_id: uuid.UUID,
|
||
device_id: uuid.UUID,
|
||
db: AsyncSession,
|
||
) -> tuple[User, str]:
|
||
"""Login per Auswahl aus der Mitarbeiterliste (nur für vertrauenswürdige Umgebungen)."""
|
||
user = await db.scalar(
|
||
select(User).where(
|
||
User.id == user_id,
|
||
User.company_id == company_id,
|
||
User.is_active == True,
|
||
)
|
||
)
|
||
if user is None:
|
||
raise HTTPException(status_code=404, detail="Mitarbeiter nicht gefunden.")
|
||
|
||
session_token = await kiosk_session_service.create_session(
|
||
user_id=user.id,
|
||
company_id=company_id,
|
||
device_id=device_id,
|
||
auth_method="list",
|
||
)
|
||
return user, session_token
|
||
|
||
async def get_user_list(
|
||
self, company_id: uuid.UUID, db: AsyncSession
|
||
) -> list[User]:
|
||
"""Alle aktiven Mitarbeiter der Firma für Kiosk-Auswahlliste."""
|
||
result = await db.scalars(
|
||
select(User).where(
|
||
User.company_id == company_id,
|
||
User.is_active == True,
|
||
).order_by(User.last_name, User.first_name)
|
||
)
|
||
return list(result.all())
|
||
|
||
|
||
def _display_name(user: User) -> str:
|
||
"""Kurzname für Kiosk-Anzeige: 'Max M.'"""
|
||
return f"{user.first_name} {user.last_name[:1]}."
|
||
|
||
|
||
kiosk_auth_service = KioskAuthService()
|