Files
timemaster/backend/app/services/kiosk_auth_service.py
T
2026-05-24 12:49:09 +02:00

196 lines
6.2 KiB
Python

"""
Kiosk-User-Auth Service.
Unterstützte Methoden:
PIN → User über Personalnummer suchen, bcrypt-PIN prüfen
NFC → User über kiosk_nfc_uid suchen
QR → QR-Token ist ein kurzlebiger Redis-Key (5 min, einmalig)
List → Kein Passwort, User wählt sich aus Liste (für vertrauenswürdige Umgebungen)
"""
from __future__ import annotations
import json
import logging
import secrets
import uuid
import bcrypt
from fastapi import HTTPException
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.user import User
from app.services.kiosk_session_service import SESSION_TTL_SECONDS, kiosk_session_service
log = logging.getLogger(__name__)
QR_TOKEN_PREFIX = "kiosk_qr:"
QR_TOKEN_TTL = 5 * 60 # 5 Minuten
class KioskAuthService:
async def login_pin(
self,
personnel_number: str,
pin: str,
company_id: uuid.UUID,
device_id: uuid.UUID,
db: AsyncSession,
) -> tuple[User, str]:
"""Authentifizierung per Personalnummer + PIN. Returns (user, session_token)."""
user = await db.scalar(
select(User).where(
User.company_id == company_id,
User.personnel_number == personnel_number,
User.is_active == True,
)
)
if user is None:
raise HTTPException(status_code=401, detail="Personalnummer nicht gefunden.")
if not user.kiosk_pin_hash:
raise HTTPException(
status_code=401,
detail="Kein PIN gesetzt. Bitte in den Profileinstellungen einen PIN vergeben.",
)
if not bcrypt.checkpw(pin.encode(), user.kiosk_pin_hash.encode()):
raise HTTPException(status_code=401, detail="Falscher PIN.")
session_token = await kiosk_session_service.create_session(
user_id=user.id,
company_id=company_id,
device_id=device_id,
auth_method="pin",
)
return user, session_token
async def login_nfc(
self,
nfc_uid: str,
company_id: uuid.UUID,
device_id: uuid.UUID,
db: AsyncSession,
) -> tuple[User, str]:
"""Authentifizierung per NFC-UID."""
user = await db.scalar(
select(User).where(
User.company_id == company_id,
User.kiosk_nfc_uid == nfc_uid,
User.is_active == True,
)
)
if user is None:
raise HTTPException(status_code=401, detail="NFC-Karte nicht registriert.")
session_token = await kiosk_session_service.create_session(
user_id=user.id,
company_id=company_id,
device_id=device_id,
auth_method="nfc",
)
return user, session_token
async def generate_qr_token(self, user_id: uuid.UUID, company_id: uuid.UUID) -> str:
"""
Erzeugt einen einmaligen QR-Token (für die Web-App: User scannt QR am Kiosk).
Token ist 5 min gültig und wird in Redis gespeichert.
"""
from app.core.redis import get_redis_client
redis = get_redis_client()
if redis is None:
raise HTTPException(status_code=503, detail="QR-Login nicht verfügbar (Redis).")
token = secrets.token_urlsafe(32)
key = QR_TOKEN_PREFIX + token
redis.setex(key, QR_TOKEN_TTL, json.dumps({
"user_id": str(user_id),
"company_id": str(company_id),
}))
return token
async def login_qr(
self,
qr_token: str,
company_id: uuid.UUID,
device_id: uuid.UUID,
db: AsyncSession,
) -> tuple[User, str]:
"""Validiert QR-Token (einmalig) und erstellt Session."""
from app.core.redis import get_redis_client
redis = get_redis_client()
if redis is None:
raise HTTPException(status_code=503, detail="QR-Login nicht verfügbar (Redis).")
key = QR_TOKEN_PREFIX + qr_token
data = redis.get(key)
if data is None:
raise HTTPException(status_code=401, detail="QR-Code abgelaufen oder ungültig.")
# Einmalig: Token sofort löschen
redis.delete(key)
payload = json.loads(data)
if str(payload.get("company_id")) != str(company_id):
raise HTTPException(status_code=401, detail="QR-Code für falsches Unternehmen.")
user_id = uuid.UUID(payload["user_id"])
user = await db.get(User, user_id)
if user is None or not user.is_active:
raise HTTPException(status_code=401, detail="User nicht gefunden oder inaktiv.")
session_token = await kiosk_session_service.create_session(
user_id=user.id,
company_id=company_id,
device_id=device_id,
auth_method="qr",
)
return user, session_token
async def login_list(
self,
user_id: uuid.UUID,
company_id: uuid.UUID,
device_id: uuid.UUID,
db: AsyncSession,
) -> tuple[User, str]:
"""Login per Auswahl aus der Mitarbeiterliste (nur für vertrauenswürdige Umgebungen)."""
user = await db.scalar(
select(User).where(
User.id == user_id,
User.company_id == company_id,
User.is_active == True,
)
)
if user is None:
raise HTTPException(status_code=404, detail="Mitarbeiter nicht gefunden.")
session_token = await kiosk_session_service.create_session(
user_id=user.id,
company_id=company_id,
device_id=device_id,
auth_method="list",
)
return user, session_token
async def get_user_list(
self, company_id: uuid.UUID, db: AsyncSession
) -> list[User]:
"""Alle aktiven Mitarbeiter der Firma für Kiosk-Auswahlliste."""
result = await db.scalars(
select(User).where(
User.company_id == company_id,
User.is_active == True,
).order_by(User.last_name, User.first_name)
)
return list(result.all())
def _display_name(user: User) -> str:
"""Kurzname für Kiosk-Anzeige: 'Max M.'"""
return f"{user.first_name} {user.last_name[:1]}."
kiosk_auth_service = KioskAuthService()