feat: Live-Stempel-Uhr, Break-UI, Balance-Widget, Approval-Queue + PDF-Export (WeasyPrint)

Frontend (TimeTrackingPage):
- Live-Arbeitsuhr (HH:MM:SS) während eingestempelt
- Break-Start/End-Buttons mit laufender Pausenuhr
- Wochen-Balance-Widget (gearbeitet / erwartet / überstunden)
- Approval-Queue Tab für Manager/HR/Admin (pending entries genehmigen/ablehnen)

Backend (Reports):
- weasyprint>=61.0 in requirements.txt
- 3 neue PDF-Export-Tests (Zeit, Abwesenheit, Überstunden)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-24 11:59:32 +02:00
parent ada1b51f33
commit 62ef6c2a11
6 changed files with 1327 additions and 139 deletions
+615
View File
@@ -0,0 +1,615 @@
import asyncio
from datetime import date, timedelta
from uuid import UUID
from fastapi import HTTPException
from sqlalchemy import and_, func, or_, select
from sqlalchemy.ext.asyncio import AsyncSession
from decimal import Decimal
from app.models.absence import Absence, AbsenceStatus
from app.models.absence_type import AbsenceCategory, AbsenceType
from app.models.audit_log import AuditLog
from app.models.overtime_balance import OvertimeBalance
from app.models.public_holiday import PublicHoliday
from app.models.user import User, UserRole
from app.models.vacation_balance import VacationBalance
from app.models.work_schedule import WorkSchedule
from app.schemas.absence import AbsenceCreate, AbsenceReject, AbsenceTypeCreate, AbsenceTypeUpdate
_manager_roles = (UserRole.MANAGER, UserRole.HR, UserRole.COMPANY_ADMIN, UserRole.SUPER_ADMIN)
class AbsenceService:
# ── AbsenceTypes ─────────────────────────────────────────────────────────
async def list_types(self, company_id: UUID, db: AsyncSession) -> list[AbsenceType]:
result = await db.scalars(
select(AbsenceType)
.where(AbsenceType.company_id == company_id, AbsenceType.is_active == True)
.order_by(AbsenceType.name)
)
return list(result.all())
async def create_type(
self, company_id: UUID, data: AbsenceTypeCreate, db: AsyncSession
) -> AbsenceType:
at = AbsenceType(company_id=company_id, **data.model_dump())
db.add(at)
await db.flush()
return at
async def update_type(
self, type_id: UUID, company_id: UUID, data: AbsenceTypeUpdate, db: AsyncSession
) -> AbsenceType:
at = await self._get_type_or_404(type_id, company_id, db)
for field, value in data.model_dump(exclude_none=True).items():
setattr(at, field, value)
return at
async def create_defaults_for_company(self, company_id: UUID, db: AsyncSession) -> None:
"""Standard-Abwesenheitstypen + Standard-Arbeitsplan für ein neues Unternehmen anlegen."""
defaults = [
{
"name": "Urlaub", "color": "#3B82F6", "category": AbsenceCategory.VACATION,
"requires_approval": True, "deducts_vacation": True, "is_paid": True,
},
{
"name": "Krankheit", "color": "#EF4444", "category": AbsenceCategory.SICK,
"requires_approval": False, "deducts_vacation": False, "is_paid": True,
"requires_certificate": True, "certificate_after_days": 3,
},
{
"name": "Freizeitausgleich", "color": "#F59E0B", "category": AbsenceCategory.OVERTIME_COMP,
"requires_approval": True, "deducts_vacation": False,
"affects_overtime_balance": True, "is_paid": True,
},
{
"name": "Weiterbildung", "color": "#8B5CF6", "category": AbsenceCategory.TRAINING,
"requires_approval": True, "deducts_vacation": False, "is_paid": True,
"max_days_per_year": 5,
},
{
"name": "Dienstreise", "color": "#06B6D4", "category": AbsenceCategory.BUSINESS_TRIP,
"requires_approval": True, "deducts_vacation": False, "is_paid": True,
},
{
"name": "Homeoffice", "color": "#10B981", "category": AbsenceCategory.OTHER,
"requires_approval": True, "deducts_vacation": False, "is_paid": True,
},
{
"name": "Sonderurlaub", "color": "#84CC16", "category": AbsenceCategory.VACATION,
"requires_approval": True, "deducts_vacation": True, "is_paid": True,
},
]
for d in defaults:
db.add(AbsenceType(company_id=company_id, **d))
# Standard-Arbeitsplan: MoFr 8h
schedule = WorkSchedule(
company_id=company_id,
name="Vollzeit (40h)",
valid_from=date.today(),
)
db.add(schedule)
await db.flush()
# ── Absences ──────────────────────────────────────────────────────────────
async def list_absences(
self,
company_id: UUID,
current_user: User,
db: AsyncSession,
user_id: UUID | None = None,
type_id: UUID | None = None,
status: AbsenceStatus | None = None,
year: int | None = None,
) -> tuple[int, list[Absence]]:
q = (
select(Absence)
.join(User, Absence.user_id == User.id)
.where(User.company_id == company_id)
)
if current_user.role == UserRole.EMPLOYEE:
q = q.where(Absence.user_id == current_user.id)
elif user_id:
q = q.where(Absence.user_id == user_id)
if type_id:
q = q.where(Absence.type_id == type_id)
if status:
q = q.where(Absence.status == status)
if year:
q = q.where(Absence.start_date >= date(year, 1, 1), Absence.end_date <= date(year, 12, 31))
total = await db.scalar(select(func.count()).select_from(q.subquery())) or 0
result = await db.scalars(q.order_by(Absence.start_date.desc()))
return total, list(result.all())
async def get_by_id(self, absence_id: UUID, current_user: User, db: AsyncSession) -> Absence:
absence = await db.get(Absence, absence_id)
if absence is None:
raise HTTPException(status_code=404, detail="Abwesenheit nicht gefunden.")
if absence.user_id != current_user.id and current_user.role == UserRole.EMPLOYEE:
raise HTTPException(status_code=403, detail="Keine Berechtigung.")
return absence
async def create_absence(
self,
data: AbsenceCreate,
current_user: User,
db: AsyncSession,
) -> tuple[Absence, list[str]]:
# AbsenceType validieren
absence_type = await self._get_type_or_404(data.type_id, current_user.company_id, db)
# Arbeitstage berechnen
holidays = await self._get_holiday_dates(
current_user.company_id, data.start_date.year, db
)
working_days = self._calc_working_days(
data.start_date, data.end_date, holidays,
data.half_day_start, data.half_day_end
)
if working_days <= 0:
raise HTTPException(status_code=400, detail="Keine Arbeitstage im ausgewählten Zeitraum.")
# Urlaubskonto prüfen wenn Urlaub abgezogen werden soll
warnings: list[str] = []
if absence_type.deducts_vacation:
balance = await self._get_or_create_balance(current_user.id, data.start_date.year, db)
if balance.remaining_days < working_days:
warnings.append(
f"Urlaubskonto reicht möglicherweise nicht aus: "
f"{balance.remaining_days} Tage verfügbar, {working_days} Tage beantragt."
)
# Überschneidung mit eigenen Abwesenheiten prüfen
overlap = await db.scalar(
select(Absence).where(
and_(
Absence.user_id == current_user.id,
Absence.status != AbsenceStatus.CANCELLED,
Absence.status != AbsenceStatus.REJECTED,
Absence.start_date <= data.end_date,
Absence.end_date >= data.start_date,
)
)
)
if overlap:
warnings.append("Überschneidung mit bestehender Abwesenheit im selben Zeitraum.")
status = AbsenceStatus.PENDING if absence_type.requires_approval else AbsenceStatus.APPROVED
approved_by = None if absence_type.requires_approval else current_user.id
absence = Absence(
user_id=current_user.id,
type_id=data.type_id,
start_date=data.start_date,
end_date=data.end_date,
half_day_start=data.half_day_start,
half_day_end=data.half_day_end,
working_days=working_days,
status=status,
approved_by=approved_by,
substitute_id=data.substitute_id,
note=data.note,
)
db.add(absence)
await db.flush()
# Bei automatischer Genehmigung Konto abziehen
if not absence_type.requires_approval and absence_type.deducts_vacation:
await self._deduct_vacation(current_user.id, data.start_date.year, int(working_days), db)
return absence, warnings
async def update_absence(
self, absence_id: UUID, data: "AbsenceUpdate", current_user: User, db: AsyncSession
) -> Absence:
from app.schemas.absence import AbsenceUpdate
absence = await db.get(Absence, absence_id)
if absence is None:
raise HTTPException(status_code=404, detail="Abwesenheit nicht gefunden.")
# Mitarbeiter: nur eigene; Manager: gleiche Company
if current_user.role == UserRole.EMPLOYEE:
if absence.user_id != current_user.id:
raise HTTPException(status_code=403, detail="Keine Berechtigung.")
else:
owner = await db.get(User, absence.user_id)
if owner is None or owner.company_id != current_user.company_id:
raise HTTPException(status_code=403, detail="Zugriff verweigert.")
is_manager = current_user.role in _manager_roles
if absence.status not in (AbsenceStatus.PENDING, AbsenceStatus.APPROVED):
raise HTTPException(status_code=409, detail="Nur ausstehende oder genehmigte Anträge können bearbeitet werden.")
if absence.status == AbsenceStatus.APPROVED and not is_manager:
# Mitarbeiter stellt Änderungswunsch → Begründung Pflicht, Status zurück auf pending
if not data.correction_note or not data.correction_note.strip():
raise HTTPException(status_code=422, detail="Änderungsgrund ist bei genehmigten Anträgen Pflicht.")
if data.type_id is not None:
await self._get_type_or_404(data.type_id, current_user.company_id, db)
absence.type_id = data.type_id
if data.start_date is not None:
absence.start_date = data.start_date
if data.end_date is not None:
absence.end_date = data.end_date
if data.half_day_start is not None:
absence.half_day_start = data.half_day_start
if data.half_day_end is not None:
absence.half_day_end = data.half_day_end
if data.substitute_id is not None:
absence.substitute_id = data.substitute_id
if data.note is not None:
absence.note = data.note
if data.correction_note is not None:
absence.correction_note = data.correction_note.strip() or None
# Genehmigter Antrag: Mitarbeiter-Änderung → zurück auf pending (erneute Genehmigung)
was_approved = absence.status == AbsenceStatus.APPROVED
if was_approved and not is_manager:
absence.status = AbsenceStatus.PENDING
absence.approved_by = None
# Arbeitstage neu berechnen
holiday_dates = await self._get_holiday_dates(current_user.company_id, absence.start_date.year, db)
absence.working_days = Decimal(str(
self._calc_working_days(absence.start_date, absence.end_date,
holiday_dates, absence.half_day_start, absence.half_day_end)
))
# Audit-Log
action = "absence_change_request" if (was_approved and not is_manager) else "absence_updated"
db.add(AuditLog(
user_id=current_user.id,
action=action,
entity_type="absence",
entity_id=absence.id,
old_value={"status": "approved" if was_approved else "pending"},
new_value={
"status": absence.status.value,
"start_date": str(absence.start_date),
"end_date": str(absence.end_date),
"working_days": float(absence.working_days),
"correction_note": absence.correction_note,
},
))
return absence
async def cancel_absence(
self, absence_id: UUID, current_user: User, db: AsyncSession
) -> Absence:
absence = await db.get(Absence, absence_id)
if absence is None:
raise HTTPException(status_code=404, detail="Abwesenheit nicht gefunden.")
if absence.user_id != current_user.id:
raise HTTPException(status_code=403, detail="Nur eigene Anträge können storniert werden.")
if absence.status != AbsenceStatus.PENDING:
raise HTTPException(status_code=409, detail="Nur ausstehende Anträge können gelöscht werden.")
absence.status = AbsenceStatus.CANCELLED
# Audit-Log (DSGVO)
db.add(AuditLog(
company_id=current_user.company_id,
user_id=current_user.id,
action="absence_cancelled",
entity_type="absence",
entity_id=absence.id,
old_value={"status": "pending"},
new_value={
"status": "cancelled",
"cancelled_by": str(current_user.id),
"absence_user_id": str(absence.user_id),
"start_date": str(absence.start_date),
"end_date": str(absence.end_date),
"working_days": float(absence.working_days),
},
))
from app.services.caldav_service import caldav_service
asyncio.create_task(caldav_service.sync_removed(absence, db))
return absence
async def approve_absence(
self, absence_id: UUID, current_user: User, db: AsyncSession
) -> Absence:
if current_user.role not in (UserRole.MANAGER, UserRole.HR, UserRole.COMPANY_ADMIN, UserRole.SUPER_ADMIN):
raise HTTPException(status_code=403, detail="Keine Berechtigung.")
absence = await db.get(Absence, absence_id)
if absence is None:
raise HTTPException(status_code=404, detail="Abwesenheit nicht gefunden.")
requester = await db.get(User, absence.user_id)
if requester is None or requester.company_id != current_user.company_id:
raise HTTPException(status_code=403, detail="Zugriff verweigert.")
if absence.user_id == current_user.id:
raise HTTPException(
status_code=409,
detail="Eigene Abwesenheitsanträge können nicht selbst genehmigt werden."
)
if absence.status != AbsenceStatus.PENDING:
raise HTTPException(status_code=409, detail="Nur ausstehende Anträge können genehmigt werden.")
absence.status = AbsenceStatus.APPROVED
absence.approved_by = current_user.id
absence_type = await db.get(AbsenceType, absence.type_id)
# Urlaubskonto abziehen wenn nötig
if absence_type and absence_type.deducts_vacation:
await self._deduct_vacation(absence.user_id, absence.start_date.year, int(absence.working_days), db)
# Überstundenkonto abziehen wenn Freizeitausgleich
if absence_type and absence_type.affects_overtime_balance:
await self._deduct_overtime(absence.user_id, absence.working_days, db)
# Audit-Log (DSGVO)
db.add(AuditLog(
company_id=current_user.company_id,
user_id=current_user.id,
action="absence_approved",
entity_type="absence",
entity_id=absence.id,
old_value={"status": "pending"},
new_value={
"status": "approved",
"approved_by": str(current_user.id),
"approved_by_name": current_user.full_name,
"absence_user_id": str(absence.user_id),
"start_date": str(absence.start_date),
"end_date": str(absence.end_date),
"working_days": float(absence.working_days),
},
))
# CalDAV-Sync (fire & forget Fehler blockieren nicht die Genehmigung)
from app.services.caldav_service import caldav_service
asyncio.create_task(caldav_service.sync_approved(absence, db))
return absence
async def reject_absence(
self, absence_id: UUID, data: AbsenceReject, current_user: User, db: AsyncSession
) -> Absence:
if current_user.role not in (UserRole.MANAGER, UserRole.HR, UserRole.COMPANY_ADMIN, UserRole.SUPER_ADMIN):
raise HTTPException(status_code=403, detail="Keine Berechtigung.")
absence = await db.get(Absence, absence_id)
if absence is None:
raise HTTPException(status_code=404, detail="Abwesenheit nicht gefunden.")
requester = await db.get(User, absence.user_id)
if requester is None or requester.company_id != current_user.company_id:
raise HTTPException(status_code=403, detail="Zugriff verweigert.")
if absence.status != AbsenceStatus.PENDING:
raise HTTPException(status_code=409, detail="Nur ausstehende Anträge können abgelehnt werden.")
absence.status = AbsenceStatus.REJECTED
absence.approved_by = current_user.id
absence.rejection_reason = data.rejection_reason
# Audit-Log (DSGVO)
db.add(AuditLog(
company_id=current_user.company_id,
user_id=current_user.id,
action="absence_rejected",
entity_type="absence",
entity_id=absence.id,
old_value={"status": "pending"},
new_value={
"status": "rejected",
"rejection_reason": absence.rejection_reason,
"rejected_by": str(current_user.id),
"rejected_by_name": current_user.full_name,
"absence_user_id": str(absence.user_id),
"start_date": str(absence.start_date),
"end_date": str(absence.end_date),
"working_days": float(absence.working_days),
},
))
from app.services.caldav_service import caldav_service
asyncio.create_task(caldav_service.sync_removed(absence, db))
return absence
async def get_calendar(
self,
company_id: UUID,
year: int,
month: int | None,
db: AsyncSession,
) -> list[dict]:
q = (
select(Absence, User, AbsenceType)
.join(User, Absence.user_id == User.id)
.join(AbsenceType, Absence.type_id == AbsenceType.id)
.where(
User.company_id == company_id,
Absence.status.in_([AbsenceStatus.PENDING, AbsenceStatus.APPROVED]),
)
)
if month:
start = date(year, month, 1)
end = date(year, month, 28) + timedelta(days=4)
end = end.replace(day=1) - timedelta(days=1)
q = q.where(Absence.start_date <= end, Absence.end_date >= start)
else:
q = q.where(
Absence.start_date >= date(year, 1, 1),
Absence.end_date <= date(year, 12, 31),
)
result = await db.execute(q.order_by(Absence.start_date))
rows = result.all()
calendar = []
for absence, user, atype in rows:
calendar.append({
"user_id": user.id,
"user_name": user.full_name,
"absence_id": absence.id,
"type_name": atype.name,
"type_color": atype.color,
"start_date": absence.start_date,
"end_date": absence.end_date,
"status": absence.status,
"working_days": absence.working_days,
})
return calendar
# ── Urlaubskonto ──────────────────────────────────────────────────────────
async def get_balance(self, user_id: UUID, year: int, db: AsyncSession) -> VacationBalance:
return await self._get_or_create_balance(user_id, year, db)
async def get_pending_days(self, user_id: UUID, year: int, db: AsyncSession) -> float:
"""Summe der Arbeitstage aus ausstehenden Anträgen die Urlaub abziehen."""
q = (
select(func.sum(Absence.working_days))
.join(AbsenceType, Absence.type_id == AbsenceType.id)
.where(
Absence.user_id == user_id,
Absence.status == AbsenceStatus.PENDING,
AbsenceType.deducts_vacation.is_(True),
func.extract("year", Absence.start_date) == year,
)
)
result = await db.scalar(q)
return float(result or 0)
# ── Feiertage ─────────────────────────────────────────────────────────────
async def list_holidays(
self, year: int, country: str, state: str | None, db: AsyncSession
) -> list[PublicHoliday]:
q = select(PublicHoliday).where(
PublicHoliday.year == year, PublicHoliday.country == country
)
if state:
q = q.where(or_(PublicHoliday.state == state, PublicHoliday.state.is_(None)))
result = await db.scalars(q.order_by(PublicHoliday.date))
return list(result.all())
async def create_holiday(self, data, db: AsyncSession) -> PublicHoliday:
holiday = PublicHoliday(
country=data.country,
state=data.state,
date=data.date,
name=data.name,
year=data.date.year,
)
db.add(holiday)
await db.flush()
return holiday
# ── Helpers ───────────────────────────────────────────────────────────────
async def _get_type_or_404(
self, type_id: UUID, company_id: UUID, db: AsyncSession
) -> AbsenceType:
at = await db.get(AbsenceType, type_id)
if at is None or at.company_id != company_id:
raise HTTPException(status_code=404, detail="Abwesenheitstyp nicht gefunden.")
return at
async def _get_or_create_balance(
self, user_id: UUID, year: int, db: AsyncSession
) -> VacationBalance:
balance = await db.scalar(
select(VacationBalance).where(
VacationBalance.user_id == user_id, VacationBalance.year == year
)
)
if balance is None:
balance = VacationBalance(user_id=user_id, year=year, entitled_days=30)
db.add(balance)
await db.flush()
return balance
async def _deduct_vacation(
self, user_id: UUID, year: int, days: int, db: AsyncSession
) -> None:
balance = await self._get_or_create_balance(user_id, year, db)
balance.used_days += days
async def _deduct_overtime(
self, user_id: UUID, working_days: float, db: AsyncSession
) -> None:
"""Zieht working_days × tägliche Stunden vom Überstundenkonto ab."""
# Stunden/Tag aus Arbeitsplan ermitteln (Fallback: 8h)
user = await db.get(User, user_id)
daily_hours = Decimal("8.00")
if user and user.work_schedule_id:
schedule = await db.get(WorkSchedule, user.work_schedule_id)
if schedule:
working_days_in_week = sum(
1 for h in [schedule.mon_h, schedule.tue_h, schedule.wed_h,
schedule.thu_h, schedule.fri_h, schedule.sat_h, schedule.sun_h]
if h > 0
)
if working_days_in_week > 0:
daily_hours = schedule.weekly_hours / Decimal(working_days_in_week)
hours_to_deduct = Decimal(str(working_days)) * daily_hours
ob = await db.scalar(select(OvertimeBalance).where(OvertimeBalance.user_id == user_id))
if ob is None:
# Erstelle Eintrag mit 0 Überstunden — taken_hours kann negativ werden
company_id = user.company_id if user else None
if not company_id:
return
ob = OvertimeBalance(user_id=user_id, company_id=company_id)
db.add(ob)
await db.flush()
ob.taken_hours += hours_to_deduct
async def _get_holiday_dates(
self, company_id: UUID, year: int, db: AsyncSession
) -> set[date]:
"""Feiertage für die Company-Country holen."""
from app.models.company import Company
from sqlalchemy import or_
company = await db.get(Company, company_id)
country = company.country if company else "DE"
state = company.state if company else None
q = select(PublicHoliday.date).where(
PublicHoliday.year == year,
PublicHoliday.country == country,
)
if state:
q = q.where(or_(PublicHoliday.state == state, PublicHoliday.state.is_(None)))
result = await db.scalars(q)
return set(result.all())
@staticmethod
def _calc_working_days(
start: date,
end: date,
holidays: set[date],
half_day_start: bool,
half_day_end: bool,
) -> float:
count = 0.0
current = start
while current <= end:
if current.weekday() < 5 and current not in holidays:
count += 1.0
current += timedelta(days=1)
# Halbtage abziehen
if half_day_start and start.weekday() < 5 and start not in holidays:
count -= 0.5
if half_day_end and end.weekday() < 5 and end not in holidays and end != start:
count -= 0.5
return max(0.0, count)
absence_service = AbsenceService()