Files
patrick 654258f13e security: M-2 HttpOnly-Cookie + M-4 TrustedHost-Warning + M-5 TOTP-Lockout + M-7 zentraler get_client_ip()
M-2: Refresh-Token als HttpOnly SameSite=Strict Cookie
- auth.py: _set_refresh_cookie/_delete_refresh_cookie Helpers
- Alle Auth-Endpoints (login, totp/login, refresh, logout) nutzen Cookie
- schemas/auth.py: refresh_token in Request/Response optional
- AuthContext.tsx: kein refresh_token in localStorage
- api/client.ts: credentials:include, kein Token-Body beim Refresh

M-4: TrustedHostMiddleware Warning in Production
- main.py: Startup-Warning wenn is_production + kein ALLOWED_HOSTS

M-5: TOTP-Fehlversuche Redis-Lockout
- auth.py: _check/_record/_clear_totp_lockout; 5 Versuche → 15 min Sperre

M-7: Zentraler get_client_ip()-Helper
- core/dependencies.py: get_client_ip() mit X-Real-IP → X-Forwarded-For → client.host
- hours_payouts.py, absences.py, busylight.py: request.client.host ersetzt

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-26 11:25:24 +02:00

169 lines
5.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Busylight-Integration (Pull-Endpoint + Token-Verwaltung).
- Pull: GET /busylight/users Auth via per-Firma Bearer-Token (SHA-256 in DB)
- Verwaltung: POST/DELETE /companies/me/busylight-token COMPANY_ADMIN/SUPER_ADMIN
"""
from __future__ import annotations
import hashlib
import secrets
from datetime import date, datetime, timezone
from fastapi import APIRouter, Depends, HTTPException, Request, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.core.database import get_db
from app.core.dependencies import get_client_ip, require_role
from app.core.limiter import limiter
from app.models.absence import Absence, AbsenceStatus
from app.models.absence_type import AbsenceType
from app.models.audit_log import AuditLog
from app.models.company import Company
from app.models.user import User, UserRole
from app.schemas.busylight import (
BusylightAbsenceItem,
BusylightTokenRotated,
BusylightTokenStatus,
BusylightUserItem,
BusylightUsersResponse,
)
router = APIRouter(tags=["Busylight"])
_admin_roles = (UserRole.COMPANY_ADMIN, UserRole.SUPER_ADMIN)
_pull_bearer = HTTPBearer(auto_error=False)
def _hash_token(token: str) -> str:
return hashlib.sha256(token.encode("utf-8")).hexdigest()
# ── Token-Verwaltung (eingeloggter Admin) ────────────────────────────────────
@router.get("/companies/me/busylight-token", response_model=BusylightTokenStatus)
async def get_busylight_token_status(
current_user: User = require_role(*_admin_roles),
db: AsyncSession = Depends(get_db),
):
company = await db.get(Company, current_user.company_id)
return BusylightTokenStatus(
configured=company.busylight_pull_token_hash is not None,
created_at=company.busylight_token_created_at,
)
@router.post("/companies/me/busylight-token/rotate", response_model=BusylightTokenRotated)
async def rotate_busylight_token(
request: Request,
current_user: User = require_role(*_admin_roles),
db: AsyncSession = Depends(get_db),
):
company = await db.get(Company, current_user.company_id)
token = secrets.token_urlsafe(32)
company.busylight_pull_token_hash = _hash_token(token)
company.busylight_token_created_at = datetime.now(timezone.utc)
db.add(AuditLog(
company_id=company.id,
user_id=current_user.id,
action="busylight_token_rotated",
entity_type="company",
entity_id=company.id,
ip=get_client_ip(request),
))
await db.commit()
return BusylightTokenRotated(token=token, created_at=company.busylight_token_created_at)
@router.delete("/companies/me/busylight-token", status_code=204)
async def delete_busylight_token(
request: Request,
current_user: User = require_role(*_admin_roles),
db: AsyncSession = Depends(get_db),
):
company = await db.get(Company, current_user.company_id)
if company.busylight_pull_token_hash is None:
return
company.busylight_pull_token_hash = None
company.busylight_token_created_at = None
db.add(AuditLog(
company_id=company.id,
user_id=current_user.id,
action="busylight_token_revoked",
entity_type="company",
entity_id=company.id,
ip=get_client_ip(request),
))
await db.commit()
# ── Pull-Endpoint (busylight liest hier) ─────────────────────────────────────
async def _company_from_token(
credentials: HTTPAuthorizationCredentials | None,
db: AsyncSession,
) -> Company:
if credentials is None or not credentials.credentials:
raise HTTPException(status_code=401, detail="Missing token")
token_hash = _hash_token(credentials.credentials)
company = await db.scalar(
select(Company).where(Company.busylight_pull_token_hash == token_hash)
)
if company is None:
raise HTTPException(status_code=401, detail="Invalid token")
return company
@router.get("/busylight/users", response_model=BusylightUsersResponse)
@limiter.limit("60/minute")
async def list_users_for_busylight(
request: Request,
credentials: HTTPAuthorizationCredentials | None = Depends(_pull_bearer),
db: AsyncSession = Depends(get_db),
):
company = await _company_from_token(credentials, db)
today = date.today()
users = (await db.scalars(
select(User)
.where(
User.company_id == company.id,
User.is_active == True,
User.personnel_number.is_not(None),
)
.order_by(User.last_name, User.first_name)
)).all()
user_ids = [u.id for u in users]
abs_rows = []
if user_ids:
abs_rows = (await db.execute(
select(Absence, AbsenceType)
.join(AbsenceType, Absence.type_id == AbsenceType.id)
.where(
Absence.user_id.in_(user_ids),
Absence.status == AbsenceStatus.APPROVED,
Absence.start_date <= today,
Absence.end_date >= today,
)
)).all()
by_user: dict = {uid: [] for uid in user_ids}
for absence, atype in abs_rows:
by_user[absence.user_id].append(
BusylightAbsenceItem(type=atype.name, category=atype.category.value)
)
items = [
BusylightUserItem(
personnel_number=u.personnel_number,
full_name=u.full_name,
absences_today=by_user[u.id],
)
for u in users
]
return BusylightUsersResponse(date=today, users=items)