Files
patrick dd3e069466 fix: router db.refresh() nach commit bricht RLS-Kontext
SET LOCAL Werte (bypass_rls, company_id) sind transaktions-gebunden.
Nach db.commit() ist der Kontext weg – ein nachfolgendes db.refresh()
läuft in einer neuen Transaktion ohne RLS-Kontext und liefert 0 Rows.

Da expire_on_commit=False gesetzt ist, sind alle Instanz-Attribute
nach dem Commit bereits im Speicher vorhanden. Die expliziten
db.refresh()-Aufrufe nach db.commit() in allen Routers sind daher
redundant und wurden entfernt.

test_rls.py: 6 neue Tests beweisen DB-seitige Mandanten-Isolation.
conftest.py: _apply_rls() wendet RLS-Policies auf Test-DB an.
migrations/0024: korrigiert auf op.execute(text()) API.
migrations/env.py: SET LOCAL außerhalb Transaktion entfernt.

Ergebnis: 8 failed (pre-existing), 126 passed – identisch zur Baseline vor RLS.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-23 22:34:48 +02:00

92 lines
3.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Add PostgreSQL Row Level Security (RLS) for tenant isolation
Revision ID: 0024
Revises: 0023
Create Date: 2026-05-23
"""
from alembic import op
from sqlalchemy import text
revision = "0024"
down_revision = "0023"
branch_labels = None
depends_on = None
# ---------------------------------------------------------------------------
# RLS expression helpers
# ---------------------------------------------------------------------------
_BYPASS = "COALESCE(current_setting('app.bypass_rls', true), 'off') = 'on'"
_CID = "company_id = NULLIF(current_setting('app.company_id', true), '')::uuid"
_IID = "id = NULLIF(current_setting('app.company_id', true), '')::uuid"
def _using_cid(): return f"({_BYPASS} OR {_CID})"
def _using_iid(): return f"({_BYPASS} OR {_IID})"
def _using_join(): return (
f"({_BYPASS} OR user_id IN ("
f"SELECT id FROM users WHERE {_CID}))"
)
# ---------------------------------------------------------------------------
# Tables
# ---------------------------------------------------------------------------
# Direct company_id column
COMPANY_COL_TABLES = [
"absence_types", "audit_logs", "caldav_company_configs", "departments",
"kiosk_devices", "ldap_configs", "overtime_balances", "smtp_configs",
"users", "work_schedules",
]
# Linked via user_id → users.company_id
USER_JOIN_TABLES = [
"absences", "caldav_user_configs", "password_resets",
"sessions", "time_entries", "vacation_balances",
]
# public_holidays is global no RLS
# ---------------------------------------------------------------------------
# upgrade / downgrade
# ---------------------------------------------------------------------------
def _exec(sql: str) -> None:
op.execute(text(sql))
def _enable(table: str, using: str) -> None:
_exec(f"ALTER TABLE {table} ENABLE ROW LEVEL SECURITY")
_exec(f"ALTER TABLE {table} FORCE ROW LEVEL SECURITY")
for cmd in ("SELECT", "INSERT", "UPDATE", "DELETE"):
_exec(f"DROP POLICY IF EXISTS rls_{table}_{cmd.lower()} ON {table}")
_exec(f"CREATE POLICY rls_{table}_select ON {table} FOR SELECT USING {using}")
_exec(f"CREATE POLICY rls_{table}_insert ON {table} FOR INSERT WITH CHECK {using}")
_exec(f"CREATE POLICY rls_{table}_update ON {table} FOR UPDATE USING {using} WITH CHECK {using}")
_exec(f"CREATE POLICY rls_{table}_delete ON {table} FOR DELETE USING {using}")
def _disable(table: str) -> None:
for cmd in ("select", "insert", "update", "delete"):
_exec(f"DROP POLICY IF EXISTS rls_{table}_{cmd} ON {table}")
_exec(f"ALTER TABLE {table} NO FORCE ROW LEVEL SECURITY")
_exec(f"ALTER TABLE {table} DISABLE ROW LEVEL SECURITY")
def upgrade() -> None:
# companies: restrict by id (companies IS the tenant root)
_enable("companies", _using_iid())
for table in COMPANY_COL_TABLES:
_enable(table, _using_cid())
for table in USER_JOIN_TABLES:
_enable(table, _using_join())
def downgrade() -> None:
_disable("companies")
for table in COMPANY_COL_TABLES:
_disable(table)
for table in USER_JOIN_TABLES:
_disable(table)