Files
patrick dd3e069466 fix: router db.refresh() nach commit bricht RLS-Kontext
SET LOCAL Werte (bypass_rls, company_id) sind transaktions-gebunden.
Nach db.commit() ist der Kontext weg – ein nachfolgendes db.refresh()
läuft in einer neuen Transaktion ohne RLS-Kontext und liefert 0 Rows.

Da expire_on_commit=False gesetzt ist, sind alle Instanz-Attribute
nach dem Commit bereits im Speicher vorhanden. Die expliziten
db.refresh()-Aufrufe nach db.commit() in allen Routers sind daher
redundant und wurden entfernt.

test_rls.py: 6 neue Tests beweisen DB-seitige Mandanten-Isolation.
conftest.py: _apply_rls() wendet RLS-Policies auf Test-DB an.
migrations/0024: korrigiert auf op.execute(text()) API.
migrations/env.py: SET LOCAL außerhalb Transaktion entfernt.

Ergebnis: 8 failed (pre-existing), 126 passed – identisch zur Baseline vor RLS.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-23 22:34:48 +02:00

138 lines
4.7 KiB
Python

"""LDAP configuration and sync endpoints.
All endpoints require COMPANY_ADMIN or SUPER_ADMIN role.
"""
import uuid
from fastapi import APIRouter, Depends
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_db
from app.core.dependencies import require_role
from app.models.ldap_config import LdapConfig
from app.models.user import User, UserRole
_admin_roles = (UserRole.COMPANY_ADMIN, UserRole.SUPER_ADMIN)
from app.schemas.ldap import (
LdapConfigCreate,
LdapConfigOut,
LdapConfigUpdate,
LdapSyncRequest,
LdapSyncResult,
LdapTestResult,
LdapUserPreview,
)
from app.services.ldap_service import decrypt_password, encrypt_password, ldap_service
router = APIRouter(prefix="/ldap", tags=["LDAP"])
@router.get("/config", response_model=LdapConfigOut | None)
async def get_ldap_config(
current_user: User = require_role(*_admin_roles),
db: AsyncSession = Depends(get_db),
):
return await ldap_service.get_config(current_user.company_id, db)
@router.post("/config", response_model=LdapConfigOut)
async def create_ldap_config(
data: LdapConfigCreate,
current_user: User = require_role(*_admin_roles),
db: AsyncSession = Depends(get_db),
):
existing = await ldap_service.get_config(current_user.company_id, db)
if existing:
# Update instead of duplicate
return await _apply_update(existing, data.model_dump(), db)
cfg = LdapConfig(
company_id=current_user.company_id,
enabled=data.enabled,
host=data.host,
port=data.port,
use_ssl=data.use_ssl,
use_tls=data.use_tls,
bind_dn=data.bind_dn,
bind_password_encrypted=encrypt_password(data.bind_password),
base_dn=data.base_dn,
user_search_filter=data.user_search_filter,
attr_email=data.attr_email,
attr_firstname=data.attr_firstname,
attr_lastname=data.attr_lastname,
attr_username=data.attr_username,
attr_department=data.attr_department,
)
db.add(cfg)
await db.commit()
return cfg
@router.patch("/config", response_model=LdapConfigOut)
async def update_ldap_config(
data: LdapConfigUpdate,
current_user: User = require_role(*_admin_roles),
db: AsyncSession = Depends(get_db),
):
cfg = await ldap_service.get_config_or_404(current_user.company_id, db)
return await _apply_update(cfg, data.model_dump(exclude_none=True), db)
@router.post("/test", response_model=LdapTestResult)
async def test_ldap_connection(
current_user: User = require_role(*_admin_roles),
db: AsyncSession = Depends(get_db),
):
cfg = await ldap_service.get_config_or_404(current_user.company_id, db)
result = ldap_service.test_connection(cfg)
return LdapTestResult(success=result.success, message=result.message)
@router.get("/preview", response_model=list[LdapUserPreview])
async def preview_ldap_users(
current_user: User = require_role(*_admin_roles),
db: AsyncSession = Depends(get_db),
):
"""Returns first 50 users found in LDAP (for preview before sync)."""
cfg = await ldap_service.get_config_or_404(current_user.company_id, db)
raw_users = ldap_service.search_users(cfg)
previews = []
for u in raw_users[:50]:
previews.append(LdapUserPreview(
dn=u.get("dn", ""),
email=str(u.get(cfg.attr_email, "") or "").lower(),
first_name=str(u.get(cfg.attr_firstname, "") or ""),
last_name=str(u.get(cfg.attr_lastname, "") or ""),
department=str(u.get(cfg.attr_department, "") or "") if cfg.attr_department else None,
))
return previews
@router.post("/sync", response_model=LdapSyncResult)
async def sync_ldap_users(
data: LdapSyncRequest,
current_user: User = require_role(*_admin_roles),
db: AsyncSession = Depends(get_db),
):
cfg = await ldap_service.get_config_or_404(current_user.company_id, db)
result = await ldap_service.sync_users(cfg, db, default_role=data.default_role)
return LdapSyncResult(
created=result.created,
updated=result.updated,
deactivated=result.deactivated,
errors=result.errors,
)
# ── Helpers ───────────────────────────────────────────────────────────────────
async def _apply_update(cfg: LdapConfig, updates: dict, db: AsyncSession) -> LdapConfig:
for field, value in updates.items():
if field == "bind_password" and value:
cfg.bind_password_encrypted = encrypt_password(value)
elif hasattr(cfg, field):
setattr(cfg, field, value)
await db.commit()
return cfg