""" RLS-Tests: Verifiziert dass PostgreSQL Row Level Security tatsächlich cross-tenant Datenzugriff blockiert – unabhängig von App-seitigen WHERE-Klauseln. """ import pytest import uuid from httpx import AsyncClient from sqlalchemy import text pytestmark = pytest.mark.asyncio REG_URL = "/api/v1/auth/register" LOGIN_URL = "/api/v1/auth/login" async def register_company(client: AsyncClient, suffix: str) -> dict: """Registriert eine neue Firma und gibt tokens + company_id zurück.""" resp = await client.post(REG_URL, json={ "company_name": f"RLS-Firma-{suffix}", "first_name": "Admin", "last_name": suffix, "email": f"rls-admin-{suffix}@test.de", "password": "Secret123", }) assert resp.status_code == 201, resp.text tokens = resp.json() me = await client.get( "/api/v1/auth/me", headers={"Authorization": f"Bearer {tokens['access_token']}"}, ) assert me.status_code == 200 return {"tokens": tokens, "user": me.json()} # ── Hilfsfunktion: Raw-SQL mit explizitem RLS-Kontext ──────────────────────── async def query_users_as_tenant(db_session, company_id: str) -> list[dict]: """ Führt SELECT * FROM users direkt aus, mit gesetztem app.company_id – simuliert einen Request als dieser Mandant. bypass_rls ist OFF, d.h. RLS greift. """ await db_session.execute(text("SET LOCAL app.bypass_rls = 'off'")) await db_session.execute( text(f"SET LOCAL app.company_id = '{company_id}'") ) result = await db_session.execute(text("SELECT id, email, company_id FROM users")) rows = [dict(r._mapping) for r in result.fetchall()] # Reset: bypass wieder an (damit nachfolgende Tests nicht leiden) await db_session.execute(text("SET LOCAL app.bypass_rls = 'on'")) return rows # ── Tests ──────────────────────────────────────────────────────────────────── async def test_rls_tenant_a_cannot_see_tenant_b_users( client: AsyncClient, db_session ): """Mandant A darf Mandant B's Users nicht sehen (DB-Ebene).""" a = await register_company(client, "RLS-A") b = await register_company(client, "RLS-B") cid_a = str(a["user"]["company_id"]) cid_b = str(b["user"]["company_id"]) # Als Mandant A abfragen rows_as_a = await query_users_as_tenant(db_session, cid_a) emails_as_a = {r["email"] for r in rows_as_a} # Mandant A sieht sich selbst assert f"rls-admin-RLS-A@test.de" in emails_as_a, \ "Mandant A sollte eigene Daten sehen" # Mandant A sieht Mandant B NICHT assert f"rls-admin-RLS-B@test.de" not in emails_as_a, \ "RLS BLOCKIERT NICHT: Mandant A sieht Daten von Mandant B!" # Doppelcheck: alle zurückgegebenen Rows gehören zu company A for row in rows_as_a: assert str(row["company_id"]) == cid_a, \ f"Fremder Mandant in Ergebnis: {row}" async def test_rls_tenant_b_cannot_see_tenant_a_users( client: AsyncClient, db_session ): """Symmetrietest: Mandant B sieht Mandant A nicht.""" # Firmen aus vorherigem Test existieren schon – direkt aus DB holen result = await db_session.execute( text("SELECT id FROM companies WHERE name LIKE 'RLS-Firma-%' ORDER BY name") ) companies = [str(r[0]) for r in result.fetchall()] assert len(companies) >= 2, "Firmen aus vorherigem Test fehlen" cid_a, cid_b = companies[0], companies[1] rows_as_b = await query_users_as_tenant(db_session, cid_b) company_ids_seen = {str(r["company_id"]) for r in rows_as_b} assert cid_a not in company_ids_seen, \ "RLS BLOCKIERT NICHT: Mandant B sieht Daten von Mandant A!" assert cid_b in company_ids_seen, \ "Mandant B sieht keine eigenen Daten – RLS zu restriktiv" async def test_rls_no_context_returns_nothing(db_session): """Ohne gesetztes app.company_id gibt SELECT nichts zurück (kein bypass).""" await db_session.execute(text("SET LOCAL app.bypass_rls = 'off'")) # app.company_id explizit leeren await db_session.execute(text("SET LOCAL app.company_id = ''")) result = await db_session.execute(text("SELECT id FROM users")) rows = result.fetchall() # Reset await db_session.execute(text("SET LOCAL app.bypass_rls = 'on'")) assert len(rows) == 0, \ f"RLS BLOCKIERT NICHT: {len(rows)} Rows ohne company_id-Kontext sichtbar!" async def test_rls_bypass_on_sees_all(db_session): """Mit bypass_rls='on' sieht ein SUPER_ADMIN alle Mandanten.""" await db_session.execute(text("SET LOCAL app.bypass_rls = 'on'")) result = await db_session.execute( text("SELECT DISTINCT company_id FROM users") ) company_ids = {str(r[0]) for r in result.fetchall()} assert len(company_ids) >= 2, \ "SUPER_ADMIN sollte mindestens 2 Mandanten sehen (bypass_rls='on')" async def test_rls_companies_table_isolated(db_session): """Auch die companies-Tabelle ist mandanten-isoliert.""" result = await db_session.execute( text("SELECT id FROM companies WHERE name LIKE 'RLS-Firma-%' ORDER BY name") ) companies = [str(r[0]) for r in result.fetchall()] assert len(companies) >= 2 cid_a, cid_b = companies[0], companies[1] # Als Mandant A: sehe nur meine Firma await db_session.execute(text("SET LOCAL app.bypass_rls = 'off'")) await db_session.execute(text(f"SET LOCAL app.company_id = '{cid_a}'")) result = await db_session.execute(text("SELECT id FROM companies")) visible = [str(r[0]) for r in result.fetchall()] await db_session.execute(text("SET LOCAL app.bypass_rls = 'on'")) assert cid_a in visible, "Eigene Firma nicht sichtbar" assert cid_b not in visible, "Fremde Firma trotz RLS sichtbar!" assert len(visible) == 1, f"Mehr als 1 Firma sichtbar: {visible}" async def test_rls_insert_blocked_for_wrong_tenant(db_session): """INSERT in fremden Mandanten wird durch WITH CHECK blockiert.""" result = await db_session.execute( text("SELECT id FROM companies WHERE name LIKE 'RLS-Firma-%' ORDER BY name LIMIT 2") ) companies = [str(r[0]) for r in result.fetchall()] cid_a, cid_b = companies[0], companies[1] await db_session.execute(text("SET LOCAL app.bypass_rls = 'off'")) await db_session.execute(text(f"SET LOCAL app.company_id = '{cid_a}'")) try: # Versuche, einen User unter Mandant B zu erstellen, während Context = A fake_id = str(uuid.uuid4()) await db_session.execute(text(f""" INSERT INTO users (id, company_id, email, first_name, last_name, role, password_hash, is_active, created_at) VALUES ('{fake_id}', '{cid_b}', 'rls-inject@evil.de', 'Evil', 'Inject', 'EMPLOYEE', 'fakehash', false, NOW()) """)) await db_session.flush() # Wenn wir hier ankommen, hat RLS nicht blockiert await db_session.rollback() pytest.fail("RLS WITH CHECK hat INSERT in fremden Mandanten NICHT blockiert!") except Exception as e: await db_session.rollback() # Erwartet: new row violates row-level security policy assert "row-level security" in str(e).lower() or "policy" in str(e).lower(), \ f"Unerwarteter Fehler (kein RLS-Fehler): {e}" finally: await db_session.execute(text("SET LOCAL app.bypass_rls = 'on'"))