Files
timemaster/backend/app/services/caldav_service.py
T
patrick 4dc69137dd security: H-1 settings-Whitelist + H-5 UUID-Guard + H-6 DNS-Pinning + H-7 Heartbeat-Timing
H-1: company.settings als typisiertes Sub-Schema
- schemas/company.py: CompanySettingsUpdate mit extra=forbid
- Nur bekannte Keys (carryover_expires_month/day) erlaubt
- Unbekannte Keys → HTTP 422

H-5: SQL-Injection defensiv absichern
- dependencies.py: UUID-Round-Trip str(_uuid.UUID(...)) + Sicherheitskommentar

H-6: CalDAV DNS-Rebinding-Schutz
- caldav_service.py: PinnedIPTransport — IP einmal auflösen, beim Request fixieren
- _validate_caldav_url gibt aufgelöste IP zurück
- Alle HTTP-Methoden nutzen PinnedIPTransport

H-7: Heartbeat-Timestamp nach Route-Logik
- kiosk_security.py: last_heartbeat_at-Update aus Dependency entfernt
- kiosk_service.py: Update erst in process_heartbeat() nach erfolgreicher Auth

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-26 11:35:18 +02:00

492 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
CalDAV-Sync für Abwesenheiten.
Logik:
approve → VEVENT in persönlichem Kalender (CaldavUserConfig) +
VEVENT in Firmenkalender (CaldavCompanyConfig)
reject / cancel → DELETE aus beiden Kalendern
Verwendet httpx für die HTTP-Kommunikation und icalendar für iCal-Erzeugung.
Passwörter werden Fernet-verschlüsselt gespeichert (gleiche Methode wie SMTP/LDAP).
"""
from __future__ import annotations
import asyncio
import base64
import hashlib
import logging
import uuid
from datetime import date, timedelta, timezone, datetime
from typing import Union
import httpx
from icalendar import Calendar, Event
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.config import settings
from app.models.absence import Absence
from app.models.caldav_config import CaldavCompanyConfig, CaldavUserConfig
log = logging.getLogger(__name__)
# ── Crypto (shared with SMTP/LDAP) ────────────────────────────────────────────
def _fernet():
from cryptography.fernet import Fernet
key = hashlib.sha256(settings.secret_key.encode()).digest()
return Fernet(base64.urlsafe_b64encode(key))
def encrypt_password(plain: str) -> str:
return _fernet().encrypt(plain.encode()).decode()
def decrypt_password(encrypted: str) -> str:
return _fernet().decrypt(encrypted.encode()).decode()
# ── iCal builder ──────────────────────────────────────────────────────────────
def _build_ical(uid: str, summary: str, start: date, end: date, description: str = "") -> bytes:
"""Erzeugt einen VCALENDAR-Blob für ein ganztägiges Ereignis."""
cal = Calendar()
cal.add("prodid", "-//TimeMaster//DE")
cal.add("version", "2.0")
ev = Event()
ev.add("uid", f"{uid}@timemaster")
ev.add("dtstart", start)
ev.add("dtend", end + timedelta(days=1)) # DTEND ist exklusiv
ev.add("summary", summary)
if description:
ev.add("description", description)
ev.add("status", "CONFIRMED")
ev.add("transp", "TRANSPARENT") # zeigt keine Verfügbarkeit als blockiert
ev.add("dtstamp", datetime.now(timezone.utc))
cal.add_component(ev)
return cal.to_ical()
# ── Kalender-Titel formatieren ────────────────────────────────────────────────
def _format_summary(user: "User", absence_type: str, name_template: str) -> str:
"""
Ersetzt Platzhalter im name_template:
$vorname → vollständiger Vorname
$nachname → vollständiger Nachname
$vorname_short → erster Buchstabe Vorname
$nachname_middle → erste 3 Buchstaben Nachname
$kuerzel → manuell gesetztes Kürzel (Fallback: Initialen)
$personalnummer → Personalnummer (leer wenn nicht gesetzt)
$typ → Abwesenheitsart
"""
kuerzel = user.kuerzel if user.kuerzel else (user.first_name[:1] + user.last_name[:1]).upper()
result = name_template
result = result.replace("$vorname_short", user.first_name[:1])
result = result.replace("$nachname_middle", user.last_name[:3])
result = result.replace("$vorname", user.first_name)
result = result.replace("$nachname", user.last_name)
result = result.replace("$kuerzel", kuerzel)
result = result.replace("$personalnummer", user.personnel_number or "")
result = result.replace("$typ", absence_type)
return result
# ── SSRF-Schutz ───────────────────────────────────────────────────────────────
import ipaddress
import socket
from urllib.parse import urlparse
# Private/reservierte IP-Bereiche die nie als CalDAV-Server erlaubt sind
_BLOCKED_NETWORKS = [
ipaddress.ip_network("10.0.0.0/8"), # RFC 1918
ipaddress.ip_network("172.16.0.0/12"), # RFC 1918
ipaddress.ip_network("192.168.0.0/16"), # RFC 1918
ipaddress.ip_network("127.0.0.0/8"), # Loopback
ipaddress.ip_network("169.254.0.0/16"), # Link-local (AWS Metadata etc.)
ipaddress.ip_network("::1/128"), # IPv6 Loopback
ipaddress.ip_network("fc00::/7"), # IPv6 Unique Local
ipaddress.ip_network("fe80::/10"), # IPv6 Link-local
ipaddress.ip_network("0.0.0.0/8"), # This network
ipaddress.ip_network("100.64.0.0/10"), # Shared Address Space (RFC 6598)
ipaddress.ip_network("192.0.2.0/24"), # TEST-NET-1
ipaddress.ip_network("198.51.100.0/24"), # TEST-NET-2
ipaddress.ip_network("203.0.113.0/24"), # TEST-NET-3
ipaddress.ip_network("240.0.0.0/4"), # Reserved
]
async def _validate_caldav_url(url: str) -> str:
"""
Prüft eine CalDAV-URL auf SSRF-Risiken und gibt die aufgelöste IP zurück.
Die zurückgegebene IP wird beim eigentlichen HTTP-Request via PinnedIPTransport
fest eingesetzt, um DNS-Rebinding zwischen Validierung und Request zu verhindern
(TTL=0-Angriff: zweite DNS-Auflösung durch httpx könnte andere IP liefern).
Prüfungen:
1. Schema muss http:// oder https:// sein (kein file://, ftp://, etc.)
2. Hostname muss vorhanden sein
3. Keine expliziten privaten IP-Adressen im URL
4. DNS-Auflösung + Prüfung ob die aufgelöste IP intern ist (DNS-Rebinding-Schutz)
Gibt die erste aufgelöste (und erlaubte) IP als String zurück.
"""
try:
parsed = urlparse(url)
except Exception as exc:
raise ValueError(f"Ungültige URL: {exc}") from exc
if parsed.scheme not in ("http", "https"):
raise ValueError(
f"URL-Schema '{parsed.scheme}' nicht erlaubt. Nur http:// und https:// sind gültig."
)
hostname = parsed.hostname
if not hostname:
raise ValueError("URL muss einen Hostnamen enthalten.")
# Explizit eingetragene IP-Adressen prüfen
try:
ip = ipaddress.ip_address(hostname)
_check_ip_blocked(ip, hostname)
# hostname ist bereits eine IP-Literal → direkt zurückgeben
return str(ip)
except ValueError:
pass # Kein gültiges IP-Literal → ist ein Hostname, DNS-Auflösung folgt
# DNS auflösen und alle aufgelösten Adressen prüfen (DNS-Rebinding-Schutz).
# socket.getaddrinfo ist blockierend → in Executor auslagern damit der Event-Loop
# nicht blockiert wird.
loop = asyncio.get_event_loop()
port = parsed.port or (443 if parsed.scheme == "https" else 80)
try:
infos = await loop.run_in_executor(
None, socket.getaddrinfo, hostname, port
)
except socket.gaierror as exc:
raise ValueError(f"Hostname '{hostname}' konnte nicht aufgelöst werden: {exc}") from exc
first_allowed_ip: str | None = None
for _family, _type, _proto, _canonname, sockaddr in infos:
ip_str = sockaddr[0]
try:
ip = ipaddress.ip_address(ip_str)
_check_ip_blocked(ip, hostname)
if first_allowed_ip is None:
first_allowed_ip = ip_str
except ValueError as exc:
raise ValueError(str(exc)) from exc
if first_allowed_ip is None:
raise ValueError(f"Hostname '{hostname}' konnte nicht auf eine erlaubte IP aufgelöst werden.")
return first_allowed_ip
def _get_allowed_networks() -> list[ipaddress.IPv4Network | ipaddress.IPv6Network]:
"""Gibt die konfigurierten Whitelist-Netzwerke zurück (CALDAV_ALLOWED_CIDRS)."""
from app.core.config import settings
networks = []
for cidr in settings.caldav_allowed_cidrs:
cidr = cidr.strip()
if not cidr:
continue
try:
networks.append(ipaddress.ip_network(cidr, strict=False))
except ValueError:
log.warning("Ungültiger CIDR in CALDAV_ALLOWED_CIDRS ignoriert: %r", cidr)
return networks
def _check_ip_blocked(ip: ipaddress.IPv4Address | ipaddress.IPv6Address, hostname: str) -> None:
"""Wirft ValueError wenn die IP in einem blockierten Netz liegt.
Whitelist (CALDAV_ALLOWED_CIDRS) schlägt die Blockliste
damit können interne Nextcloud-Server im LAN trotzdem genutzt werden.
"""
# Whitelist-Check zuerst: explizit erlaubte CIDRs überschreiben die Blockliste
for allowed in _get_allowed_networks():
if ip in allowed:
return # explizit erlaubt → kein Fehler
for network in _BLOCKED_NETWORKS:
if ip in network:
raise ValueError(
f"Host '{hostname}' ({ip}) liegt in einem privaten/reservierten "
f"Adressbereich ({network}) und darf nicht als CalDAV-Server genutzt werden. "
f"Tipp: Interne Server können per CALDAV_ALLOWED_CIDRS freigegeben werden."
)
# ── DNS-Rebinding-sicherer Transport ──────────────────────────────────────────
class PinnedIPTransport(httpx.AsyncHTTPTransport):
"""
Sicherheits-Transport: Nutzt eine vorab aufgelöste IP statt erneuter DNS-Auflösung.
Verhindert DNS-Rebinding zwischen URL-Validierung und eigentlichem HTTP-Request
(Angriff mit TTL=0: zweite DNS-Auflösung durch httpx könnte andere IP liefern).
Der originale Host-Header bleibt erhalten, damit TLS-SNI und
vHost-Routing des Servers korrekt funktionieren.
"""
def __init__(self, pinned_ip: str, **kwargs):
super().__init__(**kwargs)
self._pinned_ip = pinned_ip
async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
# URL mit aufgelöster IP statt Hostname umschreiben
new_url = request.url.copy_with(host=self._pinned_ip)
# Neues Request-Objekt mit ersetzter URL; alle Header (inkl. Host) bleiben
# erhalten httpx setzt Host automatisch auf den original-Hostnamen wenn
# er nicht explizit überschrieben wird, was SNI und vHost korrekt hält.
request = request.__class__(
method=request.method,
url=new_url,
headers=request.headers,
stream=request.stream,
extensions=request.extensions,
)
return await super().handle_async_request(request)
# ── HTTP helpers ───────────────────────────────────────────────────────────────
def _event_url(calendar_url: str, uid: str) -> str:
return calendar_url.rstrip("/") + f"/{uid}.ics"
def _effective_verify_ssl(verify_ssl: bool) -> bool:
"""
Gibt den tatsächlich zu verwendenden verify_ssl-Wert zurück.
In Production ist SSL-Verifikation immer aktiviert verify_ssl=False wird ignoriert.
"""
if settings.is_production and not verify_ssl:
log.warning(
"CalDAV: verify_ssl=False in Production ignoriert SSL-Verifikation wird erzwungen. "
"Für selbstsignierte Zertifikate das CA-Bundle unter REQUESTS_CA_BUNDLE konfigurieren."
)
return True
return verify_ssl
async def _http_put(
calendar_url: str, username: str, password: str, uid: str,
ical: bytes, verify_ssl: bool,
) -> str:
"""PUT event. Returns ETag (empty string if server doesn't send one)."""
pinned_ip = await _validate_caldav_url(calendar_url)
verify_ssl = _effective_verify_ssl(verify_ssl)
url = _event_url(calendar_url, uid)
transport = PinnedIPTransport(pinned_ip=pinned_ip, verify=verify_ssl)
async with httpx.AsyncClient(transport=transport, verify=verify_ssl, timeout=15) as client:
resp = await client.put(
url, content=ical,
headers={"Content-Type": "text/calendar; charset=utf-8"},
auth=(username, password),
)
resp.raise_for_status()
return resp.headers.get("ETag", "")
async def _http_delete(
calendar_url: str, username: str, password: str, uid: str, verify_ssl: bool,
) -> None:
pinned_ip = await _validate_caldav_url(calendar_url)
verify_ssl = _effective_verify_ssl(verify_ssl)
url = _event_url(calendar_url, uid)
transport = PinnedIPTransport(pinned_ip=pinned_ip, verify=verify_ssl)
async with httpx.AsyncClient(transport=transport, verify=verify_ssl, timeout=15) as client:
resp = await client.delete(url, auth=(username, password))
if resp.status_code not in (200, 204, 404):
resp.raise_for_status()
async def _http_propfind(
calendar_url: str, username: str, password: str, verify_ssl: bool,
) -> int:
"""Einfacher Verbindungstest via PROPFIND Depth:0. Gibt HTTP-Status zurück."""
pinned_ip = await _validate_caldav_url(calendar_url)
verify_ssl = _effective_verify_ssl(verify_ssl)
body = b'<?xml version="1.0"?><d:propfind xmlns:d="DAV:"><d:prop><d:resourcetype/></d:prop></d:propfind>'
transport = PinnedIPTransport(pinned_ip=pinned_ip, verify=verify_ssl)
async with httpx.AsyncClient(transport=transport, verify=verify_ssl, timeout=10) as client:
resp = await client.request(
"PROPFIND", calendar_url.rstrip("/") + "/",
content=body,
headers={"Content-Type": "application/xml; charset=utf-8", "Depth": "0"},
auth=(username, password),
)
return resp.status_code
# ── Service ───────────────────────────────────────────────────────────────────
class CalDavService:
# ── Config laden ──────────────────────────────────────────────────────────
async def get_company_config(
self, company_id: uuid.UUID, db: AsyncSession
) -> CaldavCompanyConfig | None:
return await db.scalar(
select(CaldavCompanyConfig).where(CaldavCompanyConfig.company_id == company_id)
)
async def get_user_config(
self, user_id: uuid.UUID, db: AsyncSession
) -> CaldavUserConfig | None:
return await db.scalar(
select(CaldavUserConfig).where(CaldavUserConfig.user_id == user_id)
)
# ── Sync-Operationen ──────────────────────────────────────────────────────
async def sync_approved(self, absence: Absence, db: AsyncSession) -> None:
"""Wird nach Genehmigung gerufen: Event in beide Kalender einpflegen."""
# User und AbsenceType laden (für VEVENT-Titel)
from app.models.user import User
from app.models.absence_type import AbsenceType
user = await db.get(User, absence.user_id)
atype = await db.get(AbsenceType, absence.type_id)
if not user or not atype:
return
if absence.caldav_uid is None:
absence.caldav_uid = str(uuid.uuid4())
description = absence.note or ""
# Persönlicher Kalender: nur Abwesenheitsart, kein Name
personal_ical = _build_ical(
absence.caldav_uid, atype.name,
absence.start_date, absence.end_date, description,
)
user_cfg = await self.get_user_config(user.id, db)
if user_cfg and user_cfg.enabled and user_cfg.calendar_url:
try:
pw = decrypt_password(user_cfg.password_encrypted)
etag = await _http_put(
user_cfg.calendar_url, user_cfg.username, pw,
absence.caldav_uid, personal_ical, user_cfg.verify_ssl,
)
absence.caldav_user_etag = etag
except Exception as exc:
absence.caldav_last_error = f"User-Kalender: {exc}"
log.warning("CalDAV user sync failed for absence %s: %s", absence.id, exc)
# Firmenkalender: Titelformat per Konfiguration
company_cfg = await self.get_company_config(user.company_id, db)
if company_cfg and company_cfg.enabled and company_cfg.calendar_url:
company_summary = _format_summary(user, atype.name, company_cfg.name_template)
company_ical = _build_ical(
absence.caldav_uid, company_summary,
absence.start_date, absence.end_date, description,
)
try:
pw = decrypt_password(company_cfg.password_encrypted)
etag = await _http_put(
company_cfg.calendar_url, company_cfg.username, pw,
absence.caldav_uid, company_ical, company_cfg.verify_ssl,
)
absence.caldav_company_etag = etag
except Exception as exc:
err = f"Firmen-Kalender: {exc}"
absence.caldav_last_error = (
(absence.caldav_last_error + " | " + err) if absence.caldav_last_error else err
)
log.warning("CalDAV company sync failed for absence %s: %s", absence.id, exc)
absence.caldav_synced_at = datetime.now(timezone.utc)
async def sync_removed(self, absence: Absence, db: AsyncSession) -> None:
"""Wird nach Ablehnung/Stornierung gerufen: Event aus Kalendern löschen."""
if not absence.caldav_uid:
return
from app.models.user import User
user = await db.get(User, absence.user_id)
if not user:
return
# Persönlicher Kalender
user_cfg = await self.get_user_config(user.id, db)
if user_cfg and user_cfg.enabled and user_cfg.calendar_url:
try:
pw = decrypt_password(user_cfg.password_encrypted)
await _http_delete(
user_cfg.calendar_url, user_cfg.username, pw,
absence.caldav_uid, user_cfg.verify_ssl,
)
absence.caldav_user_etag = None
except Exception as exc:
log.warning("CalDAV user delete failed for absence %s: %s", absence.id, exc)
# Firmenkalender
company_cfg = await self.get_company_config(user.company_id, db)
if company_cfg and company_cfg.enabled and company_cfg.calendar_url:
try:
pw = decrypt_password(company_cfg.password_encrypted)
await _http_delete(
company_cfg.calendar_url, company_cfg.username, pw,
absence.caldav_uid, company_cfg.verify_ssl,
)
absence.caldav_company_etag = None
except Exception as exc:
log.warning("CalDAV company delete failed for absence %s: %s", absence.id, exc)
absence.caldav_last_error = None
absence.caldav_synced_at = datetime.now(timezone.utc)
async def resync_all_approved(self, company_id: uuid.UUID, db: AsyncSession) -> dict:
"""Alle genehmigten Abwesenheiten der Firma neu synchronisieren."""
from app.models.absence import AbsenceStatus
from app.models.user import User
result = await db.scalars(
select(Absence)
.join(Absence.user)
.where(
Absence.status == AbsenceStatus.APPROVED,
User.company_id == company_id,
)
)
absences = list(result.all())
ok = 0
failed = 0
for absence in absences:
try:
await self.sync_approved(absence, db)
ok += 1
except Exception as exc:
failed += 1
log.error("Resync failed for absence %s: %s", absence.id, exc)
return {"synced": ok, "failed": failed, "total": len(absences)}
# ── Verbindungstest ───────────────────────────────────────────────────────
async def test_config(
self, cfg: Union[CaldavCompanyConfig, CaldavUserConfig]
) -> dict:
if not cfg.calendar_url:
return {"ok": False, "error": "Keine Kalender-URL konfiguriert."}
try:
pw = decrypt_password(cfg.password_encrypted)
status = await _http_propfind(cfg.calendar_url, cfg.username, pw, cfg.verify_ssl)
if status in (200, 207):
return {"ok": True, "status": status}
return {"ok": False, "error": f"Server antwortete mit HTTP {status}"}
except Exception as exc:
return {"ok": False, "error": str(exc)}
caldav_service = CalDavService()