Files
timemaster/backend/tests/test_rls.py
T
patrick dd3e069466 fix: router db.refresh() nach commit bricht RLS-Kontext
SET LOCAL Werte (bypass_rls, company_id) sind transaktions-gebunden.
Nach db.commit() ist der Kontext weg – ein nachfolgendes db.refresh()
läuft in einer neuen Transaktion ohne RLS-Kontext und liefert 0 Rows.

Da expire_on_commit=False gesetzt ist, sind alle Instanz-Attribute
nach dem Commit bereits im Speicher vorhanden. Die expliziten
db.refresh()-Aufrufe nach db.commit() in allen Routers sind daher
redundant und wurden entfernt.

test_rls.py: 6 neue Tests beweisen DB-seitige Mandanten-Isolation.
conftest.py: _apply_rls() wendet RLS-Policies auf Test-DB an.
migrations/0024: korrigiert auf op.execute(text()) API.
migrations/env.py: SET LOCAL außerhalb Transaktion entfernt.

Ergebnis: 8 failed (pre-existing), 126 passed – identisch zur Baseline vor RLS.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-23 22:34:48 +02:00

191 lines
7.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
RLS-Tests: Verifiziert dass PostgreSQL Row Level Security
tatsächlich cross-tenant Datenzugriff blockiert unabhängig
von App-seitigen WHERE-Klauseln.
"""
import pytest
import uuid
from httpx import AsyncClient
from sqlalchemy import text
pytestmark = pytest.mark.asyncio
REG_URL = "/api/v1/auth/register"
LOGIN_URL = "/api/v1/auth/login"
async def register_company(client: AsyncClient, suffix: str) -> dict:
"""Registriert eine neue Firma und gibt tokens + company_id zurück."""
resp = await client.post(REG_URL, json={
"company_name": f"RLS-Firma-{suffix}",
"first_name": "Admin",
"last_name": suffix,
"email": f"rls-admin-{suffix}@test.de",
"password": "Secret123",
})
assert resp.status_code == 201, resp.text
tokens = resp.json()
me = await client.get(
"/api/v1/auth/me",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
)
assert me.status_code == 200
return {"tokens": tokens, "user": me.json()}
# ── Hilfsfunktion: Raw-SQL mit explizitem RLS-Kontext ────────────────────────
async def query_users_as_tenant(db_session, company_id: str) -> list[dict]:
"""
Führt SELECT * FROM users direkt aus,
mit gesetztem app.company_id simuliert einen Request als dieser Mandant.
bypass_rls ist OFF, d.h. RLS greift.
"""
await db_session.execute(text("SET LOCAL app.bypass_rls = 'off'"))
await db_session.execute(
text(f"SET LOCAL app.company_id = '{company_id}'")
)
result = await db_session.execute(text("SELECT id, email, company_id FROM users"))
rows = [dict(r._mapping) for r in result.fetchall()]
# Reset: bypass wieder an (damit nachfolgende Tests nicht leiden)
await db_session.execute(text("SET LOCAL app.bypass_rls = 'on'"))
return rows
# ── Tests ────────────────────────────────────────────────────────────────────
async def test_rls_tenant_a_cannot_see_tenant_b_users(
client: AsyncClient, db_session
):
"""Mandant A darf Mandant B's Users nicht sehen (DB-Ebene)."""
a = await register_company(client, "RLS-A")
b = await register_company(client, "RLS-B")
cid_a = str(a["user"]["company_id"])
cid_b = str(b["user"]["company_id"])
# Als Mandant A abfragen
rows_as_a = await query_users_as_tenant(db_session, cid_a)
emails_as_a = {r["email"] for r in rows_as_a}
# Mandant A sieht sich selbst
assert f"rls-admin-RLS-A@test.de" in emails_as_a, \
"Mandant A sollte eigene Daten sehen"
# Mandant A sieht Mandant B NICHT
assert f"rls-admin-RLS-B@test.de" not in emails_as_a, \
"RLS BLOCKIERT NICHT: Mandant A sieht Daten von Mandant B!"
# Doppelcheck: alle zurückgegebenen Rows gehören zu company A
for row in rows_as_a:
assert str(row["company_id"]) == cid_a, \
f"Fremder Mandant in Ergebnis: {row}"
async def test_rls_tenant_b_cannot_see_tenant_a_users(
client: AsyncClient, db_session
):
"""Symmetrietest: Mandant B sieht Mandant A nicht."""
# Firmen aus vorherigem Test existieren schon direkt aus DB holen
result = await db_session.execute(
text("SELECT id FROM companies WHERE name LIKE 'RLS-Firma-%' ORDER BY name")
)
companies = [str(r[0]) for r in result.fetchall()]
assert len(companies) >= 2, "Firmen aus vorherigem Test fehlen"
cid_a, cid_b = companies[0], companies[1]
rows_as_b = await query_users_as_tenant(db_session, cid_b)
company_ids_seen = {str(r["company_id"]) for r in rows_as_b}
assert cid_a not in company_ids_seen, \
"RLS BLOCKIERT NICHT: Mandant B sieht Daten von Mandant A!"
assert cid_b in company_ids_seen, \
"Mandant B sieht keine eigenen Daten RLS zu restriktiv"
async def test_rls_no_context_returns_nothing(db_session):
"""Ohne gesetztes app.company_id gibt SELECT nichts zurück (kein bypass)."""
await db_session.execute(text("SET LOCAL app.bypass_rls = 'off'"))
# app.company_id explizit leeren
await db_session.execute(text("SET LOCAL app.company_id = ''"))
result = await db_session.execute(text("SELECT id FROM users"))
rows = result.fetchall()
# Reset
await db_session.execute(text("SET LOCAL app.bypass_rls = 'on'"))
assert len(rows) == 0, \
f"RLS BLOCKIERT NICHT: {len(rows)} Rows ohne company_id-Kontext sichtbar!"
async def test_rls_bypass_on_sees_all(db_session):
"""Mit bypass_rls='on' sieht ein SUPER_ADMIN alle Mandanten."""
await db_session.execute(text("SET LOCAL app.bypass_rls = 'on'"))
result = await db_session.execute(
text("SELECT DISTINCT company_id FROM users")
)
company_ids = {str(r[0]) for r in result.fetchall()}
assert len(company_ids) >= 2, \
"SUPER_ADMIN sollte mindestens 2 Mandanten sehen (bypass_rls='on')"
async def test_rls_companies_table_isolated(db_session):
"""Auch die companies-Tabelle ist mandanten-isoliert."""
result = await db_session.execute(
text("SELECT id FROM companies WHERE name LIKE 'RLS-Firma-%' ORDER BY name")
)
companies = [str(r[0]) for r in result.fetchall()]
assert len(companies) >= 2
cid_a, cid_b = companies[0], companies[1]
# Als Mandant A: sehe nur meine Firma
await db_session.execute(text("SET LOCAL app.bypass_rls = 'off'"))
await db_session.execute(text(f"SET LOCAL app.company_id = '{cid_a}'"))
result = await db_session.execute(text("SELECT id FROM companies"))
visible = [str(r[0]) for r in result.fetchall()]
await db_session.execute(text("SET LOCAL app.bypass_rls = 'on'"))
assert cid_a in visible, "Eigene Firma nicht sichtbar"
assert cid_b not in visible, "Fremde Firma trotz RLS sichtbar!"
assert len(visible) == 1, f"Mehr als 1 Firma sichtbar: {visible}"
async def test_rls_insert_blocked_for_wrong_tenant(db_session):
"""INSERT in fremden Mandanten wird durch WITH CHECK blockiert."""
result = await db_session.execute(
text("SELECT id FROM companies WHERE name LIKE 'RLS-Firma-%' ORDER BY name LIMIT 2")
)
companies = [str(r[0]) for r in result.fetchall()]
cid_a, cid_b = companies[0], companies[1]
await db_session.execute(text("SET LOCAL app.bypass_rls = 'off'"))
await db_session.execute(text(f"SET LOCAL app.company_id = '{cid_a}'"))
try:
# Versuche, einen User unter Mandant B zu erstellen, während Context = A
fake_id = str(uuid.uuid4())
await db_session.execute(text(f"""
INSERT INTO users (id, company_id, email, first_name, last_name,
role, password_hash, is_active, created_at)
VALUES ('{fake_id}', '{cid_b}', 'rls-inject@evil.de',
'Evil', 'Inject', 'EMPLOYEE', 'fakehash', false, NOW())
"""))
await db_session.flush()
# Wenn wir hier ankommen, hat RLS nicht blockiert
await db_session.rollback()
pytest.fail("RLS WITH CHECK hat INSERT in fremden Mandanten NICHT blockiert!")
except Exception as e:
await db_session.rollback()
# Erwartet: new row violates row-level security policy
assert "row-level security" in str(e).lower() or "policy" in str(e).lower(), \
f"Unerwarteter Fehler (kein RLS-Fehler): {e}"
finally:
await db_session.execute(text("SET LOCAL app.bypass_rls = 'on'"))