Files
patrick 4dc69137dd security: H-1 settings-Whitelist + H-5 UUID-Guard + H-6 DNS-Pinning + H-7 Heartbeat-Timing
H-1: company.settings als typisiertes Sub-Schema
- schemas/company.py: CompanySettingsUpdate mit extra=forbid
- Nur bekannte Keys (carryover_expires_month/day) erlaubt
- Unbekannte Keys → HTTP 422

H-5: SQL-Injection defensiv absichern
- dependencies.py: UUID-Round-Trip str(_uuid.UUID(...)) + Sicherheitskommentar

H-6: CalDAV DNS-Rebinding-Schutz
- caldav_service.py: PinnedIPTransport — IP einmal auflösen, beim Request fixieren
- _validate_caldav_url gibt aufgelöste IP zurück
- Alle HTTP-Methoden nutzen PinnedIPTransport

H-7: Heartbeat-Timestamp nach Route-Logik
- kiosk_security.py: last_heartbeat_at-Update aus Dependency entfernt
- kiosk_service.py: Update erst in process_heartbeat() nach erfolgreicher Auth

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-26 11:35:18 +02:00

102 lines
4.0 KiB
Python

import uuid as _uuid
from typing import Annotated
from uuid import UUID
from fastapi import Depends, HTTPException, Request, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose import JWTError
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_db
from app.core.security import decode_access_token
from app.models.user import User, UserRole
bearer_scheme = HTTPBearer()
def get_client_ip(request: Request) -> str:
"""Liest die echte Client-IP auch hinter nginx-Proxy.
nginx setzt X-Real-IP auf die ursprüngliche Client-IP.
Ohne diesen Header würde request.client.host hinter nginx immer
127.0.0.1 zurückgeben, womit AuditLog-Einträge wertlos wären.
"""
real_ip = request.headers.get("X-Real-IP")
if real_ip:
return real_ip.strip()
forwarded_for = request.headers.get("X-Forwarded-For")
if forwarded_for:
return forwarded_for.split(",")[0].strip()
return request.client.host if request.client else "unknown"
async def get_current_user(
credentials: Annotated[HTTPAuthorizationCredentials, Depends(bearer_scheme)],
db: Annotated[AsyncSession, Depends(get_db)],
) -> User:
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = decode_access_token(credentials.credentials)
user_id: str = payload.get("sub")
if user_id is None:
raise credentials_exception
except JWTError:
raise credentials_exception
# User lookup happens while bypass_rls = 'on' (set in get_db), so the
# SELECT on users is unrestricted — necessary because we don't yet know
# the company_id at this point.
user = await db.get(User, UUID(user_id))
if user is None or not user.is_active:
raise credentials_exception
# ── RLS context ────────────────────────────────────────────────────────
# SUPER_ADMIN can see all companies → keep bypass_rls = 'on'.
# Every other role gets the RLS fence applied: set company_id and disable
# bypass so subsequent queries in the same transaction are automatically
# filtered to the user's company.
if user.role != UserRole.SUPER_ADMIN and user.company_id is not None:
# Sicherheits-Invariante: safe_company_id muss eine valide UUID sein.
# Der _uuid.UUID()-Round-Trip verhindert SQL-Injection auch bei zukünftigen
# Refactorings (z.B. falls user.company_id einmal ein String aus einem
# anderen Pfad käme). SET LOCAL akzeptiert keine Bind-Parameter in
# PostgreSQL, daher ist String-Interpolation hier unvermeidlich —
# der UUID-Round-Trip ist die kryptographische Absicherung dagegen.
safe_company_id = str(_uuid.UUID(str(user.company_id)))
await db.execute(text(f"SET LOCAL app.company_id = '{safe_company_id}'"))
await db.execute(text("SET LOCAL app.bypass_rls = 'off'"))
return user
CurrentUser = Annotated[User, Depends(get_current_user)]
def require_role(*roles: UserRole):
"""Dependency factory: require_role(UserRole.MANAGER, UserRole.COMPANY_ADMIN)"""
async def checker(current_user: CurrentUser) -> User:
if current_user.role not in roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Insufficient permissions",
)
return current_user
return Depends(checker)
def require_same_company(target_company_id: UUID, current_user: User) -> None:
"""Raise 403 if user tries to access another company's data."""
if (
current_user.role != UserRole.SUPER_ADMIN
and current_user.company_id != target_company_id
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Access to this resource is not allowed",
)