fix(security): SSRF-Schutz für CalDAV-URLs

Neue _validate_caldav_url() Funktion in caldav_service.py:
- Prüft Schema (nur http/https erlaubt)
- Blockiert private IP-Ranges (RFC 1918, Loopback, Link-local)
- DNS-Auflösung + Prüfung der aufgelösten IP (DNS-Rebinding-Schutz)
- Blockiert: 10/8, 172.16/12, 192.168/16, 127/8, 169.254/16, fc00::/7 etc.

Validierung in allen drei HTTP-Helpers (_http_put, _http_delete, _http_propfind)
sowie beim Speichern in caldav.py Router (company + user config) – doppelter Schutz.

Getestet: 8 böse URLs geblockt, 2 legitime URLs erlaubt (10/10 OK)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-24 12:39:59 +02:00
parent 35fcea90f4
commit 1db7164837
2 changed files with 114 additions and 0 deletions
+28
View File
@@ -50,6 +50,20 @@ async def save_company_config(
cfg = CaldavCompanyConfig(company_id=current_user.company_id, id=uuid.uuid4())
db.add(cfg)
# SSRF-Schutz: URL validieren bevor sie gespeichert wird
if data.calendar_url:
try:
from app.services.caldav_service import _validate_caldav_url
_validate_caldav_url(data.calendar_url)
except ValueError as exc:
raise HTTPException(status_code=400, detail=f"Ungültige Kalender-URL: {exc}")
if data.principal_url:
try:
from app.services.caldav_service import _validate_caldav_url
_validate_caldav_url(data.principal_url)
except ValueError as exc:
raise HTTPException(status_code=400, detail=f"Ungültige Principal-URL: {exc}")
cfg.enabled = data.enabled
cfg.principal_url = data.principal_url
cfg.calendar_url = data.calendar_url
@@ -112,6 +126,20 @@ async def save_user_config(
cfg = CaldavUserConfig(user_id=current_user.id, id=uuid.uuid4())
db.add(cfg)
# SSRF-Schutz: URL validieren bevor sie gespeichert wird
if data.calendar_url:
try:
from app.services.caldav_service import _validate_caldav_url
_validate_caldav_url(data.calendar_url)
except ValueError as exc:
raise HTTPException(status_code=400, detail=f"Ungültige Kalender-URL: {exc}")
if data.principal_url:
try:
from app.services.caldav_service import _validate_caldav_url
_validate_caldav_url(data.principal_url)
except ValueError as exc:
raise HTTPException(status_code=400, detail=f"Ungültige Principal-URL: {exc}")
cfg.enabled = data.enabled
cfg.principal_url = data.principal_url
cfg.calendar_url = data.calendar_url
+86
View File
@@ -95,6 +95,89 @@ def _format_summary(user: "User", absence_type: str, name_template: str) -> str:
return result
# ── SSRF-Schutz ───────────────────────────────────────────────────────────────
import ipaddress
import socket
from urllib.parse import urlparse
# Private/reservierte IP-Bereiche die nie als CalDAV-Server erlaubt sind
_BLOCKED_NETWORKS = [
ipaddress.ip_network("10.0.0.0/8"), # RFC 1918
ipaddress.ip_network("172.16.0.0/12"), # RFC 1918
ipaddress.ip_network("192.168.0.0/16"), # RFC 1918
ipaddress.ip_network("127.0.0.0/8"), # Loopback
ipaddress.ip_network("169.254.0.0/16"), # Link-local (AWS Metadata etc.)
ipaddress.ip_network("::1/128"), # IPv6 Loopback
ipaddress.ip_network("fc00::/7"), # IPv6 Unique Local
ipaddress.ip_network("fe80::/10"), # IPv6 Link-local
ipaddress.ip_network("0.0.0.0/8"), # This network
ipaddress.ip_network("100.64.0.0/10"), # Shared Address Space (RFC 6598)
ipaddress.ip_network("192.0.2.0/24"), # TEST-NET-1
ipaddress.ip_network("198.51.100.0/24"), # TEST-NET-2
ipaddress.ip_network("203.0.113.0/24"), # TEST-NET-3
ipaddress.ip_network("240.0.0.0/4"), # Reserved
]
def _validate_caldav_url(url: str) -> None:
"""
Prüft eine CalDAV-URL auf SSRF-Risiken.
Wirft ValueError mit sprechender Meldung bei Problemen.
Prüfungen:
1. Schema muss http:// oder https:// sein (kein file://, ftp://, etc.)
2. Hostname muss vorhanden sein
3. Keine expliziten privaten IP-Adressen im URL
4. DNS-Auflösung + Prüfung ob die aufgelöste IP intern ist (DNS-Rebinding-Schutz)
"""
try:
parsed = urlparse(url)
except Exception as exc:
raise ValueError(f"Ungültige URL: {exc}") from exc
if parsed.scheme not in ("http", "https"):
raise ValueError(
f"URL-Schema '{parsed.scheme}' nicht erlaubt. Nur http:// und https:// sind gültig."
)
hostname = parsed.hostname
if not hostname:
raise ValueError("URL muss einen Hostnamen enthalten.")
# Explizit eingetragene IP-Adressen prüfen
try:
ip = ipaddress.ip_address(hostname)
_check_ip_blocked(ip, hostname)
except ValueError:
pass # Kein gültiges IP-Literal → ist ein Hostname, DNS-Auflösung folgt
# DNS auflösen und alle aufgelösten Adressen prüfen (DNS-Rebinding-Schutz)
try:
infos = socket.getaddrinfo(hostname, parsed.port or (443 if parsed.scheme == "https" else 80))
except socket.gaierror as exc:
raise ValueError(f"Hostname '{hostname}' konnte nicht aufgelöst werden: {exc}") from exc
for _family, _type, _proto, _canonname, sockaddr in infos:
ip_str = sockaddr[0]
try:
ip = ipaddress.ip_address(ip_str)
_check_ip_blocked(ip, hostname)
except ValueError as exc:
raise ValueError(str(exc)) from exc
def _check_ip_blocked(ip: ipaddress.IPv4Address | ipaddress.IPv6Address, hostname: str) -> None:
"""Wirft ValueError wenn die IP in einem blockierten Netz liegt."""
for network in _BLOCKED_NETWORKS:
if ip in network:
raise ValueError(
f"Host '{hostname}' ({ip}) liegt in einem privaten/reservierten "
f"Adressbereich ({network}) und darf nicht als CalDAV-Server genutzt werden."
)
# ── HTTP helpers ───────────────────────────────────────────────────────────────
def _event_url(calendar_url: str, uid: str) -> str:
@@ -106,6 +189,7 @@ async def _http_put(
ical: bytes, verify_ssl: bool,
) -> str:
"""PUT event. Returns ETag (empty string if server doesn't send one)."""
_validate_caldav_url(calendar_url)
url = _event_url(calendar_url, uid)
async with httpx.AsyncClient(verify=verify_ssl, timeout=15) as client:
resp = await client.put(
@@ -120,6 +204,7 @@ async def _http_put(
async def _http_delete(
calendar_url: str, username: str, password: str, uid: str, verify_ssl: bool,
) -> None:
_validate_caldav_url(calendar_url)
url = _event_url(calendar_url, uid)
async with httpx.AsyncClient(verify=verify_ssl, timeout=15) as client:
resp = await client.delete(url, auth=(username, password))
@@ -131,6 +216,7 @@ async def _http_propfind(
calendar_url: str, username: str, password: str, verify_ssl: bool,
) -> int:
"""Einfacher Verbindungstest via PROPFIND Depth:0. Gibt HTTP-Status zurück."""
_validate_caldav_url(calendar_url)
body = b'<?xml version="1.0"?><d:propfind xmlns:d="DAV:"><d:prop><d:resourcetype/></d:prop></d:propfind>'
async with httpx.AsyncClient(verify=verify_ssl, timeout=10) as client:
resp = await client.request(