4dc69137dd
H-1: company.settings als typisiertes Sub-Schema - schemas/company.py: CompanySettingsUpdate mit extra=forbid - Nur bekannte Keys (carryover_expires_month/day) erlaubt - Unbekannte Keys → HTTP 422 H-5: SQL-Injection defensiv absichern - dependencies.py: UUID-Round-Trip str(_uuid.UUID(...)) + Sicherheitskommentar H-6: CalDAV DNS-Rebinding-Schutz - caldav_service.py: PinnedIPTransport — IP einmal auflösen, beim Request fixieren - _validate_caldav_url gibt aufgelöste IP zurück - Alle HTTP-Methoden nutzen PinnedIPTransport H-7: Heartbeat-Timestamp nach Route-Logik - kiosk_security.py: last_heartbeat_at-Update aus Dependency entfernt - kiosk_service.py: Update erst in process_heartbeat() nach erfolgreicher Auth Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
102 lines
4.0 KiB
Python
102 lines
4.0 KiB
Python
import uuid as _uuid
|
|
from typing import Annotated
|
|
from uuid import UUID
|
|
|
|
from fastapi import Depends, HTTPException, Request, status
|
|
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
|
|
from jose import JWTError
|
|
from sqlalchemy import text
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.core.database import get_db
|
|
from app.core.security import decode_access_token
|
|
from app.models.user import User, UserRole
|
|
|
|
bearer_scheme = HTTPBearer()
|
|
|
|
|
|
def get_client_ip(request: Request) -> str:
|
|
"""Liest die echte Client-IP auch hinter nginx-Proxy.
|
|
|
|
nginx setzt X-Real-IP auf die ursprüngliche Client-IP.
|
|
Ohne diesen Header würde request.client.host hinter nginx immer
|
|
127.0.0.1 zurückgeben, womit AuditLog-Einträge wertlos wären.
|
|
"""
|
|
real_ip = request.headers.get("X-Real-IP")
|
|
if real_ip:
|
|
return real_ip.strip()
|
|
forwarded_for = request.headers.get("X-Forwarded-For")
|
|
if forwarded_for:
|
|
return forwarded_for.split(",")[0].strip()
|
|
return request.client.host if request.client else "unknown"
|
|
|
|
|
|
async def get_current_user(
|
|
credentials: Annotated[HTTPAuthorizationCredentials, Depends(bearer_scheme)],
|
|
db: Annotated[AsyncSession, Depends(get_db)],
|
|
) -> User:
|
|
credentials_exception = HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Could not validate credentials",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
try:
|
|
payload = decode_access_token(credentials.credentials)
|
|
user_id: str = payload.get("sub")
|
|
if user_id is None:
|
|
raise credentials_exception
|
|
except JWTError:
|
|
raise credentials_exception
|
|
|
|
# User lookup happens while bypass_rls = 'on' (set in get_db), so the
|
|
# SELECT on users is unrestricted — necessary because we don't yet know
|
|
# the company_id at this point.
|
|
user = await db.get(User, UUID(user_id))
|
|
if user is None or not user.is_active:
|
|
raise credentials_exception
|
|
|
|
# ── RLS context ────────────────────────────────────────────────────────
|
|
# SUPER_ADMIN can see all companies → keep bypass_rls = 'on'.
|
|
# Every other role gets the RLS fence applied: set company_id and disable
|
|
# bypass so subsequent queries in the same transaction are automatically
|
|
# filtered to the user's company.
|
|
if user.role != UserRole.SUPER_ADMIN and user.company_id is not None:
|
|
# Sicherheits-Invariante: safe_company_id muss eine valide UUID sein.
|
|
# Der _uuid.UUID()-Round-Trip verhindert SQL-Injection auch bei zukünftigen
|
|
# Refactorings (z.B. falls user.company_id einmal ein String aus einem
|
|
# anderen Pfad käme). SET LOCAL akzeptiert keine Bind-Parameter in
|
|
# PostgreSQL, daher ist String-Interpolation hier unvermeidlich —
|
|
# der UUID-Round-Trip ist die kryptographische Absicherung dagegen.
|
|
safe_company_id = str(_uuid.UUID(str(user.company_id)))
|
|
await db.execute(text(f"SET LOCAL app.company_id = '{safe_company_id}'"))
|
|
await db.execute(text("SET LOCAL app.bypass_rls = 'off'"))
|
|
|
|
return user
|
|
|
|
|
|
CurrentUser = Annotated[User, Depends(get_current_user)]
|
|
|
|
|
|
def require_role(*roles: UserRole):
|
|
"""Dependency factory: require_role(UserRole.MANAGER, UserRole.COMPANY_ADMIN)"""
|
|
async def checker(current_user: CurrentUser) -> User:
|
|
if current_user.role not in roles:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Insufficient permissions",
|
|
)
|
|
return current_user
|
|
return Depends(checker)
|
|
|
|
|
|
def require_same_company(target_company_id: UUID, current_user: User) -> None:
|
|
"""Raise 403 if user tries to access another company's data."""
|
|
if (
|
|
current_user.role != UserRole.SUPER_ADMIN
|
|
and current_user.company_id != target_company_id
|
|
):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Access to this resource is not allowed",
|
|
)
|