Files
patrick 6d4b8a9f17 agent-rls: PostgreSQL Row Level Security für Mandanten-Isolation
- Migration 0024: RLS + FORCE RLS auf 18 Tabellen
  - Direkte company_id-Policies: users, departments, companies, absence_types,
    audit_logs, kiosk_devices, ldap_configs, smtp_configs, caldav_company_configs,
    work_schedules, overtime_balances
  - JOIN-Policies (user_id → company_id): absences, sessions, password_resets,
    time_entries, vacation_balances, caldav_user_configs
  - public_holidays ausgenommen (globale Referenztabelle)
- database.py: get_db setzt bypass_rls='on' als Default (Auth-Endpoints unverändert)
- dependencies.py: get_current_user setzt app.company_id + bypass_rls='off'
  für alle nicht-SUPER_ADMIN Rollen
- migrations/env.py: Alembic-Migrationen nutzen bypass_rls='on'
- tests/conftest.py: override_get_db setzt bypass_rls='on' für Test-Session

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-23 21:57:58 +02:00

40 lines
1.1 KiB
Python

from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy.orm import DeclarativeBase
from app.core.config import settings
engine = create_async_engine(
settings.database_url,
echo=settings.app_env == "development",
pool_pre_ping=True,
pool_size=10,
max_overflow=20,
)
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
)
class Base(DeclarativeBase):
pass
async def get_db() -> AsyncSession:
async with AsyncSessionLocal() as session:
try:
# Default: RLS bypass active so that unauthenticated routes
# (register, login, password-reset) and internal operations work.
# get_current_user() will disable bypass and set app.company_id for
# authenticated, non-SUPER_ADMIN requests.
await session.execute(text("SET LOCAL app.bypass_rls = 'on'"))
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()