"""Feiertags-Service: berechnet und befüllt deutsche Feiertage per Bundesland.""" from __future__ import annotations import uuid from datetime import date, timedelta from sqlalchemy import delete, select from sqlalchemy.ext.asyncio import AsyncSession from app.models.public_holiday import PublicHoliday # --------------------------------------------------------------------------- # Osterformel (Gauß/Anonymous) # --------------------------------------------------------------------------- def _easter(year: int) -> date: """Berechnet Ostersonntag nach der anonymen Gregorianischen Osterformel.""" a = year % 19 b = year // 100 c = year % 100 d = b // 4 e = b % 4 f = (b + 8) // 25 g = (b - f + 1) // 3 h = (19 * a + b - d - g + 15) % 30 i = c // 4 k = c % 4 l = (32 + 2 * e + 2 * i - h - k) % 7 m = (a + 11 * h + 22 * l) // 451 month = (h + l - 7 * m + 114) // 31 day = ((h + l - 7 * m + 114) % 31) + 1 return date(year, month, day) # --------------------------------------------------------------------------- # Feiertage berechnen # --------------------------------------------------------------------------- def _holidays_for_state(year: int, state: str) -> list[tuple[date, str, bool]]: """ Gibt Liste von (date, name, is_high_rate) für das Bundesland zurück. is_high_rate = True → 150% Zuschlag nach §3b EStG """ easter = _easter(year) holidays: list[tuple[date, str, bool]] = [] def add(d: date, name: str, high: bool = False) -> None: holidays.append((d, name, high)) # ── Bundesweit gültige Feiertage ──────────────────────────────────────── add(date(year, 1, 1), "Neujahr") add(easter - timedelta(days=2), "Karfreitag") add(easter, "Ostersonntag") add(easter + timedelta(days=1), "Ostermontag") add(date(year, 5, 1), "Tag der Arbeit", high=True) add(easter + timedelta(days=39), "Christi Himmelfahrt") add(easter + timedelta(days=49), "Pfingstsonntag") add(easter + timedelta(days=50), "Pfingstmontag") add(date(year, 10, 3), "Tag der Deutschen Einheit") add(date(year, 12, 25), "1. Weihnachtstag", high=True) add(date(year, 12, 26), "2. Weihnachtstag", high=True) # ── Heilige Drei Könige: BY, BW, ST ──────────────────────────────────── if state in ("BY", "BW", "ST"): add(date(year, 1, 6), "Heilige Drei Könige") # ── Frauentag: BE (ab 2019) ───────────────────────────────────────────── if state == "BE" and year >= 2019: add(date(year, 3, 8), "Internationaler Frauentag") # ── Gründonnerstag: BY (nur Schulen) – nicht als gesetzlicher Feiertag # ── Fronleichnam: BW, BY, HE, NW, RP, SL (+ Teile ST, TH) ───────────── if state in ("BW", "BY", "HE", "NW", "RP", "SL"): add(easter + timedelta(days=60), "Fronleichnam") # ── Mariä Himmelfahrt: BY (kath. Gemeinden), SL ───────────────────────── if state in ("BY", "SL"): add(date(year, 8, 15), "Mariä Himmelfahrt") # ── Weltkindertag: TH (ab 2019) ───────────────────────────────────────── if state == "TH" and year >= 2019: add(date(year, 9, 20), "Weltkindertag") # ── Reformationstag: BB, HB, HH, MV, NI, SH, SN, ST, TH ─────────────── if state in ("BB", "HB", "HH", "MV", "NI", "SH", "SN", "ST", "TH"): add(date(year, 10, 31), "Reformationstag") # ── Allerheiligen: BW, BY, NW, RP, SL ─────────────────────────────────── if state in ("BW", "BY", "NW", "RP", "SL"): add(date(year, 11, 1), "Allerheiligen") # ── Buß- und Bettag: SN ────────────────────────────────────────────────── if state == "SN": # Mittwoch vor dem 23. November nov23 = date(year, 11, 23) bbt = nov23 - timedelta(days=(nov23.weekday() + 3) % 7 + 1) if bbt.weekday() != 2: # Fallback: letzter Mittwoch vor 23.11. bbt = nov23 - timedelta(days=(nov23.weekday() - 2) % 7 + 7) add(bbt, "Buß- und Bettag") return holidays # --------------------------------------------------------------------------- # DB-Funktionen # --------------------------------------------------------------------------- async def ensure_holidays_for_year(year: int, state: str, db: AsyncSession) -> int: """ Stellt sicher dass Feiertage für (year, state) in der DB vorhanden sind. Löscht ggf. alte Einträge und schreibt neu. Gibt Anzahl geschriebener Einträge zurück. """ # Löschen falls schon vorhanden (refresh) await db.execute( delete(PublicHoliday).where( PublicHoliday.country == "DE", PublicHoliday.state == state, PublicHoliday.year == year, ) ) holidays = _holidays_for_state(year, state) for d, name, high in holidays: db.add(PublicHoliday( id=uuid.uuid4(), country="DE", state=state, date=d, name=name, year=year, is_high_rate=high, )) await db.flush() return len(holidays) async def get_holidays_set( date_from: date, date_to: date, state: str, db: AsyncSession, ) -> dict[date, tuple[str, bool]]: """ Gibt dict {date: (name, is_high_rate)} für den Zeitraum zurück. Befüllt fehlende Jahre automatisch. """ years = set(range(date_from.year, date_to.year + 1)) # Prüfen welche Jahre schon in der DB sind existing_years_q = await db.execute( select(PublicHoliday.year).where( PublicHoliday.country == "DE", PublicHoliday.state == state, ).distinct() ) existing_years = {r[0] for r in existing_years_q.all()} for year in years - existing_years: await ensure_holidays_for_year(year, state, db) result_q = await db.execute( select(PublicHoliday).where( PublicHoliday.country == "DE", PublicHoliday.state == state, PublicHoliday.date >= date_from, PublicHoliday.date <= date_to, ) ) return {h.date: (h.name, h.is_high_rate) for h in result_q.scalars().all()}