Files
timemaster/backend/app/services/report_service.py
T
sysops 1fedd683e0 Initial commit – TimeMaster Zeiterfassung & HR-Tool
Stand: agent-06 (Audit-Log), agent-05 (Krankmeldung), agent-07 Phase 1 (Personalnummer),
Busylight-Pull-Integration, TOTP/2FA, Abwesenheiten, Zeiterfassung, Kiosk-Grundgerüst.
Migrations 0001–0023 deployed auf 192.168.1.137 + .164.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-23 20:03:27 +02:00

1187 lines
54 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import csv
import io
from collections import defaultdict
from datetime import date, datetime, time, timedelta
from decimal import Decimal
from uuid import UUID
from sqlalchemy import distinct, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.absence import Absence, AbsenceStatus
from app.models.absence_type import AbsenceType
from app.models.company import Company
from app.models.department import Department
from app.models.overtime_balance import OvertimeBalance
from app.models.time_entry import EntryStatus, TimeEntry
from app.models.user import User, UserRole
from app.models.vacation_balance import VacationBalance
from app.models.work_schedule import WorkSchedule
from app.schemas.report import (
AbsenceReport,
AbsenceReportRow,
CompanyDashboard,
DayEntry,
EmployeeDashboard,
HoursBreakdown,
OvertimeDay,
OvertimeReport,
OvertimeReportDetailed,
OvertimeReportRow,
OvertimeReportRowDetailed,
OvertimeWeek,
TeamDashboard,
TeamMemberStatus,
TimeReport,
TimeReportRow,
UpcomingAbsence,
)
from app.services.holiday_service import get_holidays_set
# Fallback wenn kein Arbeitsplan zugewiesen
_DEFAULT_DAILY_HOURS = {0: 8.0, 1: 8.0, 2: 8.0, 3: 8.0, 4: 8.0, 5: 0.0, 6: 0.0}
# ---------------------------------------------------------------------------
# §3b EStG Stunden-Kategorisierung
# ---------------------------------------------------------------------------
def _categorize_hours(
entry_date: date,
start: time,
end: time,
break_minutes: int,
holidays: dict[date, tuple[str, bool]],
) -> HoursBreakdown:
"""
Teilt die gearbeiteten Minuten einer Schicht in §3b EStG-Kategorien auf.
Jede Minute bekommt genau eine Kategorie (höchster Zuschlag gewinnt).
Pausen werden proportional abgezogen.
"""
# Minuten-Timeline aufbauen (Ende ggf. +24h für Nachtschichten)
start_m = start.hour * 60 + start.minute
end_m = end.hour * 60 + end.minute
if end_m <= start_m:
end_m += 24 * 60 # Nachtschicht über Mitternacht
gross_minutes = end_m - start_m
if gross_minutes <= 0:
return HoursBreakdown(
normal_hours=0, night_25_hours=0, night_40_hours=0,
sunday_hours=0, holiday_125_hours=0, holiday_150_hours=0,
holiday_name=None,
)
# Pausen proportional verteilen
net_factor = max(0.0, (gross_minutes - break_minutes) / gross_minutes)
holiday_info = holidays.get(entry_date)
is_sunday = entry_date.weekday() == 6
cats = {"normal": 0, "night_25": 0, "night_40": 0, "sunday": 0,
"holiday_125": 0, "holiday_150": 0}
for offset in range(gross_minutes):
# Absolute Minute des Tages (kann >1440 sein bei Nachtschicht)
abs_min = start_m + offset
# Wochentag dieser Minute (Mitternacht kann Folgetag sein)
minute_date = entry_date + timedelta(days=abs_min // (24 * 60))
minute_mod = abs_min % (24 * 60) # Minute im Tag 01439
minute_is_sunday = minute_date.weekday() == 6
minute_holiday = holidays.get(minute_date)
# Kategorie bestimmen: höchster Zuschlag gewinnt
if minute_holiday and minute_holiday[1]:
cat = "holiday_150"
elif minute_holiday:
cat = "holiday_125"
elif minute_is_sunday:
cat = "sunday"
elif 0 <= minute_mod < 4 * 60: # 00:0004:00
cat = "night_40"
elif minute_mod >= 20 * 60 or minute_mod < 6 * 60: # 20:0024:00 + 04:0006:00 bereits über night_40
# 04:0006:00 oder 20:0024:00 → 25%
cat = "night_25"
else:
cat = "normal"
cats[cat] += 1
def to_h(mins: int) -> float:
return round(mins * net_factor / 60, 2)
return HoursBreakdown(
normal_hours=to_h(cats["normal"]),
night_25_hours=to_h(cats["night_25"]),
night_40_hours=to_h(cats["night_40"]),
sunday_hours=to_h(cats["sunday"]),
holiday_125_hours=to_h(cats["holiday_125"]),
holiday_150_hours=to_h(cats["holiday_150"]),
holiday_name=holiday_info[0] if holiday_info else (
holidays.get(entry_date + timedelta(days=1), (None,))[0]
if (end.hour * 60 + end.minute) < (start.hour * 60 + start.minute) else None
),
)
def _schedule_daily(schedule: WorkSchedule | None) -> dict[int, float]:
"""Gibt ein Mapping {weekday: stunden} zurück (0=Mo … 6=So)."""
if schedule is None:
return _DEFAULT_DAILY_HOURS
return {
0: float(schedule.mon_h),
1: float(schedule.tue_h),
2: float(schedule.wed_h),
3: float(schedule.thu_h),
4: float(schedule.fri_h),
5: float(schedule.sat_h),
6: float(schedule.sun_h),
}
def _expected_hours(schedule: WorkSchedule | None, date_from: date, date_to: date) -> float:
"""Soll-Stunden im Zeitraum basierend auf dem Arbeitsplan."""
daily = _schedule_daily(schedule)
total = 0.0
current = date_from
while current <= date_to:
total += daily.get(current.weekday(), 0.0)
current += timedelta(days=1)
return total
async def _load_schedule(user: User, db: AsyncSession) -> WorkSchedule | None:
"""Lädt den Arbeitsplan des Users. Fallback: erster gültiger Firmen-Plan."""
if user.work_schedule_id:
return await db.get(WorkSchedule, user.work_schedule_id)
result = await db.scalars(
select(WorkSchedule)
.where(
WorkSchedule.company_id == user.company_id,
WorkSchedule.valid_from <= date.today(),
)
.order_by(WorkSchedule.valid_from)
.limit(1)
)
return result.first()
async def _get_or_create_overtime_balance(user: User, db: AsyncSession) -> OvertimeBalance:
bal = await db.scalar(
select(OvertimeBalance).where(OvertimeBalance.user_id == user.id)
)
if not bal:
bal = OvertimeBalance(user_id=user.id, company_id=user.company_id)
db.add(bal)
await db.flush()
return bal
async def _recalculate_overtime_balance(
user: User, schedule: WorkSchedule | None, db: AsyncSession
) -> OvertimeBalance:
"""Berechnet das Überstundenguthaben neu aus allen genehmigten Zeiteinträgen."""
entries = list(await db.scalars(
select(TimeEntry).where(
TimeEntry.user_id == user.id,
TimeEntry.end_time.isnot(None),
TimeEntry.status == EntryStatus.APPROVED,
)
))
bal = await _get_or_create_overtime_balance(user, db)
if not entries:
bal.total_hours = Decimal("0")
bal.last_calculated = datetime.utcnow()
return bal
date_from = min(e.date for e in entries)
date_to = max(e.date for e in entries)
expected = _expected_hours(schedule, date_from, date_to)
worked = sum(e.worked_hours or 0.0 for e in entries)
overtime = max(0.0, worked - expected)
bal.total_hours = Decimal(str(round(overtime, 2)))
bal.last_calculated = datetime.utcnow()
return bal
def _check_arbzg_day(entry: TimeEntry) -> list[str]:
"""ArbZG-Prüfung für einen einzelnen Zeiteintrag."""
if entry.end_time is None:
return []
start_m = entry.start_time.hour * 60 + entry.start_time.minute
end_m = entry.end_time.hour * 60 + entry.end_time.minute
if end_m <= start_m:
end_m += 24 * 60
total = end_m - start_m
worked = total - entry.break_minutes
worked_h = worked / 60
warnings = []
if worked_h > 10:
warnings.append(f"Maximale Arbeitszeit von 10 Stunden überschritten ({worked_h:.1f}h) ArbZG §3")
if total >= 9 * 60 and entry.break_minutes < 45:
warnings.append("Bei >9h Anwesenheit mind. 45 min Pause erforderlich ArbZG §4")
elif total >= 6 * 60 and entry.break_minutes < 30:
warnings.append("Bei >6h Anwesenheit mind. 30 min Pause erforderlich ArbZG §4")
return warnings
class ReportService:
# ── Employee Dashboard ───────────────────────────────────────────────────
async def employee_dashboard(self, user: User, db: AsyncSession) -> EmployeeDashboard:
today = date.today()
schedule = await _load_schedule(user, db)
open_entry = await db.scalar(
select(TimeEntry).where(
TimeEntry.user_id == user.id,
TimeEntry.date == today,
TimeEntry.end_time.is_(None),
)
)
week_start = today - timedelta(days=today.weekday())
week_entries = list(await db.scalars(
select(TimeEntry).where(
TimeEntry.user_id == user.id,
TimeEntry.date >= week_start,
TimeEntry.date <= today,
TimeEntry.end_time.isnot(None),
)
))
week_hours_worked = sum(e.worked_hours or 0.0 for e in week_entries)
week_expected = _expected_hours(schedule, week_start, today)
balance = await db.scalar(
select(VacationBalance).where(
VacationBalance.user_id == user.id,
VacationBalance.year == today.year,
)
)
pending_absences = await db.scalar(
select(func.count()).select_from(Absence).where(
Absence.user_id == user.id,
Absence.status == AbsenceStatus.PENDING,
)
) or 0
overtime_bal = await db.scalar(
select(OvertimeBalance).where(OvertimeBalance.user_id == user.id)
)
overtime_balance_hours = float(overtime_bal.available_hours) if overtime_bal else None
hours_today = None
if open_entry:
now = datetime.now().time()
start_mins = open_entry.start_time.hour * 60 + open_entry.start_time.minute
now_mins = now.hour * 60 + now.minute
hours_today = round(
max(0, now_mins - start_mins - open_entry.break_minutes) / 60.0, 2
)
return EmployeeDashboard(
today_open=open_entry is not None,
today_start=open_entry.start_time if open_entry else None,
today_hours_so_far=hours_today,
week_hours_worked=round(week_hours_worked, 2),
week_hours_expected=round(week_expected, 2),
week_overtime=round(week_hours_worked - week_expected, 2),
vacation_remaining_days=balance.remaining_days if balance else None,
vacation_used_days=int(balance.used_days) if balance else 0,
vacation_entitled_days=int(balance.entitled_days) if balance else 0,
pending_absences=pending_absences,
overtime_balance_hours=overtime_balance_hours,
schedule_name=schedule.name if schedule else None,
)
# ── Team Dashboard ───────────────────────────────────────────────────────
async def team_dashboard(self, current_user: User, db: AsyncSession) -> TeamDashboard:
today = date.today()
result = await db.execute(
select(User, Department.name.label("dept_name"))
.outerjoin(Department, User.department_id == Department.id)
.where(User.company_id == current_user.company_id, User.is_active == True)
)
users_with_dept = [(row[0], row[1]) for row in result.all()]
user_ids = [u.id for u, _ in users_with_dept]
if not user_ids:
return TeamDashboard(
present_count=0, on_leave_count=0, absent_count=0,
pending_time_approvals=0, pending_absence_approvals=0, members=[],
)
result = await db.execute(
select(TimeEntry).where(
TimeEntry.user_id.in_(user_ids),
TimeEntry.date == today,
)
)
today_entries: dict[UUID, TimeEntry] = {}
for entry in result.scalars().all():
if entry.user_id not in today_entries or entry.end_time is None:
today_entries[entry.user_id] = entry
result = await db.execute(
select(Absence, AbsenceType.name.label("type_name"))
.join(AbsenceType, Absence.type_id == AbsenceType.id)
.where(
Absence.user_id.in_(user_ids),
Absence.start_date <= today,
Absence.end_date >= today,
Absence.status == AbsenceStatus.APPROVED,
)
)
today_absences: dict[UUID, tuple[Absence, str]] = {}
for absence, type_name in result.all():
today_absences[absence.user_id] = (absence, type_name)
pending_time = await db.scalar(
select(func.count()).select_from(TimeEntry).where(
TimeEntry.user_id.in_(user_ids),
TimeEntry.status == EntryStatus.PENDING,
)
) or 0
pending_absence = await db.scalar(
select(func.count()).select_from(Absence).where(
Absence.user_id.in_(user_ids),
Absence.status == AbsenceStatus.PENDING,
)
) or 0
present = on_leave = absent = 0
members = []
for user, dept_name in users_with_dept:
entry = today_entries.get(user.id)
abs_data = today_absences.get(user.id)
if entry:
status = "present"; present += 1
time_in = entry.start_time
hours_today = entry.worked_hours if entry.end_time else None
elif abs_data:
status = "on_leave"; on_leave += 1
time_in = None; hours_today = None
else:
status = "absent"; absent += 1
time_in = None; hours_today = None
members.append(TeamMemberStatus(
user_id=user.id,
user_name=user.full_name,
department=dept_name,
status=status,
absence_type=abs_data[1] if abs_data else None,
time_in=time_in,
hours_today=hours_today,
))
return TeamDashboard(
present_count=present,
on_leave_count=on_leave,
absent_count=absent,
pending_time_approvals=pending_time,
pending_absence_approvals=pending_absence,
members=members,
)
# ── Company Dashboard ────────────────────────────────────────────────────
async def company_dashboard(self, current_user: User, db: AsyncSession) -> CompanyDashboard:
today = date.today()
month_start = today.replace(day=1)
users = list(await db.scalars(
select(User).where(
User.company_id == current_user.company_id,
User.is_active == True,
)
))
user_ids = [u.id for u in users]
total_employees = len(user_ids)
if not user_ids:
return CompanyDashboard(
total_employees=0, active_today=0, attendance_rate=0.0,
month_hours_worked=0.0, month_hours_expected=0.0, month_overtime=0.0,
pending_time_approvals=0, pending_absence_approvals=0, upcoming_absences=[],
)
active_today = await db.scalar(
select(func.count(distinct(TimeEntry.user_id))).where(
TimeEntry.user_id.in_(user_ids),
TimeEntry.date == today,
)
) or 0
pending_time = await db.scalar(
select(func.count()).select_from(TimeEntry).where(
TimeEntry.user_id.in_(user_ids),
TimeEntry.status == EntryStatus.PENDING,
)
) or 0
pending_absence = await db.scalar(
select(func.count()).select_from(Absence).where(
Absence.user_id.in_(user_ids),
Absence.status == AbsenceStatus.PENDING,
)
) or 0
month_entries = list(await db.scalars(
select(TimeEntry).where(
TimeEntry.user_id.in_(user_ids),
TimeEntry.date >= month_start,
TimeEntry.date <= today,
TimeEntry.end_time.isnot(None),
)
))
month_hours = sum(e.worked_hours or 0.0 for e in month_entries)
# Soll-Stunden: pro User den echten Arbeitsplan nutzen
company_schedules = list(await db.scalars(
select(WorkSchedule).where(
WorkSchedule.company_id == current_user.company_id,
WorkSchedule.valid_from <= today,
)
))
default_schedule = company_schedules[0] if company_schedules else None
month_expected = 0.0
for user in users:
sched = next(
(s for s in company_schedules if s.id == user.work_schedule_id),
default_schedule,
)
month_expected += _expected_hours(sched, month_start, today)
upcoming_end = today + timedelta(days=14)
result = await db.execute(
select(
Absence,
User.first_name.label("first_name"),
User.last_name.label("last_name"),
AbsenceType.name.label("type_name"),
)
.join(User, Absence.user_id == User.id)
.join(AbsenceType, Absence.type_id == AbsenceType.id)
.where(
Absence.user_id.in_(user_ids),
Absence.start_date >= today,
Absence.start_date <= upcoming_end,
Absence.status == AbsenceStatus.APPROVED,
)
.order_by(Absence.start_date)
)
upcoming = [
UpcomingAbsence(
user_id=absence.user_id,
user_name=f"{first_name} {last_name}",
absence_type=type_name,
start_date=absence.start_date,
end_date=absence.end_date,
working_days=float(absence.working_days),
)
for absence, first_name, last_name, type_name in result.all()
]
return CompanyDashboard(
total_employees=total_employees,
active_today=active_today,
attendance_rate=round(active_today / total_employees * 100, 1) if total_employees else 0.0,
month_hours_worked=round(month_hours, 2),
month_hours_expected=round(month_expected, 2),
month_overtime=round(month_hours - month_expected, 2),
pending_time_approvals=pending_time,
pending_absence_approvals=pending_absence,
upcoming_absences=upcoming,
)
# ── Time Report ──────────────────────────────────────────────────────────
async def time_report(
self,
company_id: UUID,
current_user: User,
db: AsyncSession,
date_from: date,
date_to: date,
user_id: UUID | None = None,
) -> TimeReport:
stmt = (
select(
TimeEntry,
User.first_name.label("first_name"),
User.last_name.label("last_name"),
User.personnel_number.label("personnel_number"),
Department.name.label("dept_name"),
)
.join(User, TimeEntry.user_id == User.id)
.outerjoin(Department, User.department_id == Department.id)
.where(
User.company_id == company_id,
TimeEntry.date >= date_from,
TimeEntry.date <= date_to,
)
.order_by(TimeEntry.date, User.last_name, User.first_name)
)
if current_user.role == UserRole.EMPLOYEE:
stmt = stmt.where(TimeEntry.user_id == current_user.id)
elif user_id:
stmt = stmt.where(TimeEntry.user_id == user_id)
# Bundesland der Firma für Feiertagskalender laden
company = await db.get(Company, company_id)
state = company.state if company else None
holidays: dict[date, tuple[str, bool]] = {}
if state:
holidays = await get_holidays_set(date_from, date_to, state, db)
result = await db.execute(stmt)
rows = []
total_hours = 0.0
for entry, first_name, last_name, personnel_number, dept_name in result.all():
wh = entry.worked_hours or 0.0
total_hours += wh
breakdown = None
if state and entry.end_time is not None:
breakdown = _categorize_hours(
entry.date, entry.start_time, entry.end_time,
entry.break_minutes, holidays,
)
rows.append(TimeReportRow(
date=entry.date,
user_id=entry.user_id,
user_name=f"{first_name} {last_name}",
personnel_number=personnel_number,
department=dept_name,
start_time=entry.start_time,
end_time=entry.end_time,
break_minutes=entry.break_minutes,
worked_hours=entry.worked_hours,
status=entry.status.value,
source=entry.source.value,
note=entry.note,
breakdown=breakdown,
))
return TimeReport(
date_from=date_from, date_to=date_to,
total_rows=len(rows), total_hours=round(total_hours, 2), rows=rows,
)
# ── Absence Report ───────────────────────────────────────────────────────
async def absence_report(
self,
company_id: UUID,
current_user: User,
db: AsyncSession,
date_from: date,
date_to: date,
user_id: UUID | None = None,
) -> AbsenceReport:
stmt = (
select(
Absence,
User.first_name.label("first_name"),
User.last_name.label("last_name"),
User.personnel_number.label("personnel_number"),
Department.name.label("dept_name"),
AbsenceType.name.label("type_name"),
)
.join(User, Absence.user_id == User.id)
.outerjoin(Department, User.department_id == Department.id)
.join(AbsenceType, Absence.type_id == AbsenceType.id)
.where(
User.company_id == company_id,
Absence.start_date <= date_to,
Absence.end_date >= date_from,
)
.order_by(Absence.start_date, User.last_name)
)
if current_user.role == UserRole.EMPLOYEE:
stmt = stmt.where(Absence.user_id == current_user.id)
elif user_id:
stmt = stmt.where(Absence.user_id == user_id)
result = await db.execute(stmt)
rows = []
total_days = 0.0
for absence, first_name, last_name, personnel_number, dept_name, type_name in result.all():
days = float(absence.working_days)
total_days += days
rows.append(AbsenceReportRow(
user_id=absence.user_id,
user_name=f"{first_name} {last_name}",
personnel_number=personnel_number,
department=dept_name,
absence_type=type_name,
start_date=absence.start_date,
end_date=absence.end_date,
working_days=days,
status=absence.status.value,
note=absence.note,
))
return AbsenceReport(
date_from=date_from, date_to=date_to,
total_rows=len(rows), total_days=total_days, rows=rows,
)
# ── Overtime Report ──────────────────────────────────────────────────────
async def overtime_report(
self,
company_id: UUID,
current_user: User,
db: AsyncSession,
date_from: date,
date_to: date,
user_id: UUID | None = None,
) -> OvertimeReport:
stmt = (
select(User, Department.name.label("dept_name"))
.outerjoin(Department, User.department_id == Department.id)
.where(User.company_id == company_id, User.is_active == True)
)
if current_user.role == UserRole.EMPLOYEE:
stmt = stmt.where(User.id == current_user.id)
elif user_id is not None:
stmt = stmt.where(User.id == user_id)
result = await db.execute(stmt)
users_with_dept = [(row[0], row[1]) for row in result.all()]
if not users_with_dept:
return OvertimeReport(
date_from=date_from, date_to=date_to,
total_employees=0, total_overtime=0.0, rows=[],
)
user_ids = [u.id for u, _ in users_with_dept]
entries = list(await db.scalars(
select(TimeEntry).where(
TimeEntry.user_id.in_(user_ids),
TimeEntry.date >= date_from,
TimeEntry.date <= date_to,
TimeEntry.end_time.isnot(None),
)
))
entries_by_user: dict[UUID, list[TimeEntry]] = defaultdict(list)
for entry in entries:
entries_by_user[entry.user_id].append(entry)
all_schedules = list(await db.scalars(
select(WorkSchedule).where(
WorkSchedule.company_id == company_id,
WorkSchedule.valid_from <= date_to,
)
))
default_schedule = all_schedules[0] if all_schedules else None
rows = []
total_overtime = 0.0
for user, dept_name in users_with_dept:
sched = next(
(s for s in all_schedules if s.id == user.work_schedule_id),
default_schedule,
)
expected = _expected_hours(sched, date_from, date_to)
hours_worked = sum(
e.worked_hours or 0.0 for e in entries_by_user.get(user.id, [])
)
overtime = hours_worked - expected
total_overtime += overtime
rows.append(OvertimeReportRow(
user_id=user.id,
user_name=user.full_name,
personnel_number=user.personnel_number,
department=dept_name,
hours_worked=round(hours_worked, 2),
hours_expected=round(expected, 2),
overtime_hours=round(overtime, 2),
))
return OvertimeReport(
date_from=date_from, date_to=date_to,
total_employees=len(rows),
total_overtime=round(total_overtime, 2),
rows=rows,
)
# ── Overtime Detail Report ────────────────────────────────────────────────
async def overtime_report_detail(
self,
company_id: UUID,
current_user: User,
db: AsyncSession,
date_from: date,
date_to: date,
user_id: UUID | None = None,
) -> OvertimeReportDetailed:
"""Erweiterter Überstundenbericht mit Wochen- und Tagesaufschlüsselung."""
stmt = (
select(User, Department.name.label("dept_name"))
.outerjoin(Department, User.department_id == Department.id)
.where(User.company_id == company_id, User.is_active == True)
)
if current_user.role == UserRole.EMPLOYEE:
stmt = stmt.where(User.id == current_user.id)
elif user_id is not None:
stmt = stmt.where(User.id == user_id)
result = await db.execute(stmt)
users_with_dept = [(row[0], row[1]) for row in result.all()]
if not users_with_dept:
return OvertimeReportDetailed(
date_from=date_from, date_to=date_to,
total_employees=0, total_overtime=0.0, rows=[],
)
user_ids = [u.id for u, _ in users_with_dept]
entries = list(await db.scalars(
select(TimeEntry).where(
TimeEntry.user_id.in_(user_ids),
TimeEntry.date >= date_from,
TimeEntry.date <= date_to,
TimeEntry.end_time.isnot(None),
)
))
entries_by_user: dict[UUID, dict[date, list[TimeEntry]]] = defaultdict(lambda: defaultdict(list))
for entry in entries:
entries_by_user[entry.user_id][entry.date].append(entry)
all_schedules = list(await db.scalars(
select(WorkSchedule).where(
WorkSchedule.company_id == company_id,
WorkSchedule.valid_from <= date_to,
)
))
default_schedule = all_schedules[0] if all_schedules else None
company = await db.get(Company, company_id)
state = company.state if company else None
holidays: dict[date, tuple[str, bool]] = {}
if state:
from app.services.holiday_service import get_holidays_set
holidays = await get_holidays_set(date_from, date_to, state, db)
weekday_short = ["Mo", "Di", "Mi", "Do", "Fr", "Sa", "So"]
rows = []
total_overtime = 0.0
for user, dept_name in users_with_dept:
sched = next((s for s in all_schedules if s.id == user.work_schedule_id), default_schedule)
daily_expected = _schedule_daily(sched)
# Wochen ermitteln (ISO)
weeks_map: dict[int, list[date]] = defaultdict(list)
current = date_from
while current <= date_to:
weeks_map[current.isocalendar()[1]].append(current)
current += timedelta(days=1)
user_weeks = []
total_worked = 0.0
total_expected_user = 0.0
arbzg_violations = 0
# Aggregierte Sonderstunden
agg_breakdown = dict(normal_hours=0.0, night_25_hours=0.0, night_40_hours=0.0,
sunday_hours=0.0, holiday_125_hours=0.0, holiday_150_hours=0.0, holiday_name=None)
for week_nr, week_days in sorted(weeks_map.items()):
week_days_list: list[OvertimeDay] = []
w_worked = w_expected = 0.0
for d in week_days:
exp = daily_expected.get(d.weekday(), 0.0)
day_entries = entries_by_user[user.id].get(d, [])
w_expected += exp
if day_entries:
day_worked = 0.0
day_entry_objs: list[DayEntry] = []
for entry in day_entries:
worked = entry.worked_hours or 0.0
day_worked += worked
warnings = _check_arbzg_day(entry)
breakdown = None
if state and entry.end_time:
breakdown = _categorize_hours(
d, entry.start_time, entry.end_time,
entry.break_minutes, holidays,
)
for k in ["normal_hours", "night_25_hours", "night_40_hours",
"sunday_hours", "holiday_125_hours", "holiday_150_hours"]:
agg_breakdown[k] += getattr(breakdown, k)
day_entry_objs.append(DayEntry(
start_time=entry.start_time,
end_time=entry.end_time,
break_minutes=entry.break_minutes,
hours_worked=round(worked, 2),
status=entry.status.value,
arbzg_warnings=warnings,
breakdown=breakdown,
))
# ArbZG-Tagesverletzung zählen (Summe aller Einträge des Tages)
if day_worked > 10:
arbzg_violations += 1
w_worked += day_worked
week_days_list.append(OvertimeDay(
date=d, weekday=weekday_short[d.weekday()],
hours_worked=round(day_worked, 2),
hours_expected=round(exp, 2),
overtime=round(day_worked - exp, 2),
entries=day_entry_objs,
))
else:
week_days_list.append(OvertimeDay(
date=d, weekday=weekday_short[d.weekday()],
hours_worked=0.0, hours_expected=round(exp, 2),
overtime=round(-exp, 2),
entries=[],
))
user_weeks.append(OvertimeWeek(
week_nr=week_nr,
week_start=week_days[0],
week_end=week_days[-1],
hours_worked=round(w_worked, 2),
hours_expected=round(w_expected, 2),
overtime=round(w_worked - w_expected, 2),
days=week_days_list,
))
total_worked += w_worked
total_expected_user += w_expected
user_overtime = total_worked - total_expected_user
total_overtime += user_overtime
special = HoursBreakdown(**agg_breakdown) if state else None
rows.append(OvertimeReportRowDetailed(
user_id=user.id,
user_name=user.full_name,
department=dept_name,
hours_worked=round(total_worked, 2),
hours_expected=round(total_expected_user, 2),
overtime_hours=round(user_overtime, 2),
weeks=user_weeks,
arbzg_violation_days=arbzg_violations,
special_hours_total=special,
))
return OvertimeReportDetailed(
date_from=date_from, date_to=date_to,
total_employees=len(rows),
total_overtime=round(total_overtime, 2),
rows=rows,
)
# ── Überstundenguthaben neu berechnen ─────────────────────────────────────
async def recalculate_overtime_balance(self, user: User, db: AsyncSession) -> OvertimeBalance:
schedule = await _load_schedule(user, db)
bal = await _recalculate_overtime_balance(user, schedule, db)
await db.commit()
return bal
# ── Export ───────────────────────────────────────────────────────────────
@staticmethod
def _pdf_base(title: str, period: str, rows_html: str, summary_html: str = "") -> str:
return f"""<!DOCTYPE html>
<html lang="de">
<head>
<meta charset="utf-8">
<style>
@page {{ margin: 1.5cm 2cm; @bottom-right {{ content: "Seite " counter(page) " von " counter(pages); font-size: 9px; color: #6b7280; }} }}
body {{ font-family: Arial, Helvetica, sans-serif; font-size: 10px; color: #111827; line-height: 1.4; }}
h1 {{ font-size: 16px; font-weight: bold; margin: 0 0 2px; color: #1e40af; }}
.period {{ font-size: 10px; color: #6b7280; margin-bottom: 12px; }}
.summary {{ display: flex; gap: 20px; margin-bottom: 14px; background: #f8fafc; border: 1px solid #e2e8f0; border-radius: 6px; padding: 10px 14px; }}
.kpi {{ }}
.kpi-label {{ font-size: 9px; color: #6b7280; text-transform: uppercase; letter-spacing: 0.05em; }}
.kpi-value {{ font-size: 14px; font-weight: bold; color: #1e293b; }}
table {{ width: 100%; border-collapse: collapse; margin-top: 4px; }}
thead th {{ background: #1e40af; color: white; padding: 6px 8px; text-align: left; font-size: 9px; text-transform: uppercase; letter-spacing: 0.04em; font-weight: 600; }}
thead th.right {{ text-align: right; }}
tbody tr:nth-child(even) {{ background: #f8fafc; }}
tbody tr.group-header {{ background: #dbeafe; font-weight: bold; }}
tbody tr.warning {{ background: #fef2f2; }}
tbody tr.sonder {{ background: #fffbeb; font-size: 9px; color: #92400e; }}
tbody tr.week-header {{ background: #eff6ff; font-weight: 600; }}
tbody tr.no-entry {{ color: #9ca3af; }}
td {{ padding: 5px 8px; border-bottom: 1px solid #f1f5f9; vertical-align: top; }}
td.right {{ text-align: right; }}
td.bold {{ font-weight: 600; }}
.plus {{ color: #16a34a; font-weight: bold; }}
.minus {{ color: #dc2626; font-weight: bold; }}
.badge {{ display: inline-block; padding: 1px 5px; border-radius: 9px; font-size: 8px; font-weight: 600; }}
.badge-green {{ background: #dcfce7; color: #15803d; }}
.badge-yellow {{ background: #fef9c3; color: #a16207; }}
.badge-red {{ background: #fee2e2; color: #b91c1c; }}
.badge-gray {{ background: #f1f5f9; color: #475569; }}
tfoot td {{ background: #1e293b; color: white; font-weight: bold; padding: 6px 8px; font-size: 10px; }}
tfoot td.right {{ text-align: right; }}
.generated {{ font-size: 8px; color: #9ca3af; margin-top: 12px; }}
</style>
</head>
<body>
<h1>{title}</h1>
<div class="period">{period}</div>
{summary_html}
{rows_html}
<div class="generated">Erstellt: {__import__('datetime').datetime.now().strftime('%d.%m.%Y %H:%M')} · TimeMaster</div>
</body>
</html>"""
def to_pdf(self, html: str) -> bytes:
from weasyprint import HTML
return HTML(string=html).write_pdf()
def time_report_to_pdf(self, report: "TimeReport", company_name: str = "") -> bytes:
def fmt_h(h: float) -> str:
hrs = int(abs(h)); mins = round((abs(h) - hrs) * 60)
return f"{'-' if h < 0 else ''}{hrs}h {mins:02d}m"
def fmt_t(t) -> str:
return str(t)[:5] if t else ""
status_label = {"pending": "Prüfung", "approved": "Genehmigt", "rejected": "Abgelehnt", "open": "Offen"}
status_cls = {"pending": "badge-yellow", "approved": "badge-green", "rejected": "badge-red", "open": "badge-yellow"}
from collections import defaultdict as dd
groups: dict = dd(list)
for r in report.rows:
groups[r.user_id].append(r)
rows_html = "<table><thead><tr><th>Datum</th><th>Mitarbeiter</th><th>Beginn</th><th>Ende</th><th class='right'>Pause</th><th class='right'>Netto</th><th>Status</th><th>Notiz</th></tr></thead><tbody>"
for rows in groups.values():
sub = sum(r.worked_hours or 0 for r in rows)
rows_html += f"<tr class='group-header'><td colspan='8'>{rows[0].user_name}{' · ' + rows[0].department if rows[0].department else ''} &nbsp;&nbsp; {fmt_h(sub)} gesamt</td></tr>"
for r in rows:
d = __import__('datetime').date.fromisoformat(str(r.date))
day = ["Mo","Di","Mi","Do","Fr","Sa","So"][d.weekday()]
rows_html += f"<tr><td>{day} {d.strftime('%d.%m.')}</td><td></td><td>{fmt_t(r.start_time)}</td><td>{fmt_t(r.end_time)}</td><td class='right'>{r.break_minutes} min</td><td class='right bold'>{fmt_h(r.worked_hours or 0)}</td><td><span class='badge {status_cls.get(r.status,'badge-gray')}'>{status_label.get(r.status, r.status)}</span></td><td>{r.note or ''}</td></tr>"
rows_html += f"</tbody><tfoot><tr><td colspan='5' class='right'>Gesamt</td><td class='right'>{fmt_h(report.total_hours)}</td><td colspan='2'></td></tr></tfoot></table>"
summary = f"<div class='summary'><div class='kpi'><div class='kpi-label'>Einträge</div><div class='kpi-value'>{report.total_rows}</div></div><div class='kpi'><div class='kpi-label'>Gesamt-Stunden</div><div class='kpi-value'>{fmt_h(report.total_hours)}</div></div><div class='kpi'><div class='kpi-label'>Mitarbeiter</div><div class='kpi-value'>{len(groups)}</div></div></div>"
period = f"{report.date_from.strftime('%d.%m.%Y')} {report.date_to.strftime('%d.%m.%Y')}"
html = self._pdf_base("Zeiterfassungsbericht", period, rows_html, summary)
return self.to_pdf(html)
def absence_report_to_pdf(self, report: "AbsenceReport") -> bytes:
from collections import defaultdict as dd
groups: dict = dd(list)
for r in report.rows:
groups[r.user_id].append(r)
status_label = {"pending": "Prüfung", "approved": "Genehmigt", "rejected": "Abgelehnt", "cancelled": "Storniert"}
status_cls = {"pending": "badge-yellow", "approved": "badge-green", "rejected": "badge-red", "cancelled": "badge-gray"}
rows_html = "<table><thead><tr><th>Mitarbeiter</th><th>Art</th><th>Von</th><th>Bis</th><th class='right'>Tage</th><th>Status</th><th>Notiz</th></tr></thead><tbody>"
for rows in groups.values():
sub = sum(r.working_days for r in rows)
rows_html += f"<tr class='group-header'><td colspan='7'>{rows[0].user_name}{' · ' + rows[0].department if rows[0].department else ''} &nbsp;&nbsp; {sub:.1f} Tage</td></tr>"
for r in rows:
rows_html += f"<tr><td></td><td>{r.absence_type}</td><td>{__import__('datetime').date.fromisoformat(str(r.start_date)).strftime('%d.%m.%Y')}</td><td>{__import__('datetime').date.fromisoformat(str(r.end_date)).strftime('%d.%m.%Y')}</td><td class='right bold'>{r.working_days}</td><td><span class='badge {status_cls.get(r.status,'badge-gray')}'>{status_label.get(r.status, r.status)}</span></td><td>{r.note or ''}</td></tr>"
rows_html += f"</tbody><tfoot><tr><td colspan='4' class='right'>Gesamt Arbeitstage</td><td class='right'>{report.total_days:.1f}</td><td colspan='2'></td></tr></tfoot></table>"
summary = f"<div class='summary'><div class='kpi'><div class='kpi-label'>Anträge</div><div class='kpi-value'>{report.total_rows}</div></div><div class='kpi'><div class='kpi-label'>Arbeitstage</div><div class='kpi-value'>{report.total_days:.1f}</div></div><div class='kpi'><div class='kpi-label'>Mitarbeiter</div><div class='kpi-value'>{len(groups)}</div></div></div>"
period = f"{report.date_from.strftime('%d.%m.%Y')} {report.date_to.strftime('%d.%m.%Y')}"
html = self._pdf_base("Abwesenheitsbericht", period, rows_html, summary)
return self.to_pdf(html)
def overtime_report_to_pdf(self, report: "OvertimeReport") -> bytes:
def fmt_h(h: float) -> str:
hrs = int(abs(h)); mins = round((abs(h) - hrs) * 60)
return f"{'-' if h < 0 else ''}{hrs}h {mins:02d}m"
rows_html = "<table><thead><tr><th>Mitarbeiter</th><th>Abteilung</th><th class='right'>Soll</th><th class='right'>Ist</th><th class='right'>Überstunden</th></tr></thead><tbody>"
for r in report.rows:
ot_cls = "plus" if r.overtime_hours > 0 else "minus" if r.overtime_hours < 0 else ""
sign = "+" if r.overtime_hours > 0 else ""
rows_html += f"<tr><td class='bold'>{r.user_name}</td><td>{r.department or ''}</td><td class='right'>{fmt_h(r.hours_expected)}</td><td class='right'>{fmt_h(r.hours_worked)}</td><td class='right'><span class='{ot_cls}'>{sign}{fmt_h(r.overtime_hours)}</span></td></tr>"
total_sign = "+" if report.total_overtime > 0 else ""
rows_html += f"</tbody><tfoot><tr><td colspan='2'></td><td class='right'>{fmt_h(sum(r.hours_expected for r in report.rows))}</td><td class='right'>{fmt_h(sum(r.hours_worked for r in report.rows))}</td><td class='right'>{total_sign}{fmt_h(report.total_overtime)}</td></tr></tfoot></table>"
summary = f"<div class='summary'><div class='kpi'><div class='kpi-label'>Mitarbeiter</div><div class='kpi-value'>{report.total_employees}</div></div><div class='kpi'><div class='kpi-label'>Gesamt-Überstunden</div><div class='kpi-value'>{'+'if report.total_overtime>0 else ''}{fmt_h(report.total_overtime)}</div></div><div class='kpi'><div class='kpi-label'>Periode</div><div class='kpi-value' style='font-size:11px'>{report.date_from.strftime('%d.%m.%Y')} {report.date_to.strftime('%d.%m.%Y')}</div></div></div>"
period = f"{report.date_from.strftime('%d.%m.%Y')} {report.date_to.strftime('%d.%m.%Y')}"
html = self._pdf_base("Überstundenbericht", period, rows_html, summary)
return self.to_pdf(html)
def to_csv(self, rows: list[dict]) -> str:
if not rows:
return ""
output = io.StringIO()
writer = csv.DictWriter(output, fieldnames=list(rows[0].keys()))
writer.writeheader()
writer.writerows(rows)
return output.getvalue()
def to_xlsx(self, rows: list[dict], sheet_name: str = "Report") -> bytes:
from openpyxl import Workbook
wb = Workbook()
ws = wb.active
ws.title = sheet_name
if rows:
ws.append(list(rows[0].keys()))
for row in rows:
ws.append([str(v) if v is not None else "" for v in row.values()])
output = io.BytesIO()
wb.save(output)
return output.getvalue()
@staticmethod
def _time_rows_to_dicts(rows: list[TimeReportRow]) -> list[dict]:
return [
{
"Datum": str(r.date), "Mitarbeiter": r.user_name,
"Personalnr": r.personnel_number or "",
"Abteilung": r.department or "", "Beginn": str(r.start_time),
"Ende": str(r.end_time) if r.end_time else "",
"Pause (Min)": r.break_minutes,
"Arbeitsstunden": r.worked_hours if r.worked_hours is not None else "",
"Status": r.status, "Quelle": r.source, "Notiz": r.note or "",
}
for r in rows
]
@staticmethod
def _absence_rows_to_dicts(rows: list[AbsenceReportRow]) -> list[dict]:
return [
{
"Mitarbeiter": r.user_name,
"Personalnr": r.personnel_number or "",
"Abteilung": r.department or "",
"Abwesenheitstyp": r.absence_type, "Von": str(r.start_date),
"Bis": str(r.end_date), "Arbeitstage": r.working_days,
"Status": r.status, "Notiz": r.note or "",
}
for r in rows
]
@staticmethod
def _overtime_rows_to_dicts(rows: list[OvertimeReportRow]) -> list[dict]:
return [
{
"Mitarbeiter": r.user_name,
"Personalnr": r.personnel_number or "",
"Abteilung": r.department or "",
"Gearbeitet (h)": r.hours_worked, "Soll (h)": r.hours_expected,
"Überstunden (h)": r.overtime_hours,
}
for r in rows
]
@staticmethod
def _overtime_detail_to_dicts(report: "OvertimeReportDetailed") -> list[dict]:
"""Flache Tabelle: eine Zeile pro Tageseintrag für CSV/Excel."""
status_de = {"approved": "Genehmigt", "pending": "Prüfung", "rejected": "Abgelehnt"}
rows = []
for user in report.rows:
for week in user.weeks:
for day in week.days:
if day.entries:
for entry in day.entries:
rows.append({
"Mitarbeiter": user.user_name,
"Abteilung": user.department or "",
"KW": week.week_nr,
"Datum": day.date.strftime("%d.%m.%Y"),
"Wochentag": day.weekday,
"Beginn": str(entry.start_time)[:5],
"Ende": str(entry.end_time)[:5],
"Pause (Min)": entry.break_minutes,
"Ist (h)": entry.hours_worked,
"Soll (h)": day.hours_expected if len(day.entries) == 1 else "",
"Diff (h)": round(day.overtime, 2) if len(day.entries) == 1 else "",
"Status": status_de.get(entry.status, entry.status),
"ArbZG-Warnungen": "; ".join(entry.arbzg_warnings),
})
else:
rows.append({
"Mitarbeiter": user.user_name,
"Abteilung": user.department or "",
"KW": week.week_nr,
"Datum": day.date.strftime("%d.%m.%Y"),
"Wochentag": day.weekday,
"Beginn": "", "Ende": "", "Pause (Min)": "",
"Ist (h)": 0,
"Soll (h)": day.hours_expected,
"Diff (h)": round(day.overtime, 2),
"Status": "",
"ArbZG-Warnungen": "",
})
return rows
def overtime_detail_to_pdf(self, report: "OvertimeReportDetailed") -> bytes:
def fmt_h(h: float) -> str:
hrs = int(abs(h)); mins = round((abs(h) - hrs) * 60)
return f"{'-' if h < 0 else ''}{hrs}h {mins:02d}m"
def fmt_t(t) -> str:
return str(t)[:5] if t else ""
status_de = {"approved": "Genehmigt", "pending": "Prüfung", "rejected": "Abgelehnt"}
status_cls = {"approved": "badge-green", "pending": "badge-yellow", "rejected": "badge-red"}
rows_html = ""
for user in report.rows:
sign = "+" if user.overtime_hours > 0 else ""
ot_cls = "plus" if user.overtime_hours > 0 else "minus" if user.overtime_hours < 0 else ""
arbzg_badge = f"&nbsp;<span class='badge badge-red'>{user.arbzg_violation_days}× ArbZG</span>" if user.arbzg_violation_days > 0 else ""
rows_html += f"<table style='margin-bottom:16px'>"
rows_html += f"<thead><tr><th colspan='9'>{user.user_name}{' · ' + user.department if user.department else ''}&nbsp;&nbsp;Soll {fmt_h(user.hours_expected)} · Ist {fmt_h(user.hours_worked)} · <span style=\"color:{'#16a34a' if user.overtime_hours >= 0 else '#dc2626'}\">{sign}{fmt_h(user.overtime_hours)}</span>{arbzg_badge}</th></tr>"
rows_html += "<tr><th>KW</th><th>Tag</th><th>Datum</th><th class='right'>Beginn</th><th class='right'>Ende</th><th class='right'>Pause</th><th class='right'>Ist</th><th class='right'>Soll</th><th class='right'>Diff</th></tr></thead><tbody>"
for week in user.weeks:
w_sign = "+" if week.overtime > 0 else ""
w_cls = "plus" if week.overtime > 0 else "minus" if week.overtime < 0 else ""
rows_html += f"<tr class='week-header'><td>KW {week.week_nr}</td><td colspan='5'>{week.week_start.strftime('%d.%m.')} {week.week_end.strftime('%d.%m.%Y')}</td><td class='right'>{fmt_h(week.hours_worked)}</td><td class='right'>{fmt_h(week.hours_expected)}</td><td class='right'><span class='{w_cls}'>{w_sign}{fmt_h(week.overtime)}</span></td></tr>"
for day in week.days:
if day.entries:
for ei, entry in enumerate(day.entries):
show_day = ei == 0
soll_str = fmt_h(day.hours_expected) if (show_day and day.hours_expected > 0) else ""
diff_str = ""
if show_day and len(day.entries) == 1:
diff_str = f"<span class='{'plus' if day.overtime > 0 else 'minus' if day.overtime < 0 else ''}'>{'+' if day.overtime > 0 else ''}{fmt_h(day.overtime)}</span>"
warn = " ".join(f"<br/><small style='color:#dc2626'>⚠ {w}</small>" for w in entry.arbzg_warnings)
rows_html += f"<tr><td>{'' if not show_day else ''}</td><td>{day.weekday if show_day else ''}</td><td>{day.date.strftime('%d.%m.') if show_day else ''}</td><td class='right'>{fmt_t(entry.start_time)}</td><td class='right'>{fmt_t(entry.end_time)}</td><td class='right'>{entry.break_minutes} min</td><td class='right bold'>{fmt_h(entry.hours_worked)}{warn}</td><td class='right'>{soll_str}</td><td class='right'>{diff_str}</td></tr>"
if len(day.entries) > 1:
d_sign = "+" if day.overtime > 0 else ""
rows_html += f"<tr style='background:#f0f9ff'><td colspan='6' class='right' style='font-size:9px;color:#64748b'>Tagessumme</td><td class='right bold'>{fmt_h(day.hours_worked)}</td><td class='right'>{fmt_h(day.hours_expected)}</td><td class='right'><span class='{'plus' if day.overtime > 0 else 'minus'}'>{d_sign}{fmt_h(day.overtime)}</span></td></tr>"
else:
rows_html += f"<tr class='no-entry'><td></td><td>{day.weekday}</td><td>{day.date.strftime('%d.%m.')}</td><td colspan='4' style='color:#d1d5db'>kein Eintrag</td><td class='right'>{fmt_h(day.hours_expected) if day.hours_expected > 0 else ''}</td><td class='right'><span class='minus'>{fmt_h(day.overtime) if day.hours_expected > 0 else ''}</span></td></tr>"
rows_html += f"</tbody></table>"
sign = "+" if report.total_overtime > 0 else ""
summary = f"<div class='summary'><div class='kpi'><div class='kpi-label'>Mitarbeiter</div><div class='kpi-value'>{report.total_employees}</div></div><div class='kpi'><div class='kpi-label'>Gesamt-Überstunden</div><div class='kpi-value'>{sign}{fmt_h(report.total_overtime)}</div></div><div class='kpi'><div class='kpi-label'>Periode</div><div class='kpi-value' style='font-size:11px'>{report.date_from.strftime('%d.%m.%Y')} {report.date_to.strftime('%d.%m.%Y')}</div></div></div>"
period = f"{report.date_from.strftime('%d.%m.%Y')} {report.date_to.strftime('%d.%m.%Y')}"
html = self._pdf_base("Überstundenbericht Detailansicht", period, rows_html, summary)
return self.to_pdf(html)
report_service = ReportService()