""" CalDAV-Sync für Abwesenheiten. Logik: approve → VEVENT in persönlichem Kalender (CaldavUserConfig) + VEVENT in Firmenkalender (CaldavCompanyConfig) reject / cancel → DELETE aus beiden Kalendern Verwendet httpx für die HTTP-Kommunikation und icalendar für iCal-Erzeugung. Passwörter werden Fernet-verschlüsselt gespeichert (gleiche Methode wie SMTP/LDAP). """ from __future__ import annotations import asyncio import base64 import hashlib import logging import uuid from datetime import date, timedelta, timezone, datetime from typing import Union import httpx from icalendar import Calendar, Event from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.core.config import settings from app.models.absence import Absence from app.models.caldav_config import CaldavCompanyConfig, CaldavUserConfig log = logging.getLogger(__name__) # ── Crypto (shared with SMTP/LDAP) ──────────────────────────────────────────── def _fernet(): from cryptography.fernet import Fernet key = hashlib.sha256(settings.secret_key.encode()).digest() return Fernet(base64.urlsafe_b64encode(key)) def encrypt_password(plain: str) -> str: return _fernet().encrypt(plain.encode()).decode() def decrypt_password(encrypted: str) -> str: return _fernet().decrypt(encrypted.encode()).decode() # ── iCal builder ────────────────────────────────────────────────────────────── def _build_ical(uid: str, summary: str, start: date, end: date, description: str = "") -> bytes: """Erzeugt einen VCALENDAR-Blob für ein ganztägiges Ereignis.""" cal = Calendar() cal.add("prodid", "-//TimeMaster//DE") cal.add("version", "2.0") ev = Event() ev.add("uid", f"{uid}@timemaster") ev.add("dtstart", start) ev.add("dtend", end + timedelta(days=1)) # DTEND ist exklusiv ev.add("summary", summary) if description: ev.add("description", description) ev.add("status", "CONFIRMED") ev.add("transp", "TRANSPARENT") # zeigt keine Verfügbarkeit als blockiert ev.add("dtstamp", datetime.now(timezone.utc)) cal.add_component(ev) return cal.to_ical() # ── Kalender-Titel formatieren ──────────────────────────────────────────────── def _format_summary(user: "User", absence_type: str, name_template: str) -> str: """ Ersetzt Platzhalter im name_template: $vorname → vollständiger Vorname $nachname → vollständiger Nachname $vorname_short → erster Buchstabe Vorname $nachname_middle → erste 3 Buchstaben Nachname $kuerzel → manuell gesetztes Kürzel (Fallback: Initialen) $personalnummer → Personalnummer (leer wenn nicht gesetzt) $typ → Abwesenheitsart """ kuerzel = user.kuerzel if user.kuerzel else (user.first_name[:1] + user.last_name[:1]).upper() result = name_template result = result.replace("$vorname_short", user.first_name[:1]) result = result.replace("$nachname_middle", user.last_name[:3]) result = result.replace("$vorname", user.first_name) result = result.replace("$nachname", user.last_name) result = result.replace("$kuerzel", kuerzel) result = result.replace("$personalnummer", user.personnel_number or "") result = result.replace("$typ", absence_type) return result # ── HTTP helpers ─────────────────────────────────────────────────────────────── def _event_url(calendar_url: str, uid: str) -> str: return calendar_url.rstrip("/") + f"/{uid}.ics" async def _http_put( calendar_url: str, username: str, password: str, uid: str, ical: bytes, verify_ssl: bool, ) -> str: """PUT event. Returns ETag (empty string if server doesn't send one).""" url = _event_url(calendar_url, uid) async with httpx.AsyncClient(verify=verify_ssl, timeout=15) as client: resp = await client.put( url, content=ical, headers={"Content-Type": "text/calendar; charset=utf-8"}, auth=(username, password), ) resp.raise_for_status() return resp.headers.get("ETag", "") async def _http_delete( calendar_url: str, username: str, password: str, uid: str, verify_ssl: bool, ) -> None: url = _event_url(calendar_url, uid) async with httpx.AsyncClient(verify=verify_ssl, timeout=15) as client: resp = await client.delete(url, auth=(username, password)) if resp.status_code not in (200, 204, 404): resp.raise_for_status() async def _http_propfind( calendar_url: str, username: str, password: str, verify_ssl: bool, ) -> int: """Einfacher Verbindungstest via PROPFIND Depth:0. Gibt HTTP-Status zurück.""" body = b'' async with httpx.AsyncClient(verify=verify_ssl, timeout=10) as client: resp = await client.request( "PROPFIND", calendar_url.rstrip("/") + "/", content=body, headers={"Content-Type": "application/xml; charset=utf-8", "Depth": "0"}, auth=(username, password), ) return resp.status_code # ── Service ─────────────────────────────────────────────────────────────────── class CalDavService: # ── Config laden ────────────────────────────────────────────────────────── async def get_company_config( self, company_id: uuid.UUID, db: AsyncSession ) -> CaldavCompanyConfig | None: return await db.scalar( select(CaldavCompanyConfig).where(CaldavCompanyConfig.company_id == company_id) ) async def get_user_config( self, user_id: uuid.UUID, db: AsyncSession ) -> CaldavUserConfig | None: return await db.scalar( select(CaldavUserConfig).where(CaldavUserConfig.user_id == user_id) ) # ── Sync-Operationen ────────────────────────────────────────────────────── async def sync_approved(self, absence: Absence, db: AsyncSession) -> None: """Wird nach Genehmigung gerufen: Event in beide Kalender einpflegen.""" # User und AbsenceType laden (für VEVENT-Titel) from app.models.user import User from app.models.absence_type import AbsenceType user = await db.get(User, absence.user_id) atype = await db.get(AbsenceType, absence.type_id) if not user or not atype: return if absence.caldav_uid is None: absence.caldav_uid = str(uuid.uuid4()) description = absence.note or "" # Persönlicher Kalender: nur Abwesenheitsart, kein Name personal_ical = _build_ical( absence.caldav_uid, atype.name, absence.start_date, absence.end_date, description, ) user_cfg = await self.get_user_config(user.id, db) if user_cfg and user_cfg.enabled and user_cfg.calendar_url: try: pw = decrypt_password(user_cfg.password_encrypted) etag = await _http_put( user_cfg.calendar_url, user_cfg.username, pw, absence.caldav_uid, personal_ical, user_cfg.verify_ssl, ) absence.caldav_user_etag = etag except Exception as exc: absence.caldav_last_error = f"User-Kalender: {exc}" log.warning("CalDAV user sync failed for absence %s: %s", absence.id, exc) # Firmenkalender: Titelformat per Konfiguration company_cfg = await self.get_company_config(user.company_id, db) if company_cfg and company_cfg.enabled and company_cfg.calendar_url: company_summary = _format_summary(user, atype.name, company_cfg.name_template) company_ical = _build_ical( absence.caldav_uid, company_summary, absence.start_date, absence.end_date, description, ) try: pw = decrypt_password(company_cfg.password_encrypted) etag = await _http_put( company_cfg.calendar_url, company_cfg.username, pw, absence.caldav_uid, company_ical, company_cfg.verify_ssl, ) absence.caldav_company_etag = etag except Exception as exc: err = f"Firmen-Kalender: {exc}" absence.caldav_last_error = ( (absence.caldav_last_error + " | " + err) if absence.caldav_last_error else err ) log.warning("CalDAV company sync failed for absence %s: %s", absence.id, exc) absence.caldav_synced_at = datetime.now(timezone.utc) async def sync_removed(self, absence: Absence, db: AsyncSession) -> None: """Wird nach Ablehnung/Stornierung gerufen: Event aus Kalendern löschen.""" if not absence.caldav_uid: return from app.models.user import User user = await db.get(User, absence.user_id) if not user: return # Persönlicher Kalender user_cfg = await self.get_user_config(user.id, db) if user_cfg and user_cfg.enabled and user_cfg.calendar_url: try: pw = decrypt_password(user_cfg.password_encrypted) await _http_delete( user_cfg.calendar_url, user_cfg.username, pw, absence.caldav_uid, user_cfg.verify_ssl, ) absence.caldav_user_etag = None except Exception as exc: log.warning("CalDAV user delete failed for absence %s: %s", absence.id, exc) # Firmenkalender company_cfg = await self.get_company_config(user.company_id, db) if company_cfg and company_cfg.enabled and company_cfg.calendar_url: try: pw = decrypt_password(company_cfg.password_encrypted) await _http_delete( company_cfg.calendar_url, company_cfg.username, pw, absence.caldav_uid, company_cfg.verify_ssl, ) absence.caldav_company_etag = None except Exception as exc: log.warning("CalDAV company delete failed for absence %s: %s", absence.id, exc) absence.caldav_last_error = None absence.caldav_synced_at = datetime.now(timezone.utc) async def resync_all_approved(self, company_id: uuid.UUID, db: AsyncSession) -> dict: """Alle genehmigten Abwesenheiten der Firma neu synchronisieren.""" from app.models.absence import AbsenceStatus from app.models.user import User result = await db.scalars( select(Absence) .join(Absence.user) .where( Absence.status == AbsenceStatus.APPROVED, User.company_id == company_id, ) ) absences = list(result.all()) ok = 0 failed = 0 for absence in absences: try: await self.sync_approved(absence, db) ok += 1 except Exception as exc: failed += 1 log.error("Resync failed for absence %s: %s", absence.id, exc) return {"synced": ok, "failed": failed, "total": len(absences)} # ── Verbindungstest ─────────────────────────────────────────────────────── async def test_config( self, cfg: Union[CaldavCompanyConfig, CaldavUserConfig] ) -> dict: if not cfg.calendar_url: return {"ok": False, "error": "Keine Kalender-URL konfiguriert."} try: pw = decrypt_password(cfg.password_encrypted) status = await _http_propfind(cfg.calendar_url, cfg.username, pw, cfg.verify_ssl) if status in (200, 207): return {"ok": True, "status": status} return {"ok": False, "error": f"Server antwortete mit HTTP {status}"} except Exception as exc: return {"ok": False, "error": str(exc)} caldav_service = CalDavService()