Files
timemaster/backend/app/services/absence_service.py
T
patrick 345002944e feat: Freizeitausgleich-Lücken geschlossen (Gap 1-3) + konfigurierbare Schwellwerte
Gap-1: Überziehschutz für Überstundenkonto
  - Company.overtime_overdraft_allowed (default: true) – blockiert FZA wenn deaktiviert
  - Company.overtime_warning_threshold_hours (default: 0) – Warnung wenn Konto unter Schwelle fällt
  - warnings[] jetzt in approve_absence Response (AbsenceApproveOut)
  - Migration 0028_overtime_fza_config.py

Gap-2: total_hours wird bei Zeiteintrag-Genehmigung neu berechnet
  - time_service.approve_entry() ruft _recalculate_overtime_balance() auf
  - last_calculated Timestamp wird gesetzt

Gap-3: Stornierung genehmigter FZA-Anträge bucht taken_hours zurück
  - _refund_overtime() Helfer hinzugefügt
  - cancel_absence() erlaubt jetzt HR/Admin auch genehmigte Abwesenheiten zu stornieren
  - DELETE /absences/{id} gibt jetzt AbsenceOut zurück (statt 204)
  - Mitarbeiter können genehmigte FZA-Anträge nicht selbst stornieren (409)

Frontend:
  - CompanySettingsPage: neuer Abschnitt 'Freizeitausgleich' mit Toggle + Schwellwert-Eingabe

Tests: backend/tests/test_fza.py mit 6 Tests (alle 3 Gaps)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 00:08:03 +02:00

834 lines
35 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
from datetime import date, timedelta
from uuid import UUID
from fastapi import HTTPException
from sqlalchemy import and_, func, or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from decimal import Decimal
from app.models.absence import Absence, AbsenceStatus
from app.models.absence_type import AbsenceCategory, AbsenceType
from app.models.audit_log import AuditLog
from app.models.company import Company
from app.models.overtime_balance import OvertimeBalance
from app.models.public_holiday import PublicHoliday
from app.models.user import User, UserRole
from app.models.vacation_balance import VacationBalance
from app.models.work_schedule import WorkSchedule
from app.schemas.absence import AbsenceCreate, AbsenceReject, AbsenceTypeCreate, AbsenceTypeUpdate
_manager_roles = (UserRole.MANAGER, UserRole.HR, UserRole.COMPANY_ADMIN, UserRole.SUPER_ADMIN)
class AbsenceService:
# ── AbsenceTypes ─────────────────────────────────────────────────────────
async def list_types(self, company_id: UUID, db: AsyncSession) -> list[AbsenceType]:
result = await db.scalars(
select(AbsenceType)
.where(AbsenceType.company_id == company_id, AbsenceType.is_active == True)
.order_by(AbsenceType.name)
)
return list(result.all())
async def create_type(
self, company_id: UUID, data: AbsenceTypeCreate, db: AsyncSession
) -> AbsenceType:
at = AbsenceType(company_id=company_id, **data.model_dump())
db.add(at)
await db.flush()
return at
async def update_type(
self, type_id: UUID, company_id: UUID, data: AbsenceTypeUpdate, db: AsyncSession
) -> AbsenceType:
at = await self._get_type_or_404(type_id, company_id, db)
for field, value in data.model_dump(exclude_none=True).items():
setattr(at, field, value)
return at
async def create_defaults_for_company(self, company_id: UUID, db: AsyncSession) -> None:
"""Standard-Abwesenheitstypen + Standard-Arbeitsplan für ein neues Unternehmen anlegen."""
defaults = [
{
"name": "Urlaub", "color": "#3B82F6", "category": AbsenceCategory.VACATION,
"requires_approval": True, "deducts_vacation": True, "is_paid": True,
},
{
"name": "Krankheit", "color": "#EF4444", "category": AbsenceCategory.SICK,
"requires_approval": False, "deducts_vacation": False, "is_paid": True,
"requires_certificate": True, "certificate_after_days": 3,
},
{
"name": "Freizeitausgleich", "color": "#F59E0B", "category": AbsenceCategory.OVERTIME_COMP,
"requires_approval": True, "deducts_vacation": False,
"affects_overtime_balance": True, "is_paid": True,
},
{
"name": "Weiterbildung", "color": "#8B5CF6", "category": AbsenceCategory.TRAINING,
"requires_approval": True, "deducts_vacation": False, "is_paid": True,
"max_days_per_year": 5,
},
{
"name": "Dienstreise", "color": "#06B6D4", "category": AbsenceCategory.BUSINESS_TRIP,
"requires_approval": True, "deducts_vacation": False, "is_paid": True,
},
{
"name": "Homeoffice", "color": "#10B981", "category": AbsenceCategory.OTHER,
"requires_approval": True, "deducts_vacation": False, "is_paid": True,
},
{
"name": "Sonderurlaub", "color": "#84CC16", "category": AbsenceCategory.VACATION,
"requires_approval": True, "deducts_vacation": True, "is_paid": True,
},
]
for d in defaults:
db.add(AbsenceType(company_id=company_id, **d))
# Standard-Arbeitsplan: MoFr 8h
schedule = WorkSchedule(
company_id=company_id,
name="Vollzeit (40h)",
valid_from=date.today(),
)
db.add(schedule)
await db.flush()
# ── Absences ──────────────────────────────────────────────────────────────
async def list_absences(
self,
company_id: UUID,
current_user: User,
db: AsyncSession,
user_id: UUID | None = None,
type_id: UUID | None = None,
status: AbsenceStatus | None = None,
year: int | None = None,
) -> tuple[int, list[Absence]]:
q = (
select(Absence)
.join(User, Absence.user_id == User.id)
.where(User.company_id == company_id)
)
if current_user.role == UserRole.EMPLOYEE:
q = q.where(Absence.user_id == current_user.id)
elif user_id:
q = q.where(Absence.user_id == user_id)
if type_id:
q = q.where(Absence.type_id == type_id)
if status:
q = q.where(Absence.status == status)
if year:
q = q.where(Absence.start_date >= date(year, 1, 1), Absence.end_date <= date(year, 12, 31))
total = await db.scalar(select(func.count()).select_from(q.subquery())) or 0
result = await db.scalars(q.order_by(Absence.start_date.desc()))
return total, list(result.all())
async def get_by_id(self, absence_id: UUID, current_user: User, db: AsyncSession) -> Absence:
absence = await db.get(Absence, absence_id)
if absence is None:
raise HTTPException(status_code=404, detail="Abwesenheit nicht gefunden.")
if absence.user_id != current_user.id and current_user.role == UserRole.EMPLOYEE:
raise HTTPException(status_code=403, detail="Keine Berechtigung.")
return absence
async def create_absence(
self,
data: AbsenceCreate,
current_user: User,
db: AsyncSession,
) -> tuple[Absence, list[str]]:
# AbsenceType validieren
absence_type = await self._get_type_or_404(data.type_id, current_user.company_id, db)
# Arbeitstage berechnen
holidays = await self._get_holiday_dates(
current_user.company_id, data.start_date.year, db
)
working_days = self._calc_working_days(
data.start_date, data.end_date, holidays,
data.half_day_start, data.half_day_end
)
# Krankmeldungen dürfen auch Wochenenden/Feiertage umfassen (0 Arbeitstage erlaubt)
if working_days <= 0 and absence_type.category != AbsenceCategory.SICK:
raise HTTPException(status_code=400, detail="Keine Arbeitstage im ausgewählten Zeitraum.")
# Urlaubskonto prüfen wenn Urlaub abgezogen werden soll
warnings: list[str] = []
if absence_type.deducts_vacation:
balance = await self._get_or_create_balance(current_user.id, data.start_date.year, db)
if balance.remaining_days < working_days:
warnings.append(
f"Urlaubskonto reicht möglicherweise nicht aus: "
f"{balance.remaining_days} Tage verfügbar, {working_days} Tage beantragt."
)
# Überschneidung mit eigenen Abwesenheiten prüfen
overlap = await db.scalar(
select(Absence).where(
and_(
Absence.user_id == current_user.id,
Absence.status != AbsenceStatus.CANCELLED,
Absence.status != AbsenceStatus.REJECTED,
Absence.start_date <= data.end_date,
Absence.end_date >= data.start_date,
)
)
)
if overlap:
warnings.append("Überschneidung mit bestehender Abwesenheit im selben Zeitraum.")
status = AbsenceStatus.PENDING if absence_type.requires_approval else AbsenceStatus.APPROVED
approved_by = None if absence_type.requires_approval else current_user.id
# Krankmeldung: AU-Pflicht-Datum automatisch berechnen.
# Reihenfolge: AbsenceType.certificate_after_days (override) → Company default.
certificate_required_by: date | None = None
if absence_type.category == AbsenceCategory.SICK and absence_type.requires_certificate:
company = await db.get(Company, current_user.company_id)
company_default = company.sick_note_required_after_days if company else 3
threshold = absence_type.certificate_after_days or company_default
certificate_required_by = data.start_date + timedelta(days=threshold)
absence = Absence(
user_id=current_user.id,
type_id=data.type_id,
start_date=data.start_date,
end_date=data.end_date,
half_day_start=data.half_day_start,
half_day_end=data.half_day_end,
working_days=working_days,
status=status,
approved_by=approved_by,
substitute_id=data.substitute_id,
note=data.note,
certificate_required_by=certificate_required_by,
)
db.add(absence)
await db.flush()
# Bei automatischer Genehmigung Konto abziehen
if not absence_type.requires_approval and absence_type.deducts_vacation:
await self._deduct_vacation(current_user.id, data.start_date.year, int(working_days), db)
return absence, warnings
async def update_absence(
self, absence_id: UUID, data: "AbsenceUpdate", current_user: User, db: AsyncSession
) -> Absence:
from app.schemas.absence import AbsenceUpdate
absence = await db.get(Absence, absence_id)
if absence is None:
raise HTTPException(status_code=404, detail="Abwesenheit nicht gefunden.")
# Mitarbeiter: nur eigene; Manager: gleiche Company
if current_user.role == UserRole.EMPLOYEE:
if absence.user_id != current_user.id:
raise HTTPException(status_code=403, detail="Keine Berechtigung.")
else:
owner = await db.get(User, absence.user_id)
if owner is None or owner.company_id != current_user.company_id:
raise HTTPException(status_code=403, detail="Zugriff verweigert.")
is_manager = current_user.role in _manager_roles
if absence.status not in (AbsenceStatus.PENDING, AbsenceStatus.APPROVED):
raise HTTPException(status_code=409, detail="Nur ausstehende oder genehmigte Anträge können bearbeitet werden.")
if absence.status == AbsenceStatus.APPROVED and not is_manager:
# Mitarbeiter stellt Änderungswunsch → Begründung Pflicht, Status zurück auf pending
if not data.correction_note or not data.correction_note.strip():
raise HTTPException(status_code=422, detail="Änderungsgrund ist bei genehmigten Anträgen Pflicht.")
if data.type_id is not None:
await self._get_type_or_404(data.type_id, current_user.company_id, db)
absence.type_id = data.type_id
if data.start_date is not None:
absence.start_date = data.start_date
if data.end_date is not None:
absence.end_date = data.end_date
if data.half_day_start is not None:
absence.half_day_start = data.half_day_start
if data.half_day_end is not None:
absence.half_day_end = data.half_day_end
if data.substitute_id is not None:
absence.substitute_id = data.substitute_id
if data.note is not None:
absence.note = data.note
if data.correction_note is not None:
absence.correction_note = data.correction_note.strip() or None
# Genehmigter Antrag: Mitarbeiter-Änderung → zurück auf pending (erneute Genehmigung)
was_approved = absence.status == AbsenceStatus.APPROVED
if was_approved and not is_manager:
absence.status = AbsenceStatus.PENDING
absence.approved_by = None
# Arbeitstage neu berechnen
holiday_dates = await self._get_holiday_dates(current_user.company_id, absence.start_date.year, db)
absence.working_days = Decimal(str(
self._calc_working_days(absence.start_date, absence.end_date,
holiday_dates, absence.half_day_start, absence.half_day_end)
))
# Audit-Log
action = "absence_change_request" if (was_approved and not is_manager) else "absence_updated"
db.add(AuditLog(
user_id=current_user.id,
action=action,
entity_type="absence",
entity_id=absence.id,
old_value={"status": "approved" if was_approved else "pending"},
new_value={
"status": absence.status.value,
"start_date": str(absence.start_date),
"end_date": str(absence.end_date),
"working_days": float(absence.working_days),
"correction_note": absence.correction_note,
},
))
return absence
async def cancel_absence(
self, absence_id: UUID, current_user: User, db: AsyncSession
) -> Absence:
absence = await db.get(Absence, absence_id)
if absence is None:
raise HTTPException(status_code=404, detail="Abwesenheit nicht gefunden.")
is_admin = current_user.role in (UserRole.HR, UserRole.COMPANY_ADMIN, UserRole.SUPER_ADMIN)
if absence.user_id != current_user.id and not is_admin:
raise HTTPException(status_code=403, detail="Nur eigene Anträge können storniert werden.")
if absence.status == AbsenceStatus.APPROVED:
if not is_admin:
raise HTTPException(
status_code=409,
detail="Genehmigte Anträge können nur von HR/Admin storniert werden."
)
# Überstunden zurückbuchen wenn Freizeitausgleich
absence_type = await db.get(AbsenceType, absence.type_id)
if absence_type and absence_type.affects_overtime_balance:
await self._refund_overtime(absence.user_id, absence.working_days, db)
elif absence.status != AbsenceStatus.PENDING:
raise HTTPException(
status_code=409,
detail="Nur ausstehende oder genehmigte Anträge können storniert werden."
)
absence.status = AbsenceStatus.CANCELLED
# Audit-Log (DSGVO)
db.add(AuditLog(
company_id=current_user.company_id,
user_id=current_user.id,
action="absence_cancelled",
entity_type="absence",
entity_id=absence.id,
old_value={"status": "pending"},
new_value={
"status": "cancelled",
"cancelled_by": str(current_user.id),
"absence_user_id": str(absence.user_id),
"start_date": str(absence.start_date),
"end_date": str(absence.end_date),
"working_days": float(absence.working_days),
},
))
from app.services.caldav_service import caldav_service
asyncio.create_task(caldav_service.sync_removed(absence, db))
return absence
async def approve_absence(
self, absence_id: UUID, current_user: User, db: AsyncSession
) -> Absence:
if current_user.role not in (UserRole.MANAGER, UserRole.HR, UserRole.COMPANY_ADMIN, UserRole.SUPER_ADMIN):
raise HTTPException(status_code=403, detail="Keine Berechtigung.")
absence = await db.get(Absence, absence_id)
if absence is None:
raise HTTPException(status_code=404, detail="Abwesenheit nicht gefunden.")
requester = await db.get(User, absence.user_id)
if requester is None or requester.company_id != current_user.company_id:
raise HTTPException(status_code=403, detail="Zugriff verweigert.")
if absence.user_id == current_user.id:
raise HTTPException(
status_code=409,
detail="Eigene Abwesenheitsanträge können nicht selbst genehmigt werden."
)
if absence.status != AbsenceStatus.PENDING:
raise HTTPException(status_code=409, detail="Nur ausstehende Anträge können genehmigt werden.")
absence.status = AbsenceStatus.APPROVED
absence.approved_by = current_user.id
absence_type = await db.get(AbsenceType, absence.type_id)
# Urlaubskonto abziehen wenn nötig
if absence_type and absence_type.deducts_vacation:
await self._deduct_vacation(absence.user_id, absence.start_date.year, int(absence.working_days), db)
# Überstundenkonto abziehen wenn Freizeitausgleich
fza_warnings: list[str] = []
if absence_type and absence_type.affects_overtime_balance:
fza_warnings = await self._deduct_overtime(absence.user_id, absence.working_days, db)
# Audit-Log (DSGVO)
db.add(AuditLog(
company_id=current_user.company_id,
user_id=current_user.id,
action="absence_approved",
entity_type="absence",
entity_id=absence.id,
old_value={"status": "pending"},
new_value={
"status": "approved",
"approved_by": str(current_user.id),
"approved_by_name": current_user.full_name,
"absence_user_id": str(absence.user_id),
"start_date": str(absence.start_date),
"end_date": str(absence.end_date),
"working_days": float(absence.working_days),
},
))
# CalDAV-Sync (fire & forget Fehler blockieren nicht die Genehmigung)
from app.services.caldav_service import caldav_service
asyncio.create_task(caldav_service.sync_approved(absence, db))
return absence, fza_warnings
async def reject_absence(
self, absence_id: UUID, data: AbsenceReject, current_user: User, db: AsyncSession
) -> Absence:
if current_user.role not in (UserRole.MANAGER, UserRole.HR, UserRole.COMPANY_ADMIN, UserRole.SUPER_ADMIN):
raise HTTPException(status_code=403, detail="Keine Berechtigung.")
absence = await db.get(Absence, absence_id)
if absence is None:
raise HTTPException(status_code=404, detail="Abwesenheit nicht gefunden.")
requester = await db.get(User, absence.user_id)
if requester is None or requester.company_id != current_user.company_id:
raise HTTPException(status_code=403, detail="Zugriff verweigert.")
if absence.status != AbsenceStatus.PENDING:
raise HTTPException(status_code=409, detail="Nur ausstehende Anträge können abgelehnt werden.")
absence.status = AbsenceStatus.REJECTED
absence.approved_by = current_user.id
absence.rejection_reason = data.rejection_reason
# Audit-Log (DSGVO)
db.add(AuditLog(
company_id=current_user.company_id,
user_id=current_user.id,
action="absence_rejected",
entity_type="absence",
entity_id=absence.id,
old_value={"status": "pending"},
new_value={
"status": "rejected",
"rejection_reason": absence.rejection_reason,
"rejected_by": str(current_user.id),
"rejected_by_name": current_user.full_name,
"absence_user_id": str(absence.user_id),
"start_date": str(absence.start_date),
"end_date": str(absence.end_date),
"working_days": float(absence.working_days),
},
))
from app.services.caldav_service import caldav_service
asyncio.create_task(caldav_service.sync_removed(absence, db))
return absence
async def get_calendar(
self,
company_id: UUID,
year: int,
month: int | None,
db: AsyncSession,
) -> list[dict]:
q = (
select(Absence, User, AbsenceType)
.join(User, Absence.user_id == User.id)
.join(AbsenceType, Absence.type_id == AbsenceType.id)
.where(
User.company_id == company_id,
Absence.status.in_([AbsenceStatus.PENDING, AbsenceStatus.APPROVED]),
)
)
if month:
start = date(year, month, 1)
end = date(year, month, 28) + timedelta(days=4)
end = end.replace(day=1) - timedelta(days=1)
q = q.where(Absence.start_date <= end, Absence.end_date >= start)
else:
q = q.where(
Absence.start_date >= date(year, 1, 1),
Absence.end_date <= date(year, 12, 31),
)
result = await db.execute(q.order_by(Absence.start_date))
rows = result.all()
calendar = []
for absence, user, atype in rows:
calendar.append({
"user_id": user.id,
"user_name": user.full_name,
"absence_id": absence.id,
"type_name": atype.name,
"type_color": atype.color,
"start_date": absence.start_date,
"end_date": absence.end_date,
"status": absence.status,
"working_days": absence.working_days,
})
return calendar
# ── Urlaubskonto ──────────────────────────────────────────────────────────
async def get_balance(self, user_id: UUID, year: int, db: AsyncSession) -> VacationBalance:
return await self._get_or_create_balance(user_id, year, db)
async def get_pending_days(self, user_id: UUID, year: int, db: AsyncSession) -> float:
"""Summe der Arbeitstage aus ausstehenden Anträgen die Urlaub abziehen."""
q = (
select(func.sum(Absence.working_days))
.join(AbsenceType, Absence.type_id == AbsenceType.id)
.where(
Absence.user_id == user_id,
Absence.status == AbsenceStatus.PENDING,
AbsenceType.deducts_vacation.is_(True),
func.extract("year", Absence.start_date) == year,
)
)
result = await db.scalar(q)
return float(result or 0)
# ── Feiertage ─────────────────────────────────────────────────────────────
async def list_holidays(
self, year: int, country: str, state: str | None, db: AsyncSession
) -> list[PublicHoliday]:
q = select(PublicHoliday).where(
PublicHoliday.year == year, PublicHoliday.country == country
)
if state:
q = q.where(or_(PublicHoliday.state == state, PublicHoliday.state.is_(None)))
result = await db.scalars(q.order_by(PublicHoliday.date))
return list(result.all())
async def create_holiday(self, data, db: AsyncSession) -> PublicHoliday:
holiday = PublicHoliday(
country=data.country,
state=data.state,
date=data.date,
name=data.name,
year=data.date.year,
)
db.add(holiday)
await db.flush()
return holiday
# ── Helpers ───────────────────────────────────────────────────────────────
async def _get_type_or_404(
self, type_id: UUID, company_id: UUID, db: AsyncSession
) -> AbsenceType:
at = await db.get(AbsenceType, type_id)
if at is None or at.company_id != company_id:
raise HTTPException(status_code=404, detail="Abwesenheitstyp nicht gefunden.")
return at
async def _get_or_create_balance(
self, user_id: UUID, year: int, db: AsyncSession
) -> VacationBalance:
balance = await db.scalar(
select(VacationBalance).where(
VacationBalance.user_id == user_id, VacationBalance.year == year
)
)
if balance is None:
# Automatischer Übertrag: Resturlaub aus dem Vorjahr übernehmen
prev = await db.scalar(
select(VacationBalance).where(
VacationBalance.user_id == user_id, VacationBalance.year == year - 1
)
)
carried = max(0, prev.remaining_days) if prev else 0
entitled = prev.entitled_days if prev else 30
balance = VacationBalance(
user_id=user_id,
year=year,
entitled_days=entitled,
carried_over=carried,
)
db.add(balance)
await db.flush()
return balance
async def _deduct_vacation(
self, user_id: UUID, year: int, days: int, db: AsyncSession
) -> None:
balance = await self._get_or_create_balance(user_id, year, db)
balance.used_days += days
async def _calc_daily_hours(self, user_id: UUID, db: AsyncSession) -> Decimal:
"""Tägliche Soll-Stunden aus Arbeitsplan ermitteln (Fallback: 8h)."""
user = await db.get(User, user_id)
daily_hours = Decimal("8.00")
if user and user.work_schedule_id:
schedule = await db.get(WorkSchedule, user.work_schedule_id)
if schedule:
working_days_in_week = sum(
1 for h in [schedule.mon_h, schedule.tue_h, schedule.wed_h,
schedule.thu_h, schedule.fri_h, schedule.sat_h, schedule.sun_h]
if h > 0
)
if working_days_in_week > 0:
daily_hours = schedule.weekly_hours / Decimal(working_days_in_week)
return daily_hours
async def _deduct_overtime(
self, user_id: UUID, working_days: float, db: AsyncSession
) -> list[str]:
"""Zieht working_days × tägliche Stunden vom Überstundenkonto ab.
Gibt Warnungen zurück. Wirft HTTPException wenn Überziehen verboten ist."""
user = await db.get(User, user_id)
daily_hours = await self._calc_daily_hours(user_id, db)
hours_to_deduct = Decimal(str(working_days)) * daily_hours
ob = await db.scalar(select(OvertimeBalance).where(OvertimeBalance.user_id == user_id))
if ob is None:
company_id = user.company_id if user else None
if not company_id:
return []
ob = OvertimeBalance(user_id=user_id, company_id=company_id)
db.add(ob)
await db.flush()
# Firmen-Konfiguration für Überziehen laden
company = await db.get(Company, ob.company_id)
overdraft_allowed = company.overtime_overdraft_allowed if company else True
warning_threshold = Decimal(str(company.overtime_warning_threshold_hours if company else 0))
available = ob.available_hours
warnings: list[str] = []
if available < hours_to_deduct and not overdraft_allowed:
raise HTTPException(
status_code=422,
detail=(
f"Nicht genug Überstunden für Freizeitausgleich. "
f"Verfügbar: {float(available):.1f}h, benötigt: {float(hours_to_deduct):.1f}h."
),
)
after_deduction = available - hours_to_deduct
if warning_threshold > 0 and after_deduction < warning_threshold:
sign = "-" if after_deduction < 0 else ""
warnings.append(
f"Überstundenkonto sinkt unter die Warnschwelle "
f"({float(warning_threshold):.0f}h). Verbleibend: {sign}{abs(float(after_deduction)):.1f}h."
)
ob.taken_hours += hours_to_deduct
return warnings
async def _refund_overtime(
self, user_id: UUID, working_days: float, db: AsyncSession
) -> None:
"""Rückbuchung von Freizeitausgleichs-Stunden (z.B. bei Stornierung)."""
daily_hours = await self._calc_daily_hours(user_id, db)
hours_to_refund = Decimal(str(working_days)) * daily_hours
ob = await db.scalar(select(OvertimeBalance).where(OvertimeBalance.user_id == user_id))
if ob is not None:
ob.taken_hours = max(Decimal("0"), ob.taken_hours - hours_to_refund)
async def _get_holiday_dates(
self, company_id: UUID, year: int, db: AsyncSession
) -> set[date]:
"""Feiertage für die Company-Country holen."""
from app.models.company import Company
from sqlalchemy import or_
company = await db.get(Company, company_id)
country = company.country if company else "DE"
state = company.state if company else None
q = select(PublicHoliday.date).where(
PublicHoliday.year == year,
PublicHoliday.country == country,
)
if state:
q = q.where(or_(PublicHoliday.state == state, PublicHoliday.state.is_(None)))
result = await db.scalars(q)
return set(result.all())
@staticmethod
def _calc_working_days(
start: date,
end: date,
holidays: set[date],
half_day_start: bool,
half_day_end: bool,
) -> float:
count = 0.0
current = start
while current <= end:
if current.weekday() < 5 and current not in holidays:
count += 1.0
current += timedelta(days=1)
# Halbtage abziehen
if half_day_start and start.weekday() < 5 and start not in holidays:
count -= 0.5
if half_day_end and end.weekday() < 5 and end not in holidays and end != start:
count -= 0.5
return max(0.0, count)
# ── Krankmeldung ──────────────────────────────────────────────────────────
async def quick_sick(
self,
start: date,
end: date,
current_user: User,
db: AsyncSession,
) -> tuple[Absence, list[str]]:
"""Sofort-Krankmeldung: nutzt den ersten aktiven SICK-Typ der Firma."""
sick_type = await db.scalar(
select(AbsenceType)
.where(
AbsenceType.company_id == current_user.company_id,
AbsenceType.category == AbsenceCategory.SICK,
AbsenceType.is_active == True,
)
.order_by(AbsenceType.name)
.limit(1)
)
if sick_type is None:
raise HTTPException(status_code=404, detail="Kein aktiver Krankheits-Typ konfiguriert.")
if end < start:
raise HTTPException(status_code=400, detail="Enddatum darf nicht vor dem Startdatum liegen.")
create_data = AbsenceCreate(
type_id=sick_type.id,
start_date=start,
end_date=end,
)
return await self.create_absence(create_data, current_user, db)
async def mark_certificate_received(
self,
absence_id: UUID,
received_at: date | None,
current_user: User,
db: AsyncSession,
) -> Absence:
"""HR/Admin: AU-Bescheinigung als eingegangen markieren."""
if current_user.role not in (UserRole.HR, UserRole.COMPANY_ADMIN, UserRole.SUPER_ADMIN):
raise HTTPException(status_code=403, detail="Nur HR/Admin darf den Attest-Eingang markieren.")
absence = await db.get(Absence, absence_id)
if absence is None:
raise HTTPException(status_code=404, detail="Abwesenheit nicht gefunden.")
owner = await db.get(User, absence.user_id)
if owner is None or owner.company_id != current_user.company_id:
raise HTTPException(status_code=403, detail="Zugriff verweigert.")
absence_type = await db.get(AbsenceType, absence.type_id)
if absence_type is None or absence_type.category != AbsenceCategory.SICK:
raise HTTPException(status_code=409, detail="Nur für Krankmeldungen verfügbar.")
old_value = str(absence.certificate_received_at) if absence.certificate_received_at else None
absence.certificate_received_at = received_at or date.today()
db.add(AuditLog(
company_id=current_user.company_id,
user_id=current_user.id,
action="absence_certificate_received",
entity_type="absence",
entity_id=absence.id,
old_value={"certificate_received_at": old_value},
new_value={
"certificate_received_at": str(absence.certificate_received_at),
"absence_user_id": str(absence.user_id),
"marked_by": str(current_user.id),
"marked_by_name": current_user.full_name,
},
))
return absence
async def get_sick_stats(
self,
company_id: UUID,
current_user: User,
ref_date: date,
db: AsyncSession,
user_id: UUID | None = None,
) -> list[dict]:
"""Krankheitsstatistik für rolling 12 Monate ab ref_date.
Bradford-Faktor: S² × D mit S = Anzahl Episoden, D = Summe Kranktage.
"""
window_start = ref_date - timedelta(days=365)
q = (
select(Absence, User)
.join(User, Absence.user_id == User.id)
.join(AbsenceType, Absence.type_id == AbsenceType.id)
.where(
User.company_id == company_id,
AbsenceType.category == AbsenceCategory.SICK,
Absence.status == AbsenceStatus.APPROVED,
Absence.start_date <= ref_date,
Absence.end_date >= window_start,
)
.order_by(User.last_name, User.first_name, Absence.start_date)
)
if user_id:
q = q.where(Absence.user_id == user_id)
# MANAGER sieht nur sein Department
if current_user.role == UserRole.MANAGER and current_user.department_id:
q = q.where(User.department_id == current_user.department_id)
result = await db.execute(q)
rows = result.all()
by_user: dict[UUID, dict] = {}
for absence, user in rows:
entry = by_user.setdefault(user.id, {
"user_id": user.id,
"user_name": user.full_name,
"personnel_number": user.personnel_number,
"episodes": 0,
"total_days": 0.0,
"certificates_overdue": 0,
})
entry["episodes"] += 1
entry["total_days"] += float(absence.working_days or 0)
if (
absence.certificate_required_by
and absence.certificate_required_by < ref_date
and absence.certificate_received_at is None
):
entry["certificates_overdue"] += 1
for entry in by_user.values():
entry["bradford_factor"] = float(entry["episodes"]) ** 2 * entry["total_days"]
return list(by_user.values())
absence_service = AbsenceService()