Files
timemaster/backend/tests/conftest.py
T
patrick dd3e069466 fix: router db.refresh() nach commit bricht RLS-Kontext
SET LOCAL Werte (bypass_rls, company_id) sind transaktions-gebunden.
Nach db.commit() ist der Kontext weg – ein nachfolgendes db.refresh()
läuft in einer neuen Transaktion ohne RLS-Kontext und liefert 0 Rows.

Da expire_on_commit=False gesetzt ist, sind alle Instanz-Attribute
nach dem Commit bereits im Speicher vorhanden. Die expliziten
db.refresh()-Aufrufe nach db.commit() in allen Routers sind daher
redundant und wurden entfernt.

test_rls.py: 6 neue Tests beweisen DB-seitige Mandanten-Isolation.
conftest.py: _apply_rls() wendet RLS-Policies auf Test-DB an.
migrations/0024: korrigiert auf op.execute(text()) API.
migrations/env.py: SET LOCAL außerhalb Transaktion entfernt.

Ergebnis: 8 failed (pre-existing), 126 passed – identisch zur Baseline vor RLS.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-23 22:34:48 +02:00

135 lines
5.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import pytest_asyncio
from httpx import AsyncClient, ASGITransport
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from sqlalchemy import text
from app.main import app
from app.core.database import Base, get_db
from app.core.limiter import limiter
# Echte PostgreSQL Test-Datenbank (kein SQLite Models nutzen JSONB/UUID)
TEST_DATABASE_URL = "postgresql+asyncpg://timemaster:timemaster_secret_change_me@localhost:5432/timemaster_test"
test_engine = create_async_engine(TEST_DATABASE_URL, echo=False)
TestSessionLocal = async_sessionmaker(test_engine, class_=AsyncSession, expire_on_commit=False)
_BYPASS = "COALESCE(current_setting('app.bypass_rls', true), 'off') = 'on'"
_CID = "company_id = NULLIF(current_setting('app.company_id', true), '')::uuid"
_IID = "id = NULLIF(current_setting('app.company_id', true), '')::uuid"
def _rls_using_cid(): return f"({_BYPASS} OR {_CID})"
def _rls_using_iid(): return f"({_BYPASS} OR {_IID})"
def _rls_using_join(): return (
f"({_BYPASS} OR user_id IN (SELECT id FROM users WHERE {_CID}))"
)
_COMPANY_COL_TABLES = [
"absence_types", "audit_logs", "caldav_company_configs", "departments",
"kiosk_devices", "ldap_configs", "overtime_balances", "smtp_configs",
"users", "work_schedules",
]
_USER_JOIN_TABLES = [
"absences", "caldav_user_configs", "password_resets",
"sessions", "time_entries", "vacation_balances",
]
async def _apply_rls(conn) -> None:
"""Apply the same RLS policies as migration 0024 to the test database."""
def enable(table: str, using: str):
return [
f"ALTER TABLE {table} ENABLE ROW LEVEL SECURITY",
f"ALTER TABLE {table} FORCE ROW LEVEL SECURITY",
f"DROP POLICY IF EXISTS rls_{table}_select ON {table}",
f"DROP POLICY IF EXISTS rls_{table}_insert ON {table}",
f"DROP POLICY IF EXISTS rls_{table}_update ON {table}",
f"DROP POLICY IF EXISTS rls_{table}_delete ON {table}",
f"CREATE POLICY rls_{table}_select ON {table} FOR SELECT USING {using}",
f"CREATE POLICY rls_{table}_insert ON {table} FOR INSERT WITH CHECK {using}",
f"CREATE POLICY rls_{table}_update ON {table} FOR UPDATE USING {using} WITH CHECK {using}",
f"CREATE POLICY rls_{table}_delete ON {table} FOR DELETE USING {using}",
]
for sql in enable("companies", _rls_using_iid()):
await conn.execute(text(sql))
for table in _COMPANY_COL_TABLES:
for sql in enable(table, _rls_using_cid()):
await conn.execute(text(sql))
for table in _USER_JOIN_TABLES:
for sql in enable(table, _rls_using_join()):
await conn.execute(text(sql))
@pytest_asyncio.fixture(scope="session", loop_scope="session", autouse=True)
async def setup_db():
async with test_engine.begin() as conn:
# Schema komplett neu anlegen löst circular dependency departments↔users
await conn.execute(text("DROP SCHEMA public CASCADE"))
await conn.execute(text("CREATE SCHEMA public"))
await conn.execute(text("GRANT ALL ON SCHEMA public TO timemaster"))
await conn.execute(text("GRANT ALL ON SCHEMA public TO public"))
await conn.run_sync(Base.metadata.create_all)
await _apply_rls(conn)
yield
async with test_engine.begin() as conn:
await conn.execute(text("DROP SCHEMA public CASCADE"))
await conn.execute(text("CREATE SCHEMA public"))
await conn.execute(text("GRANT ALL ON SCHEMA public TO timemaster"))
await conn.execute(text("GRANT ALL ON SCHEMA public TO public"))
@pytest_asyncio.fixture(scope="session", loop_scope="session")
async def db_session():
async with TestSessionLocal() as session:
yield session
await session.rollback()
@pytest_asyncio.fixture(scope="session", loop_scope="session")
async def client(db_session: AsyncSession):
async def override_get_db():
try:
# Tests use a shared session without a real transaction context per
# request. Set bypass_rls = 'on' so that all test queries succeed
# regardless of whether app.company_id is set.
await db_session.execute(text("SET LOCAL app.bypass_rls = 'on'"))
yield db_session
await db_session.commit()
except Exception:
await db_session.rollback()
raise
app.dependency_overrides[get_db] = override_get_db
limiter.enabled = False # Rate-Limiter in Tests deaktivieren
async with AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
) as ac:
yield ac
limiter.enabled = True
app.dependency_overrides.clear()
@pytest_asyncio.fixture(scope="session", loop_scope="session")
async def registered_user(client: AsyncClient):
"""Register a company + admin user, return tokens + user data."""
resp = await client.post("/api/v1/auth/register", json={
"company_name": "Test GmbH",
"first_name": "Max",
"last_name": "Mustermann",
"email": "max@testgmbh.de",
"password": "Secret123",
})
assert resp.status_code == 201
tokens = resp.json()
me = await client.get(
"/api/v1/auth/me",
headers={"Authorization": f"Bearer {tokens['access_token']}"},
)
return {"tokens": tokens, "user": me.json()}