feat: agent-02-kiosk Phase 2A - Auth endpoints (PIN/NFC/QR/List)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,195 @@
|
||||
"""
|
||||
Kiosk-User-Auth Service.
|
||||
|
||||
Unterstützte Methoden:
|
||||
PIN → User über Personalnummer suchen, bcrypt-PIN prüfen
|
||||
NFC → User über kiosk_nfc_uid suchen
|
||||
QR → QR-Token ist ein kurzlebiger Redis-Key (5 min, einmalig)
|
||||
List → Kein Passwort, User wählt sich aus Liste (für vertrauenswürdige Umgebungen)
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import secrets
|
||||
import uuid
|
||||
|
||||
import bcrypt
|
||||
from fastapi import HTTPException
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
|
||||
from app.models.user import User
|
||||
from app.services.kiosk_session_service import SESSION_TTL_SECONDS, kiosk_session_service
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
QR_TOKEN_PREFIX = "kiosk_qr:"
|
||||
QR_TOKEN_TTL = 5 * 60 # 5 Minuten
|
||||
|
||||
|
||||
class KioskAuthService:
|
||||
|
||||
async def login_pin(
|
||||
self,
|
||||
personnel_number: str,
|
||||
pin: str,
|
||||
company_id: uuid.UUID,
|
||||
device_id: uuid.UUID,
|
||||
db: AsyncSession,
|
||||
) -> tuple[User, str]:
|
||||
"""Authentifizierung per Personalnummer + PIN. Returns (user, session_token)."""
|
||||
user = await db.scalar(
|
||||
select(User).where(
|
||||
User.company_id == company_id,
|
||||
User.personnel_number == personnel_number,
|
||||
User.is_active == True,
|
||||
)
|
||||
)
|
||||
if user is None:
|
||||
raise HTTPException(status_code=401, detail="Personalnummer nicht gefunden.")
|
||||
|
||||
if not user.kiosk_pin_hash:
|
||||
raise HTTPException(
|
||||
status_code=401,
|
||||
detail="Kein PIN gesetzt. Bitte in den Profileinstellungen einen PIN vergeben.",
|
||||
)
|
||||
|
||||
if not bcrypt.checkpw(pin.encode(), user.kiosk_pin_hash.encode()):
|
||||
raise HTTPException(status_code=401, detail="Falscher PIN.")
|
||||
|
||||
session_token = await kiosk_session_service.create_session(
|
||||
user_id=user.id,
|
||||
company_id=company_id,
|
||||
device_id=device_id,
|
||||
auth_method="pin",
|
||||
)
|
||||
return user, session_token
|
||||
|
||||
async def login_nfc(
|
||||
self,
|
||||
nfc_uid: str,
|
||||
company_id: uuid.UUID,
|
||||
device_id: uuid.UUID,
|
||||
db: AsyncSession,
|
||||
) -> tuple[User, str]:
|
||||
"""Authentifizierung per NFC-UID."""
|
||||
user = await db.scalar(
|
||||
select(User).where(
|
||||
User.company_id == company_id,
|
||||
User.kiosk_nfc_uid == nfc_uid,
|
||||
User.is_active == True,
|
||||
)
|
||||
)
|
||||
if user is None:
|
||||
raise HTTPException(status_code=401, detail="NFC-Karte nicht registriert.")
|
||||
|
||||
session_token = await kiosk_session_service.create_session(
|
||||
user_id=user.id,
|
||||
company_id=company_id,
|
||||
device_id=device_id,
|
||||
auth_method="nfc",
|
||||
)
|
||||
return user, session_token
|
||||
|
||||
async def generate_qr_token(self, user_id: uuid.UUID, company_id: uuid.UUID) -> str:
|
||||
"""
|
||||
Erzeugt einen einmaligen QR-Token (für die Web-App: User scannt QR am Kiosk).
|
||||
Token ist 5 min gültig und wird in Redis gespeichert.
|
||||
"""
|
||||
from app.core.redis import get_redis_client
|
||||
redis = get_redis_client()
|
||||
if redis is None:
|
||||
raise HTTPException(status_code=503, detail="QR-Login nicht verfügbar (Redis).")
|
||||
|
||||
token = secrets.token_urlsafe(32)
|
||||
key = QR_TOKEN_PREFIX + token
|
||||
redis.setex(key, QR_TOKEN_TTL, json.dumps({
|
||||
"user_id": str(user_id),
|
||||
"company_id": str(company_id),
|
||||
}))
|
||||
return token
|
||||
|
||||
async def login_qr(
|
||||
self,
|
||||
qr_token: str,
|
||||
company_id: uuid.UUID,
|
||||
device_id: uuid.UUID,
|
||||
db: AsyncSession,
|
||||
) -> tuple[User, str]:
|
||||
"""Validiert QR-Token (einmalig) und erstellt Session."""
|
||||
from app.core.redis import get_redis_client
|
||||
redis = get_redis_client()
|
||||
if redis is None:
|
||||
raise HTTPException(status_code=503, detail="QR-Login nicht verfügbar (Redis).")
|
||||
|
||||
key = QR_TOKEN_PREFIX + qr_token
|
||||
data = redis.get(key)
|
||||
if data is None:
|
||||
raise HTTPException(status_code=401, detail="QR-Code abgelaufen oder ungültig.")
|
||||
|
||||
# Einmalig: Token sofort löschen
|
||||
redis.delete(key)
|
||||
|
||||
payload = json.loads(data)
|
||||
if str(payload.get("company_id")) != str(company_id):
|
||||
raise HTTPException(status_code=401, detail="QR-Code für falsches Unternehmen.")
|
||||
|
||||
user_id = uuid.UUID(payload["user_id"])
|
||||
user = await db.get(User, user_id)
|
||||
if user is None or not user.is_active:
|
||||
raise HTTPException(status_code=401, detail="User nicht gefunden oder inaktiv.")
|
||||
|
||||
session_token = await kiosk_session_service.create_session(
|
||||
user_id=user.id,
|
||||
company_id=company_id,
|
||||
device_id=device_id,
|
||||
auth_method="qr",
|
||||
)
|
||||
return user, session_token
|
||||
|
||||
async def login_list(
|
||||
self,
|
||||
user_id: uuid.UUID,
|
||||
company_id: uuid.UUID,
|
||||
device_id: uuid.UUID,
|
||||
db: AsyncSession,
|
||||
) -> tuple[User, str]:
|
||||
"""Login per Auswahl aus der Mitarbeiterliste (nur für vertrauenswürdige Umgebungen)."""
|
||||
user = await db.scalar(
|
||||
select(User).where(
|
||||
User.id == user_id,
|
||||
User.company_id == company_id,
|
||||
User.is_active == True,
|
||||
)
|
||||
)
|
||||
if user is None:
|
||||
raise HTTPException(status_code=404, detail="Mitarbeiter nicht gefunden.")
|
||||
|
||||
session_token = await kiosk_session_service.create_session(
|
||||
user_id=user.id,
|
||||
company_id=company_id,
|
||||
device_id=device_id,
|
||||
auth_method="list",
|
||||
)
|
||||
return user, session_token
|
||||
|
||||
async def get_user_list(
|
||||
self, company_id: uuid.UUID, db: AsyncSession
|
||||
) -> list[User]:
|
||||
"""Alle aktiven Mitarbeiter der Firma für Kiosk-Auswahlliste."""
|
||||
result = await db.scalars(
|
||||
select(User).where(
|
||||
User.company_id == company_id,
|
||||
User.is_active == True,
|
||||
).order_by(User.last_name, User.first_name)
|
||||
)
|
||||
return list(result.all())
|
||||
|
||||
|
||||
def _display_name(user: User) -> str:
|
||||
"""Kurzname für Kiosk-Anzeige: 'Max M.'"""
|
||||
return f"{user.first_name} {user.last_name[:1]}."
|
||||
|
||||
|
||||
kiosk_auth_service = KioskAuthService()
|
||||
Reference in New Issue
Block a user