security: H-1 settings-Whitelist + H-5 UUID-Guard + H-6 DNS-Pinning + H-7 Heartbeat-Timing

H-1: company.settings als typisiertes Sub-Schema
- schemas/company.py: CompanySettingsUpdate mit extra=forbid
- Nur bekannte Keys (carryover_expires_month/day) erlaubt
- Unbekannte Keys → HTTP 422

H-5: SQL-Injection defensiv absichern
- dependencies.py: UUID-Round-Trip str(_uuid.UUID(...)) + Sicherheitskommentar

H-6: CalDAV DNS-Rebinding-Schutz
- caldav_service.py: PinnedIPTransport — IP einmal auflösen, beim Request fixieren
- _validate_caldav_url gibt aufgelöste IP zurück
- Alle HTTP-Methoden nutzen PinnedIPTransport

H-7: Heartbeat-Timestamp nach Route-Logik
- kiosk_security.py: last_heartbeat_at-Update aus Dependency entfernt
- kiosk_service.py: Update erst in process_heartbeat() nach erfolgreicher Auth

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-26 11:35:18 +02:00
parent 654258f13e
commit 4dc69137dd
7 changed files with 152 additions and 24 deletions
+67 -11
View File
@@ -121,16 +121,21 @@ _BLOCKED_NETWORKS = [
]
def _validate_caldav_url(url: str) -> None:
async def _validate_caldav_url(url: str) -> str:
"""
Prüft eine CalDAV-URL auf SSRF-Risiken.
Wirft ValueError mit sprechender Meldung bei Problemen.
Prüft eine CalDAV-URL auf SSRF-Risiken und gibt die aufgelöste IP zurück.
Die zurückgegebene IP wird beim eigentlichen HTTP-Request via PinnedIPTransport
fest eingesetzt, um DNS-Rebinding zwischen Validierung und Request zu verhindern
(TTL=0-Angriff: zweite DNS-Auflösung durch httpx könnte andere IP liefern).
Prüfungen:
1. Schema muss http:// oder https:// sein (kein file://, ftp://, etc.)
2. Hostname muss vorhanden sein
3. Keine expliziten privaten IP-Adressen im URL
4. DNS-Auflösung + Prüfung ob die aufgelöste IP intern ist (DNS-Rebinding-Schutz)
Gibt die erste aufgelöste (und erlaubte) IP als String zurück.
"""
try:
parsed = urlparse(url)
@@ -150,23 +155,39 @@ def _validate_caldav_url(url: str) -> None:
try:
ip = ipaddress.ip_address(hostname)
_check_ip_blocked(ip, hostname)
# hostname ist bereits eine IP-Literal → direkt zurückgeben
return str(ip)
except ValueError:
pass # Kein gültiges IP-Literal → ist ein Hostname, DNS-Auflösung folgt
# DNS auflösen und alle aufgelösten Adressen prüfen (DNS-Rebinding-Schutz)
# DNS auflösen und alle aufgelösten Adressen prüfen (DNS-Rebinding-Schutz).
# socket.getaddrinfo ist blockierend → in Executor auslagern damit der Event-Loop
# nicht blockiert wird.
loop = asyncio.get_event_loop()
port = parsed.port or (443 if parsed.scheme == "https" else 80)
try:
infos = socket.getaddrinfo(hostname, parsed.port or (443 if parsed.scheme == "https" else 80))
infos = await loop.run_in_executor(
None, socket.getaddrinfo, hostname, port
)
except socket.gaierror as exc:
raise ValueError(f"Hostname '{hostname}' konnte nicht aufgelöst werden: {exc}") from exc
first_allowed_ip: str | None = None
for _family, _type, _proto, _canonname, sockaddr in infos:
ip_str = sockaddr[0]
try:
ip = ipaddress.ip_address(ip_str)
_check_ip_blocked(ip, hostname)
if first_allowed_ip is None:
first_allowed_ip = ip_str
except ValueError as exc:
raise ValueError(str(exc)) from exc
if first_allowed_ip is None:
raise ValueError(f"Hostname '{hostname}' konnte nicht auf eine erlaubte IP aufgelöst werden.")
return first_allowed_ip
def _get_allowed_networks() -> list[ipaddress.IPv4Network | ipaddress.IPv6Network]:
"""Gibt die konfigurierten Whitelist-Netzwerke zurück (CALDAV_ALLOWED_CIDRS)."""
@@ -203,6 +224,38 @@ def _check_ip_blocked(ip: ipaddress.IPv4Address | ipaddress.IPv6Address, hostnam
)
# ── DNS-Rebinding-sicherer Transport ──────────────────────────────────────────
class PinnedIPTransport(httpx.AsyncHTTPTransport):
"""
Sicherheits-Transport: Nutzt eine vorab aufgelöste IP statt erneuter DNS-Auflösung.
Verhindert DNS-Rebinding zwischen URL-Validierung und eigentlichem HTTP-Request
(Angriff mit TTL=0: zweite DNS-Auflösung durch httpx könnte andere IP liefern).
Der originale Host-Header bleibt erhalten, damit TLS-SNI und
vHost-Routing des Servers korrekt funktionieren.
"""
def __init__(self, pinned_ip: str, **kwargs):
super().__init__(**kwargs)
self._pinned_ip = pinned_ip
async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
# URL mit aufgelöster IP statt Hostname umschreiben
new_url = request.url.copy_with(host=self._pinned_ip)
# Neues Request-Objekt mit ersetzter URL; alle Header (inkl. Host) bleiben
# erhalten httpx setzt Host automatisch auf den original-Hostnamen wenn
# er nicht explizit überschrieben wird, was SNI und vHost korrekt hält.
request = request.__class__(
method=request.method,
url=new_url,
headers=request.headers,
stream=request.stream,
extensions=request.extensions,
)
return await super().handle_async_request(request)
# ── HTTP helpers ───────────────────────────────────────────────────────────────
def _event_url(calendar_url: str, uid: str) -> str:
@@ -228,10 +281,11 @@ async def _http_put(
ical: bytes, verify_ssl: bool,
) -> str:
"""PUT event. Returns ETag (empty string if server doesn't send one)."""
_validate_caldav_url(calendar_url)
pinned_ip = await _validate_caldav_url(calendar_url)
verify_ssl = _effective_verify_ssl(verify_ssl)
url = _event_url(calendar_url, uid)
async with httpx.AsyncClient(verify=verify_ssl, timeout=15) as client:
transport = PinnedIPTransport(pinned_ip=pinned_ip, verify=verify_ssl)
async with httpx.AsyncClient(transport=transport, verify=verify_ssl, timeout=15) as client:
resp = await client.put(
url, content=ical,
headers={"Content-Type": "text/calendar; charset=utf-8"},
@@ -244,10 +298,11 @@ async def _http_put(
async def _http_delete(
calendar_url: str, username: str, password: str, uid: str, verify_ssl: bool,
) -> None:
_validate_caldav_url(calendar_url)
pinned_ip = await _validate_caldav_url(calendar_url)
verify_ssl = _effective_verify_ssl(verify_ssl)
url = _event_url(calendar_url, uid)
async with httpx.AsyncClient(verify=verify_ssl, timeout=15) as client:
transport = PinnedIPTransport(pinned_ip=pinned_ip, verify=verify_ssl)
async with httpx.AsyncClient(transport=transport, verify=verify_ssl, timeout=15) as client:
resp = await client.delete(url, auth=(username, password))
if resp.status_code not in (200, 204, 404):
resp.raise_for_status()
@@ -257,10 +312,11 @@ async def _http_propfind(
calendar_url: str, username: str, password: str, verify_ssl: bool,
) -> int:
"""Einfacher Verbindungstest via PROPFIND Depth:0. Gibt HTTP-Status zurück."""
_validate_caldav_url(calendar_url)
pinned_ip = await _validate_caldav_url(calendar_url)
verify_ssl = _effective_verify_ssl(verify_ssl)
body = b'<?xml version="1.0"?><d:propfind xmlns:d="DAV:"><d:prop><d:resourcetype/></d:prop></d:propfind>'
async with httpx.AsyncClient(verify=verify_ssl, timeout=10) as client:
transport = PinnedIPTransport(pinned_ip=pinned_ip, verify=verify_ssl)
async with httpx.AsyncClient(transport=transport, verify=verify_ssl, timeout=10) as client:
resp = await client.request(
"PROPFIND", calendar_url.rstrip("/") + "/",
content=body,