from datetime import datetime, timedelta, timezone from uuid import UUID from fastapi import HTTPException from sqlalchemy import func, or_, select, text from sqlalchemy.ext.asyncio import AsyncSession from app.core.security import ( generate_invite_token, hash_password, hash_token, verify_password, ) from app.models import User, UserRole from app.models.audit_log import AuditLog from app.models.company import Company, PersonnelNumberMode from app.schemas.user import InviteAccept, InviteRequest, UserUpdate from app.services.email_service import email_service PERSONNEL_NUMBER_MIN_DIGITS = 4 class UserService: # ── Personalnummer-Helpers ──────────────────────────────────────────────── async def _get_company(self, company_id: UUID, db: AsyncSession) -> Company: company = await db.get(Company, company_id) if not company: raise HTTPException(status_code=404, detail="Company not found") return company @staticmethod def _format_personnel_number(value: int) -> str: return str(value).zfill(PERSONNEL_NUMBER_MIN_DIGITS) async def _next_personnel_number(self, company_id: UUID, db: AsyncSession) -> str: """Atomic increment + return next personnel number for the company. Uses UPDATE ... RETURNING to avoid race conditions with parallel inserts. Skips numbers that are already taken (e.g. manual override) by retrying. """ for _ in range(50): # safety bound, in practice 1-2 iterations max result = await db.execute( text( "UPDATE companies " "SET personnel_number_next = personnel_number_next + 1 " "WHERE id = :cid " "RETURNING personnel_number_next - 1 AS used" ), {"cid": company_id}, ) row = result.first() if row is None: raise HTTPException(status_code=404, detail="Company not found") candidate = self._format_personnel_number(int(row.used)) existing = await db.scalar( select(User.id).where( User.company_id == company_id, User.personnel_number == candidate, ) ) if existing is None: return candidate raise HTTPException(status_code=500, detail="Could not allocate personnel number") async def next_personnel_suggestion(self, company_id: UUID, db: AsyncSession) -> str: """Preview next personnel number without consuming the counter.""" company = await self._get_company(company_id, db) candidate_int = company.personnel_number_next while True: candidate = self._format_personnel_number(candidate_int) taken = await db.scalar( select(User.id).where( User.company_id == company_id, User.personnel_number == candidate, ) ) if taken is None: return candidate candidate_int += 1 async def _check_personnel_unique( self, company_id: UUID, number: str, db: AsyncSession, exclude_user_id: UUID | None = None, ) -> None: """Raise 409 if personnel number is already taken (incl. deactivated/reserved).""" q = select(User.id).where( User.company_id == company_id, User.personnel_number == number, ) if exclude_user_id is not None: q = q.where(User.id != exclude_user_id) existing = await db.scalar(q) if existing is not None: raise HTTPException( status_code=409, detail=f"Personalnummer '{number}' ist bereits vergeben (auch reservierte Nummern bleiben belegt).", ) async def get_by_personnel_number( self, number: str, company_id: UUID, db: AsyncSession ) -> User: user = await db.scalar( select(User).where( User.company_id == company_id, User.personnel_number == number, ) ) if user is None: raise HTTPException(status_code=404, detail="Personalnummer nicht gefunden") return user # ── Invite ──────────────────────────────────────────────────────────────── async def invite( self, data: InviteRequest, company_id: UUID, invited_by: User, db: AsyncSession, ) -> User: existing = await db.scalar(select(User).where(User.email == data.email)) if existing: raise HTTPException(status_code=400, detail="Email already registered") company = await self._get_company(company_id, db) personnel_number = data.personnel_number if personnel_number: await self._check_personnel_unique(company_id, personnel_number, db) else: if company.personnel_number_mode == PersonnelNumberMode.AUTO.value: personnel_number = await self._next_personnel_number(company_id, db) elif company.personnel_number_required: raise HTTPException( status_code=400, detail="Personalnummer ist in dieser Firma Pflicht.", ) if data.initial_password: # Direktanlage mit Passwort – sofort aktiv, kein E-Mail-Invite user = User( company_id=company_id, email=data.email, first_name=data.first_name, last_name=data.last_name, role=data.role, department_id=data.department_id, personnel_number=personnel_number, password_hash=hash_password(data.initial_password), is_active=True, ) db.add(user) await db.flush() db.add(AuditLog( company_id=company_id, user_id=invited_by.id, action="user_created", entity_type="user", entity_id=user.id, new_value={"email": user.email, "role": user.role.value, "direct": True}, )) else: raw_token, token_hash = generate_invite_token() user = User( company_id=company_id, email=data.email, first_name=data.first_name, last_name=data.last_name, role=data.role, department_id=data.department_id, personnel_number=personnel_number, password_hash=hash_password(raw_token), # Temp – overwritten on accept invite_token_hash=token_hash, invite_expires=datetime.now(timezone.utc) + timedelta(days=7), is_active=False, ) db.add(user) await db.flush() db.add(AuditLog( company_id=company_id, user_id=invited_by.id, action="user_invited", entity_type="user", entity_id=user.id, new_value={"email": user.email, "role": user.role.value}, )) await email_service.send_invite(user, invited_by, raw_token, db) return user async def accept_invite(self, data: InviteAccept, db: AsyncSession) -> User: token_hash = hash_token(data.token) user = await db.scalar( select(User).where(User.invite_token_hash == token_hash) ) if not user: raise HTTPException(status_code=400, detail="Invalid invite token") if user.invite_expires and user.invite_expires < datetime.now(timezone.utc): raise HTTPException(status_code=400, detail="Invite token expired") user.password_hash = hash_password(data.password) user.invite_token_hash = None user.invite_expires = None user.is_active = True return user # ── Listing ─────────────────────────────────────────────────────────────── async def list_users( self, company_id: UUID, db: AsyncSession, skip: int = 0, limit: int = 50, active_only: bool = True, search: str | None = None, ) -> tuple[int, list[User]]: q = select(User).where(User.company_id == company_id) if active_only: q = q.where(User.is_active == True) if search: pattern = f"%{search.strip()}%" q = q.where( or_( User.email.ilike(pattern), User.first_name.ilike(pattern), User.last_name.ilike(pattern), User.personnel_number.ilike(pattern), ) ) total = await db.scalar(select(func.count()).select_from(q.subquery())) users = await db.scalars(q.offset(skip).limit(limit)) return total, list(users.all()) async def get_by_id(self, user_id: UUID, company_id: UUID, db: AsyncSession) -> User: user = await db.get(User, user_id) if not user or user.company_id != company_id: raise HTTPException(status_code=404, detail="User not found") return user # ── Update / De/Reactivate ──────────────────────────────────────────────── async def update( self, user_id: UUID, data: UserUpdate, current_user: User, db: AsyncSession, ) -> User: user = await self.get_by_id(user_id, current_user.company_id, db) changes = data.model_dump(exclude_unset=True) old_personnel = user.personnel_number if "personnel_number" in changes: new_value = changes["personnel_number"] if new_value: await self._check_personnel_unique( current_user.company_id, new_value, db, exclude_user_id=user.id ) elif user.personnel_number is not None: # Explizites Löschen erlauben? Plan sagt Reservierung – wir verbieten Clear. raise HTTPException( status_code=400, detail="Personalnummer kann nicht gelöscht werden (Reservierung).", ) for field, value in changes.items(): setattr(user, field, value) if "personnel_number" in changes and changes["personnel_number"] != old_personnel: db.add(AuditLog( company_id=current_user.company_id, user_id=current_user.id, action="user_personnel_number_changed", entity_type="user", entity_id=user.id, old_value={"personnel_number": old_personnel}, new_value={"personnel_number": user.personnel_number}, )) return user async def deactivate(self, user_id: UUID, current_user: User, db: AsyncSession) -> User: if user_id == current_user.id: raise HTTPException(status_code=400, detail="Cannot deactivate your own account") user = await self.get_by_id(user_id, current_user.company_id, db) user.is_active = False return user async def reactivate(self, user_id: UUID, current_user: User, db: AsyncSession) -> User: user = await self.get_by_id(user_id, current_user.company_id, db) user.is_active = True return user # ── Kiosk ───────────────────────────────────────────────────────────────── async def set_kiosk_pin(self, user: User, pin: str, db: AsyncSession) -> None: user.kiosk_pin_hash = hash_password(pin) async def verify_kiosk_pin(self, user: User, pin: str) -> bool: if not user.kiosk_pin_hash: return False return verify_password(pin, user.kiosk_pin_hash) user_service = UserService()