Files
timemaster/backend/app/routers/audit.py
T
sysops 1fedd683e0 Initial commit – TimeMaster Zeiterfassung & HR-Tool
Stand: agent-06 (Audit-Log), agent-05 (Krankmeldung), agent-07 Phase 1 (Personalnummer),
Busylight-Pull-Integration, TOTP/2FA, Abwesenheiten, Zeiterfassung, Kiosk-Grundgerüst.
Migrations 0001–0023 deployed auf 192.168.1.137 + .164.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-23 20:03:27 +02:00

120 lines
3.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""AuditLog-Endpoint nur für COMPANY_ADMIN und SUPER_ADMIN, company-isoliert."""
from datetime import datetime
from uuid import UUID
from fastapi import APIRouter, Depends, Query
from sqlalchemy import func, select
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import joinedload
from app.core.database import get_db
from app.core.dependencies import require_role
from app.models.audit_log import AuditLog
from app.models.user import User, UserRole
from app.schemas.audit_log import AuditLogEntry, AuditLogListResponse
router = APIRouter(tags=["Audit Log"])
_admin_roles = (UserRole.COMPANY_ADMIN, UserRole.SUPER_ADMIN)
@router.get("/audit-logs", response_model=AuditLogListResponse)
async def list_audit_logs(
user_id: UUID | None = Query(None),
action: str | None = Query(None),
entity_type: str | None = Query(None),
date_from: datetime | None = Query(None),
date_to: datetime | None = Query(None),
limit: int = Query(50, ge=1, le=500),
offset: int = Query(0, ge=0),
current_user: User = require_role(*_admin_roles),
db: AsyncSession = Depends(get_db),
):
base_filter = [AuditLog.company_id == current_user.company_id]
if current_user.role == UserRole.SUPER_ADMIN:
base_filter = [] # SUPER_ADMIN sieht alle Firmen
if user_id:
base_filter.append(AuditLog.user_id == user_id)
if action:
base_filter.append(AuditLog.action.ilike(f"%{action}%"))
if entity_type:
base_filter.append(AuditLog.entity_type == entity_type)
if date_from:
base_filter.append(AuditLog.created_at >= date_from)
if date_to:
base_filter.append(AuditLog.created_at <= date_to)
count_q = select(func.count()).select_from(AuditLog).where(*base_filter)
total = await db.scalar(count_q) or 0
rows_q = (
select(AuditLog, User.first_name, User.last_name)
.outerjoin(User, AuditLog.user_id == User.id)
.where(*base_filter)
.order_by(AuditLog.created_at.desc())
.limit(limit)
.offset(offset)
)
rows = (await db.execute(rows_q)).all()
items = [
AuditLogEntry(
id=log.id,
user_id=log.user_id,
user_name=f"{first} {last}".strip() if first or last else None,
action=log.action,
entity_type=log.entity_type,
entity_id=log.entity_id,
old_value=log.old_value,
new_value=log.new_value,
ip_address=log.ip,
created_at=log.created_at,
)
for log, first, last in rows
]
return AuditLogListResponse(total=total, items=items)
@router.get("/audit-logs/actions", response_model=list[str])
async def list_audit_actions(
current_user: User = require_role(*_admin_roles),
db: AsyncSession = Depends(get_db),
):
"""Alle vorhandenen Action-Werte für Filter-Dropdown."""
filter_cond = (
[] if current_user.role == UserRole.SUPER_ADMIN
else [AuditLog.company_id == current_user.company_id]
)
q = (
select(AuditLog.action)
.where(*filter_cond)
.distinct()
.order_by(AuditLog.action)
)
result = await db.execute(q)
return [r for (r,) in result.all()]
@router.get("/audit-logs/entity-types", response_model=list[str])
async def list_entity_types(
current_user: User = require_role(*_admin_roles),
db: AsyncSession = Depends(get_db),
):
"""Alle vorhandenen Entity-Typen für Filter-Dropdown."""
filter_cond = (
[AuditLog.entity_type.isnot(None)]
if current_user.role == UserRole.SUPER_ADMIN
else [AuditLog.company_id == current_user.company_id, AuditLog.entity_type.isnot(None)]
)
q = (
select(AuditLog.entity_type)
.where(*filter_cond)
.distinct()
.order_by(AuditLog.entity_type)
)
result = await db.execute(q)
return [r for (r,) in result.all()]