"""User CSV Bulk Import – validates, creates new users or reactivates deactivated ones.""" from __future__ import annotations import csv import io import re from dataclasses import dataclass from datetime import datetime, timedelta, timezone from uuid import UUID from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.core.security import generate_invite_token, hash_password from app.models.company import Company, PersonnelNumberMode from app.models.user import User, UserRole from app.services.email_service import email_service from app.services.user_service import user_service REQUIRED_HEADERS = ["email", "first_name", "last_name"] OPTIONAL_HEADERS = ["role", "personnel_number", "kuerzel"] TEMPLATE_HEADERS = REQUIRED_HEADERS + OPTIONAL_HEADERS EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$") PERSONNEL_RE = re.compile(r"^[0-9]+$") VALID_ROLES = {r.value for r in UserRole if r != UserRole.SUPER_ADMIN} # ── Datenstrukturen ────────────────────────────────────────────────────────── @dataclass class ImportRowResult: row: int email: str personnel_number: str | None action: str # created | reactivated | error message: str | None = None @dataclass class ImportResult: total_rows: int created: int reactivated: int errors: int items: list[ImportRowResult] # ── CSV-Parsing ────────────────────────────────────────────────────────────── def build_template_csv() -> str: """CSV template returned via /users/import-template.csv.""" buf = io.StringIO() writer = csv.writer(buf) writer.writerow(TEMPLATE_HEADERS) writer.writerow([ "max@firma.de", "Max", "Mustermann", "EMPLOYEE", "0042", "MM", ]) return buf.getvalue() def _normalize(value: str | None) -> str: return (value or "").strip() def _parse_csv(content: bytes) -> tuple[list[dict[str, str]], list[str]]: """Parse CSV bytes (BOM-safe). Returns (rows, header_errors).""" text = content.decode("utf-8-sig") reader = csv.DictReader(io.StringIO(text)) if reader.fieldnames is None: return [], ["CSV ist leer oder kein gültiger Header gefunden."] headers = [h.strip() for h in reader.fieldnames] missing = [h for h in REQUIRED_HEADERS if h not in headers] if missing: return [], [f"Pflicht-Spalten fehlen: {', '.join(missing)}"] rows = list(reader) return rows, [] # ── Import-Kern (Preview & Apply gemeinsam) ────────────────────────────────── async def _process_import( *, content: bytes, company_id: UUID, invited_by: User, db: AsyncSession, apply: bool, ) -> ImportResult: """Process CSV bulk import. apply=False = validation only (no DB writes, rolled back).""" rows, header_errors = _parse_csv(content) items: list[ImportRowResult] = [] if header_errors: for msg in header_errors: items.append(ImportRowResult( row=0, email="", personnel_number=None, action="error", message=msg, )) return ImportResult(total_rows=0, created=0, reactivated=0, errors=len(items), items=items) company = await db.get(Company, company_id) if company is None: items.append(ImportRowResult( row=0, email="", personnel_number=None, action="error", message="Firma nicht gefunden.", )) return ImportResult(total_rows=0, created=0, reactivated=0, errors=1, items=items) seen_emails_in_csv: set[str] = set() used_personnel_in_csv: set[str] = set() created = 0 reactivated = 0 errors = 0 for idx, raw in enumerate(rows, start=2): # CSV row numbers start at 2 (after header) email = _normalize(raw.get("email")).lower() first_name = _normalize(raw.get("first_name")) last_name = _normalize(raw.get("last_name")) role_str = _normalize(raw.get("role")) or UserRole.EMPLOYEE.value personnel_number = _normalize(raw.get("personnel_number")) or None kuerzel = _normalize(raw.get("kuerzel")) or None # Validation if not email or not EMAIL_RE.match(email): items.append(ImportRowResult( row=idx, email=email, personnel_number=personnel_number, action="error", message="Ungültige E-Mail-Adresse.", )) errors += 1 continue if not first_name or not last_name: items.append(ImportRowResult( row=idx, email=email, personnel_number=personnel_number, action="error", message="Vor- und Nachname sind Pflicht.", )) errors += 1 continue if role_str not in VALID_ROLES: items.append(ImportRowResult( row=idx, email=email, personnel_number=personnel_number, action="error", message=f"Ungültige Rolle: {role_str}", )) errors += 1 continue if personnel_number is not None and not PERSONNEL_RE.match(personnel_number): items.append(ImportRowResult( row=idx, email=email, personnel_number=personnel_number, action="error", message="Personalnummer darf nur Ziffern enthalten.", )) errors += 1 continue # Doppelte Mail im Import → Fehler if email in seen_emails_in_csv: items.append(ImportRowResult( row=idx, email=email, personnel_number=personnel_number, action="error", message="E-Mail kommt im Import mehrfach vor.", )) errors += 1 continue seen_emails_in_csv.add(email) # Doppelte Personalnr. im Import → Fehler if personnel_number and personnel_number in used_personnel_in_csv: items.append(ImportRowResult( row=idx, email=email, personnel_number=personnel_number, action="error", message="Personalnummer kommt im Import mehrfach vor.", )) errors += 1 continue # Personalnr.-Konflikt mit DB? Eigene Nummer (Reaktivierung) zulassen. if personnel_number: taken = await db.scalar( select(User).where( User.company_id == company_id, User.personnel_number == personnel_number, ) ) if taken is not None and taken.email.lower() != email: items.append(ImportRowResult( row=idx, email=email, personnel_number=personnel_number, action="error", message="Personalnummer ist bereits vergeben.", )) errors += 1 continue # Auto-Vergabe wenn leer (auch im Manuell-Modus laut Anforderung) if not personnel_number: personnel_number = await user_service._next_personnel_number(company_id, db) # E-Mail-Konflikt prüfen (auch deaktivierte User in derselben Firma) existing_user = await db.scalar( select(User).where(User.email == email) ) if existing_user is not None and existing_user.is_active: items.append(ImportRowResult( row=idx, email=email, personnel_number=personnel_number, action="error", message="E-Mail bereits aktiv vergeben.", )) errors += 1 continue if existing_user is not None and not existing_user.is_active: # Reaktivieren if existing_user.company_id != company_id: items.append(ImportRowResult( row=idx, email=email, personnel_number=personnel_number, action="error", message="E-Mail existiert in anderer Firma.", )) errors += 1 continue existing_user.first_name = first_name existing_user.last_name = last_name existing_user.role = UserRole(role_str) if kuerzel: existing_user.kuerzel = kuerzel # Personalnr.: behalten, falls schon vorhanden (Reservierung), sonst setzen if not existing_user.personnel_number: existing_user.personnel_number = personnel_number else: personnel_number = existing_user.personnel_number existing_user.is_active = True used_personnel_in_csv.add(personnel_number) items.append(ImportRowResult( row=idx, email=email, personnel_number=personnel_number, action="reactivated", )) reactivated += 1 continue # Neuanlage: Invite-Token generieren, User inaktiv (warten auf Annahme) raw_token, token_hash = generate_invite_token() new_user = User( company_id=company_id, email=email, first_name=first_name, last_name=last_name, role=UserRole(role_str), kuerzel=kuerzel, personnel_number=personnel_number, password_hash=hash_password(raw_token), invite_token_hash=token_hash, invite_expires=datetime.now(timezone.utc) + timedelta(days=7), is_active=False, ) db.add(new_user) await db.flush() used_personnel_in_csv.add(personnel_number) if apply: try: await email_service.send_invite(new_user, invited_by, raw_token, db) except Exception as e: # noqa: BLE001 # Mail-Fehler darf Import nicht abbrechen, wird aber gemeldet items.append(ImportRowResult( row=idx, email=email, personnel_number=personnel_number, action="created", message=f"Anlage OK, aber Einladungs-Mail fehlgeschlagen: {e}", )) created += 1 continue items.append(ImportRowResult( row=idx, email=email, personnel_number=personnel_number, action="created", )) created += 1 return ImportResult( total_rows=len(rows), created=created, reactivated=reactivated, errors=errors, items=items, ) async def preview_csv( content: bytes, company_id: UUID, invited_by: User, db: AsyncSession, ) -> ImportResult: """Validiert CSV ohne DB-Schreibvorgänge (Rollback am Ende).""" result = await _process_import( content=content, company_id=company_id, invited_by=invited_by, db=db, apply=False, ) await db.rollback() return result async def apply_csv( content: bytes, company_id: UUID, invited_by: User, db: AsyncSession, ) -> ImportResult: """Führt Import durch und committet.""" result = await _process_import( content=content, company_id=company_id, invited_by=invited_by, db=db, apply=True, ) await db.commit() return result