feat: Statischer firmenweiter QR-Code für mobiles Ein-/Ausstempeln

Mitarbeiter scannen einen am Eingang ausgehängten QR-Code mit dem Privat-Handy
(/stamp?t=<token>), melden sich per Personalnummer + PIN an und stempeln ein/aus.

Eigener öffentlicher Endpunkt-Pfad, da der Kiosk-PIN-Login Ed25519-Geräte-
Signaturen verlangt, die ein Privat-Handy nicht hat.

Backend:
- Company.public_stamp_enabled (opt-in, default OFF) + rotierbares
  public_stamp_token_hash (SHA-256) + created_at; Migration 0033
- Router /time/public: company/auth/action (slowapi-Limits, AuditLog)
- kiosk_auth_service.login_pin_public() reused PIN-Lockout, keyed auf
  (public:company_id, personnel_number)
- public_stamp_session_service: 120s Redis-Kurz-Session
- Admin-Token-Endpunkte in companies.py (GET/rotate/DELETE)

Frontend:
- Public-Route /stamp (PublicStampPage)
- Stempel-PIN-Verwaltung in ProfilePage (reused POST /users/{id}/kiosk-pin)
- QR-Generierung/Druck/Toggle in CompanySettingsPage

Sicherheit: schwächer als Kiosk (keine Geräte-Signatur/Nonce/IP-Whitelist),
bewusster BYOD-Komfort-Tradeoff; Schutz über PIN + Lockout + opt-in.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
2026-06-02 15:58:38 +02:00
parent 03d5fd6e2e
commit cead46c1e1
14 changed files with 1130 additions and 2 deletions
+2
View File
@@ -17,6 +17,7 @@ from app.routers import busylight
from app.routers import audit
from app.routers import special_assignments
from app.routers import hours_payouts
from app.routers import public_stamp
@asynccontextmanager
@@ -85,6 +86,7 @@ app.include_router(auth.router, prefix=API_PREFIX)
app.include_router(users.router, prefix=API_PREFIX)
app.include_router(companies.router, prefix=API_PREFIX)
app.include_router(time_entries.router, prefix=API_PREFIX)
app.include_router(public_stamp.router, prefix=API_PREFIX)
app.include_router(absences.router, prefix=API_PREFIX)
app.include_router(reports.router, prefix=API_PREFIX)
app.include_router(ldap.router, prefix=API_PREFIX)
+7
View File
@@ -44,6 +44,13 @@ class Company(Base):
busylight_pull_token_hash: Mapped[str | None] = mapped_column(String(64), unique=True)
busylight_token_created_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
# Öffentliches QR-Stempeln: statischer firmenweiter QR-Code → /stamp?t=<token>.
# Mitarbeiter scannt mit Privat-Handy, meldet sich per Personalnummer + PIN an.
# Opt-in (default OFF). Token gehasht in DB (SHA-256), Klartext nur beim Rotieren.
public_stamp_enabled: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False)
public_stamp_token_hash: Mapped[str | None] = mapped_column(String(64), unique=True)
public_stamp_token_created_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
# Kiosk-Konfiguration
kiosk_require_approval: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
kiosk_track_current_user: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
+78 -2
View File
@@ -1,12 +1,17 @@
import hashlib
import secrets
from datetime import datetime, timezone
from uuid import UUID
from fastapi import APIRouter, Depends
from fastapi import APIRouter, Depends, Request
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings
from app.core.database import get_db
from app.core.dependencies import CurrentUser, require_role
from app.core.dependencies import CurrentUser, get_client_ip, require_role
from app.models import Company
from app.models.audit_log import AuditLog
from app.models.department import Department
from app.models.user import User, UserRole
from app.schemas.company import (
@@ -15,6 +20,8 @@ from app.schemas.company import (
DepartmentCreate,
DepartmentOut,
DepartmentUpdate,
PublicStampTokenRotated,
PublicStampTokenStatus,
)
router = APIRouter(prefix="/companies", tags=["Companies"])
@@ -22,6 +29,10 @@ router = APIRouter(prefix="/companies", tags=["Companies"])
_admin_roles = (UserRole.COMPANY_ADMIN, UserRole.SUPER_ADMIN)
def _public_stamp_url(token: str) -> str:
return f"{settings.frontend_url.rstrip('/')}/stamp?t={token}"
@router.get("/me", response_model=CompanyOut)
async def get_my_company(current_user: CurrentUser, db: AsyncSession = Depends(get_db)):
company = await db.get(Company, current_user.company_id)
@@ -46,6 +57,71 @@ async def update_my_company(
return CompanyOut.model_validate(company)
# ── Öffentliches QR-Stempel-Token ────────────────────────────────────────────
@router.get("/me/public-stamp-token", response_model=PublicStampTokenStatus)
async def get_public_stamp_token_status(
current_user: User = require_role(*_admin_roles),
db: AsyncSession = Depends(get_db),
):
company = await db.get(Company, current_user.company_id)
return PublicStampTokenStatus(
enabled=company.public_stamp_enabled,
configured=company.public_stamp_token_hash is not None,
created_at=company.public_stamp_token_created_at,
)
@router.post("/me/public-stamp-token/rotate", response_model=PublicStampTokenRotated)
async def rotate_public_stamp_token(
request: Request,
current_user: User = require_role(*_admin_roles),
db: AsyncSession = Depends(get_db),
):
company = await db.get(Company, current_user.company_id)
token = secrets.token_urlsafe(32)
company.public_stamp_token_hash = hashlib.sha256(token.encode("utf-8")).hexdigest()
company.public_stamp_token_created_at = datetime.now(timezone.utc)
db.add(AuditLog(
company_id=company.id,
user_id=current_user.id,
action="public_stamp_token_rotated",
entity_type="company",
entity_id=company.id,
ip=get_client_ip(request),
))
await db.commit()
return PublicStampTokenRotated(
token=token,
public_url=_public_stamp_url(token),
created_at=company.public_stamp_token_created_at,
)
@router.delete("/me/public-stamp-token", status_code=204)
async def delete_public_stamp_token(
request: Request,
current_user: User = require_role(*_admin_roles),
db: AsyncSession = Depends(get_db),
):
company = await db.get(Company, current_user.company_id)
if company.public_stamp_token_hash is None:
return
company.public_stamp_token_hash = None
company.public_stamp_token_created_at = None
db.add(AuditLog(
company_id=company.id,
user_id=current_user.id,
action="public_stamp_token_revoked",
entity_type="company",
entity_id=company.id,
ip=get_client_ip(request),
))
await db.commit()
# ── Departments ──────────────────────────────────────────────────────────────
@router.get("/me/departments", response_model=list[DepartmentOut])
+155
View File
@@ -0,0 +1,155 @@
"""Öffentliches QR-Stempeln (statischer firmenweiter QR-Code).
Kein Bearer-Token, KEINE Ed25519-Geräte-Signatur (anders als der Kiosk):
ein privates Handy hat keinen Geräteschlüssel. Stattdessen Identifikation per
Personalnummer + PIN. Härtung: Opt-in (default OFF), PIN-Lockout, IP-Rate-Limit,
gehashtes rotierbares Token, 120s-Session, AuditLog.
⚠ SICHERHEIT: Dieser Pfad ist STRIKT SCHWÄCHER als der Kiosk-Pfad keine
Geräte-Signatur, kein Replay-Nonce, keine IP-Whitelist. Er tauscht Sicherheit
gegen BYOD-Komfort. Pro Firma nur aktivieren wenn wirklich benötigt.
"""
from __future__ import annotations
import hashlib
from fastapi import APIRouter, Depends, HTTPException, Query, Request
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_db
from app.core.dependencies import get_client_ip
from app.core.limiter import limiter
from app.models.audit_log import AuditLog
from app.models.company import Company
from app.models.time_entry import EntrySource
from app.models.user import User
from app.schemas.public_stamp import (
PublicStampActionRequest,
PublicStampActionResponse,
PublicStampAuthRequest,
PublicStampAuthResponse,
PublicStampCompanyInfo,
)
from app.schemas.time_entry import StampInRequest, TimeEntryOut
from app.services.kiosk_auth_service import kiosk_auth_service, _display_name
from app.services.public_stamp_session_service import (
PUBLIC_STAMP_SESSION_TTL,
public_stamp_session_service,
)
from app.services.time_service import time_service
router = APIRouter(prefix="/time/public", tags=["Public Stamping"])
def _hash_token(token: str) -> str:
return hashlib.sha256(token.encode("utf-8")).hexdigest()
async def _company_from_token(token: str, db: AsyncSession, *, require_enabled: bool) -> Company:
company = await db.scalar(
select(Company).where(Company.public_stamp_token_hash == _hash_token(token))
)
if company is None:
raise HTTPException(status_code=404, detail="QR-Code ungültig.")
if require_enabled and not company.public_stamp_enabled:
raise HTTPException(status_code=403, detail="QR-Stempeln ist für dieses Unternehmen deaktiviert.")
return company
async def _status(user: User, db: AsyncSession) -> dict:
"""Aktuellen Stempel-Status + heutige Einträge ermitteln."""
open_entry = await time_service._get_open_entry(user.id, db)
today = await time_service.get_today(user, db)
return {
"open": open_entry is not None,
"on_break": open_entry is not None and open_entry.break_start is not None,
"today": [TimeEntryOut.model_validate(e) for e in today],
}
@router.get("/company", response_model=PublicStampCompanyInfo)
@limiter.limit("30/minute")
async def public_company_info(
request: Request,
t: str = Query(..., min_length=8),
db: AsyncSession = Depends(get_db),
):
"""Firmen-Header für die Stempel-Seite. Gibt auch bei deaktiviertem
Feature den Namen zurück, damit die Seite einen Hinweis zeigen kann."""
company = await _company_from_token(t, db, require_enabled=False)
return PublicStampCompanyInfo(company_name=company.name, enabled=company.public_stamp_enabled)
@router.post("/auth", response_model=PublicStampAuthResponse)
@limiter.limit("10/minute")
async def public_auth(
request: Request,
body: PublicStampAuthRequest,
db: AsyncSession = Depends(get_db),
):
"""Personalnummer + PIN → Kurz-Session + aktueller Status."""
company = await _company_from_token(body.token, db, require_enabled=True)
user = await kiosk_auth_service.login_pin_public(
body.personnel_number, body.pin, company.id, db
)
session_token = await public_stamp_session_service.create_session(user.id, company.id)
db.add(AuditLog(
company_id=company.id,
user_id=user.id,
action="public_stamp_login",
entity_type="user",
entity_id=user.id,
ip=get_client_ip(request),
))
await db.commit()
status = await _status(user, db)
return PublicStampAuthResponse(
session_token=session_token,
user_name=_display_name(user),
expires_in_seconds=PUBLIC_STAMP_SESSION_TTL,
**status,
)
@router.post("/action", response_model=PublicStampActionResponse)
@limiter.limit("30/minute")
async def public_action(
request: Request,
body: PublicStampActionRequest,
db: AsyncSession = Depends(get_db),
):
"""Stempel-Aktion über eine gültige Kurz-Session ausführen."""
session = await public_stamp_session_service.require_session(body.session_token)
user = await db.get(User, session["user_id"])
if user is None or not user.is_active or str(user.company_id) != session["company_id"]:
raise HTTPException(status_code=401, detail="Session ungültig.")
warnings: list[str] = []
if body.action == "in":
_, warnings = await time_service.stamp_in(
user, StampInRequest(source=EntrySource.KIOSK, note=body.note), db
)
elif body.action == "out":
_, warnings = await time_service.stamp_out(user, body.note, db)
elif body.action == "break_start":
await time_service.break_start(user, db)
elif body.action == "break_end":
await time_service.break_end(user, db)
db.add(AuditLog(
company_id=user.company_id,
user_id=user.id,
action=f"public_stamp_{body.action}",
entity_type="user",
entity_id=user.id,
ip=get_client_ip(request),
))
await db.commit()
status = await _status(user, db)
return PublicStampActionResponse(warnings=warnings, **status)
+17
View File
@@ -1,4 +1,5 @@
import uuid
from datetime import datetime
from typing import Literal
from pydantic import BaseModel, ConfigDict, Field
@@ -45,6 +46,21 @@ class CompanyOut(BaseModel):
kiosk_require_approval: bool = True
kiosk_track_current_user: bool = True
kiosk_heartbeat_interval_sec: int = 30
public_stamp_enabled: bool = False
class PublicStampTokenStatus(BaseModel):
"""Status des öffentlichen QR-Stempel-Tokens (kein Klartext)."""
enabled: bool
configured: bool
created_at: datetime | None = None
class PublicStampTokenRotated(BaseModel):
"""Antwort beim Rotieren enthält Klartext-Token + fertige URL (nur einmalig)."""
token: str = Field(..., description="Klartext-Token, wird nur einmal angezeigt.")
public_url: str
created_at: datetime
class CompanyUpdate(BaseModel):
@@ -65,6 +81,7 @@ class CompanyUpdate(BaseModel):
kiosk_require_approval: bool | None = None
kiosk_track_current_user: bool | None = None
kiosk_heartbeat_interval_sec: int | None = Field(None, ge=10, le=120)
public_stamp_enabled: bool | None = None
class DepartmentOut(BaseModel):
+47
View File
@@ -0,0 +1,47 @@
"""Schemas für das öffentliche QR-Stempeln (statischer firmenweiter QR-Code).
Flow: Handy scannt QR (/stamp?t=<token>) → Seite zeigt Firmennamen →
Personalnummer + PIN → Kurz-Session (120s) → Ein-/Ausstempeln.
"""
from __future__ import annotations
from typing import Literal
from pydantic import BaseModel, Field
from app.schemas.time_entry import TimeEntryOut
class PublicStampCompanyInfo(BaseModel):
"""Header-Info für die Stempel-Seite (Token-Auflösung)."""
company_name: str
enabled: bool
class PublicStampAuthRequest(BaseModel):
token: str = Field(..., min_length=8)
personnel_number: str = Field(..., min_length=1, max_length=50)
pin: str = Field(..., min_length=4, max_length=6, pattern=r"^\d+$")
class PublicStampStatus(BaseModel):
"""Aktueller Stempel-Status des Mitarbeiters."""
open: bool
on_break: bool
today: list[TimeEntryOut] = []
class PublicStampAuthResponse(PublicStampStatus):
session_token: str
user_name: str
expires_in_seconds: int
class PublicStampActionRequest(BaseModel):
session_token: str
action: Literal["in", "out", "break_start", "break_end"]
note: str | None = None
class PublicStampActionResponse(PublicStampStatus):
warnings: list[str] = []
@@ -126,6 +126,58 @@ class KioskAuthService:
)
return user, session_token
async def login_pin_public(
self,
personnel_number: str,
pin: str,
company_id: uuid.UUID,
db: AsyncSession,
) -> User:
"""PIN-Auth für das öffentliche QR-Stempeln (ohne Kiosk-Gerät).
Wiederverwendet den Brute-Force-Lockout, aber keyed auf
(company_id, personnel_number) statt (device_id, ...), da hier kein
Gerät existiert. Erzeugt KEINE Kiosk-Session der Aufrufer legt eine
separate öffentliche Kurz-Session an. Gibt nur den User zurück.
"""
import redis.asyncio as aioredis
from app.core.config import settings
# Lockout-Key-Namespace klar vom Kiosk trennen
lock_id = f"public:{company_id}"
redis_client = aioredis.from_url(settings.redis_url, decode_responses=True)
try:
await self._check_pin_lockout(lock_id, personnel_number, redis_client)
user = await db.scalar(
select(User).where(
User.company_id == company_id,
User.personnel_number == personnel_number,
User.is_active == True,
)
)
if user is None:
# Fehlversuch auch bei unbekannter Personalnummer (Anti-Enumeration)
await self._record_pin_failure(lock_id, personnel_number, redis_client)
raise HTTPException(status_code=401, detail="Personalnummer oder PIN falsch.")
if not user.kiosk_pin_hash:
raise HTTPException(
status_code=401,
detail="Kein PIN gesetzt. Bitte im Mitarbeiter-Portal einen Stempel-PIN vergeben.",
)
if not bcrypt.checkpw(pin.encode(), user.kiosk_pin_hash.encode()):
await self._record_pin_failure(lock_id, personnel_number, redis_client)
raise HTTPException(status_code=401, detail="Personalnummer oder PIN falsch.")
await self._clear_pin_failures(lock_id, personnel_number, redis_client)
finally:
await redis_client.aclose()
return user
async def login_nfc(
self,
nfc_uid: str,
@@ -0,0 +1,77 @@
"""Kurzlebige Redis-Sessions für das öffentliche QR-Stempeln.
Anders als der Kiosk (15 min, vertrauenswürdiges Wandterminal) läuft das
öffentliche Stempeln auf einem privaten Handy über einen ungesicherten
öffentlichen Endpunkt. Deshalb sehr kurze TTL (120s): nach Anmeldung
genügend Zeit zum Ein-/Ausstempeln, danach automatischer Verfall.
Redis ist Pflicht. Bei Redis-Ausfall → 503.
"""
from __future__ import annotations
import json
import uuid
from datetime import datetime, timezone
from typing import Optional
from fastapi import HTTPException
PUBLIC_STAMP_SESSION_TTL = 120 # 2 Minuten
SESSION_KEY_PREFIX = "public_stamp_session:"
class PublicStampSessionService:
def _get_redis(self):
from app.core.redis import get_redis_client
client = get_redis_client()
if client is None:
raise HTTPException(
status_code=503,
detail="Stempel-Service nicht verfügbar (Redis nicht erreichbar).",
)
return client
async def create_session(self, user_id: uuid.UUID, company_id: uuid.UUID) -> str:
redis = self._get_redis()
session_token = str(uuid.uuid4())
key = SESSION_KEY_PREFIX + session_token
payload = {
"user_id": str(user_id),
"company_id": str(company_id),
"created_at": datetime.now(timezone.utc).isoformat(),
}
try:
redis.setex(key, PUBLIC_STAMP_SESSION_TTL, json.dumps(payload))
except Exception as exc:
raise HTTPException(status_code=503, detail=f"Session konnte nicht erstellt werden: {exc}")
return session_token
async def get_session(self, session_token: str) -> Optional[dict]:
redis = self._get_redis()
try:
data = redis.get(SESSION_KEY_PREFIX + session_token)
except Exception as exc:
raise HTTPException(status_code=503, detail=f"Session-Lookup fehlgeschlagen: {exc}")
if data is None:
return None
return json.loads(data)
async def require_session(self, session_token: str) -> dict:
session = await self.get_session(session_token)
if session is None:
raise HTTPException(
status_code=401,
detail="Stempel-Session abgelaufen. Bitte Personalnummer und PIN erneut eingeben.",
)
return session
async def invalidate_session(self, session_token: str) -> None:
redis = self._get_redis()
try:
redis.delete(SESSION_KEY_PREFIX + session_token)
except Exception:
pass # Best-effort
public_stamp_session_service = PublicStampSessionService()
@@ -0,0 +1,38 @@
"""public stamp token + opt-in flag for static QR stamping
Revision ID: 0033
Revises: 0032
Create Date: 2026-06-02
"""
from alembic import op
import sqlalchemy as sa
revision = '0033'
down_revision = '0032'
branch_labels = None
depends_on = None
def upgrade():
op.add_column(
'companies',
sa.Column('public_stamp_enabled', sa.Boolean(), nullable=False, server_default=sa.false()),
)
op.add_column(
'companies',
sa.Column('public_stamp_token_hash', sa.String(length=64), nullable=True),
)
op.add_column(
'companies',
sa.Column('public_stamp_token_created_at', sa.DateTime(timezone=True), nullable=True),
)
op.create_unique_constraint(
'uq_companies_public_stamp_token_hash', 'companies', ['public_stamp_token_hash']
)
def downgrade():
op.drop_constraint('uq_companies_public_stamp_token_hash', 'companies', type_='unique')
op.drop_column('companies', 'public_stamp_token_created_at')
op.drop_column('companies', 'public_stamp_token_hash')
op.drop_column('companies', 'public_stamp_enabled')
+168
View File
@@ -0,0 +1,168 @@
"""Tests für das öffentliche QR-Stempeln (Personalnummer + PIN, statischer QR).
Benötigt Redis (PIN-Lockout + Kurz-Session) läuft auf dem Server.
"""
import pytest
import pytest_asyncio
from httpx import AsyncClient
async def _register(client: AsyncClient, company: str, email: str, pin: str | None,
personnel: str = "0001", enable: bool = True):
"""Registriert Firma+Admin, setzt Personalnr/PIN, aktiviert QR-Stempeln,
rotiert das Token. Gibt dict mit headers/token/personnel/pin zurück."""
reg = await client.post("/api/v1/auth/register", json={
"company_name": company,
"first_name": "QR",
"last_name": "Admin",
"email": email,
"password": "Secret123",
})
assert reg.status_code == 201, reg.text
headers = {"Authorization": f"Bearer {reg.json()['access_token']}"}
me = await client.get("/api/v1/auth/me", headers=headers)
user_id = me.json()["id"]
pr = await client.patch(f"/api/v1/users/{user_id}",
json={"personnel_number": personnel}, headers=headers)
assert pr.status_code == 200, pr.text
if pin is not None:
pr = await client.post(f"/api/v1/users/{user_id}/kiosk-pin",
json={"pin": pin}, headers=headers)
assert pr.status_code == 200, pr.text
upd = await client.patch("/api/v1/companies/me",
json={"public_stamp_enabled": enable}, headers=headers)
assert upd.status_code == 200, upd.text
rot = await client.post("/api/v1/companies/me/public-stamp-token/rotate", headers=headers)
assert rot.status_code == 200, rot.text
return {"headers": headers, "user_id": user_id,
"token": rot.json()["token"], "personnel": personnel, "pin": pin}
@pytest_asyncio.fixture(scope="session", loop_scope="session")
async def ps(client: AsyncClient):
return await _register(client, "QR Stamp GmbH", "qr@stamp.de", "1234")
# ── Token-Auflösung ──────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_company_info_resolves(client: AsyncClient, ps):
resp = await client.get(f"/api/v1/time/public/company?t={ps['token']}")
assert resp.status_code == 200
body = resp.json()
assert body["company_name"] == "QR Stamp GmbH"
assert body["enabled"] is True
@pytest.mark.asyncio
async def test_company_info_unknown_token_404(client: AsyncClient):
resp = await client.get("/api/v1/time/public/company?t=totally-invalid-token-xyz")
assert resp.status_code == 404
@pytest.mark.asyncio
async def test_rotate_returns_url_once(client: AsyncClient, ps):
resp = await client.post("/api/v1/companies/me/public-stamp-token/rotate", headers=ps["headers"])
assert resp.status_code == 200
body = resp.json()
assert "/stamp?t=" in body["public_url"]
assert len(body["token"]) >= 32
# neues Token → ps['token'] (altes) ungültig
old = await client.get(f"/api/v1/time/public/company?t={ps['token']}")
assert old.status_code == 404
# ps-Fixture aktualisieren, damit Folge-Tests das gültige Token nutzen
ps["token"] = body["token"]
# ── Auth ─────────────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_auth_success(client: AsyncClient, ps):
resp = await client.post("/api/v1/time/public/auth", json={
"token": ps["token"], "personnel_number": ps["personnel"], "pin": ps["pin"],
})
assert resp.status_code == 200, resp.text
body = resp.json()
assert body["session_token"]
assert body["expires_in_seconds"] > 0
assert body["open"] is False
@pytest.mark.asyncio
async def test_auth_wrong_pin_401(client: AsyncClient, ps):
resp = await client.post("/api/v1/time/public/auth", json={
"token": ps["token"], "personnel_number": ps["personnel"], "pin": "9999",
})
assert resp.status_code == 401
@pytest.mark.asyncio
async def test_auth_disabled_company_403(client: AsyncClient):
setup = await _register(client, "QR Disabled GmbH", "qr@disabled.de", "1234",
personnel="0002", enable=False)
resp = await client.post("/api/v1/time/public/auth", json={
"token": setup["token"], "personnel_number": "0002", "pin": "1234",
})
assert resp.status_code == 403
@pytest.mark.asyncio
async def test_auth_lockout_after_failures(client: AsyncClient):
"""5 Fehlversuche (auch bei unbekannter Personalnr.) → 429 Lockout."""
setup = await _register(client, "QR Lockout GmbH", "qr@lockout.de", "1234",
personnel="0003")
codes = []
for _ in range(6):
r = await client.post("/api/v1/time/public/auth", json={
"token": setup["token"], "personnel_number": "999999", "pin": "0000",
})
codes.append(r.status_code)
assert 429 in codes # Sperre greift
# ── Stempel-Aktionen ─────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_stamp_in_and_out(client: AsyncClient, ps):
auth = await client.post("/api/v1/time/public/auth", json={
"token": ps["token"], "personnel_number": ps["personnel"], "pin": ps["pin"],
})
assert auth.status_code == 200, auth.text
session = auth.json()["session_token"]
already_open = auth.json()["open"]
if not already_open:
sin = await client.post("/api/v1/time/public/action",
json={"session_token": session, "action": "in"})
assert sin.status_code == 200, sin.text
assert sin.json()["open"] is True
sout = await client.post("/api/v1/time/public/action",
json={"session_token": session, "action": "out"})
assert sout.status_code == 200, sout.text
assert sout.json()["open"] is False
@pytest.mark.asyncio
async def test_action_invalid_session_401(client: AsyncClient):
resp = await client.post("/api/v1/time/public/action", json={
"session_token": "00000000-0000-0000-0000-000000000000", "action": "in",
})
assert resp.status_code == 401
# ── Token-Delete ─────────────────────────────────────────────────────────────
@pytest.mark.asyncio
async def test_delete_token_invalidates(client: AsyncClient):
setup = await _register(client, "QR Delete GmbH", "qr@delete.de", "1234",
personnel="0004")
dl = await client.delete("/api/v1/companies/me/public-stamp-token", headers=setup["headers"])
assert dl.status_code == 204
resp = await client.get(f"/api/v1/time/public/company?t={setup['token']}")
assert resp.status_code == 404