Files
patrick cead46c1e1 feat: Statischer firmenweiter QR-Code für mobiles Ein-/Ausstempeln
Mitarbeiter scannen einen am Eingang ausgehängten QR-Code mit dem Privat-Handy
(/stamp?t=<token>), melden sich per Personalnummer + PIN an und stempeln ein/aus.

Eigener öffentlicher Endpunkt-Pfad, da der Kiosk-PIN-Login Ed25519-Geräte-
Signaturen verlangt, die ein Privat-Handy nicht hat.

Backend:
- Company.public_stamp_enabled (opt-in, default OFF) + rotierbares
  public_stamp_token_hash (SHA-256) + created_at; Migration 0033
- Router /time/public: company/auth/action (slowapi-Limits, AuditLog)
- kiosk_auth_service.login_pin_public() reused PIN-Lockout, keyed auf
  (public:company_id, personnel_number)
- public_stamp_session_service: 120s Redis-Kurz-Session
- Admin-Token-Endpunkte in companies.py (GET/rotate/DELETE)

Frontend:
- Public-Route /stamp (PublicStampPage)
- Stempel-PIN-Verwaltung in ProfilePage (reused POST /users/{id}/kiosk-pin)
- QR-Generierung/Druck/Toggle in CompanySettingsPage

Sicherheit: schwächer als Kiosk (keine Geräte-Signatur/Nonce/IP-Whitelist),
bewusster BYOD-Komfort-Tradeoff; Schutz über PIN + Lockout + opt-in.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-02 15:58:38 +02:00

174 lines
6.1 KiB
Python

import hashlib
import secrets
from datetime import datetime, timezone
from uuid import UUID
from fastapi import APIRouter, Depends, Request
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings
from app.core.database import get_db
from app.core.dependencies import CurrentUser, get_client_ip, require_role
from app.models import Company
from app.models.audit_log import AuditLog
from app.models.department import Department
from app.models.user import User, UserRole
from app.schemas.company import (
CompanyOut,
CompanyUpdate,
DepartmentCreate,
DepartmentOut,
DepartmentUpdate,
PublicStampTokenRotated,
PublicStampTokenStatus,
)
router = APIRouter(prefix="/companies", tags=["Companies"])
_admin_roles = (UserRole.COMPANY_ADMIN, UserRole.SUPER_ADMIN)
def _public_stamp_url(token: str) -> str:
return f"{settings.frontend_url.rstrip('/')}/stamp?t={token}"
@router.get("/me", response_model=CompanyOut)
async def get_my_company(current_user: CurrentUser, db: AsyncSession = Depends(get_db)):
company = await db.get(Company, current_user.company_id)
return CompanyOut.model_validate(company)
@router.patch("/me", response_model=CompanyOut)
async def update_my_company(
data: CompanyUpdate,
current_user: User = require_role(*_admin_roles),
db: AsyncSession = Depends(get_db),
):
company = await db.get(Company, current_user.company_id)
update_data = data.model_dump(exclude_none=True)
# settings ist ein CompanySettingsUpdate-Objekt → als dict ins JSONB mergen
if "settings" in update_data and isinstance(update_data["settings"], dict):
existing = dict(company.settings or {})
existing.update(update_data.pop("settings"))
update_data["settings"] = existing
for field, value in update_data.items():
setattr(company, field, value)
return CompanyOut.model_validate(company)
# ── Öffentliches QR-Stempel-Token ────────────────────────────────────────────
@router.get("/me/public-stamp-token", response_model=PublicStampTokenStatus)
async def get_public_stamp_token_status(
current_user: User = require_role(*_admin_roles),
db: AsyncSession = Depends(get_db),
):
company = await db.get(Company, current_user.company_id)
return PublicStampTokenStatus(
enabled=company.public_stamp_enabled,
configured=company.public_stamp_token_hash is not None,
created_at=company.public_stamp_token_created_at,
)
@router.post("/me/public-stamp-token/rotate", response_model=PublicStampTokenRotated)
async def rotate_public_stamp_token(
request: Request,
current_user: User = require_role(*_admin_roles),
db: AsyncSession = Depends(get_db),
):
company = await db.get(Company, current_user.company_id)
token = secrets.token_urlsafe(32)
company.public_stamp_token_hash = hashlib.sha256(token.encode("utf-8")).hexdigest()
company.public_stamp_token_created_at = datetime.now(timezone.utc)
db.add(AuditLog(
company_id=company.id,
user_id=current_user.id,
action="public_stamp_token_rotated",
entity_type="company",
entity_id=company.id,
ip=get_client_ip(request),
))
await db.commit()
return PublicStampTokenRotated(
token=token,
public_url=_public_stamp_url(token),
created_at=company.public_stamp_token_created_at,
)
@router.delete("/me/public-stamp-token", status_code=204)
async def delete_public_stamp_token(
request: Request,
current_user: User = require_role(*_admin_roles),
db: AsyncSession = Depends(get_db),
):
company = await db.get(Company, current_user.company_id)
if company.public_stamp_token_hash is None:
return
company.public_stamp_token_hash = None
company.public_stamp_token_created_at = None
db.add(AuditLog(
company_id=company.id,
user_id=current_user.id,
action="public_stamp_token_revoked",
entity_type="company",
entity_id=company.id,
ip=get_client_ip(request),
))
await db.commit()
# ── Departments ──────────────────────────────────────────────────────────────
@router.get("/me/departments", response_model=list[DepartmentOut])
async def list_departments(current_user: CurrentUser, db: AsyncSession = Depends(get_db)):
depts = await db.scalars(
select(Department).where(Department.company_id == current_user.company_id)
)
return [DepartmentOut.model_validate(d) for d in depts.all()]
@router.post("/me/departments", response_model=DepartmentOut, status_code=201)
async def create_department(
data: DepartmentCreate,
current_user: User = require_role(*_admin_roles),
db: AsyncSession = Depends(get_db),
):
dept = Department(company_id=current_user.company_id, **data.model_dump())
db.add(dept)
await db.flush()
return DepartmentOut.model_validate(dept)
@router.patch("/me/departments/{dept_id}", response_model=DepartmentOut)
async def update_department(
dept_id: UUID,
data: DepartmentUpdate,
current_user: User = require_role(*_admin_roles),
db: AsyncSession = Depends(get_db),
):
dept = await db.get(Department, dept_id)
if not dept or dept.company_id != current_user.company_id:
from fastapi import HTTPException
raise HTTPException(status_code=404, detail="Department not found")
for field, value in data.model_dump(exclude_none=True).items():
setattr(dept, field, value)
return DepartmentOut.model_validate(dept)
@router.delete("/me/departments/{dept_id}", status_code=204)
async def delete_department(
dept_id: UUID,
current_user: User = require_role(*_admin_roles),
db: AsyncSession = Depends(get_db),
):
dept = await db.get(Department, dept_id)
if not dept or dept.company_id != current_user.company_id:
from fastapi import HTTPException
raise HTTPException(status_code=404, detail="Department not found")
await db.delete(dept)