Files
patrick 1db7164837 fix(security): SSRF-Schutz für CalDAV-URLs
Neue _validate_caldav_url() Funktion in caldav_service.py:
- Prüft Schema (nur http/https erlaubt)
- Blockiert private IP-Ranges (RFC 1918, Loopback, Link-local)
- DNS-Auflösung + Prüfung der aufgelösten IP (DNS-Rebinding-Schutz)
- Blockiert: 10/8, 172.16/12, 192.168/16, 127/8, 169.254/16, fc00::/7 etc.

Validierung in allen drei HTTP-Helpers (_http_put, _http_delete, _http_propfind)
sowie beim Speichern in caldav.py Router (company + user config) – doppelter Schutz.

Getestet: 8 böse URLs geblockt, 2 legitime URLs erlaubt (10/10 OK)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-24 12:39:59 +02:00

170 lines
6.5 KiB
Python

"""
CalDAV-Konfiguration und manueller Sync-Trigger.
Firmenkalender: nur COMPANY_ADMIN / SUPER_ADMIN
Persönlicher Kalender: jeder eingeloggte Nutzer für sich selbst
"""
import uuid
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_db
from app.core.dependencies import CurrentUser, require_role
from app.models.caldav_config import CaldavCompanyConfig, CaldavUserConfig
from app.models.user import User, UserRole
from app.schemas.caldav import (
CaldavCompanyConfigOut,
CaldavCompanyConfigSave,
CaldavUserConfigOut,
CaldavUserConfigSave,
ResyncResult,
)
from app.services.caldav_service import caldav_service, encrypt_password
router = APIRouter(prefix="/caldav", tags=["CalDAV"])
_admin_roles = (UserRole.COMPANY_ADMIN, UserRole.SUPER_ADMIN)
# ── Firmenkalender ─────────────────────────────────────────────────────────────
@router.get("/company/config", response_model=CaldavCompanyConfigOut | None)
async def get_company_config(
current_user: User = require_role(*_admin_roles),
db: AsyncSession = Depends(get_db),
):
cfg = await caldav_service.get_company_config(current_user.company_id, db)
return CaldavCompanyConfigOut.model_validate(cfg) if cfg else None
@router.post("/company/config", response_model=CaldavCompanyConfigOut)
async def save_company_config(
data: CaldavCompanyConfigSave,
current_user: User = require_role(*_admin_roles),
db: AsyncSession = Depends(get_db),
):
cfg = await caldav_service.get_company_config(current_user.company_id, db)
if cfg is None:
cfg = CaldavCompanyConfig(company_id=current_user.company_id, id=uuid.uuid4())
db.add(cfg)
# SSRF-Schutz: URL validieren bevor sie gespeichert wird
if data.calendar_url:
try:
from app.services.caldav_service import _validate_caldav_url
_validate_caldav_url(data.calendar_url)
except ValueError as exc:
raise HTTPException(status_code=400, detail=f"Ungültige Kalender-URL: {exc}")
if data.principal_url:
try:
from app.services.caldav_service import _validate_caldav_url
_validate_caldav_url(data.principal_url)
except ValueError as exc:
raise HTTPException(status_code=400, detail=f"Ungültige Principal-URL: {exc}")
cfg.enabled = data.enabled
cfg.principal_url = data.principal_url
cfg.calendar_url = data.calendar_url
cfg.username = data.username
cfg.calendar_display_name = data.calendar_display_name
cfg.verify_ssl = data.verify_ssl
if data.password:
cfg.password_encrypted = encrypt_password(data.password)
elif not cfg.password_encrypted:
raise HTTPException(status_code=400, detail="Passwort wird beim ersten Speichern benötigt.")
await db.commit()
return CaldavCompanyConfigOut.model_validate(cfg)
@router.post("/company/test")
async def test_company_config(
current_user: User = require_role(*_admin_roles),
db: AsyncSession = Depends(get_db),
):
cfg = await caldav_service.get_company_config(current_user.company_id, db)
if not cfg:
raise HTTPException(status_code=404, detail="Keine Firmen-CalDAV-Konfiguration vorhanden.")
result = await caldav_service.test_config(cfg)
if not result["ok"]:
raise HTTPException(status_code=502, detail=result["error"])
return {"message": "Verbindung erfolgreich.", "http_status": result.get("status")}
@router.post("/company/resync", response_model=ResyncResult)
async def resync_all(
current_user: User = require_role(*_admin_roles),
db: AsyncSession = Depends(get_db),
):
"""Alle genehmigten Abwesenheiten neu in den Firmenkalender synchronisieren."""
result = await caldav_service.resync_all_approved(current_user.company_id, db)
await db.commit()
return ResyncResult(**result)
# ── Persönlicher Kalender ──────────────────────────────────────────────────────
@router.get("/user/config", response_model=CaldavUserConfigOut | None)
async def get_user_config(
current_user: CurrentUser,
db: AsyncSession = Depends(get_db),
):
cfg = await caldav_service.get_user_config(current_user.id, db)
return CaldavUserConfigOut.model_validate(cfg) if cfg else None
@router.post("/user/config", response_model=CaldavUserConfigOut)
async def save_user_config(
data: CaldavUserConfigSave,
current_user: CurrentUser,
db: AsyncSession = Depends(get_db),
):
cfg = await caldav_service.get_user_config(current_user.id, db)
if cfg is None:
cfg = CaldavUserConfig(user_id=current_user.id, id=uuid.uuid4())
db.add(cfg)
# SSRF-Schutz: URL validieren bevor sie gespeichert wird
if data.calendar_url:
try:
from app.services.caldav_service import _validate_caldav_url
_validate_caldav_url(data.calendar_url)
except ValueError as exc:
raise HTTPException(status_code=400, detail=f"Ungültige Kalender-URL: {exc}")
if data.principal_url:
try:
from app.services.caldav_service import _validate_caldav_url
_validate_caldav_url(data.principal_url)
except ValueError as exc:
raise HTTPException(status_code=400, detail=f"Ungültige Principal-URL: {exc}")
cfg.enabled = data.enabled
cfg.principal_url = data.principal_url
cfg.calendar_url = data.calendar_url
cfg.username = data.username
cfg.calendar_display_name = data.calendar_display_name
cfg.verify_ssl = data.verify_ssl
if data.password:
cfg.password_encrypted = encrypt_password(data.password)
elif not cfg.password_encrypted:
raise HTTPException(status_code=400, detail="Passwort wird beim ersten Speichern benötigt.")
await db.commit()
return CaldavUserConfigOut.model_validate(cfg)
@router.post("/user/test")
async def test_user_config(
current_user: CurrentUser,
db: AsyncSession = Depends(get_db),
):
cfg = await caldav_service.get_user_config(current_user.id, db)
if not cfg:
raise HTTPException(status_code=404, detail="Keine persönliche CalDAV-Konfiguration vorhanden.")
result = await caldav_service.test_config(cfg)
if not result["ok"]:
raise HTTPException(status_code=502, detail=result["error"])
return {"message": "Verbindung erfolgreich.", "http_status": result.get("status")}