""" Kiosk-User-Auth Service. Unterstützte Methoden: PIN → User über Personalnummer suchen, bcrypt-PIN prüfen NFC → User über kiosk_nfc_uid suchen QR → QR-Token ist ein kurzlebiger Redis-Key (5 min, einmalig) List → Kein Passwort, User wählt sich aus Liste (für vertrauenswürdige Umgebungen) """ from __future__ import annotations import json import logging import secrets import uuid import bcrypt from fastapi import HTTPException from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.models.user import User from app.services.kiosk_session_service import SESSION_TTL_SECONDS, kiosk_session_service log = logging.getLogger(__name__) QR_TOKEN_PREFIX = "kiosk_qr:" QR_TOKEN_TTL = 5 * 60 # 5 Minuten # PIN-Brute-Force-Schutz PIN_MAX_ATTEMPTS = 5 PIN_LOCKOUT_SECONDS = 900 # 15 Minuten class KioskAuthService: async def _check_pin_lockout( self, device_id: uuid.UUID, personnel_number: str, redis ) -> None: """Prüft ob PIN-Login für diese Kombination gesperrt ist. Wirft 429 wenn ja.""" lockout_key = f"pin_lockout:{device_id}:{personnel_number}" if await redis.exists(lockout_key): ttl = await redis.ttl(lockout_key) wait_min = ttl // 60 + 1 raise HTTPException( status_code=429, detail=f"Zu viele Fehlversuche. Bitte {wait_min} Minute(n) warten.", ) async def _record_pin_failure( self, device_id: uuid.UUID, personnel_number: str, redis ) -> None: """Zählt einen Fehlversuch und sperrt bei Überschreitung von PIN_MAX_ATTEMPTS.""" fail_key = f"pin_fails:{device_id}:{personnel_number}" lockout_key = f"pin_lockout:{device_id}:{personnel_number}" fails = await redis.incr(fail_key) await redis.expire(fail_key, PIN_LOCKOUT_SECONDS) if fails >= PIN_MAX_ATTEMPTS: await redis.set(lockout_key, "1", ex=PIN_LOCKOUT_SECONDS) await redis.delete(fail_key) log.warning( "PIN-Lockout ausgelöst: device=%s personnel_number=%s", device_id, personnel_number, ) async def _clear_pin_failures( self, device_id: uuid.UUID, personnel_number: str, redis ) -> None: """Löscht Fehlversuche nach erfolgreichem Login.""" await redis.delete(f"pin_fails:{device_id}:{personnel_number}") await redis.delete(f"pin_lockout:{device_id}:{personnel_number}") async def login_pin( self, personnel_number: str, pin: str, company_id: uuid.UUID, device_id: uuid.UUID, db: AsyncSession, ) -> tuple[User, str]: """Authentifizierung per Personalnummer + PIN. Returns (user, session_token).""" import redis.asyncio as aioredis from app.core.config import settings # Redis für Brute-Force-Schutz (async) redis_client = aioredis.from_url(settings.redis_url, decode_responses=True) try: # 1. Lockout-Check vor DB-Abfrage (verhindert auch User-Enumeration via Timing) await self._check_pin_lockout(device_id, personnel_number, redis_client) user = await db.scalar( select(User).where( User.company_id == company_id, User.personnel_number == personnel_number, User.is_active == True, ) ) if user is None: # Fehlversuch zählen auch bei unbekannter Personalnummer await self._record_pin_failure(device_id, personnel_number, redis_client) raise HTTPException(status_code=401, detail="Personalnummer nicht gefunden.") if not user.kiosk_pin_hash: raise HTTPException( status_code=401, detail="Kein PIN gesetzt. Bitte in den Profileinstellungen einen PIN vergeben.", ) if not bcrypt.checkpw(pin.encode(), user.kiosk_pin_hash.encode()): await self._record_pin_failure(device_id, personnel_number, redis_client) raise HTTPException(status_code=401, detail="Falscher PIN.") # Erfolgreicher Login: Fehlversuche zurücksetzen await self._clear_pin_failures(device_id, personnel_number, redis_client) finally: await redis_client.aclose() session_token = await kiosk_session_service.create_session( user_id=user.id, company_id=company_id, device_id=device_id, auth_method="pin", ) return user, session_token async def login_pin_public( self, personnel_number: str, pin: str, company_id: uuid.UUID, db: AsyncSession, ) -> User: """PIN-Auth für das öffentliche QR-Stempeln (ohne Kiosk-Gerät). Wiederverwendet den Brute-Force-Lockout, aber keyed auf (company_id, personnel_number) statt (device_id, ...), da hier kein Gerät existiert. Erzeugt KEINE Kiosk-Session – der Aufrufer legt eine separate öffentliche Kurz-Session an. Gibt nur den User zurück. """ import redis.asyncio as aioredis from app.core.config import settings # Lockout-Key-Namespace klar vom Kiosk trennen lock_id = f"public:{company_id}" redis_client = aioredis.from_url(settings.redis_url, decode_responses=True) try: await self._check_pin_lockout(lock_id, personnel_number, redis_client) user = await db.scalar( select(User).where( User.company_id == company_id, User.personnel_number == personnel_number, User.is_active == True, ) ) if user is None: # Fehlversuch auch bei unbekannter Personalnummer (Anti-Enumeration) await self._record_pin_failure(lock_id, personnel_number, redis_client) raise HTTPException(status_code=401, detail="Personalnummer oder PIN falsch.") if not user.kiosk_pin_hash: raise HTTPException( status_code=401, detail="Kein PIN gesetzt. Bitte im Mitarbeiter-Portal einen Stempel-PIN vergeben.", ) if not bcrypt.checkpw(pin.encode(), user.kiosk_pin_hash.encode()): await self._record_pin_failure(lock_id, personnel_number, redis_client) raise HTTPException(status_code=401, detail="Personalnummer oder PIN falsch.") await self._clear_pin_failures(lock_id, personnel_number, redis_client) finally: await redis_client.aclose() return user async def login_nfc( self, nfc_uid: str, company_id: uuid.UUID, device_id: uuid.UUID, db: AsyncSession, ) -> tuple[User, str]: """Authentifizierung per NFC-UID.""" user = await db.scalar( select(User).where( User.company_id == company_id, User.kiosk_nfc_uid == nfc_uid, User.is_active == True, ) ) if user is None: raise HTTPException(status_code=401, detail="NFC-Karte nicht registriert.") session_token = await kiosk_session_service.create_session( user_id=user.id, company_id=company_id, device_id=device_id, auth_method="nfc", ) return user, session_token async def generate_qr_token(self, user_id: uuid.UUID, company_id: uuid.UUID) -> str: """ Erzeugt einen einmaligen QR-Token (für die Web-App: User scannt QR am Kiosk). Token ist 5 min gültig und wird in Redis gespeichert. """ from app.core.redis import get_redis_client redis = get_redis_client() if redis is None: raise HTTPException(status_code=503, detail="QR-Login nicht verfügbar (Redis).") token = secrets.token_urlsafe(32) key = QR_TOKEN_PREFIX + token redis.setex(key, QR_TOKEN_TTL, json.dumps({ "user_id": str(user_id), "company_id": str(company_id), })) return token async def login_qr( self, qr_token: str, company_id: uuid.UUID, device_id: uuid.UUID, db: AsyncSession, ) -> tuple[User, str]: """Validiert QR-Token (einmalig) und erstellt Session.""" from app.core.redis import get_redis_client redis = get_redis_client() if redis is None: raise HTTPException(status_code=503, detail="QR-Login nicht verfügbar (Redis).") key = QR_TOKEN_PREFIX + qr_token data = redis.get(key) if data is None: raise HTTPException(status_code=401, detail="QR-Code abgelaufen oder ungültig.") # Einmalig: Token sofort löschen redis.delete(key) payload = json.loads(data) if str(payload.get("company_id")) != str(company_id): raise HTTPException(status_code=401, detail="QR-Code für falsches Unternehmen.") user_id = uuid.UUID(payload["user_id"]) user = await db.get(User, user_id) if user is None or not user.is_active: raise HTTPException(status_code=401, detail="User nicht gefunden oder inaktiv.") session_token = await kiosk_session_service.create_session( user_id=user.id, company_id=company_id, device_id=device_id, auth_method="qr", ) return user, session_token async def login_list( self, user_id: uuid.UUID, company_id: uuid.UUID, device_id: uuid.UUID, db: AsyncSession, ) -> tuple[User, str]: """Login per Auswahl aus der Mitarbeiterliste (nur für vertrauenswürdige Umgebungen).""" user = await db.scalar( select(User).where( User.id == user_id, User.company_id == company_id, User.is_active == True, ) ) if user is None: raise HTTPException(status_code=404, detail="Mitarbeiter nicht gefunden.") session_token = await kiosk_session_service.create_session( user_id=user.id, company_id=company_id, device_id=device_id, auth_method="list", ) return user, session_token async def get_user_list( self, company_id: uuid.UUID, db: AsyncSession ) -> list[User]: """Alle aktiven Mitarbeiter der Firma für Kiosk-Auswahlliste.""" result = await db.scalars( select(User).where( User.company_id == company_id, User.is_active == True, ).order_by(User.last_name, User.first_name) ) return list(result.all()) def _display_name(user: User) -> str: """Kurzname für Kiosk-Anzeige: 'Max M.'""" return f"{user.first_name} {user.last_name[:1]}." kiosk_auth_service = KioskAuthService()