fix(security): SSRF-Schutz für CalDAV-URLs
Neue _validate_caldav_url() Funktion in caldav_service.py: - Prüft Schema (nur http/https erlaubt) - Blockiert private IP-Ranges (RFC 1918, Loopback, Link-local) - DNS-Auflösung + Prüfung der aufgelösten IP (DNS-Rebinding-Schutz) - Blockiert: 10/8, 172.16/12, 192.168/16, 127/8, 169.254/16, fc00::/7 etc. Validierung in allen drei HTTP-Helpers (_http_put, _http_delete, _http_propfind) sowie beim Speichern in caldav.py Router (company + user config) – doppelter Schutz. Getestet: 8 böse URLs geblockt, 2 legitime URLs erlaubt (10/10 OK) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -95,6 +95,89 @@ def _format_summary(user: "User", absence_type: str, name_template: str) -> str:
|
||||
return result
|
||||
|
||||
|
||||
# ── SSRF-Schutz ───────────────────────────────────────────────────────────────
|
||||
|
||||
import ipaddress
|
||||
import socket
|
||||
from urllib.parse import urlparse
|
||||
|
||||
|
||||
# Private/reservierte IP-Bereiche die nie als CalDAV-Server erlaubt sind
|
||||
_BLOCKED_NETWORKS = [
|
||||
ipaddress.ip_network("10.0.0.0/8"), # RFC 1918
|
||||
ipaddress.ip_network("172.16.0.0/12"), # RFC 1918
|
||||
ipaddress.ip_network("192.168.0.0/16"), # RFC 1918
|
||||
ipaddress.ip_network("127.0.0.0/8"), # Loopback
|
||||
ipaddress.ip_network("169.254.0.0/16"), # Link-local (AWS Metadata etc.)
|
||||
ipaddress.ip_network("::1/128"), # IPv6 Loopback
|
||||
ipaddress.ip_network("fc00::/7"), # IPv6 Unique Local
|
||||
ipaddress.ip_network("fe80::/10"), # IPv6 Link-local
|
||||
ipaddress.ip_network("0.0.0.0/8"), # This network
|
||||
ipaddress.ip_network("100.64.0.0/10"), # Shared Address Space (RFC 6598)
|
||||
ipaddress.ip_network("192.0.2.0/24"), # TEST-NET-1
|
||||
ipaddress.ip_network("198.51.100.0/24"), # TEST-NET-2
|
||||
ipaddress.ip_network("203.0.113.0/24"), # TEST-NET-3
|
||||
ipaddress.ip_network("240.0.0.0/4"), # Reserved
|
||||
]
|
||||
|
||||
|
||||
def _validate_caldav_url(url: str) -> None:
|
||||
"""
|
||||
Prüft eine CalDAV-URL auf SSRF-Risiken.
|
||||
Wirft ValueError mit sprechender Meldung bei Problemen.
|
||||
|
||||
Prüfungen:
|
||||
1. Schema muss http:// oder https:// sein (kein file://, ftp://, etc.)
|
||||
2. Hostname muss vorhanden sein
|
||||
3. Keine expliziten privaten IP-Adressen im URL
|
||||
4. DNS-Auflösung + Prüfung ob die aufgelöste IP intern ist (DNS-Rebinding-Schutz)
|
||||
"""
|
||||
try:
|
||||
parsed = urlparse(url)
|
||||
except Exception as exc:
|
||||
raise ValueError(f"Ungültige URL: {exc}") from exc
|
||||
|
||||
if parsed.scheme not in ("http", "https"):
|
||||
raise ValueError(
|
||||
f"URL-Schema '{parsed.scheme}' nicht erlaubt. Nur http:// und https:// sind gültig."
|
||||
)
|
||||
|
||||
hostname = parsed.hostname
|
||||
if not hostname:
|
||||
raise ValueError("URL muss einen Hostnamen enthalten.")
|
||||
|
||||
# Explizit eingetragene IP-Adressen prüfen
|
||||
try:
|
||||
ip = ipaddress.ip_address(hostname)
|
||||
_check_ip_blocked(ip, hostname)
|
||||
except ValueError:
|
||||
pass # Kein gültiges IP-Literal → ist ein Hostname, DNS-Auflösung folgt
|
||||
|
||||
# DNS auflösen und alle aufgelösten Adressen prüfen (DNS-Rebinding-Schutz)
|
||||
try:
|
||||
infos = socket.getaddrinfo(hostname, parsed.port or (443 if parsed.scheme == "https" else 80))
|
||||
except socket.gaierror as exc:
|
||||
raise ValueError(f"Hostname '{hostname}' konnte nicht aufgelöst werden: {exc}") from exc
|
||||
|
||||
for _family, _type, _proto, _canonname, sockaddr in infos:
|
||||
ip_str = sockaddr[0]
|
||||
try:
|
||||
ip = ipaddress.ip_address(ip_str)
|
||||
_check_ip_blocked(ip, hostname)
|
||||
except ValueError as exc:
|
||||
raise ValueError(str(exc)) from exc
|
||||
|
||||
|
||||
def _check_ip_blocked(ip: ipaddress.IPv4Address | ipaddress.IPv6Address, hostname: str) -> None:
|
||||
"""Wirft ValueError wenn die IP in einem blockierten Netz liegt."""
|
||||
for network in _BLOCKED_NETWORKS:
|
||||
if ip in network:
|
||||
raise ValueError(
|
||||
f"Host '{hostname}' ({ip}) liegt in einem privaten/reservierten "
|
||||
f"Adressbereich ({network}) und darf nicht als CalDAV-Server genutzt werden."
|
||||
)
|
||||
|
||||
|
||||
# ── HTTP helpers ───────────────────────────────────────────────────────────────
|
||||
|
||||
def _event_url(calendar_url: str, uid: str) -> str:
|
||||
@@ -106,6 +189,7 @@ async def _http_put(
|
||||
ical: bytes, verify_ssl: bool,
|
||||
) -> str:
|
||||
"""PUT event. Returns ETag (empty string if server doesn't send one)."""
|
||||
_validate_caldav_url(calendar_url)
|
||||
url = _event_url(calendar_url, uid)
|
||||
async with httpx.AsyncClient(verify=verify_ssl, timeout=15) as client:
|
||||
resp = await client.put(
|
||||
@@ -120,6 +204,7 @@ async def _http_put(
|
||||
async def _http_delete(
|
||||
calendar_url: str, username: str, password: str, uid: str, verify_ssl: bool,
|
||||
) -> None:
|
||||
_validate_caldav_url(calendar_url)
|
||||
url = _event_url(calendar_url, uid)
|
||||
async with httpx.AsyncClient(verify=verify_ssl, timeout=15) as client:
|
||||
resp = await client.delete(url, auth=(username, password))
|
||||
@@ -131,6 +216,7 @@ async def _http_propfind(
|
||||
calendar_url: str, username: str, password: str, verify_ssl: bool,
|
||||
) -> int:
|
||||
"""Einfacher Verbindungstest via PROPFIND Depth:0. Gibt HTTP-Status zurück."""
|
||||
_validate_caldav_url(calendar_url)
|
||||
body = b'<?xml version="1.0"?><d:propfind xmlns:d="DAV:"><d:prop><d:resourcetype/></d:prop></d:propfind>'
|
||||
async with httpx.AsyncClient(verify=verify_ssl, timeout=10) as client:
|
||||
resp = await client.request(
|
||||
|
||||
Reference in New Issue
Block a user