"""Kimai CSV Import Service – parst Kimai-Export und erzeugt TimeEntries + Absences.""" from __future__ import annotations import csv import io import uuid from dataclasses import dataclass, field from datetime import date, datetime, time, timedelta from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.models.absence import Absence, AbsenceStatus from app.models.absence_type import AbsenceType from app.models.time_entry import EntrySource, EntryStatus, TimeEntry from app.models.user import User # --------------------------------------------------------------------------- # Datenstrukturen für die Vorschau # --------------------------------------------------------------------------- @dataclass class KimaiRow: date: date start: time end: time duration_sec: int projekt: str taetigkeit: str beschreibung: str @dataclass class ImportPreviewEntry: kind: str # "time" | "absence" date_from: str date_to: str start: str | None end: str | None break_minutes: int worked_hours: float | None absence_type: str | None note: str | None skipped: bool = False skip_reason: str | None = None @dataclass class ImportResult: preview: list[ImportPreviewEntry] = field(default_factory=list) time_imported: int = 0 absence_imported: int = 0 skipped: int = 0 errors: list[str] = field(default_factory=list) # --------------------------------------------------------------------------- # Hilfsfunktionen # --------------------------------------------------------------------------- def _parse_time(s: str) -> time: parts = s.strip().split(":") return time(int(parts[0]), int(parts[1])) def _parse_date(s: str) -> date: return datetime.strptime(s.strip(), "%Y-%m-%d").date() def _gross_minutes(start: time, end: time) -> int: return (end.hour * 60 + end.minute) - (start.hour * 60 + start.minute) def _break_minutes(row: KimaiRow) -> int: gross = _gross_minutes(row.start, row.end) net = row.duration_sec // 60 return max(0, gross - net) def _worked_hours(row: KimaiRow) -> float: net_min = row.duration_sec / 60 return round(net_min / 60, 2) def _is_vacation_row(row: KimaiRow) -> bool: return row.projekt.strip().lower() == "urlaub" def _note(row: KimaiRow) -> str | None: """ Notiz aus Beschreibung; falls leer, Tätigkeit – außer 'Reguläre Arbeitszeit' (das ist der Standard und braucht keine eigene Notiz). """ desc = row.beschreibung.strip() if desc: return desc taet = row.taetigkeit.strip() if taet and taet.lower() != "reguläre arbeitszeit": return taet return None def _absence_type_name(row: KimaiRow) -> str: """Ermittelt Abwesenheitstyp aus Beschreibung.""" desc = row.beschreibung.strip().lower() if "sonderurlaub" in desc: return "Sonderurlaub" return "Urlaub" def _group_vacation_rows(rows: list[KimaiRow]) -> list[tuple[date, date, str, str]]: """ Gruppiert aufeinanderfolgende Urlaubszeilen (gleicher Typ) zu Abwesenheitsblöcken. Gibt Liste von (start_date, end_date, abs_type_name, note) zurück. """ if not rows: return [] rows_sorted = sorted(rows, key=lambda r: r.date) groups: list[tuple[date, date, str, str]] = [] cur_start = rows_sorted[0].date cur_end = rows_sorted[0].date cur_type = _absence_type_name(rows_sorted[0]) cur_note = rows_sorted[0].beschreibung.strip() for row in rows_sorted[1:]: t = _absence_type_name(row) # Aufeinanderfolgend = max. 3 Tage Abstand (Wochenende überbrücken) gap = (row.date - cur_end).days if t == cur_type and gap <= 3: cur_end = row.date if row.beschreibung.strip() and row.beschreibung.strip() not in cur_note: cur_note = (cur_note + " / " + row.beschreibung.strip()).strip(" /") else: groups.append((cur_start, cur_end, cur_type, cur_note)) cur_start = row.date cur_end = row.date cur_type = t cur_note = row.beschreibung.strip() groups.append((cur_start, cur_end, cur_type, cur_note)) return groups # --------------------------------------------------------------------------- # CSV-Parser # --------------------------------------------------------------------------- def parse_kimai_csv(content: bytes) -> tuple[list[KimaiRow], list[str]]: """Parst Kimai-CSV-Bytes, gibt (rows, errors) zurück.""" rows: list[KimaiRow] = [] errors: list[str] = [] text = content.decode("utf-8-sig") # BOM-safe reader = csv.DictReader(io.StringIO(text)) for i, row in enumerate(reader, start=2): try: rows.append(KimaiRow( date=_parse_date(row["Datum"]), start=_parse_time(row["Von"]), end=_parse_time(row["Bis"]), duration_sec=int(row["Dauer"]), projekt=row.get("Projekt", ""), taetigkeit=row.get("Tätigkeit", ""), beschreibung=row.get("Beschreibung", ""), )) except Exception as e: errors.append(f"Zeile {i}: {e}") return rows, errors # --------------------------------------------------------------------------- # Preview (keine DB-Änderungen) # --------------------------------------------------------------------------- async def preview_kimai_import( content: bytes, target_user_id: uuid.UUID, db: AsyncSession, ) -> ImportResult: result = ImportResult() rows, parse_errors = parse_kimai_csv(content) result.errors.extend(parse_errors) # Bestehende Zeiteinträge: Duplikat-Prüfung auf (date, start_time, end_time) existing_q = await db.execute( select(TimeEntry.date, TimeEntry.start_time, TimeEntry.end_time) .where(TimeEntry.user_id == target_user_id) ) existing_slots: set[tuple] = {(r.date, r.start_time, r.end_time) for r in existing_q} # Abwesenheitstypen laden types_q = await db.execute(select(AbsenceType)) abs_types: dict[str, AbsenceType] = {t.name: t for t in types_q.scalars().all()} time_rows = [r for r in rows if not _is_vacation_row(r)] vac_rows = [r for r in rows if _is_vacation_row(r)] # Zeiteinträge seen_slots: set[tuple] = set() for row in time_rows: slot = (row.date, row.start, row.end) skip = slot in existing_slots or slot in seen_slots if not skip: seen_slots.add(slot) brk = _break_minutes(row) result.preview.append(ImportPreviewEntry( kind="time", date_from=row.date.isoformat(), date_to=row.date.isoformat(), start=row.start.strftime("%H:%M"), end=row.end.strftime("%H:%M"), break_minutes=brk, worked_hours=_worked_hours(row), absence_type=None, note=_note(row), skipped=skip, skip_reason="Bereits vorhanden (gleiche Zeit)" if skip else None, )) # Bestehende Abwesenheiten für Duplikat-Prüfung existing_abs_q = await db.execute( select(AbsenceType.id).where(AbsenceType.id.in_([t.id for t in abs_types.values()])) ) from app.models.absence import Absence as AbsenceModel existing_abs_q2 = await db.execute( select(AbsenceModel.start_date, AbsenceModel.end_date, AbsenceModel.type_id) .where(AbsenceModel.user_id == target_user_id) ) existing_absences: set[tuple] = {(r.start_date, r.end_date, r.type_id) for r in existing_abs_q2} # Urlaubsblöcke for start, end, type_name, note in _group_vacation_rows(vac_rows): t = abs_types.get(type_name) already_exists = t is not None and (start, end, t.id) in existing_absences skip = t is None or already_exists result.preview.append(ImportPreviewEntry( kind="absence", date_from=start.isoformat(), date_to=end.isoformat(), start=None, end=None, break_minutes=0, worked_hours=None, absence_type=type_name, note=note or None, skipped=skip, skip_reason=( f"Abwesenheitstyp '{type_name}' nicht gefunden" if t is None else "Bereits vorhanden" if already_exists else None ), )) result.skipped = sum(1 for p in result.preview if p.skipped) return result # --------------------------------------------------------------------------- # Eigentlicher Import (mit DB-Änderungen) # --------------------------------------------------------------------------- async def run_kimai_import( content: bytes, target_user_id: uuid.UUID, approver_id: uuid.UUID, db: AsyncSession, ) -> ImportResult: result = ImportResult() rows, parse_errors = parse_kimai_csv(content) result.errors.extend(parse_errors) # User + Company laden user_q = await db.execute(select(User).where(User.id == target_user_id)) user = user_q.scalar_one_or_none() if not user: result.errors.append("Ziel-User nicht gefunden.") return result # Bestehende Zeiteinträge: Duplikat-Prüfung auf (date, start_time, end_time) existing_q = await db.execute( select(TimeEntry.date, TimeEntry.start_time, TimeEntry.end_time) .where(TimeEntry.user_id == target_user_id) ) existing_slots: set[tuple] = {(r.date, r.start_time, r.end_time) for r in existing_q} # Abwesenheitstypen types_q = await db.execute( select(AbsenceType).where(AbsenceType.company_id == user.company_id) ) abs_types: dict[str, AbsenceType] = {t.name: t for t in types_q.scalars().all()} time_rows = [r for r in rows if not _is_vacation_row(r)] vac_rows = [r for r in rows if _is_vacation_row(r)] # ---- Zeiteinträge ---- seen_slots: set[tuple] = set() for row in time_rows: slot = (row.date, row.start, row.end) if slot in existing_slots or slot in seen_slots: result.skipped += 1 continue seen_slots.add(slot) brk = _break_minutes(row) entry = TimeEntry( id=uuid.uuid4(), user_id=target_user_id, date=row.date, start_time=row.start, end_time=row.end, break_minutes=brk, status=EntryStatus.APPROVED, source=EntrySource.API, approved_by=approver_id, note=_note(row), ) db.add(entry) result.time_imported += 1 # Bestehende Abwesenheiten für Duplikat-Prüfung existing_abs_q = await db.execute( select(Absence.start_date, Absence.end_date, Absence.type_id) .where(Absence.user_id == target_user_id) ) existing_absences: set[tuple] = {(r.start_date, r.end_date, r.type_id) for r in existing_abs_q} # ---- Urlaubsblöcke ---- for start, end, type_name, note in _group_vacation_rows(vac_rows): t = abs_types.get(type_name) if not t: result.errors.append(f"Abwesenheitstyp '{type_name}' nicht gefunden – übersprungen.") result.skipped += 1 continue if (start, end, t.id) in existing_absences: result.skipped += 1 continue # Arbeitstage zählen (Mo–Fr, keine Feiertage) working_days = sum( 1 for n in range((end - start).days + 1) if (start + timedelta(days=n)).weekday() < 5 ) absence = Absence( id=uuid.uuid4(), user_id=target_user_id, type_id=t.id, start_date=start, end_date=end, working_days=working_days, status=AbsenceStatus.APPROVED, approved_by=approver_id, note=note or None, ) db.add(absence) result.absence_imported += 1 await db.commit() return result