fbc04bc2c0
- Fix conftest.py: commit after each request in override_get_db so
preview_csv's rollback no longer wipes the shared registered_user
(root cause of 401 cascade across test_user_import + test_personnel_number)
- Fix limiter.enabled=False in client fixture (blocks rate-limit 429)
- Fix user_import_service: allow reactivation when personnel number
belongs to the same user being reactivated
- Fix test_personnel_number: use PATCH /companies/me (not /companies/{id})
and add try/finally cleanup for personnel_number_required flag
- Frontend UsersPage: add CSV import modal with template download,
preview/validation table, and guarded apply button
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
309 lines
11 KiB
Python
309 lines
11 KiB
Python
"""User CSV Bulk Import – validates, creates new users or reactivates deactivated ones."""
|
||
from __future__ import annotations
|
||
|
||
import csv
|
||
import io
|
||
import re
|
||
from dataclasses import dataclass
|
||
from datetime import datetime, timedelta, timezone
|
||
from uuid import UUID
|
||
|
||
from sqlalchemy import select
|
||
from sqlalchemy.ext.asyncio import AsyncSession
|
||
|
||
from app.core.security import generate_invite_token, hash_password
|
||
from app.models.company import Company, PersonnelNumberMode
|
||
from app.models.user import User, UserRole
|
||
from app.services.email_service import email_service
|
||
from app.services.user_service import user_service
|
||
|
||
|
||
REQUIRED_HEADERS = ["email", "first_name", "last_name"]
|
||
OPTIONAL_HEADERS = ["role", "personnel_number", "kuerzel"]
|
||
TEMPLATE_HEADERS = REQUIRED_HEADERS + OPTIONAL_HEADERS
|
||
|
||
EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
|
||
PERSONNEL_RE = re.compile(r"^[0-9]+$")
|
||
VALID_ROLES = {r.value for r in UserRole if r != UserRole.SUPER_ADMIN}
|
||
|
||
|
||
# ── Datenstrukturen ──────────────────────────────────────────────────────────
|
||
|
||
@dataclass
|
||
class ImportRowResult:
|
||
row: int
|
||
email: str
|
||
personnel_number: str | None
|
||
action: str # created | reactivated | error
|
||
message: str | None = None
|
||
|
||
|
||
@dataclass
|
||
class ImportResult:
|
||
total_rows: int
|
||
created: int
|
||
reactivated: int
|
||
errors: int
|
||
items: list[ImportRowResult]
|
||
|
||
|
||
# ── CSV-Parsing ──────────────────────────────────────────────────────────────
|
||
|
||
def build_template_csv() -> str:
|
||
"""CSV template returned via /users/import-template.csv."""
|
||
buf = io.StringIO()
|
||
writer = csv.writer(buf)
|
||
writer.writerow(TEMPLATE_HEADERS)
|
||
writer.writerow([
|
||
"max@firma.de", "Max", "Mustermann",
|
||
"EMPLOYEE", "0042", "MM",
|
||
])
|
||
return buf.getvalue()
|
||
|
||
|
||
def _normalize(value: str | None) -> str:
|
||
return (value or "").strip()
|
||
|
||
|
||
def _parse_csv(content: bytes) -> tuple[list[dict[str, str]], list[str]]:
|
||
"""Parse CSV bytes (BOM-safe). Returns (rows, header_errors)."""
|
||
text = content.decode("utf-8-sig")
|
||
reader = csv.DictReader(io.StringIO(text))
|
||
if reader.fieldnames is None:
|
||
return [], ["CSV ist leer oder kein gültiger Header gefunden."]
|
||
headers = [h.strip() for h in reader.fieldnames]
|
||
missing = [h for h in REQUIRED_HEADERS if h not in headers]
|
||
if missing:
|
||
return [], [f"Pflicht-Spalten fehlen: {', '.join(missing)}"]
|
||
rows = list(reader)
|
||
return rows, []
|
||
|
||
|
||
# ── Import-Kern (Preview & Apply gemeinsam) ──────────────────────────────────
|
||
|
||
async def _process_import(
|
||
*,
|
||
content: bytes,
|
||
company_id: UUID,
|
||
invited_by: User,
|
||
db: AsyncSession,
|
||
apply: bool,
|
||
) -> ImportResult:
|
||
"""Process CSV bulk import. apply=False = validation only (no DB writes, rolled back)."""
|
||
rows, header_errors = _parse_csv(content)
|
||
items: list[ImportRowResult] = []
|
||
|
||
if header_errors:
|
||
for msg in header_errors:
|
||
items.append(ImportRowResult(
|
||
row=0, email="", personnel_number=None, action="error", message=msg,
|
||
))
|
||
return ImportResult(total_rows=0, created=0, reactivated=0, errors=len(items), items=items)
|
||
|
||
company = await db.get(Company, company_id)
|
||
if company is None:
|
||
items.append(ImportRowResult(
|
||
row=0, email="", personnel_number=None, action="error", message="Firma nicht gefunden.",
|
||
))
|
||
return ImportResult(total_rows=0, created=0, reactivated=0, errors=1, items=items)
|
||
|
||
seen_emails_in_csv: set[str] = set()
|
||
used_personnel_in_csv: set[str] = set()
|
||
created = 0
|
||
reactivated = 0
|
||
errors = 0
|
||
|
||
for idx, raw in enumerate(rows, start=2): # CSV row numbers start at 2 (after header)
|
||
email = _normalize(raw.get("email")).lower()
|
||
first_name = _normalize(raw.get("first_name"))
|
||
last_name = _normalize(raw.get("last_name"))
|
||
role_str = _normalize(raw.get("role")) or UserRole.EMPLOYEE.value
|
||
personnel_number = _normalize(raw.get("personnel_number")) or None
|
||
kuerzel = _normalize(raw.get("kuerzel")) or None
|
||
|
||
# Validation
|
||
if not email or not EMAIL_RE.match(email):
|
||
items.append(ImportRowResult(
|
||
row=idx, email=email, personnel_number=personnel_number,
|
||
action="error", message="Ungültige E-Mail-Adresse.",
|
||
))
|
||
errors += 1
|
||
continue
|
||
if not first_name or not last_name:
|
||
items.append(ImportRowResult(
|
||
row=idx, email=email, personnel_number=personnel_number,
|
||
action="error", message="Vor- und Nachname sind Pflicht.",
|
||
))
|
||
errors += 1
|
||
continue
|
||
if role_str not in VALID_ROLES:
|
||
items.append(ImportRowResult(
|
||
row=idx, email=email, personnel_number=personnel_number,
|
||
action="error", message=f"Ungültige Rolle: {role_str}",
|
||
))
|
||
errors += 1
|
||
continue
|
||
if personnel_number is not None and not PERSONNEL_RE.match(personnel_number):
|
||
items.append(ImportRowResult(
|
||
row=idx, email=email, personnel_number=personnel_number,
|
||
action="error", message="Personalnummer darf nur Ziffern enthalten.",
|
||
))
|
||
errors += 1
|
||
continue
|
||
|
||
# Doppelte Mail im Import → Fehler
|
||
if email in seen_emails_in_csv:
|
||
items.append(ImportRowResult(
|
||
row=idx, email=email, personnel_number=personnel_number,
|
||
action="error", message="E-Mail kommt im Import mehrfach vor.",
|
||
))
|
||
errors += 1
|
||
continue
|
||
seen_emails_in_csv.add(email)
|
||
|
||
# Doppelte Personalnr. im Import → Fehler
|
||
if personnel_number and personnel_number in used_personnel_in_csv:
|
||
items.append(ImportRowResult(
|
||
row=idx, email=email, personnel_number=personnel_number,
|
||
action="error", message="Personalnummer kommt im Import mehrfach vor.",
|
||
))
|
||
errors += 1
|
||
continue
|
||
|
||
# Personalnr.-Konflikt mit DB? Eigene Nummer (Reaktivierung) zulassen.
|
||
if personnel_number:
|
||
taken = await db.scalar(
|
||
select(User).where(
|
||
User.company_id == company_id,
|
||
User.personnel_number == personnel_number,
|
||
)
|
||
)
|
||
if taken is not None and taken.email.lower() != email:
|
||
items.append(ImportRowResult(
|
||
row=idx, email=email, personnel_number=personnel_number,
|
||
action="error", message="Personalnummer ist bereits vergeben.",
|
||
))
|
||
errors += 1
|
||
continue
|
||
|
||
# Auto-Vergabe wenn leer (auch im Manuell-Modus laut Anforderung)
|
||
if not personnel_number:
|
||
personnel_number = await user_service._next_personnel_number(company_id, db)
|
||
|
||
# E-Mail-Konflikt prüfen (auch deaktivierte User in derselben Firma)
|
||
existing_user = await db.scalar(
|
||
select(User).where(User.email == email)
|
||
)
|
||
|
||
if existing_user is not None and existing_user.is_active:
|
||
items.append(ImportRowResult(
|
||
row=idx, email=email, personnel_number=personnel_number,
|
||
action="error", message="E-Mail bereits aktiv vergeben.",
|
||
))
|
||
errors += 1
|
||
continue
|
||
|
||
if existing_user is not None and not existing_user.is_active:
|
||
# Reaktivieren
|
||
if existing_user.company_id != company_id:
|
||
items.append(ImportRowResult(
|
||
row=idx, email=email, personnel_number=personnel_number,
|
||
action="error", message="E-Mail existiert in anderer Firma.",
|
||
))
|
||
errors += 1
|
||
continue
|
||
existing_user.first_name = first_name
|
||
existing_user.last_name = last_name
|
||
existing_user.role = UserRole(role_str)
|
||
if kuerzel:
|
||
existing_user.kuerzel = kuerzel
|
||
# Personalnr.: behalten, falls schon vorhanden (Reservierung), sonst setzen
|
||
if not existing_user.personnel_number:
|
||
existing_user.personnel_number = personnel_number
|
||
else:
|
||
personnel_number = existing_user.personnel_number
|
||
existing_user.is_active = True
|
||
used_personnel_in_csv.add(personnel_number)
|
||
items.append(ImportRowResult(
|
||
row=idx, email=email, personnel_number=personnel_number,
|
||
action="reactivated",
|
||
))
|
||
reactivated += 1
|
||
continue
|
||
|
||
# Neuanlage: Invite-Token generieren, User inaktiv (warten auf Annahme)
|
||
raw_token, token_hash = generate_invite_token()
|
||
new_user = User(
|
||
company_id=company_id,
|
||
email=email,
|
||
first_name=first_name,
|
||
last_name=last_name,
|
||
role=UserRole(role_str),
|
||
kuerzel=kuerzel,
|
||
personnel_number=personnel_number,
|
||
password_hash=hash_password(raw_token),
|
||
invite_token_hash=token_hash,
|
||
invite_expires=datetime.now(timezone.utc) + timedelta(days=7),
|
||
is_active=False,
|
||
)
|
||
db.add(new_user)
|
||
await db.flush()
|
||
used_personnel_in_csv.add(personnel_number)
|
||
|
||
if apply:
|
||
try:
|
||
await email_service.send_invite(new_user, invited_by, raw_token, db)
|
||
except Exception as e: # noqa: BLE001
|
||
# Mail-Fehler darf Import nicht abbrechen, wird aber gemeldet
|
||
items.append(ImportRowResult(
|
||
row=idx, email=email, personnel_number=personnel_number,
|
||
action="created",
|
||
message=f"Anlage OK, aber Einladungs-Mail fehlgeschlagen: {e}",
|
||
))
|
||
created += 1
|
||
continue
|
||
|
||
items.append(ImportRowResult(
|
||
row=idx, email=email, personnel_number=personnel_number,
|
||
action="created",
|
||
))
|
||
created += 1
|
||
|
||
return ImportResult(
|
||
total_rows=len(rows),
|
||
created=created,
|
||
reactivated=reactivated,
|
||
errors=errors,
|
||
items=items,
|
||
)
|
||
|
||
|
||
async def preview_csv(
|
||
content: bytes, company_id: UUID, invited_by: User, db: AsyncSession,
|
||
) -> ImportResult:
|
||
"""Validiert CSV ohne DB-Schreibvorgänge (Rollback am Ende)."""
|
||
result = await _process_import(
|
||
content=content,
|
||
company_id=company_id,
|
||
invited_by=invited_by,
|
||
db=db,
|
||
apply=False,
|
||
)
|
||
await db.rollback()
|
||
return result
|
||
|
||
|
||
async def apply_csv(
|
||
content: bytes, company_id: UUID, invited_by: User, db: AsyncSession,
|
||
) -> ImportResult:
|
||
"""Führt Import durch und committet."""
|
||
result = await _process_import(
|
||
content=content,
|
||
company_id=company_id,
|
||
invited_by=invited_by,
|
||
db=db,
|
||
apply=True,
|
||
)
|
||
await db.commit()
|
||
return result
|