Files
patrick 62c4e742ab security: 9 Findings aus Security-Audit behoben (CRITICAL + HIGH + MEDIUM)
CRITICAL:
- C-1: LDAP tls_verify Default False → True (MITM-Schutz)
- C-2: TOTP-Secret Fernet-verschlüsselt in DB (statt Plaintext)
  - core/crypto.py: encrypt_value() / decrypt_value() helper
  - Migration 0026: totp_secret VARCHAR(64→500), ldap tls_verify default=true
  - _totp_plain() helper mit Legacy-Fallback für bestehende Werte

HIGH:
- H-1: Kiosk Nonce-Cache asyncio.Lock (Race Condition behoben)
- H-2: File-Upload-Limit 10 MB (import_kimai.py + users.py CSV-Import)
- H-3: CORS allow_methods/allow_headers explizit eingeschränkt (war *)
- H-4: TrustedHostMiddleware aktiviert wenn ALLOWED_HOSTS gesetzt

MEDIUM:
- M-1: IP-Logging nutzt X-Forwarded-For hinter nginx-Proxy
- M-4: Audit-Log für password_changed, totp_enabled, totp_disabled
- M-5: CalDAV verify_ssl in Production erzwungen (_effective_verify_ssl)

152/152 Tests grün

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-24 19:45:09 +02:00

200 lines
6.7 KiB
Python

from typing import Annotated
from uuid import UUID
from fastapi import APIRouter, Depends, File, HTTPException, Query, UploadFile
from fastapi.responses import PlainTextResponse
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_db
from app.core.dependencies import CurrentUser, require_role
from app.models.user import User, UserRole
from app.schemas.auth import MessageResponse
from app.schemas.user import (
InviteRequest,
NextPersonnelNumberResponse,
SetKioskPinRequest,
UserImportResult,
UserImportRowResult,
UserListResponse,
UserOut,
UserUpdate,
)
from app.services import user_import_service
from app.services.user_service import user_service
router = APIRouter(prefix="/users", tags=["Users"])
_admin_roles = (UserRole.COMPANY_ADMIN, UserRole.SUPER_ADMIN)
_hr_roles = (UserRole.COMPANY_ADMIN, UserRole.SUPER_ADMIN, UserRole.HR, UserRole.MANAGER)
@router.get("/", response_model=UserListResponse)
async def list_users(
current_user: User = require_role(*_hr_roles),
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=500),
active_only: bool = Query(True),
search: str | None = Query(None, max_length=100),
db: AsyncSession = Depends(get_db),
):
total, users = await user_service.list_users(
current_user.company_id, db, skip, limit, active_only, search,
)
return UserListResponse(total=total, items=[UserOut.model_validate(u) for u in users])
@router.post("/invite", response_model=UserOut, status_code=201)
async def invite_user(
data: InviteRequest,
current_user: User = require_role(*_admin_roles),
db: AsyncSession = Depends(get_db),
):
user = await user_service.invite(data, current_user.company_id, current_user, db)
return UserOut.model_validate(user)
@router.get("/me", response_model=UserOut)
async def get_me(current_user: CurrentUser):
return UserOut.model_validate(current_user)
@router.get("/next-personnel-number", response_model=NextPersonnelNumberResponse)
async def next_personnel_number(
current_user: User = require_role(*_hr_roles),
db: AsyncSession = Depends(get_db),
):
"""Schlägt die nächste freie Personalnummer vor (ohne den Counter zu erhöhen)."""
suggestion = await user_service.next_personnel_suggestion(current_user.company_id, db)
return NextPersonnelNumberResponse(next=suggestion)
@router.get("/by-personnel/{number}", response_model=UserOut)
async def get_user_by_personnel(
number: str,
current_user: User = require_role(*_hr_roles),
db: AsyncSession = Depends(get_db),
):
user = await user_service.get_by_personnel_number(number, current_user.company_id, db)
return UserOut.model_validate(user)
@router.get("/import-template.csv", response_class=PlainTextResponse)
async def import_template(
current_user: User = require_role(*_admin_roles),
):
csv_text = user_import_service.build_template_csv()
return PlainTextResponse(
content=csv_text,
media_type="text/csv",
headers={"Content-Disposition": 'attachment; filename="user-import-template.csv"'},
)
_MAX_UPLOAD_BYTES = 10 * 1024 * 1024 # 10 MB
async def _read_upload(file: UploadFile) -> bytes:
"""Liest eine UploadFile mit Größenbegrenzung (max 10 MB)."""
content = await file.read(_MAX_UPLOAD_BYTES + 1)
if len(content) > _MAX_UPLOAD_BYTES:
raise HTTPException(
status_code=413,
detail=f"Datei zu groß. Maximale Upload-Größe: {_MAX_UPLOAD_BYTES // (1024 * 1024)} MB.",
)
return content
@router.post("/import/preview", response_model=UserImportResult)
async def user_import_preview(
file: Annotated[UploadFile, File()],
current_user: User = require_role(*_admin_roles),
db: AsyncSession = Depends(get_db),
):
content = await _read_upload(file)
result = await user_import_service.preview_csv(content, current_user.company_id, current_user, db)
return _to_import_result_schema(result)
@router.post("/import/apply", response_model=UserImportResult)
async def user_import_apply(
file: Annotated[UploadFile, File()],
current_user: User = require_role(*_admin_roles),
db: AsyncSession = Depends(get_db),
):
content = await _read_upload(file)
result = await user_import_service.apply_csv(content, current_user.company_id, current_user, db)
return _to_import_result_schema(result)
def _to_import_result_schema(result) -> UserImportResult:
return UserImportResult(
total_rows=result.total_rows,
created=result.created,
reactivated=result.reactivated,
errors=result.errors,
items=[
UserImportRowResult(
row=i.row, email=i.email, personnel_number=i.personnel_number,
action=i.action, message=i.message,
)
for i in result.items
],
)
@router.get("/{user_id}", response_model=UserOut)
async def get_user(
user_id: UUID,
current_user: User = require_role(*_hr_roles),
db: AsyncSession = Depends(get_db),
):
user = await user_service.get_by_id(user_id, current_user.company_id, db)
return UserOut.model_validate(user)
@router.patch("/{user_id}", response_model=UserOut)
async def update_user(
user_id: UUID,
data: UserUpdate,
current_user: User = require_role(*_admin_roles),
db: AsyncSession = Depends(get_db),
):
user = await user_service.update(user_id, data, current_user, db)
return UserOut.model_validate(user)
@router.post("/{user_id}/deactivate", response_model=UserOut)
async def deactivate_user(
user_id: UUID,
current_user: User = require_role(*_admin_roles),
db: AsyncSession = Depends(get_db),
):
user = await user_service.deactivate(user_id, current_user, db)
return UserOut.model_validate(user)
@router.post("/{user_id}/reactivate", response_model=UserOut)
async def reactivate_user(
user_id: UUID,
current_user: User = require_role(*_admin_roles),
db: AsyncSession = Depends(get_db),
):
user = await user_service.reactivate(user_id, current_user, db)
return UserOut.model_validate(user)
@router.post("/{user_id}/kiosk-pin", response_model=MessageResponse)
async def set_kiosk_pin(
user_id: UUID,
data: SetKioskPinRequest,
current_user: CurrentUser,
db: AsyncSession = Depends(get_db),
):
# Users can set their own PIN; admins can set for any user in company
if user_id != current_user.id and not current_user.is_admin_or_above():
from fastapi import HTTPException
raise HTTPException(status_code=403, detail="Not allowed")
user = await user_service.get_by_id(user_id, current_user.company_id, db)
await user_service.set_kiosk_pin(user, data.pin, db)
return MessageResponse(message="Kiosk PIN updated")