Initial commit – TimeMaster Zeiterfassung & HR-Tool

Stand: agent-06 (Audit-Log), agent-05 (Krankmeldung), agent-07 Phase 1 (Personalnummer),
Busylight-Pull-Integration, TOTP/2FA, Abwesenheiten, Zeiterfassung, Kiosk-Grundgerüst.
Migrations 0001–0023 deployed auf 192.168.1.137 + .164.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
sysops
2026-05-23 20:03:27 +02:00
commit 1fedd683e0
178 changed files with 29896 additions and 0 deletions
+772
View File
@@ -0,0 +1,772 @@
import asyncio
from datetime import date, timedelta
from uuid import UUID
from fastapi import HTTPException
from sqlalchemy import and_, func, or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from decimal import Decimal
from app.models.absence import Absence, AbsenceStatus
from app.models.absence_type import AbsenceCategory, AbsenceType
from app.models.audit_log import AuditLog
from app.models.company import Company
from app.models.overtime_balance import OvertimeBalance
from app.models.public_holiday import PublicHoliday
from app.models.user import User, UserRole
from app.models.vacation_balance import VacationBalance
from app.models.work_schedule import WorkSchedule
from app.schemas.absence import AbsenceCreate, AbsenceReject, AbsenceTypeCreate, AbsenceTypeUpdate
_manager_roles = (UserRole.MANAGER, UserRole.HR, UserRole.COMPANY_ADMIN, UserRole.SUPER_ADMIN)
class AbsenceService:
# ── AbsenceTypes ─────────────────────────────────────────────────────────
async def list_types(self, company_id: UUID, db: AsyncSession) -> list[AbsenceType]:
result = await db.scalars(
select(AbsenceType)
.where(AbsenceType.company_id == company_id, AbsenceType.is_active == True)
.order_by(AbsenceType.name)
)
return list(result.all())
async def create_type(
self, company_id: UUID, data: AbsenceTypeCreate, db: AsyncSession
) -> AbsenceType:
at = AbsenceType(company_id=company_id, **data.model_dump())
db.add(at)
await db.flush()
return at
async def update_type(
self, type_id: UUID, company_id: UUID, data: AbsenceTypeUpdate, db: AsyncSession
) -> AbsenceType:
at = await self._get_type_or_404(type_id, company_id, db)
for field, value in data.model_dump(exclude_none=True).items():
setattr(at, field, value)
return at
async def create_defaults_for_company(self, company_id: UUID, db: AsyncSession) -> None:
"""Standard-Abwesenheitstypen + Standard-Arbeitsplan für ein neues Unternehmen anlegen."""
defaults = [
{
"name": "Urlaub", "color": "#3B82F6", "category": AbsenceCategory.VACATION,
"requires_approval": True, "deducts_vacation": True, "is_paid": True,
},
{
"name": "Krankheit", "color": "#EF4444", "category": AbsenceCategory.SICK,
"requires_approval": False, "deducts_vacation": False, "is_paid": True,
"requires_certificate": True, "certificate_after_days": 3,
},
{
"name": "Freizeitausgleich", "color": "#F59E0B", "category": AbsenceCategory.OVERTIME_COMP,
"requires_approval": True, "deducts_vacation": False,
"affects_overtime_balance": True, "is_paid": True,
},
{
"name": "Weiterbildung", "color": "#8B5CF6", "category": AbsenceCategory.TRAINING,
"requires_approval": True, "deducts_vacation": False, "is_paid": True,
"max_days_per_year": 5,
},
{
"name": "Dienstreise", "color": "#06B6D4", "category": AbsenceCategory.BUSINESS_TRIP,
"requires_approval": True, "deducts_vacation": False, "is_paid": True,
},
{
"name": "Homeoffice", "color": "#10B981", "category": AbsenceCategory.OTHER,
"requires_approval": True, "deducts_vacation": False, "is_paid": True,
},
{
"name": "Sonderurlaub", "color": "#84CC16", "category": AbsenceCategory.VACATION,
"requires_approval": True, "deducts_vacation": True, "is_paid": True,
},
]
for d in defaults:
db.add(AbsenceType(company_id=company_id, **d))
# Standard-Arbeitsplan: MoFr 8h
schedule = WorkSchedule(
company_id=company_id,
name="Vollzeit (40h)",
valid_from=date.today(),
)
db.add(schedule)
await db.flush()
# ── Absences ──────────────────────────────────────────────────────────────
async def list_absences(
self,
company_id: UUID,
current_user: User,
db: AsyncSession,
user_id: UUID | None = None,
type_id: UUID | None = None,
status: AbsenceStatus | None = None,
year: int | None = None,
) -> tuple[int, list[Absence]]:
q = (
select(Absence)
.join(User, Absence.user_id == User.id)
.where(User.company_id == company_id)
)
if current_user.role == UserRole.EMPLOYEE:
q = q.where(Absence.user_id == current_user.id)
elif user_id:
q = q.where(Absence.user_id == user_id)
if type_id:
q = q.where(Absence.type_id == type_id)
if status:
q = q.where(Absence.status == status)
if year:
q = q.where(Absence.start_date >= date(year, 1, 1), Absence.end_date <= date(year, 12, 31))
total = await db.scalar(select(func.count()).select_from(q.subquery())) or 0
result = await db.scalars(q.order_by(Absence.start_date.desc()))
return total, list(result.all())
async def get_by_id(self, absence_id: UUID, current_user: User, db: AsyncSession) -> Absence:
absence = await db.get(Absence, absence_id)
if absence is None:
raise HTTPException(status_code=404, detail="Abwesenheit nicht gefunden.")
if absence.user_id != current_user.id and current_user.role == UserRole.EMPLOYEE:
raise HTTPException(status_code=403, detail="Keine Berechtigung.")
return absence
async def create_absence(
self,
data: AbsenceCreate,
current_user: User,
db: AsyncSession,
) -> tuple[Absence, list[str]]:
# AbsenceType validieren
absence_type = await self._get_type_or_404(data.type_id, current_user.company_id, db)
# Arbeitstage berechnen
holidays = await self._get_holiday_dates(
current_user.company_id, data.start_date.year, db
)
working_days = self._calc_working_days(
data.start_date, data.end_date, holidays,
data.half_day_start, data.half_day_end
)
if working_days <= 0:
raise HTTPException(status_code=400, detail="Keine Arbeitstage im ausgewählten Zeitraum.")
# Urlaubskonto prüfen wenn Urlaub abgezogen werden soll
warnings: list[str] = []
if absence_type.deducts_vacation:
balance = await self._get_or_create_balance(current_user.id, data.start_date.year, db)
if balance.remaining_days < working_days:
warnings.append(
f"Urlaubskonto reicht möglicherweise nicht aus: "
f"{balance.remaining_days} Tage verfügbar, {working_days} Tage beantragt."
)
# Überschneidung mit eigenen Abwesenheiten prüfen
overlap = await db.scalar(
select(Absence).where(
and_(
Absence.user_id == current_user.id,
Absence.status != AbsenceStatus.CANCELLED,
Absence.status != AbsenceStatus.REJECTED,
Absence.start_date <= data.end_date,
Absence.end_date >= data.start_date,
)
)
)
if overlap:
warnings.append("Überschneidung mit bestehender Abwesenheit im selben Zeitraum.")
status = AbsenceStatus.PENDING if absence_type.requires_approval else AbsenceStatus.APPROVED
approved_by = None if absence_type.requires_approval else current_user.id
# Krankmeldung: AU-Pflicht-Datum automatisch berechnen.
# Reihenfolge: AbsenceType.certificate_after_days (override) → Company default.
certificate_required_by: date | None = None
if absence_type.category == AbsenceCategory.SICK and absence_type.requires_certificate:
company = await db.get(Company, current_user.company_id)
company_default = company.sick_note_required_after_days if company else 3
threshold = absence_type.certificate_after_days or company_default
certificate_required_by = data.start_date + timedelta(days=threshold)
absence = Absence(
user_id=current_user.id,
type_id=data.type_id,
start_date=data.start_date,
end_date=data.end_date,
half_day_start=data.half_day_start,
half_day_end=data.half_day_end,
working_days=working_days,
status=status,
approved_by=approved_by,
substitute_id=data.substitute_id,
note=data.note,
certificate_required_by=certificate_required_by,
)
db.add(absence)
await db.flush()
# Bei automatischer Genehmigung Konto abziehen
if not absence_type.requires_approval and absence_type.deducts_vacation:
await self._deduct_vacation(current_user.id, data.start_date.year, int(working_days), db)
return absence, warnings
async def update_absence(
self, absence_id: UUID, data: "AbsenceUpdate", current_user: User, db: AsyncSession
) -> Absence:
from app.schemas.absence import AbsenceUpdate
absence = await db.get(Absence, absence_id)
if absence is None:
raise HTTPException(status_code=404, detail="Abwesenheit nicht gefunden.")
# Mitarbeiter: nur eigene; Manager: gleiche Company
if current_user.role == UserRole.EMPLOYEE:
if absence.user_id != current_user.id:
raise HTTPException(status_code=403, detail="Keine Berechtigung.")
else:
owner = await db.get(User, absence.user_id)
if owner is None or owner.company_id != current_user.company_id:
raise HTTPException(status_code=403, detail="Zugriff verweigert.")
is_manager = current_user.role in _manager_roles
if absence.status not in (AbsenceStatus.PENDING, AbsenceStatus.APPROVED):
raise HTTPException(status_code=409, detail="Nur ausstehende oder genehmigte Anträge können bearbeitet werden.")
if absence.status == AbsenceStatus.APPROVED and not is_manager:
# Mitarbeiter stellt Änderungswunsch → Begründung Pflicht, Status zurück auf pending
if not data.correction_note or not data.correction_note.strip():
raise HTTPException(status_code=422, detail="Änderungsgrund ist bei genehmigten Anträgen Pflicht.")
if data.type_id is not None:
await self._get_type_or_404(data.type_id, current_user.company_id, db)
absence.type_id = data.type_id
if data.start_date is not None:
absence.start_date = data.start_date
if data.end_date is not None:
absence.end_date = data.end_date
if data.half_day_start is not None:
absence.half_day_start = data.half_day_start
if data.half_day_end is not None:
absence.half_day_end = data.half_day_end
if data.substitute_id is not None:
absence.substitute_id = data.substitute_id
if data.note is not None:
absence.note = data.note
if data.correction_note is not None:
absence.correction_note = data.correction_note.strip() or None
# Genehmigter Antrag: Mitarbeiter-Änderung → zurück auf pending (erneute Genehmigung)
was_approved = absence.status == AbsenceStatus.APPROVED
if was_approved and not is_manager:
absence.status = AbsenceStatus.PENDING
absence.approved_by = None
# Arbeitstage neu berechnen
holiday_dates = await self._get_holiday_dates(current_user.company_id, absence.start_date.year, db)
absence.working_days = Decimal(str(
self._calc_working_days(absence.start_date, absence.end_date,
holiday_dates, absence.half_day_start, absence.half_day_end)
))
# Audit-Log
action = "absence_change_request" if (was_approved and not is_manager) else "absence_updated"
db.add(AuditLog(
user_id=current_user.id,
action=action,
entity_type="absence",
entity_id=absence.id,
old_value={"status": "approved" if was_approved else "pending"},
new_value={
"status": absence.status.value,
"start_date": str(absence.start_date),
"end_date": str(absence.end_date),
"working_days": float(absence.working_days),
"correction_note": absence.correction_note,
},
))
return absence
async def cancel_absence(
self, absence_id: UUID, current_user: User, db: AsyncSession
) -> Absence:
absence = await db.get(Absence, absence_id)
if absence is None:
raise HTTPException(status_code=404, detail="Abwesenheit nicht gefunden.")
if absence.user_id != current_user.id:
raise HTTPException(status_code=403, detail="Nur eigene Anträge können storniert werden.")
if absence.status != AbsenceStatus.PENDING:
raise HTTPException(status_code=409, detail="Nur ausstehende Anträge können gelöscht werden.")
absence.status = AbsenceStatus.CANCELLED
# Audit-Log (DSGVO)
db.add(AuditLog(
company_id=current_user.company_id,
user_id=current_user.id,
action="absence_cancelled",
entity_type="absence",
entity_id=absence.id,
old_value={"status": "pending"},
new_value={
"status": "cancelled",
"cancelled_by": str(current_user.id),
"absence_user_id": str(absence.user_id),
"start_date": str(absence.start_date),
"end_date": str(absence.end_date),
"working_days": float(absence.working_days),
},
))
from app.services.caldav_service import caldav_service
asyncio.create_task(caldav_service.sync_removed(absence, db))
return absence
async def approve_absence(
self, absence_id: UUID, current_user: User, db: AsyncSession
) -> Absence:
if current_user.role not in (UserRole.MANAGER, UserRole.HR, UserRole.COMPANY_ADMIN, UserRole.SUPER_ADMIN):
raise HTTPException(status_code=403, detail="Keine Berechtigung.")
absence = await db.get(Absence, absence_id)
if absence is None:
raise HTTPException(status_code=404, detail="Abwesenheit nicht gefunden.")
requester = await db.get(User, absence.user_id)
if requester is None or requester.company_id != current_user.company_id:
raise HTTPException(status_code=403, detail="Zugriff verweigert.")
if absence.user_id == current_user.id:
raise HTTPException(
status_code=409,
detail="Eigene Abwesenheitsanträge können nicht selbst genehmigt werden."
)
if absence.status != AbsenceStatus.PENDING:
raise HTTPException(status_code=409, detail="Nur ausstehende Anträge können genehmigt werden.")
absence.status = AbsenceStatus.APPROVED
absence.approved_by = current_user.id
absence_type = await db.get(AbsenceType, absence.type_id)
# Urlaubskonto abziehen wenn nötig
if absence_type and absence_type.deducts_vacation:
await self._deduct_vacation(absence.user_id, absence.start_date.year, int(absence.working_days), db)
# Überstundenkonto abziehen wenn Freizeitausgleich
if absence_type and absence_type.affects_overtime_balance:
await self._deduct_overtime(absence.user_id, absence.working_days, db)
# Audit-Log (DSGVO)
db.add(AuditLog(
company_id=current_user.company_id,
user_id=current_user.id,
action="absence_approved",
entity_type="absence",
entity_id=absence.id,
old_value={"status": "pending"},
new_value={
"status": "approved",
"approved_by": str(current_user.id),
"approved_by_name": current_user.full_name,
"absence_user_id": str(absence.user_id),
"start_date": str(absence.start_date),
"end_date": str(absence.end_date),
"working_days": float(absence.working_days),
},
))
# CalDAV-Sync (fire & forget Fehler blockieren nicht die Genehmigung)
from app.services.caldav_service import caldav_service
asyncio.create_task(caldav_service.sync_approved(absence, db))
return absence
async def reject_absence(
self, absence_id: UUID, data: AbsenceReject, current_user: User, db: AsyncSession
) -> Absence:
if current_user.role not in (UserRole.MANAGER, UserRole.HR, UserRole.COMPANY_ADMIN, UserRole.SUPER_ADMIN):
raise HTTPException(status_code=403, detail="Keine Berechtigung.")
absence = await db.get(Absence, absence_id)
if absence is None:
raise HTTPException(status_code=404, detail="Abwesenheit nicht gefunden.")
requester = await db.get(User, absence.user_id)
if requester is None or requester.company_id != current_user.company_id:
raise HTTPException(status_code=403, detail="Zugriff verweigert.")
if absence.status != AbsenceStatus.PENDING:
raise HTTPException(status_code=409, detail="Nur ausstehende Anträge können abgelehnt werden.")
absence.status = AbsenceStatus.REJECTED
absence.approved_by = current_user.id
absence.rejection_reason = data.rejection_reason
# Audit-Log (DSGVO)
db.add(AuditLog(
company_id=current_user.company_id,
user_id=current_user.id,
action="absence_rejected",
entity_type="absence",
entity_id=absence.id,
old_value={"status": "pending"},
new_value={
"status": "rejected",
"rejection_reason": absence.rejection_reason,
"rejected_by": str(current_user.id),
"rejected_by_name": current_user.full_name,
"absence_user_id": str(absence.user_id),
"start_date": str(absence.start_date),
"end_date": str(absence.end_date),
"working_days": float(absence.working_days),
},
))
from app.services.caldav_service import caldav_service
asyncio.create_task(caldav_service.sync_removed(absence, db))
return absence
async def get_calendar(
self,
company_id: UUID,
year: int,
month: int | None,
db: AsyncSession,
) -> list[dict]:
q = (
select(Absence, User, AbsenceType)
.join(User, Absence.user_id == User.id)
.join(AbsenceType, Absence.type_id == AbsenceType.id)
.where(
User.company_id == company_id,
Absence.status.in_([AbsenceStatus.PENDING, AbsenceStatus.APPROVED]),
)
)
if month:
start = date(year, month, 1)
end = date(year, month, 28) + timedelta(days=4)
end = end.replace(day=1) - timedelta(days=1)
q = q.where(Absence.start_date <= end, Absence.end_date >= start)
else:
q = q.where(
Absence.start_date >= date(year, 1, 1),
Absence.end_date <= date(year, 12, 31),
)
result = await db.execute(q.order_by(Absence.start_date))
rows = result.all()
calendar = []
for absence, user, atype in rows:
calendar.append({
"user_id": user.id,
"user_name": user.full_name,
"absence_id": absence.id,
"type_name": atype.name,
"type_color": atype.color,
"start_date": absence.start_date,
"end_date": absence.end_date,
"status": absence.status,
"working_days": absence.working_days,
})
return calendar
# ── Urlaubskonto ──────────────────────────────────────────────────────────
async def get_balance(self, user_id: UUID, year: int, db: AsyncSession) -> VacationBalance:
return await self._get_or_create_balance(user_id, year, db)
async def get_pending_days(self, user_id: UUID, year: int, db: AsyncSession) -> float:
"""Summe der Arbeitstage aus ausstehenden Anträgen die Urlaub abziehen."""
q = (
select(func.sum(Absence.working_days))
.join(AbsenceType, Absence.type_id == AbsenceType.id)
.where(
Absence.user_id == user_id,
Absence.status == AbsenceStatus.PENDING,
AbsenceType.deducts_vacation.is_(True),
func.extract("year", Absence.start_date) == year,
)
)
result = await db.scalar(q)
return float(result or 0)
# ── Feiertage ─────────────────────────────────────────────────────────────
async def list_holidays(
self, year: int, country: str, state: str | None, db: AsyncSession
) -> list[PublicHoliday]:
q = select(PublicHoliday).where(
PublicHoliday.year == year, PublicHoliday.country == country
)
if state:
q = q.where(or_(PublicHoliday.state == state, PublicHoliday.state.is_(None)))
result = await db.scalars(q.order_by(PublicHoliday.date))
return list(result.all())
async def create_holiday(self, data, db: AsyncSession) -> PublicHoliday:
holiday = PublicHoliday(
country=data.country,
state=data.state,
date=data.date,
name=data.name,
year=data.date.year,
)
db.add(holiday)
await db.flush()
return holiday
# ── Helpers ───────────────────────────────────────────────────────────────
async def _get_type_or_404(
self, type_id: UUID, company_id: UUID, db: AsyncSession
) -> AbsenceType:
at = await db.get(AbsenceType, type_id)
if at is None or at.company_id != company_id:
raise HTTPException(status_code=404, detail="Abwesenheitstyp nicht gefunden.")
return at
async def _get_or_create_balance(
self, user_id: UUID, year: int, db: AsyncSession
) -> VacationBalance:
balance = await db.scalar(
select(VacationBalance).where(
VacationBalance.user_id == user_id, VacationBalance.year == year
)
)
if balance is None:
# Automatischer Übertrag: Resturlaub aus dem Vorjahr übernehmen
prev = await db.scalar(
select(VacationBalance).where(
VacationBalance.user_id == user_id, VacationBalance.year == year - 1
)
)
carried = max(0, prev.remaining_days) if prev else 0
entitled = prev.entitled_days if prev else 30
balance = VacationBalance(
user_id=user_id,
year=year,
entitled_days=entitled,
carried_over=carried,
)
db.add(balance)
await db.flush()
return balance
async def _deduct_vacation(
self, user_id: UUID, year: int, days: int, db: AsyncSession
) -> None:
balance = await self._get_or_create_balance(user_id, year, db)
balance.used_days += days
async def _deduct_overtime(
self, user_id: UUID, working_days: float, db: AsyncSession
) -> None:
"""Zieht working_days × tägliche Stunden vom Überstundenkonto ab."""
# Stunden/Tag aus Arbeitsplan ermitteln (Fallback: 8h)
user = await db.get(User, user_id)
daily_hours = Decimal("8.00")
if user and user.work_schedule_id:
schedule = await db.get(WorkSchedule, user.work_schedule_id)
if schedule:
working_days_in_week = sum(
1 for h in [schedule.mon_h, schedule.tue_h, schedule.wed_h,
schedule.thu_h, schedule.fri_h, schedule.sat_h, schedule.sun_h]
if h > 0
)
if working_days_in_week > 0:
daily_hours = schedule.weekly_hours / Decimal(working_days_in_week)
hours_to_deduct = Decimal(str(working_days)) * daily_hours
ob = await db.scalar(select(OvertimeBalance).where(OvertimeBalance.user_id == user_id))
if ob is None:
# Erstelle Eintrag mit 0 Überstunden — taken_hours kann negativ werden
company_id = user.company_id if user else None
if not company_id:
return
ob = OvertimeBalance(user_id=user_id, company_id=company_id)
db.add(ob)
await db.flush()
ob.taken_hours += hours_to_deduct
async def _get_holiday_dates(
self, company_id: UUID, year: int, db: AsyncSession
) -> set[date]:
"""Feiertage für die Company-Country holen."""
from app.models.company import Company
from sqlalchemy import or_
company = await db.get(Company, company_id)
country = company.country if company else "DE"
state = company.state if company else None
q = select(PublicHoliday.date).where(
PublicHoliday.year == year,
PublicHoliday.country == country,
)
if state:
q = q.where(or_(PublicHoliday.state == state, PublicHoliday.state.is_(None)))
result = await db.scalars(q)
return set(result.all())
@staticmethod
def _calc_working_days(
start: date,
end: date,
holidays: set[date],
half_day_start: bool,
half_day_end: bool,
) -> float:
count = 0.0
current = start
while current <= end:
if current.weekday() < 5 and current not in holidays:
count += 1.0
current += timedelta(days=1)
# Halbtage abziehen
if half_day_start and start.weekday() < 5 and start not in holidays:
count -= 0.5
if half_day_end and end.weekday() < 5 and end not in holidays and end != start:
count -= 0.5
return max(0.0, count)
# ── Krankmeldung ──────────────────────────────────────────────────────────
async def quick_sick(
self,
start: date,
end: date,
current_user: User,
db: AsyncSession,
) -> tuple[Absence, list[str]]:
"""Sofort-Krankmeldung: nutzt den ersten aktiven SICK-Typ der Firma."""
sick_type = await db.scalar(
select(AbsenceType)
.where(
AbsenceType.company_id == current_user.company_id,
AbsenceType.category == AbsenceCategory.SICK,
AbsenceType.is_active == True,
)
.order_by(AbsenceType.name)
.limit(1)
)
if sick_type is None:
raise HTTPException(status_code=404, detail="Kein aktiver Krankheits-Typ konfiguriert.")
if end < start:
raise HTTPException(status_code=400, detail="Enddatum darf nicht vor dem Startdatum liegen.")
create_data = AbsenceCreate(
type_id=sick_type.id,
start_date=start,
end_date=end,
)
return await self.create_absence(create_data, current_user, db)
async def mark_certificate_received(
self,
absence_id: UUID,
received_at: date | None,
current_user: User,
db: AsyncSession,
) -> Absence:
"""HR/Admin: AU-Bescheinigung als eingegangen markieren."""
if current_user.role not in (UserRole.HR, UserRole.COMPANY_ADMIN, UserRole.SUPER_ADMIN):
raise HTTPException(status_code=403, detail="Nur HR/Admin darf den Attest-Eingang markieren.")
absence = await db.get(Absence, absence_id)
if absence is None:
raise HTTPException(status_code=404, detail="Abwesenheit nicht gefunden.")
owner = await db.get(User, absence.user_id)
if owner is None or owner.company_id != current_user.company_id:
raise HTTPException(status_code=403, detail="Zugriff verweigert.")
absence_type = await db.get(AbsenceType, absence.type_id)
if absence_type is None or absence_type.category != AbsenceCategory.SICK:
raise HTTPException(status_code=409, detail="Nur für Krankmeldungen verfügbar.")
old_value = str(absence.certificate_received_at) if absence.certificate_received_at else None
absence.certificate_received_at = received_at or date.today()
db.add(AuditLog(
company_id=current_user.company_id,
user_id=current_user.id,
action="absence_certificate_received",
entity_type="absence",
entity_id=absence.id,
old_value={"certificate_received_at": old_value},
new_value={
"certificate_received_at": str(absence.certificate_received_at),
"absence_user_id": str(absence.user_id),
"marked_by": str(current_user.id),
"marked_by_name": current_user.full_name,
},
))
return absence
async def get_sick_stats(
self,
company_id: UUID,
current_user: User,
ref_date: date,
db: AsyncSession,
user_id: UUID | None = None,
) -> list[dict]:
"""Krankheitsstatistik für rolling 12 Monate ab ref_date.
Bradford-Faktor: S² × D mit S = Anzahl Episoden, D = Summe Kranktage.
"""
window_start = ref_date - timedelta(days=365)
q = (
select(Absence, User)
.join(User, Absence.user_id == User.id)
.join(AbsenceType, Absence.type_id == AbsenceType.id)
.where(
User.company_id == company_id,
AbsenceType.category == AbsenceCategory.SICK,
Absence.status == AbsenceStatus.APPROVED,
Absence.start_date <= ref_date,
Absence.end_date >= window_start,
)
.order_by(User.last_name, User.first_name, Absence.start_date)
)
if user_id:
q = q.where(Absence.user_id == user_id)
# MANAGER sieht nur sein Department
if current_user.role == UserRole.MANAGER and current_user.department_id:
q = q.where(User.department_id == current_user.department_id)
result = await db.execute(q)
rows = result.all()
by_user: dict[UUID, dict] = {}
for absence, user in rows:
entry = by_user.setdefault(user.id, {
"user_id": user.id,
"user_name": user.full_name,
"personnel_number": user.personnel_number,
"episodes": 0,
"total_days": 0.0,
"certificates_overdue": 0,
})
entry["episodes"] += 1
entry["total_days"] += float(absence.working_days or 0)
if (
absence.certificate_required_by
and absence.certificate_required_by < ref_date
and absence.certificate_received_at is None
):
entry["certificates_overdue"] += 1
for entry in by_user.values():
entry["bradford_factor"] = float(entry["episodes"]) ** 2 * entry["total_days"]
return list(by_user.values())
absence_service = AbsenceService()