import csv
import io
from collections import defaultdict
from datetime import date, datetime, time, timedelta
from decimal import Decimal
from uuid import UUID
from sqlalchemy import distinct, func, select
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.absence import Absence, AbsenceStatus
from app.models.absence_type import AbsenceType
from app.models.company import Company
from app.models.department import Department
from app.models.overtime_balance import OvertimeBalance
from app.models.time_entry import EntryStatus, TimeEntry
from app.models.user import User, UserRole
from app.models.vacation_balance import VacationBalance
from app.models.work_schedule import WorkSchedule
from app.schemas.report import (
AbsenceReport,
AbsenceReportRow,
CompanyDashboard,
DayEntry,
EmployeeDashboard,
HoursBreakdown,
OvertimeDay,
OvertimeReport,
OvertimeReportDetailed,
OvertimeReportRow,
OvertimeReportRowDetailed,
OvertimeWeek,
TeamDashboard,
TeamMemberStatus,
TimeReport,
TimeReportRow,
UpcomingAbsence,
)
from app.services.holiday_service import get_holidays_set
# Fallback wenn kein Arbeitsplan zugewiesen
_DEFAULT_DAILY_HOURS = {0: 8.0, 1: 8.0, 2: 8.0, 3: 8.0, 4: 8.0, 5: 0.0, 6: 0.0}
# ---------------------------------------------------------------------------
# §3b EStG Stunden-Kategorisierung
# ---------------------------------------------------------------------------
def _categorize_hours(
entry_date: date,
start: time,
end: time,
break_minutes: int,
holidays: dict[date, tuple[str, bool]],
) -> HoursBreakdown:
"""
Teilt die gearbeiteten Minuten einer Schicht in §3b EStG-Kategorien auf.
Jede Minute bekommt genau eine Kategorie (höchster Zuschlag gewinnt).
Pausen werden proportional abgezogen.
"""
# Minuten-Timeline aufbauen (Ende ggf. +24h für Nachtschichten)
start_m = start.hour * 60 + start.minute
end_m = end.hour * 60 + end.minute
if end_m <= start_m:
end_m += 24 * 60 # Nachtschicht über Mitternacht
gross_minutes = end_m - start_m
if gross_minutes <= 0:
return HoursBreakdown(
normal_hours=0, night_25_hours=0, night_40_hours=0,
sunday_hours=0, holiday_125_hours=0, holiday_150_hours=0,
holiday_name=None,
)
# Pausen proportional verteilen
net_factor = max(0.0, (gross_minutes - break_minutes) / gross_minutes)
holiday_info = holidays.get(entry_date)
is_sunday = entry_date.weekday() == 6
cats = {"normal": 0, "night_25": 0, "night_40": 0, "sunday": 0,
"holiday_125": 0, "holiday_150": 0}
for offset in range(gross_minutes):
# Absolute Minute des Tages (kann >1440 sein bei Nachtschicht)
abs_min = start_m + offset
# Wochentag dieser Minute (Mitternacht kann Folgetag sein)
minute_date = entry_date + timedelta(days=abs_min // (24 * 60))
minute_mod = abs_min % (24 * 60) # Minute im Tag 0–1439
minute_is_sunday = minute_date.weekday() == 6
minute_holiday = holidays.get(minute_date)
# Kategorie bestimmen: höchster Zuschlag gewinnt
if minute_holiday and minute_holiday[1]:
cat = "holiday_150"
elif minute_holiday:
cat = "holiday_125"
elif minute_is_sunday:
cat = "sunday"
elif 0 <= minute_mod < 4 * 60: # 00:00–04:00
cat = "night_40"
elif minute_mod >= 20 * 60 or minute_mod < 6 * 60: # 20:00–24:00 + 04:00–06:00 bereits über night_40
# 04:00–06:00 oder 20:00–24:00 → 25%
cat = "night_25"
else:
cat = "normal"
cats[cat] += 1
def to_h(mins: int) -> float:
return round(mins * net_factor / 60, 2)
return HoursBreakdown(
normal_hours=to_h(cats["normal"]),
night_25_hours=to_h(cats["night_25"]),
night_40_hours=to_h(cats["night_40"]),
sunday_hours=to_h(cats["sunday"]),
holiday_125_hours=to_h(cats["holiday_125"]),
holiday_150_hours=to_h(cats["holiday_150"]),
holiday_name=holiday_info[0] if holiday_info else (
holidays.get(entry_date + timedelta(days=1), (None,))[0]
if (end.hour * 60 + end.minute) < (start.hour * 60 + start.minute) else None
),
)
def _schedule_daily(schedule: WorkSchedule | None) -> dict[int, float]:
"""Gibt ein Mapping {weekday: stunden} zurück (0=Mo … 6=So)."""
if schedule is None:
return _DEFAULT_DAILY_HOURS
return {
0: float(schedule.mon_h),
1: float(schedule.tue_h),
2: float(schedule.wed_h),
3: float(schedule.thu_h),
4: float(schedule.fri_h),
5: float(schedule.sat_h),
6: float(schedule.sun_h),
}
def _expected_hours(schedule: WorkSchedule | None, date_from: date, date_to: date) -> float:
"""Soll-Stunden im Zeitraum basierend auf dem Arbeitsplan."""
daily = _schedule_daily(schedule)
total = 0.0
current = date_from
while current <= date_to:
total += daily.get(current.weekday(), 0.0)
current += timedelta(days=1)
return total
async def _load_schedule(user: User, db: AsyncSession) -> WorkSchedule | None:
"""Lädt den Arbeitsplan des Users. Fallback: erster gültiger Firmen-Plan."""
if user.work_schedule_id:
return await db.get(WorkSchedule, user.work_schedule_id)
result = await db.scalars(
select(WorkSchedule)
.where(
WorkSchedule.company_id == user.company_id,
WorkSchedule.valid_from <= date.today(),
)
.order_by(WorkSchedule.valid_from)
.limit(1)
)
return result.first()
async def _get_or_create_overtime_balance(user: User, db: AsyncSession) -> OvertimeBalance:
bal = await db.scalar(
select(OvertimeBalance).where(OvertimeBalance.user_id == user.id)
)
if not bal:
bal = OvertimeBalance(user_id=user.id, company_id=user.company_id)
db.add(bal)
await db.flush()
return bal
async def _recalculate_overtime_balance(
user: User, schedule: WorkSchedule | None, db: AsyncSession
) -> OvertimeBalance:
"""Berechnet das Überstundenguthaben neu aus allen genehmigten Zeiteinträgen.
Sondervertretungs-Faktoren (mode=fza|both) werden berücksichtigt:
Für jeden Zeiteintrag wird geprüft ob der Tag in einem aktiven Sondervertretungs-
Zeitraum liegt; falls ja wird worked_hours mit dem Faktor multipliziert.
"""
from app.models.special_assignment import AssignmentMode, SpecialAssignment
entries = list(await db.scalars(
select(TimeEntry).where(
TimeEntry.user_id == user.id,
TimeEntry.end_time.isnot(None),
TimeEntry.status == EntryStatus.APPROVED,
)
))
bal = await _get_or_create_overtime_balance(user, db)
if not entries:
bal.total_hours = Decimal("0")
bal.last_calculated = datetime.utcnow()
return bal
# Sondervertretungs-Zuweisungen laden (nur FZA-relevante)
date_from_all = min(e.date for e in entries)
date_to_all = max(e.date for e in entries)
special_assignments = list(await db.scalars(
select(SpecialAssignment).where(
SpecialAssignment.user_id == user.id,
SpecialAssignment.mode.in_([AssignmentMode.fza, AssignmentMode.both]),
SpecialAssignment.date_from <= date_to_all,
SpecialAssignment.date_to >= date_from_all,
)
))
def _fza_factor(entry_date: date) -> float:
"""Gibt den Faktor für einen Tag zurück (1.0 wenn keine Zuweisung aktiv)."""
for sa in special_assignments:
if sa.date_from <= entry_date <= sa.date_to:
return float(sa.factor)
return 1.0
expected = _expected_hours(schedule, date_from_all, date_to_all)
worked = sum((e.worked_hours or 0.0) * _fza_factor(e.date) for e in entries)
overtime = max(0.0, worked - expected)
bal.total_hours = Decimal(str(round(overtime, 2)))
bal.last_calculated = datetime.utcnow()
return bal
def _check_arbzg_day(entry: TimeEntry) -> list[str]:
"""ArbZG-Prüfung für einen einzelnen Zeiteintrag."""
if entry.end_time is None:
return []
start_m = entry.start_time.hour * 60 + entry.start_time.minute
end_m = entry.end_time.hour * 60 + entry.end_time.minute
if end_m <= start_m:
end_m += 24 * 60
total = end_m - start_m
worked = total - entry.break_minutes
worked_h = worked / 60
warnings = []
if worked_h > 10:
warnings.append(f"Maximale Arbeitszeit von 10 Stunden überschritten ({worked_h:.1f}h) – ArbZG §3")
if total >= 9 * 60 and entry.break_minutes < 45:
warnings.append("Bei >9h Anwesenheit mind. 45 min Pause erforderlich – ArbZG §4")
elif total >= 6 * 60 and entry.break_minutes < 30:
warnings.append("Bei >6h Anwesenheit mind. 30 min Pause erforderlich – ArbZG §4")
return warnings
class ReportService:
# ── Employee Dashboard ───────────────────────────────────────────────────
async def employee_dashboard(self, user: User, db: AsyncSession) -> EmployeeDashboard:
today = date.today()
schedule = await _load_schedule(user, db)
open_entry = await db.scalar(
select(TimeEntry).where(
TimeEntry.user_id == user.id,
TimeEntry.date == today,
TimeEntry.end_time.is_(None),
)
)
week_start = today - timedelta(days=today.weekday())
week_entries = list(await db.scalars(
select(TimeEntry).where(
TimeEntry.user_id == user.id,
TimeEntry.date >= week_start,
TimeEntry.date <= today,
TimeEntry.end_time.isnot(None),
)
))
week_hours_worked = sum(e.worked_hours or 0.0 for e in week_entries)
week_expected = _expected_hours(schedule, week_start, today)
balance = await db.scalar(
select(VacationBalance).where(
VacationBalance.user_id == user.id,
VacationBalance.year == today.year,
)
)
pending_absences = await db.scalar(
select(func.count()).select_from(Absence).where(
Absence.user_id == user.id,
Absence.status == AbsenceStatus.PENDING,
)
) or 0
overtime_bal = await db.scalar(
select(OvertimeBalance).where(OvertimeBalance.user_id == user.id)
)
overtime_balance_hours = float(overtime_bal.available_hours) if overtime_bal else None
hours_today = None
if open_entry:
now = datetime.now().time()
start_mins = open_entry.start_time.hour * 60 + open_entry.start_time.minute
now_mins = now.hour * 60 + now.minute
hours_today = round(
max(0, now_mins - start_mins - open_entry.break_minutes) / 60.0, 2
)
return EmployeeDashboard(
today_open=open_entry is not None,
today_start=open_entry.start_time if open_entry else None,
today_hours_so_far=hours_today,
week_hours_worked=round(week_hours_worked, 2),
week_hours_expected=round(week_expected, 2),
week_overtime=round(week_hours_worked - week_expected, 2),
vacation_remaining_days=balance.remaining_days if balance else None,
vacation_used_days=int(balance.used_days) if balance else 0,
vacation_entitled_days=int(balance.entitled_days) if balance else 0,
pending_absences=pending_absences,
overtime_balance_hours=overtime_balance_hours,
schedule_name=schedule.name if schedule else None,
)
# ── Team Dashboard ───────────────────────────────────────────────────────
async def team_dashboard(self, current_user: User, db: AsyncSession) -> TeamDashboard:
today = date.today()
result = await db.execute(
select(User, Department.name.label("dept_name"))
.outerjoin(Department, User.department_id == Department.id)
.where(User.company_id == current_user.company_id, User.is_active == True)
)
users_with_dept = [(row[0], row[1]) for row in result.all()]
user_ids = [u.id for u, _ in users_with_dept]
if not user_ids:
return TeamDashboard(
present_count=0, on_leave_count=0, absent_count=0,
pending_time_approvals=0, pending_absence_approvals=0, members=[],
)
result = await db.execute(
select(TimeEntry).where(
TimeEntry.user_id.in_(user_ids),
TimeEntry.date == today,
)
)
today_entries: dict[UUID, TimeEntry] = {}
for entry in result.scalars().all():
if entry.user_id not in today_entries or entry.end_time is None:
today_entries[entry.user_id] = entry
result = await db.execute(
select(Absence, AbsenceType.name.label("type_name"))
.join(AbsenceType, Absence.type_id == AbsenceType.id)
.where(
Absence.user_id.in_(user_ids),
Absence.start_date <= today,
Absence.end_date >= today,
Absence.status == AbsenceStatus.APPROVED,
)
)
today_absences: dict[UUID, tuple[Absence, str]] = {}
for absence, type_name in result.all():
today_absences[absence.user_id] = (absence, type_name)
pending_time = await db.scalar(
select(func.count()).select_from(TimeEntry).where(
TimeEntry.user_id.in_(user_ids),
TimeEntry.status == EntryStatus.PENDING,
)
) or 0
pending_absence = await db.scalar(
select(func.count()).select_from(Absence).where(
Absence.user_id.in_(user_ids),
Absence.status == AbsenceStatus.PENDING,
)
) or 0
present = on_leave = absent = 0
members = []
for user, dept_name in users_with_dept:
entry = today_entries.get(user.id)
abs_data = today_absences.get(user.id)
if entry:
status = "present"; present += 1
time_in = entry.start_time
hours_today = entry.worked_hours if entry.end_time else None
elif abs_data:
status = "on_leave"; on_leave += 1
time_in = None; hours_today = None
else:
status = "absent"; absent += 1
time_in = None; hours_today = None
members.append(TeamMemberStatus(
user_id=user.id,
user_name=user.full_name,
department=dept_name,
status=status,
absence_type=abs_data[1] if abs_data else None,
time_in=time_in,
hours_today=hours_today,
))
return TeamDashboard(
present_count=present,
on_leave_count=on_leave,
absent_count=absent,
pending_time_approvals=pending_time,
pending_absence_approvals=pending_absence,
members=members,
)
# ── Company Dashboard ────────────────────────────────────────────────────
async def company_dashboard(self, current_user: User, db: AsyncSession) -> CompanyDashboard:
today = date.today()
month_start = today.replace(day=1)
users = list(await db.scalars(
select(User).where(
User.company_id == current_user.company_id,
User.is_active == True,
)
))
user_ids = [u.id for u in users]
total_employees = len(user_ids)
if not user_ids:
return CompanyDashboard(
total_employees=0, active_today=0, attendance_rate=0.0,
month_hours_worked=0.0, month_hours_expected=0.0, month_overtime=0.0,
pending_time_approvals=0, pending_absence_approvals=0, upcoming_absences=[],
)
active_today = await db.scalar(
select(func.count(distinct(TimeEntry.user_id))).where(
TimeEntry.user_id.in_(user_ids),
TimeEntry.date == today,
)
) or 0
pending_time = await db.scalar(
select(func.count()).select_from(TimeEntry).where(
TimeEntry.user_id.in_(user_ids),
TimeEntry.status == EntryStatus.PENDING,
)
) or 0
pending_absence = await db.scalar(
select(func.count()).select_from(Absence).where(
Absence.user_id.in_(user_ids),
Absence.status == AbsenceStatus.PENDING,
)
) or 0
month_entries = list(await db.scalars(
select(TimeEntry).where(
TimeEntry.user_id.in_(user_ids),
TimeEntry.date >= month_start,
TimeEntry.date <= today,
TimeEntry.end_time.isnot(None),
)
))
month_hours = sum(e.worked_hours or 0.0 for e in month_entries)
# Soll-Stunden: pro User den echten Arbeitsplan nutzen
company_schedules = list(await db.scalars(
select(WorkSchedule).where(
WorkSchedule.company_id == current_user.company_id,
WorkSchedule.valid_from <= today,
)
))
default_schedule = company_schedules[0] if company_schedules else None
month_expected = 0.0
for user in users:
sched = next(
(s for s in company_schedules if s.id == user.work_schedule_id),
default_schedule,
)
month_expected += _expected_hours(sched, month_start, today)
upcoming_end = today + timedelta(days=14)
result = await db.execute(
select(
Absence,
User.first_name.label("first_name"),
User.last_name.label("last_name"),
AbsenceType.name.label("type_name"),
)
.join(User, Absence.user_id == User.id)
.join(AbsenceType, Absence.type_id == AbsenceType.id)
.where(
Absence.user_id.in_(user_ids),
Absence.start_date >= today,
Absence.start_date <= upcoming_end,
Absence.status == AbsenceStatus.APPROVED,
)
.order_by(Absence.start_date)
)
upcoming = [
UpcomingAbsence(
user_id=absence.user_id,
user_name=f"{first_name} {last_name}",
absence_type=type_name,
start_date=absence.start_date,
end_date=absence.end_date,
working_days=float(absence.working_days),
)
for absence, first_name, last_name, type_name in result.all()
]
return CompanyDashboard(
total_employees=total_employees,
active_today=active_today,
attendance_rate=round(active_today / total_employees * 100, 1) if total_employees else 0.0,
month_hours_worked=round(month_hours, 2),
month_hours_expected=round(month_expected, 2),
month_overtime=round(month_hours - month_expected, 2),
pending_time_approvals=pending_time,
pending_absence_approvals=pending_absence,
upcoming_absences=upcoming,
)
# ── Time Report ──────────────────────────────────────────────────────────
async def time_report(
self,
company_id: UUID,
current_user: User,
db: AsyncSession,
date_from: date,
date_to: date,
user_id: UUID | None = None,
) -> TimeReport:
stmt = (
select(
TimeEntry,
User.first_name.label("first_name"),
User.last_name.label("last_name"),
User.personnel_number.label("personnel_number"),
Department.name.label("dept_name"),
)
.join(User, TimeEntry.user_id == User.id)
.outerjoin(Department, User.department_id == Department.id)
.where(
User.company_id == company_id,
TimeEntry.date >= date_from,
TimeEntry.date <= date_to,
)
.order_by(TimeEntry.date, User.last_name, User.first_name)
)
if current_user.role == UserRole.EMPLOYEE:
stmt = stmt.where(TimeEntry.user_id == current_user.id)
elif user_id:
stmt = stmt.where(TimeEntry.user_id == user_id)
# Bundesland der Firma für Feiertagskalender laden
company = await db.get(Company, company_id)
state = company.state if company else None
holidays: dict[date, tuple[str, bool]] = {}
if state:
holidays = await get_holidays_set(date_from, date_to, state, db)
result = await db.execute(stmt)
rows = []
total_hours = 0.0
for entry, first_name, last_name, personnel_number, dept_name in result.all():
wh = entry.worked_hours or 0.0
total_hours += wh
breakdown = None
if state and entry.end_time is not None:
breakdown = _categorize_hours(
entry.date, entry.start_time, entry.end_time,
entry.break_minutes, holidays,
)
rows.append(TimeReportRow(
date=entry.date,
user_id=entry.user_id,
user_name=f"{first_name} {last_name}",
personnel_number=personnel_number,
department=dept_name,
start_time=entry.start_time,
end_time=entry.end_time,
break_minutes=entry.break_minutes,
worked_hours=entry.worked_hours,
status=entry.status.value,
source=entry.source.value,
note=entry.note,
breakdown=breakdown,
))
return TimeReport(
date_from=date_from, date_to=date_to,
total_rows=len(rows), total_hours=round(total_hours, 2), rows=rows,
)
# ── Absence Report ───────────────────────────────────────────────────────
async def absence_report(
self,
company_id: UUID,
current_user: User,
db: AsyncSession,
date_from: date,
date_to: date,
user_id: UUID | None = None,
) -> AbsenceReport:
stmt = (
select(
Absence,
User.first_name.label("first_name"),
User.last_name.label("last_name"),
User.personnel_number.label("personnel_number"),
Department.name.label("dept_name"),
AbsenceType.name.label("type_name"),
)
.join(User, Absence.user_id == User.id)
.outerjoin(Department, User.department_id == Department.id)
.join(AbsenceType, Absence.type_id == AbsenceType.id)
.where(
User.company_id == company_id,
Absence.start_date <= date_to,
Absence.end_date >= date_from,
)
.order_by(Absence.start_date, User.last_name)
)
if current_user.role == UserRole.EMPLOYEE:
stmt = stmt.where(Absence.user_id == current_user.id)
elif user_id:
stmt = stmt.where(Absence.user_id == user_id)
result = await db.execute(stmt)
rows = []
total_days = 0.0
for absence, first_name, last_name, personnel_number, dept_name, type_name in result.all():
days = float(absence.working_days)
total_days += days
rows.append(AbsenceReportRow(
user_id=absence.user_id,
user_name=f"{first_name} {last_name}",
personnel_number=personnel_number,
department=dept_name,
absence_type=type_name,
start_date=absence.start_date,
end_date=absence.end_date,
working_days=days,
status=absence.status.value,
note=absence.note,
))
return AbsenceReport(
date_from=date_from, date_to=date_to,
total_rows=len(rows), total_days=total_days, rows=rows,
)
# ── Overtime Report ──────────────────────────────────────────────────────
async def overtime_report(
self,
company_id: UUID,
current_user: User,
db: AsyncSession,
date_from: date,
date_to: date,
user_id: UUID | None = None,
) -> OvertimeReport:
stmt = (
select(User, Department.name.label("dept_name"))
.outerjoin(Department, User.department_id == Department.id)
.where(User.company_id == company_id, User.is_active == True)
)
if current_user.role == UserRole.EMPLOYEE:
stmt = stmt.where(User.id == current_user.id)
elif user_id is not None:
stmt = stmt.where(User.id == user_id)
result = await db.execute(stmt)
users_with_dept = [(row[0], row[1]) for row in result.all()]
if not users_with_dept:
return OvertimeReport(
date_from=date_from, date_to=date_to,
total_employees=0, total_overtime=0.0, rows=[],
)
user_ids = [u.id for u, _ in users_with_dept]
entries = list(await db.scalars(
select(TimeEntry).where(
TimeEntry.user_id.in_(user_ids),
TimeEntry.date >= date_from,
TimeEntry.date <= date_to,
TimeEntry.end_time.isnot(None),
)
))
entries_by_user: dict[UUID, list[TimeEntry]] = defaultdict(list)
for entry in entries:
entries_by_user[entry.user_id].append(entry)
all_schedules = list(await db.scalars(
select(WorkSchedule).where(
WorkSchedule.company_id == company_id,
WorkSchedule.valid_from <= date_to,
)
))
default_schedule = all_schedules[0] if all_schedules else None
rows = []
total_overtime = 0.0
for user, dept_name in users_with_dept:
sched = next(
(s for s in all_schedules if s.id == user.work_schedule_id),
default_schedule,
)
expected = _expected_hours(sched, date_from, date_to)
hours_worked = sum(
e.worked_hours or 0.0 for e in entries_by_user.get(user.id, [])
)
overtime = hours_worked - expected
total_overtime += overtime
rows.append(OvertimeReportRow(
user_id=user.id,
user_name=user.full_name,
personnel_number=user.personnel_number,
department=dept_name,
hours_worked=round(hours_worked, 2),
hours_expected=round(expected, 2),
overtime_hours=round(overtime, 2),
))
return OvertimeReport(
date_from=date_from, date_to=date_to,
total_employees=len(rows),
total_overtime=round(total_overtime, 2),
rows=rows,
)
# ── Overtime Detail Report ────────────────────────────────────────────────
async def overtime_report_detail(
self,
company_id: UUID,
current_user: User,
db: AsyncSession,
date_from: date,
date_to: date,
user_id: UUID | None = None,
) -> OvertimeReportDetailed:
"""Erweiterter Überstundenbericht mit Wochen- und Tagesaufschlüsselung."""
stmt = (
select(User, Department.name.label("dept_name"))
.outerjoin(Department, User.department_id == Department.id)
.where(User.company_id == company_id, User.is_active == True)
)
if current_user.role == UserRole.EMPLOYEE:
stmt = stmt.where(User.id == current_user.id)
elif user_id is not None:
stmt = stmt.where(User.id == user_id)
result = await db.execute(stmt)
users_with_dept = [(row[0], row[1]) for row in result.all()]
if not users_with_dept:
return OvertimeReportDetailed(
date_from=date_from, date_to=date_to,
total_employees=0, total_overtime=0.0, rows=[],
)
user_ids = [u.id for u, _ in users_with_dept]
entries = list(await db.scalars(
select(TimeEntry).where(
TimeEntry.user_id.in_(user_ids),
TimeEntry.date >= date_from,
TimeEntry.date <= date_to,
TimeEntry.end_time.isnot(None),
)
))
entries_by_user: dict[UUID, dict[date, list[TimeEntry]]] = defaultdict(lambda: defaultdict(list))
for entry in entries:
entries_by_user[entry.user_id][entry.date].append(entry)
all_schedules = list(await db.scalars(
select(WorkSchedule).where(
WorkSchedule.company_id == company_id,
WorkSchedule.valid_from <= date_to,
)
))
default_schedule = all_schedules[0] if all_schedules else None
company = await db.get(Company, company_id)
state = company.state if company else None
holidays: dict[date, tuple[str, bool]] = {}
if state:
from app.services.holiday_service import get_holidays_set
holidays = await get_holidays_set(date_from, date_to, state, db)
weekday_short = ["Mo", "Di", "Mi", "Do", "Fr", "Sa", "So"]
rows = []
total_overtime = 0.0
for user, dept_name in users_with_dept:
sched = next((s for s in all_schedules if s.id == user.work_schedule_id), default_schedule)
daily_expected = _schedule_daily(sched)
# Wochen ermitteln (ISO)
weeks_map: dict[int, list[date]] = defaultdict(list)
current = date_from
while current <= date_to:
weeks_map[current.isocalendar()[1]].append(current)
current += timedelta(days=1)
user_weeks = []
total_worked = 0.0
total_expected_user = 0.0
arbzg_violations = 0
# Aggregierte Sonderstunden
agg_breakdown = dict(normal_hours=0.0, night_25_hours=0.0, night_40_hours=0.0,
sunday_hours=0.0, holiday_125_hours=0.0, holiday_150_hours=0.0, holiday_name=None)
for week_nr, week_days in sorted(weeks_map.items()):
week_days_list: list[OvertimeDay] = []
w_worked = w_expected = 0.0
for d in week_days:
exp = daily_expected.get(d.weekday(), 0.0)
day_entries = entries_by_user[user.id].get(d, [])
w_expected += exp
if day_entries:
day_worked = 0.0
day_entry_objs: list[DayEntry] = []
for entry in day_entries:
worked = entry.worked_hours or 0.0
day_worked += worked
warnings = _check_arbzg_day(entry)
breakdown = None
if state and entry.end_time:
breakdown = _categorize_hours(
d, entry.start_time, entry.end_time,
entry.break_minutes, holidays,
)
for k in ["normal_hours", "night_25_hours", "night_40_hours",
"sunday_hours", "holiday_125_hours", "holiday_150_hours"]:
agg_breakdown[k] += getattr(breakdown, k)
day_entry_objs.append(DayEntry(
start_time=entry.start_time,
end_time=entry.end_time,
break_minutes=entry.break_minutes,
hours_worked=round(worked, 2),
status=entry.status.value,
arbzg_warnings=warnings,
breakdown=breakdown,
))
# ArbZG-Tagesverletzung zählen (Summe aller Einträge des Tages)
if day_worked > 10:
arbzg_violations += 1
w_worked += day_worked
week_days_list.append(OvertimeDay(
date=d, weekday=weekday_short[d.weekday()],
hours_worked=round(day_worked, 2),
hours_expected=round(exp, 2),
overtime=round(day_worked - exp, 2),
entries=day_entry_objs,
))
else:
week_days_list.append(OvertimeDay(
date=d, weekday=weekday_short[d.weekday()],
hours_worked=0.0, hours_expected=round(exp, 2),
overtime=round(-exp, 2),
entries=[],
))
user_weeks.append(OvertimeWeek(
week_nr=week_nr,
week_start=week_days[0],
week_end=week_days[-1],
hours_worked=round(w_worked, 2),
hours_expected=round(w_expected, 2),
overtime=round(w_worked - w_expected, 2),
days=week_days_list,
))
total_worked += w_worked
total_expected_user += w_expected
user_overtime = total_worked - total_expected_user
total_overtime += user_overtime
special = HoursBreakdown(**agg_breakdown) if state else None
rows.append(OvertimeReportRowDetailed(
user_id=user.id,
user_name=user.full_name,
department=dept_name,
hours_worked=round(total_worked, 2),
hours_expected=round(total_expected_user, 2),
overtime_hours=round(user_overtime, 2),
weeks=user_weeks,
arbzg_violation_days=arbzg_violations,
special_hours_total=special,
))
return OvertimeReportDetailed(
date_from=date_from, date_to=date_to,
total_employees=len(rows),
total_overtime=round(total_overtime, 2),
rows=rows,
)
# ── Überstundenguthaben neu berechnen ─────────────────────────────────────
async def recalculate_overtime_balance(self, user: User, db: AsyncSession) -> OvertimeBalance:
schedule = await _load_schedule(user, db)
bal = await _recalculate_overtime_balance(user, schedule, db)
await db.commit()
return bal
# ── Export ───────────────────────────────────────────────────────────────
@staticmethod
def _pdf_base(title: str, period: str, rows_html: str, summary_html: str = "") -> str:
return f"""
{title}
{period}
{summary_html}
{rows_html}
Erstellt: {__import__('datetime').datetime.now().strftime('%d.%m.%Y %H:%M')} · TimeMaster
"""
def to_pdf(self, html: str) -> bytes:
from weasyprint import HTML
return HTML(string=html).write_pdf()
def time_report_to_pdf(self, report: "TimeReport", company_name: str = "") -> bytes:
def fmt_h(h: float) -> str:
hrs = int(abs(h)); mins = round((abs(h) - hrs) * 60)
return f"{'-' if h < 0 else ''}{hrs}h {mins:02d}m"
def fmt_t(t) -> str:
return str(t)[:5] if t else "–"
status_label = {"pending": "Prüfung", "approved": "Genehmigt", "rejected": "Abgelehnt", "open": "Offen"}
status_cls = {"pending": "badge-yellow", "approved": "badge-green", "rejected": "badge-red", "open": "badge-yellow"}
from collections import defaultdict as dd
groups: dict = dd(list)
for r in report.rows:
groups[r.user_id].append(r)
rows_html = "| Datum | Mitarbeiter | Beginn | Ende | Pause | Netto | Status | Notiz |
"
for rows in groups.values():
sub = sum(r.worked_hours or 0 for r in rows)
rows_html += f""
for r in rows:
d = __import__('datetime').date.fromisoformat(str(r.date))
day = ["Mo","Di","Mi","Do","Fr","Sa","So"][d.weekday()]
rows_html += f"| {day} {d.strftime('%d.%m.')} | | {fmt_t(r.start_time)} | {fmt_t(r.end_time)} | {r.break_minutes} min | {fmt_h(r.worked_hours or 0)} | {status_label.get(r.status, r.status)} | {r.note or ''} |
"
rows_html += f"| Gesamt | {fmt_h(report.total_hours)} | |
"
summary = f"Einträge
{report.total_rows}
Gesamt-Stunden
{fmt_h(report.total_hours)}
"
period = f"{report.date_from.strftime('%d.%m.%Y')} – {report.date_to.strftime('%d.%m.%Y')}"
html = self._pdf_base("Zeiterfassungsbericht", period, rows_html, summary)
return self.to_pdf(html)
def absence_report_to_pdf(self, report: "AbsenceReport") -> bytes:
from collections import defaultdict as dd
groups: dict = dd(list)
for r in report.rows:
groups[r.user_id].append(r)
status_label = {"pending": "Prüfung", "approved": "Genehmigt", "rejected": "Abgelehnt", "cancelled": "Storniert"}
status_cls = {"pending": "badge-yellow", "approved": "badge-green", "rejected": "badge-red", "cancelled": "badge-gray"}
rows_html = "| Mitarbeiter | Art | Von | Bis | Tage | Status | Notiz |
"
for rows in groups.values():
sub = sum(r.working_days for r in rows)
rows_html += f""
for r in rows:
rows_html += f" | {r.absence_type} | {__import__('datetime').date.fromisoformat(str(r.start_date)).strftime('%d.%m.%Y')} | {__import__('datetime').date.fromisoformat(str(r.end_date)).strftime('%d.%m.%Y')} | {r.working_days} | {status_label.get(r.status, r.status)} | {r.note or ''} |
"
rows_html += f"| Gesamt Arbeitstage | {report.total_days:.1f} | |
"
summary = f"Anträge
{report.total_rows}
Arbeitstage
{report.total_days:.1f}
"
period = f"{report.date_from.strftime('%d.%m.%Y')} – {report.date_to.strftime('%d.%m.%Y')}"
html = self._pdf_base("Abwesenheitsbericht", period, rows_html, summary)
return self.to_pdf(html)
def overtime_report_to_pdf(self, report: "OvertimeReport") -> bytes:
def fmt_h(h: float) -> str:
hrs = int(abs(h)); mins = round((abs(h) - hrs) * 60)
return f"{'-' if h < 0 else ''}{hrs}h {mins:02d}m"
rows_html = "| Mitarbeiter | Abteilung | Soll | Ist | Überstunden |
"
for r in report.rows:
ot_cls = "plus" if r.overtime_hours > 0 else "minus" if r.overtime_hours < 0 else ""
sign = "+" if r.overtime_hours > 0 else ""
rows_html += f"| {r.user_name} | {r.department or '–'} | {fmt_h(r.hours_expected)} | {fmt_h(r.hours_worked)} | {sign}{fmt_h(r.overtime_hours)} |
"
total_sign = "+" if report.total_overtime > 0 else ""
rows_html += f" | {fmt_h(sum(r.hours_expected for r in report.rows))} | {fmt_h(sum(r.hours_worked for r in report.rows))} | {total_sign}{fmt_h(report.total_overtime)} |
"
summary = f"Mitarbeiter
{report.total_employees}
Gesamt-Überstunden
{'+'if report.total_overtime>0 else ''}{fmt_h(report.total_overtime)}
Periode
{report.date_from.strftime('%d.%m.%Y')} – {report.date_to.strftime('%d.%m.%Y')}
"
period = f"{report.date_from.strftime('%d.%m.%Y')} – {report.date_to.strftime('%d.%m.%Y')}"
html = self._pdf_base("Überstundenbericht", period, rows_html, summary)
return self.to_pdf(html)
def to_csv(self, rows: list[dict]) -> str:
if not rows:
return ""
output = io.StringIO()
writer = csv.DictWriter(output, fieldnames=list(rows[0].keys()))
writer.writeheader()
writer.writerows(rows)
return output.getvalue()
def to_xlsx(self, rows: list[dict], sheet_name: str = "Report") -> bytes:
from openpyxl import Workbook
wb = Workbook()
ws = wb.active
ws.title = sheet_name
if rows:
ws.append(list(rows[0].keys()))
for row in rows:
ws.append([str(v) if v is not None else "" for v in row.values()])
output = io.BytesIO()
wb.save(output)
return output.getvalue()
@staticmethod
def _time_rows_to_dicts(rows: list[TimeReportRow]) -> list[dict]:
return [
{
"Datum": str(r.date), "Mitarbeiter": r.user_name,
"Personalnr": r.personnel_number or "",
"Abteilung": r.department or "", "Beginn": str(r.start_time),
"Ende": str(r.end_time) if r.end_time else "",
"Pause (Min)": r.break_minutes,
"Arbeitsstunden": r.worked_hours if r.worked_hours is not None else "",
"Status": r.status, "Quelle": r.source, "Notiz": r.note or "",
}
for r in rows
]
@staticmethod
def _absence_rows_to_dicts(rows: list[AbsenceReportRow]) -> list[dict]:
return [
{
"Mitarbeiter": r.user_name,
"Personalnr": r.personnel_number or "",
"Abteilung": r.department or "",
"Abwesenheitstyp": r.absence_type, "Von": str(r.start_date),
"Bis": str(r.end_date), "Arbeitstage": r.working_days,
"Status": r.status, "Notiz": r.note or "",
}
for r in rows
]
@staticmethod
def _overtime_rows_to_dicts(rows: list[OvertimeReportRow]) -> list[dict]:
return [
{
"Mitarbeiter": r.user_name,
"Personalnr": r.personnel_number or "",
"Abteilung": r.department or "",
"Gearbeitet (h)": r.hours_worked, "Soll (h)": r.hours_expected,
"Überstunden (h)": r.overtime_hours,
}
for r in rows
]
@staticmethod
def _overtime_detail_to_dicts(report: "OvertimeReportDetailed") -> list[dict]:
"""Flache Tabelle: eine Zeile pro Tageseintrag für CSV/Excel."""
status_de = {"approved": "Genehmigt", "pending": "Prüfung", "rejected": "Abgelehnt"}
rows = []
for user in report.rows:
for week in user.weeks:
for day in week.days:
if day.entries:
for entry in day.entries:
rows.append({
"Mitarbeiter": user.user_name,
"Abteilung": user.department or "",
"KW": week.week_nr,
"Datum": day.date.strftime("%d.%m.%Y"),
"Wochentag": day.weekday,
"Beginn": str(entry.start_time)[:5],
"Ende": str(entry.end_time)[:5],
"Pause (Min)": entry.break_minutes,
"Ist (h)": entry.hours_worked,
"Soll (h)": day.hours_expected if len(day.entries) == 1 else "",
"Diff (h)": round(day.overtime, 2) if len(day.entries) == 1 else "",
"Status": status_de.get(entry.status, entry.status),
"ArbZG-Warnungen": "; ".join(entry.arbzg_warnings),
})
else:
rows.append({
"Mitarbeiter": user.user_name,
"Abteilung": user.department or "",
"KW": week.week_nr,
"Datum": day.date.strftime("%d.%m.%Y"),
"Wochentag": day.weekday,
"Beginn": "", "Ende": "", "Pause (Min)": "",
"Ist (h)": 0,
"Soll (h)": day.hours_expected,
"Diff (h)": round(day.overtime, 2),
"Status": "–",
"ArbZG-Warnungen": "",
})
return rows
def overtime_detail_to_pdf(self, report: "OvertimeReportDetailed") -> bytes:
def fmt_h(h: float) -> str:
hrs = int(abs(h)); mins = round((abs(h) - hrs) * 60)
return f"{'-' if h < 0 else ''}{hrs}h {mins:02d}m"
def fmt_t(t) -> str:
return str(t)[:5] if t else "–"
status_de = {"approved": "Genehmigt", "pending": "Prüfung", "rejected": "Abgelehnt"}
status_cls = {"approved": "badge-green", "pending": "badge-yellow", "rejected": "badge-red"}
rows_html = ""
for user in report.rows:
sign = "+" if user.overtime_hours > 0 else ""
ot_cls = "plus" if user.overtime_hours > 0 else "minus" if user.overtime_hours < 0 else ""
arbzg_badge = f" {user.arbzg_violation_days}× ArbZG" if user.arbzg_violation_days > 0 else ""
rows_html += f""
rows_html += f"| {user.user_name}{' · ' + user.department if user.department else ''} Soll {fmt_h(user.hours_expected)} · Ist {fmt_h(user.hours_worked)} · = 0 else '#dc2626'}\">{sign}{fmt_h(user.overtime_hours)}{arbzg_badge} |
"
rows_html += "| KW | Tag | Datum | Beginn | Ende | Pause | Ist | Soll | Diff |
"
for week in user.weeks:
w_sign = "+" if week.overtime > 0 else ""
w_cls = "plus" if week.overtime > 0 else "minus" if week.overtime < 0 else ""
rows_html += f""
for day in week.days:
if day.entries:
for ei, entry in enumerate(day.entries):
show_day = ei == 0
soll_str = fmt_h(day.hours_expected) if (show_day and day.hours_expected > 0) else ""
diff_str = ""
if show_day and len(day.entries) == 1:
diff_str = f" 0 else 'minus' if day.overtime < 0 else ''}'>{'+' if day.overtime > 0 else ''}{fmt_h(day.overtime)}"
warn = " ".join(f"
⚠ {w}" for w in entry.arbzg_warnings)
rows_html += f"| {'↳' if not show_day else ''} | {day.weekday if show_day else ''} | {day.date.strftime('%d.%m.') if show_day else ''} | {fmt_t(entry.start_time)} | {fmt_t(entry.end_time)} | {entry.break_minutes} min | {fmt_h(entry.hours_worked)}{warn} | {soll_str} | {diff_str} |
"
if len(day.entries) > 1:
d_sign = "+" if day.overtime > 0 else ""
rows_html += f"| Tagessumme | {fmt_h(day.hours_worked)} | {fmt_h(day.hours_expected)} | 0 else 'minus'}'>{d_sign}{fmt_h(day.overtime)} |
"
else:
rows_html += f" | {day.weekday} | {day.date.strftime('%d.%m.')} | kein Eintrag | {fmt_h(day.hours_expected) if day.hours_expected > 0 else '–'} | {fmt_h(day.overtime) if day.hours_expected > 0 else ''} |
"
rows_html += f"
"
sign = "+" if report.total_overtime > 0 else ""
summary = f"Mitarbeiter
{report.total_employees}
Gesamt-Überstunden
{sign}{fmt_h(report.total_overtime)}
Periode
{report.date_from.strftime('%d.%m.%Y')} – {report.date_to.strftime('%d.%m.%Y')}
"
period = f"{report.date_from.strftime('%d.%m.%Y')} – {report.date_to.strftime('%d.%m.%Y')}"
html = self._pdf_base("Überstundenbericht – Detailansicht", period, rows_html, summary)
return self.to_pdf(html)
report_service = ReportService()