62c4e742ab
CRITICAL: - C-1: LDAP tls_verify Default False → True (MITM-Schutz) - C-2: TOTP-Secret Fernet-verschlüsselt in DB (statt Plaintext) - core/crypto.py: encrypt_value() / decrypt_value() helper - Migration 0026: totp_secret VARCHAR(64→500), ldap tls_verify default=true - _totp_plain() helper mit Legacy-Fallback für bestehende Werte HIGH: - H-1: Kiosk Nonce-Cache asyncio.Lock (Race Condition behoben) - H-2: File-Upload-Limit 10 MB (import_kimai.py + users.py CSV-Import) - H-3: CORS allow_methods/allow_headers explizit eingeschränkt (war *) - H-4: TrustedHostMiddleware aktiviert wenn ALLOWED_HOSTS gesetzt MEDIUM: - M-1: IP-Logging nutzt X-Forwarded-For hinter nginx-Proxy - M-4: Audit-Log für password_changed, totp_enabled, totp_disabled - M-5: CalDAV verify_ssl in Production erzwungen (_effective_verify_ssl) 152/152 Tests grün Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
200 lines
6.7 KiB
Python
200 lines
6.7 KiB
Python
from typing import Annotated
|
|
from uuid import UUID
|
|
|
|
from fastapi import APIRouter, Depends, File, HTTPException, Query, UploadFile
|
|
from fastapi.responses import PlainTextResponse
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.core.database import get_db
|
|
from app.core.dependencies import CurrentUser, require_role
|
|
from app.models.user import User, UserRole
|
|
from app.schemas.auth import MessageResponse
|
|
from app.schemas.user import (
|
|
InviteRequest,
|
|
NextPersonnelNumberResponse,
|
|
SetKioskPinRequest,
|
|
UserImportResult,
|
|
UserImportRowResult,
|
|
UserListResponse,
|
|
UserOut,
|
|
UserUpdate,
|
|
)
|
|
from app.services import user_import_service
|
|
from app.services.user_service import user_service
|
|
|
|
router = APIRouter(prefix="/users", tags=["Users"])
|
|
|
|
_admin_roles = (UserRole.COMPANY_ADMIN, UserRole.SUPER_ADMIN)
|
|
_hr_roles = (UserRole.COMPANY_ADMIN, UserRole.SUPER_ADMIN, UserRole.HR, UserRole.MANAGER)
|
|
|
|
|
|
@router.get("/", response_model=UserListResponse)
|
|
async def list_users(
|
|
current_user: User = require_role(*_hr_roles),
|
|
skip: int = Query(0, ge=0),
|
|
limit: int = Query(50, ge=1, le=500),
|
|
active_only: bool = Query(True),
|
|
search: str | None = Query(None, max_length=100),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
total, users = await user_service.list_users(
|
|
current_user.company_id, db, skip, limit, active_only, search,
|
|
)
|
|
return UserListResponse(total=total, items=[UserOut.model_validate(u) for u in users])
|
|
|
|
|
|
@router.post("/invite", response_model=UserOut, status_code=201)
|
|
async def invite_user(
|
|
data: InviteRequest,
|
|
current_user: User = require_role(*_admin_roles),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
user = await user_service.invite(data, current_user.company_id, current_user, db)
|
|
return UserOut.model_validate(user)
|
|
|
|
|
|
@router.get("/me", response_model=UserOut)
|
|
async def get_me(current_user: CurrentUser):
|
|
return UserOut.model_validate(current_user)
|
|
|
|
|
|
@router.get("/next-personnel-number", response_model=NextPersonnelNumberResponse)
|
|
async def next_personnel_number(
|
|
current_user: User = require_role(*_hr_roles),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
"""Schlägt die nächste freie Personalnummer vor (ohne den Counter zu erhöhen)."""
|
|
suggestion = await user_service.next_personnel_suggestion(current_user.company_id, db)
|
|
return NextPersonnelNumberResponse(next=suggestion)
|
|
|
|
|
|
@router.get("/by-personnel/{number}", response_model=UserOut)
|
|
async def get_user_by_personnel(
|
|
number: str,
|
|
current_user: User = require_role(*_hr_roles),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
user = await user_service.get_by_personnel_number(number, current_user.company_id, db)
|
|
return UserOut.model_validate(user)
|
|
|
|
|
|
@router.get("/import-template.csv", response_class=PlainTextResponse)
|
|
async def import_template(
|
|
current_user: User = require_role(*_admin_roles),
|
|
):
|
|
csv_text = user_import_service.build_template_csv()
|
|
return PlainTextResponse(
|
|
content=csv_text,
|
|
media_type="text/csv",
|
|
headers={"Content-Disposition": 'attachment; filename="user-import-template.csv"'},
|
|
)
|
|
|
|
|
|
_MAX_UPLOAD_BYTES = 10 * 1024 * 1024 # 10 MB
|
|
|
|
|
|
async def _read_upload(file: UploadFile) -> bytes:
|
|
"""Liest eine UploadFile mit Größenbegrenzung (max 10 MB)."""
|
|
content = await file.read(_MAX_UPLOAD_BYTES + 1)
|
|
if len(content) > _MAX_UPLOAD_BYTES:
|
|
raise HTTPException(
|
|
status_code=413,
|
|
detail=f"Datei zu groß. Maximale Upload-Größe: {_MAX_UPLOAD_BYTES // (1024 * 1024)} MB.",
|
|
)
|
|
return content
|
|
|
|
|
|
@router.post("/import/preview", response_model=UserImportResult)
|
|
async def user_import_preview(
|
|
file: Annotated[UploadFile, File()],
|
|
current_user: User = require_role(*_admin_roles),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
content = await _read_upload(file)
|
|
result = await user_import_service.preview_csv(content, current_user.company_id, current_user, db)
|
|
return _to_import_result_schema(result)
|
|
|
|
|
|
@router.post("/import/apply", response_model=UserImportResult)
|
|
async def user_import_apply(
|
|
file: Annotated[UploadFile, File()],
|
|
current_user: User = require_role(*_admin_roles),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
content = await _read_upload(file)
|
|
result = await user_import_service.apply_csv(content, current_user.company_id, current_user, db)
|
|
return _to_import_result_schema(result)
|
|
|
|
|
|
def _to_import_result_schema(result) -> UserImportResult:
|
|
return UserImportResult(
|
|
total_rows=result.total_rows,
|
|
created=result.created,
|
|
reactivated=result.reactivated,
|
|
errors=result.errors,
|
|
items=[
|
|
UserImportRowResult(
|
|
row=i.row, email=i.email, personnel_number=i.personnel_number,
|
|
action=i.action, message=i.message,
|
|
)
|
|
for i in result.items
|
|
],
|
|
)
|
|
|
|
|
|
@router.get("/{user_id}", response_model=UserOut)
|
|
async def get_user(
|
|
user_id: UUID,
|
|
current_user: User = require_role(*_hr_roles),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
user = await user_service.get_by_id(user_id, current_user.company_id, db)
|
|
return UserOut.model_validate(user)
|
|
|
|
|
|
@router.patch("/{user_id}", response_model=UserOut)
|
|
async def update_user(
|
|
user_id: UUID,
|
|
data: UserUpdate,
|
|
current_user: User = require_role(*_admin_roles),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
user = await user_service.update(user_id, data, current_user, db)
|
|
return UserOut.model_validate(user)
|
|
|
|
|
|
@router.post("/{user_id}/deactivate", response_model=UserOut)
|
|
async def deactivate_user(
|
|
user_id: UUID,
|
|
current_user: User = require_role(*_admin_roles),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
user = await user_service.deactivate(user_id, current_user, db)
|
|
return UserOut.model_validate(user)
|
|
|
|
|
|
@router.post("/{user_id}/reactivate", response_model=UserOut)
|
|
async def reactivate_user(
|
|
user_id: UUID,
|
|
current_user: User = require_role(*_admin_roles),
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
user = await user_service.reactivate(user_id, current_user, db)
|
|
return UserOut.model_validate(user)
|
|
|
|
|
|
@router.post("/{user_id}/kiosk-pin", response_model=MessageResponse)
|
|
async def set_kiosk_pin(
|
|
user_id: UUID,
|
|
data: SetKioskPinRequest,
|
|
current_user: CurrentUser,
|
|
db: AsyncSession = Depends(get_db),
|
|
):
|
|
# Users can set their own PIN; admins can set for any user in company
|
|
if user_id != current_user.id and not current_user.is_admin_or_above():
|
|
from fastapi import HTTPException
|
|
raise HTTPException(status_code=403, detail="Not allowed")
|
|
user = await user_service.get_by_id(user_id, current_user.company_id, db)
|
|
await user_service.set_kiosk_pin(user, data.pin, db)
|
|
return MessageResponse(message="Kiosk PIN updated")
|