Files
patrick fbc04bc2c0 agent-07 phase 2: fix test isolation + CSV import UI
- Fix conftest.py: commit after each request in override_get_db so
  preview_csv's rollback no longer wipes the shared registered_user
  (root cause of 401 cascade across test_user_import + test_personnel_number)
- Fix limiter.enabled=False in client fixture (blocks rate-limit 429)
- Fix user_import_service: allow reactivation when personnel number
  belongs to the same user being reactivated
- Fix test_personnel_number: use PATCH /companies/me (not /companies/{id})
  and add try/finally cleanup for personnel_number_required flag
- Frontend UsersPage: add CSV import modal with template download,
  preview/validation table, and guarded apply button

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-23 21:07:32 +02:00

309 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""User CSV Bulk Import validates, creates new users or reactivates deactivated ones."""
from __future__ import annotations
import csv
import io
import re
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from uuid import UUID
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.security import generate_invite_token, hash_password
from app.models.company import Company, PersonnelNumberMode
from app.models.user import User, UserRole
from app.services.email_service import email_service
from app.services.user_service import user_service
REQUIRED_HEADERS = ["email", "first_name", "last_name"]
OPTIONAL_HEADERS = ["role", "personnel_number", "kuerzel"]
TEMPLATE_HEADERS = REQUIRED_HEADERS + OPTIONAL_HEADERS
EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
PERSONNEL_RE = re.compile(r"^[0-9]+$")
VALID_ROLES = {r.value for r in UserRole if r != UserRole.SUPER_ADMIN}
# ── Datenstrukturen ──────────────────────────────────────────────────────────
@dataclass
class ImportRowResult:
row: int
email: str
personnel_number: str | None
action: str # created | reactivated | error
message: str | None = None
@dataclass
class ImportResult:
total_rows: int
created: int
reactivated: int
errors: int
items: list[ImportRowResult]
# ── CSV-Parsing ──────────────────────────────────────────────────────────────
def build_template_csv() -> str:
"""CSV template returned via /users/import-template.csv."""
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerow(TEMPLATE_HEADERS)
writer.writerow([
"max@firma.de", "Max", "Mustermann",
"EMPLOYEE", "0042", "MM",
])
return buf.getvalue()
def _normalize(value: str | None) -> str:
return (value or "").strip()
def _parse_csv(content: bytes) -> tuple[list[dict[str, str]], list[str]]:
"""Parse CSV bytes (BOM-safe). Returns (rows, header_errors)."""
text = content.decode("utf-8-sig")
reader = csv.DictReader(io.StringIO(text))
if reader.fieldnames is None:
return [], ["CSV ist leer oder kein gültiger Header gefunden."]
headers = [h.strip() for h in reader.fieldnames]
missing = [h for h in REQUIRED_HEADERS if h not in headers]
if missing:
return [], [f"Pflicht-Spalten fehlen: {', '.join(missing)}"]
rows = list(reader)
return rows, []
# ── Import-Kern (Preview & Apply gemeinsam) ──────────────────────────────────
async def _process_import(
*,
content: bytes,
company_id: UUID,
invited_by: User,
db: AsyncSession,
apply: bool,
) -> ImportResult:
"""Process CSV bulk import. apply=False = validation only (no DB writes, rolled back)."""
rows, header_errors = _parse_csv(content)
items: list[ImportRowResult] = []
if header_errors:
for msg in header_errors:
items.append(ImportRowResult(
row=0, email="", personnel_number=None, action="error", message=msg,
))
return ImportResult(total_rows=0, created=0, reactivated=0, errors=len(items), items=items)
company = await db.get(Company, company_id)
if company is None:
items.append(ImportRowResult(
row=0, email="", personnel_number=None, action="error", message="Firma nicht gefunden.",
))
return ImportResult(total_rows=0, created=0, reactivated=0, errors=1, items=items)
seen_emails_in_csv: set[str] = set()
used_personnel_in_csv: set[str] = set()
created = 0
reactivated = 0
errors = 0
for idx, raw in enumerate(rows, start=2): # CSV row numbers start at 2 (after header)
email = _normalize(raw.get("email")).lower()
first_name = _normalize(raw.get("first_name"))
last_name = _normalize(raw.get("last_name"))
role_str = _normalize(raw.get("role")) or UserRole.EMPLOYEE.value
personnel_number = _normalize(raw.get("personnel_number")) or None
kuerzel = _normalize(raw.get("kuerzel")) or None
# Validation
if not email or not EMAIL_RE.match(email):
items.append(ImportRowResult(
row=idx, email=email, personnel_number=personnel_number,
action="error", message="Ungültige E-Mail-Adresse.",
))
errors += 1
continue
if not first_name or not last_name:
items.append(ImportRowResult(
row=idx, email=email, personnel_number=personnel_number,
action="error", message="Vor- und Nachname sind Pflicht.",
))
errors += 1
continue
if role_str not in VALID_ROLES:
items.append(ImportRowResult(
row=idx, email=email, personnel_number=personnel_number,
action="error", message=f"Ungültige Rolle: {role_str}",
))
errors += 1
continue
if personnel_number is not None and not PERSONNEL_RE.match(personnel_number):
items.append(ImportRowResult(
row=idx, email=email, personnel_number=personnel_number,
action="error", message="Personalnummer darf nur Ziffern enthalten.",
))
errors += 1
continue
# Doppelte Mail im Import → Fehler
if email in seen_emails_in_csv:
items.append(ImportRowResult(
row=idx, email=email, personnel_number=personnel_number,
action="error", message="E-Mail kommt im Import mehrfach vor.",
))
errors += 1
continue
seen_emails_in_csv.add(email)
# Doppelte Personalnr. im Import → Fehler
if personnel_number and personnel_number in used_personnel_in_csv:
items.append(ImportRowResult(
row=idx, email=email, personnel_number=personnel_number,
action="error", message="Personalnummer kommt im Import mehrfach vor.",
))
errors += 1
continue
# Personalnr.-Konflikt mit DB? Eigene Nummer (Reaktivierung) zulassen.
if personnel_number:
taken = await db.scalar(
select(User).where(
User.company_id == company_id,
User.personnel_number == personnel_number,
)
)
if taken is not None and taken.email.lower() != email:
items.append(ImportRowResult(
row=idx, email=email, personnel_number=personnel_number,
action="error", message="Personalnummer ist bereits vergeben.",
))
errors += 1
continue
# Auto-Vergabe wenn leer (auch im Manuell-Modus laut Anforderung)
if not personnel_number:
personnel_number = await user_service._next_personnel_number(company_id, db)
# E-Mail-Konflikt prüfen (auch deaktivierte User in derselben Firma)
existing_user = await db.scalar(
select(User).where(User.email == email)
)
if existing_user is not None and existing_user.is_active:
items.append(ImportRowResult(
row=idx, email=email, personnel_number=personnel_number,
action="error", message="E-Mail bereits aktiv vergeben.",
))
errors += 1
continue
if existing_user is not None and not existing_user.is_active:
# Reaktivieren
if existing_user.company_id != company_id:
items.append(ImportRowResult(
row=idx, email=email, personnel_number=personnel_number,
action="error", message="E-Mail existiert in anderer Firma.",
))
errors += 1
continue
existing_user.first_name = first_name
existing_user.last_name = last_name
existing_user.role = UserRole(role_str)
if kuerzel:
existing_user.kuerzel = kuerzel
# Personalnr.: behalten, falls schon vorhanden (Reservierung), sonst setzen
if not existing_user.personnel_number:
existing_user.personnel_number = personnel_number
else:
personnel_number = existing_user.personnel_number
existing_user.is_active = True
used_personnel_in_csv.add(personnel_number)
items.append(ImportRowResult(
row=idx, email=email, personnel_number=personnel_number,
action="reactivated",
))
reactivated += 1
continue
# Neuanlage: Invite-Token generieren, User inaktiv (warten auf Annahme)
raw_token, token_hash = generate_invite_token()
new_user = User(
company_id=company_id,
email=email,
first_name=first_name,
last_name=last_name,
role=UserRole(role_str),
kuerzel=kuerzel,
personnel_number=personnel_number,
password_hash=hash_password(raw_token),
invite_token_hash=token_hash,
invite_expires=datetime.now(timezone.utc) + timedelta(days=7),
is_active=False,
)
db.add(new_user)
await db.flush()
used_personnel_in_csv.add(personnel_number)
if apply:
try:
await email_service.send_invite(new_user, invited_by, raw_token, db)
except Exception as e: # noqa: BLE001
# Mail-Fehler darf Import nicht abbrechen, wird aber gemeldet
items.append(ImportRowResult(
row=idx, email=email, personnel_number=personnel_number,
action="created",
message=f"Anlage OK, aber Einladungs-Mail fehlgeschlagen: {e}",
))
created += 1
continue
items.append(ImportRowResult(
row=idx, email=email, personnel_number=personnel_number,
action="created",
))
created += 1
return ImportResult(
total_rows=len(rows),
created=created,
reactivated=reactivated,
errors=errors,
items=items,
)
async def preview_csv(
content: bytes, company_id: UUID, invited_by: User, db: AsyncSession,
) -> ImportResult:
"""Validiert CSV ohne DB-Schreibvorgänge (Rollback am Ende)."""
result = await _process_import(
content=content,
company_id=company_id,
invited_by=invited_by,
db=db,
apply=False,
)
await db.rollback()
return result
async def apply_csv(
content: bytes, company_id: UUID, invited_by: User, db: AsyncSession,
) -> ImportResult:
"""Führt Import durch und committet."""
result = await _process_import(
content=content,
company_id=company_id,
invited_by=invited_by,
db=db,
apply=True,
)
await db.commit()
return result