Files
timemaster/backend/cli.py
T
patrick e83a3fbbdd fix: agent-08 Kiosk-Härtung + 24h-Zeiteintrag-Bug
- fix: worked_minutes nutzt jetzt Sekunden statt Minuten für Overnight-Vergleich
  (end < start statt end <= start) – verhindert 24h-Anzeige bei Schnell-Stempel
  in derselben Minute (z.B. 23:34:46 → 23:34:48)
- fix: _check_arbzg() gleicher Sec-basierter Fix
- fix: KioskDeviceStatus Enum values_callable → kiosk list crasht nicht mehr
- feat: kiosk rotate-key CLI-Kommando (Status→pending, Re-Enrollment)
- feat: Kiosk-Settings in CompanyOut/CompanyUpdate Schema (require_approval,
  track_current_user, heartbeat_interval_sec)
- feat: Kiosk-Terminal-Einstellungsblock in CompanySettingsPage (🖥️)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-05-25 01:42:08 +02:00

608 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
TimeMaster CLI Kiosk-Geräteverwaltung
Verwendung (auf dem Server):
cd /opt/timemaster/backend
source venv/bin/activate
python cli.py kiosk list
python cli.py kiosk add --company "Acme GmbH" --name "Eingang" --pubkey ~/.ssh/kiosk.pub
"""
import asyncio
import base64
import hashlib
import ipaddress
import sys
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
import typer
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from rich import box
# ── Projekt-Imports ───────────────────────────────────────────────────────────
# Sicherstellen, dass das backend/-Verzeichnis im Python-Pfad ist,
# damit app.* importiert werden kann.
_HERE = Path(__file__).parent.resolve()
if str(_HERE) not in sys.path:
sys.path.insert(0, str(_HERE))
# .env im selben Verzeichnis laden, bevor pydantic-settings greift
_env_file = _HERE / ".env"
if _env_file.exists():
# Minimales manuelles Laden, damit DATABASE_URL vor dem Settings-Import steht
import os
for line in _env_file.read_text().splitlines():
line = line.strip()
if line and not line.startswith("#") and "=" in line:
key, _, val = line.partition("=")
key = key.strip()
val = val.strip().strip('"').strip("'")
if key not in os.environ:
os.environ[key] = val
from sqlalchemy import select, text, update
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from app.core.config import settings
from app.models.company import Company
from app.models.kiosk_device import KioskDevice, KioskDeviceStatus
# ── Typer Apps ────────────────────────────────────────────────────────────────
app = typer.Typer(
name="timemaster",
help="TimeMaster CLI Verwaltungstool für den Server",
no_args_is_help=True,
add_completion=False,
)
kiosk_app = typer.Typer(
help="Kiosk-Geräteverwaltung",
no_args_is_help=True,
)
app.add_typer(kiosk_app, name="kiosk")
console = Console()
err_console = Console(stderr=True)
# ── DB-Hilfsfunktionen ────────────────────────────────────────────────────────
def _make_engine():
"""Erstellt einen AsyncEngine auf Basis der aktuellen DATABASE_URL."""
db_url = settings.database_url
return create_async_engine(db_url, pool_pre_ping=True, pool_size=2, max_overflow=2)
async def _open_session() -> AsyncSession:
"""Gibt eine neue AsyncSession zurück mit deaktiviertem RLS."""
engine = _make_engine()
Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
session = Session()
await session.execute(text("SET LOCAL app.bypass_rls = 'on'"))
return session
def _run(coro):
"""Blockierendes asyncio.run() mit sauberem Fehler-Handling."""
try:
return asyncio.run(coro)
except Exception as exc:
err_console.print(f"[bold red]Fehler:[/bold red] {exc}")
raise typer.Exit(1)
# ── Public-Key-Fingerprint (SHA-256, wie ssh-keygen -l -E sha256) ─────────────
def _pubkey_fingerprint(pubkey_str: str) -> str:
"""
Berechnet SHA-256-Fingerprint eines OpenSSH-Public-Keys.
Format: SHA256:<base64> (ohne trailing =, wie ssh-keygen -l -E sha256)
Funktioniert für 'ssh-ed25519 AAAA...' und PEM-Blöcke.
"""
pubkey_str = pubkey_str.strip()
try:
if pubkey_str.startswith("-----BEGIN"):
# PEM → binär
lines = [l for l in pubkey_str.splitlines() if not l.startswith("-----")]
raw = base64.b64decode("".join(lines))
else:
# OpenSSH-Einzeiler: zweites Feld ist der Base64-Teil
parts = pubkey_str.split()
if len(parts) < 2:
raise ValueError("Ungültiges Public-Key-Format")
raw = base64.b64decode(parts[1])
digest = hashlib.sha256(raw).digest()
b64 = base64.b64encode(digest).decode().rstrip("=")
return f"SHA256:{b64}"
except Exception:
return "(Fingerprint nicht berechenbar)"
# ── Heartbeat-Status-Hilfsfunktion ────────────────────────────────────────────
def _heartbeat_label(last_hb: Optional[datetime]) -> str:
if last_hb is None:
return "[dim]nie[/dim]"
now = datetime.now(timezone.utc)
if last_hb.tzinfo is None:
last_hb = last_hb.replace(tzinfo=timezone.utc)
delta = (now - last_hb).total_seconds()
ts = last_hb.strftime("%d.%m.%Y %H:%M:%S")
if delta < 90:
return f"[green]online[/green] ({ts})"
elif delta < 300:
return f"[yellow]stale[/yellow] ({ts})"
else:
return f"[red]offline[/red] ({ts})"
def _status_icon(status: KioskDeviceStatus) -> str:
return {
KioskDeviceStatus.APPROVED: "[green]approved[/green]",
KioskDeviceStatus.PENDING: "[yellow]pending[/yellow]",
KioskDeviceStatus.REVOKED: "[red]revoked[/red]",
}.get(status, status.value)
# ── CIDR-Validierung ──────────────────────────────────────────────────────────
def _validate_ip_whitelist(ip_whitelist: Optional[str]) -> None:
"""Wirft typer.BadParameter bei ungültiger CIDR-Notation."""
if not ip_whitelist:
return
for cidr in ip_whitelist.split(","):
cidr = cidr.strip()
if not cidr:
continue
try:
ipaddress.ip_network(cidr, strict=False)
except ValueError:
raise typer.BadParameter(
f"Ungültige CIDR-Notation: '{cidr}'. "
"Beispiel: 10.0.0.0/24,192.168.1.0/24"
)
# ── Firma suchen ──────────────────────────────────────────────────────────────
async def _find_company(session: AsyncSession, company_name: str) -> Company:
"""
Sucht Firma case-insensitiv. Fehler wenn 0 oder >1 Treffer.
"""
result = await session.execute(
select(Company).where(Company.name.ilike(f"%{company_name}%"))
)
companies = result.scalars().all()
if not companies:
raise typer.BadParameter(
f"Keine Firma mit dem Namen '{company_name}' gefunden."
)
if len(companies) > 1:
names = ", ".join(c.name for c in companies)
raise typer.BadParameter(
f"Mehrere Firmen gefunden: {names}\n"
"Bitte den Namen genauer angeben."
)
return companies[0]
# ── Gerät suchen ──────────────────────────────────────────────────────────────
async def _find_device(session: AsyncSession, device_id: str) -> KioskDevice:
try:
dev_uuid = uuid.UUID(device_id)
except ValueError:
raise typer.BadParameter(f"Ungültige UUID: '{device_id}'")
result = await session.execute(
select(KioskDevice).where(KioskDevice.id == dev_uuid)
)
device = result.scalar_one_or_none()
if device is None:
raise typer.BadParameter(f"Kein Gerät mit ID '{device_id}' gefunden.")
return device
# ── Subcommand: kiosk add ─────────────────────────────────────────────────────
@kiosk_app.command("add")
def kiosk_add(
company: str = typer.Option(..., "--company", help="Firmenname (Teilübereinstimmung möglich)"),
name: str = typer.Option(..., "--name", help="Name des Kiosk-Geräts, z.B. 'Eingang Berlin'"),
location: Optional[str] = typer.Option(None, "--location", help="Standort-Beschreibung"),
pubkey: Optional[Path] = typer.Option(None, "--pubkey", help="Pfad zur Public-Key-Datei (OpenSSH oder PEM)"),
ip_whitelist: Optional[str] = typer.Option(None, "--ip-whitelist", help="CIDR-Liste, z.B. '10.0.0.0/24,192.168.1.0/24'"),
):
"""Neues Kiosk-Gerät registrieren (Status: pending)."""
# Validierungen vor DB-Zugriff
_validate_ip_whitelist(ip_whitelist)
pubkey_str: Optional[str] = None
if pubkey is not None:
if not pubkey.exists():
err_console.print(f"[bold red]Fehler:[/bold red] Public-Key-Datei nicht gefunden: {pubkey}")
raise typer.Exit(1)
pubkey_str = pubkey.read_text().strip()
if not pubkey_str:
err_console.print("[bold red]Fehler:[/bold red] Public-Key-Datei ist leer.")
raise typer.Exit(1)
async def _add():
session = await _open_session()
try:
firm = await _find_company(session, company)
device = KioskDevice(
company_id=firm.id,
name=name,
location=location,
status=KioskDeviceStatus.PENDING,
public_key=pubkey_str,
key_algorithm="ed25519",
ip_whitelist=ip_whitelist,
)
session.add(device)
await session.commit()
await session.refresh(device)
return device, firm
except Exception:
await session.rollback()
raise
finally:
await session.close()
device, firm = _run(_add())
console.print()
console.print(Panel(
f"[bold green]✓ Gerät erfolgreich angelegt[/bold green]\n\n"
f" [bold]Name:[/bold] {device.name}\n"
f" [bold]ID:[/bold] {device.id}\n"
f" [bold]Firma:[/bold] {firm.name}\n"
f" [bold]Standort:[/bold] {device.location or ''}\n"
f" [bold]Status:[/bold] {_status_icon(device.status)}\n"
+ (f" [bold]Fingerprint:[/bold] {_pubkey_fingerprint(device.public_key)}\n" if device.public_key else " [bold]Public Key:[/bold] [dim]nicht gesetzt[/dim]\n")
+ (f" [bold]IP-Whitelist:[/bold] {device.ip_whitelist}\n" if device.ip_whitelist else ""),
title="Kiosk-Gerät angelegt",
border_style="green",
))
console.print()
console.print("[yellow]Hinweis:[/yellow] Das Gerät wartet auf Admin-Freigabe im Web-Interface.")
if not pubkey_str:
console.print("[yellow]Hinweis:[/yellow] Kein Public Key gesetzt. Der Enrollment-Flow muss im Browser abgeschlossen werden.")
console.print()
# ── Subcommand: kiosk list ────────────────────────────────────────────────────
@kiosk_app.command("list")
def kiosk_list(
company: Optional[str] = typer.Option(None, "--company", help="Nur Geräte dieser Firma anzeigen"),
status: Optional[str] = typer.Option(None, "--status", help="Filter: pending|approved|revoked"),
):
"""Alle registrierten Kiosk-Geräte auflisten."""
# Status-Enum validieren
status_filter: Optional[KioskDeviceStatus] = None
if status is not None:
try:
status_filter = KioskDeviceStatus(status.lower())
except ValueError:
err_console.print(
f"[bold red]Fehler:[/bold red] Ungültiger Status '{status}'. "
"Erlaubt: pending, approved, revoked"
)
raise typer.Exit(1)
async def _list():
session = await _open_session()
try:
q = select(KioskDevice, Company).join(Company, KioskDevice.company_id == Company.id)
if status_filter is not None:
q = q.where(KioskDevice.status == status_filter)
if company:
q = q.where(Company.name.ilike(f"%{company}%"))
q = q.order_by(Company.name, KioskDevice.name)
result = await session.execute(q)
return result.all()
finally:
await session.close()
rows = _run(_list())
if not rows:
console.print("[dim]Keine Geräte gefunden.[/dim]")
return
table = Table(
show_header=True,
header_style="bold cyan",
box=box.ROUNDED,
show_lines=False,
)
table.add_column("ID", style="dim", min_width=8, max_width=36, no_wrap=True)
table.add_column("Firma", min_width=10)
table.add_column("Name", min_width=12)
table.add_column("Standort")
table.add_column("Status", min_width=10)
table.add_column("Heartbeat", min_width=14)
table.add_column("Key-Algo", justify="center")
for device, firm in rows:
# UUID kurz anzeigen (erste 8 Zeichen + ...)
short_id = str(device.id)[:8] + ""
table.add_row(
short_id,
firm.name,
device.name,
device.location or "",
_status_icon(device.status),
_heartbeat_label(device.last_heartbeat_at),
device.key_algorithm or "",
)
console.print()
console.print(table)
console.print(f" [dim]{len(rows)} Gerät(e)[/dim]")
console.print()
# ── Subcommand: kiosk approve ─────────────────────────────────────────────────
@kiosk_app.command("approve")
def kiosk_approve(
device_id: str = typer.Argument(..., help="UUID des Kiosk-Geräts"),
):
"""Kiosk-Gerät freigeben (Status: pending → approved)."""
async def _approve():
session = await _open_session()
try:
device = await _find_device(session, device_id)
if device.status == KioskDeviceStatus.APPROVED:
return device, "already_approved"
old_status = device.status
device.status = KioskDeviceStatus.APPROVED
await session.commit()
return device, old_status
except Exception:
await session.rollback()
raise
finally:
await session.close()
device, old_status = _run(_approve())
if old_status == "already_approved":
console.print(f"[yellow]Info:[/yellow] Gerät '{device.name}' ist bereits [green]approved[/green].")
else:
console.print(
f"[bold green]✓[/bold green] Gerät [bold]{device.name}[/bold] "
f"({str(device.id)[:8]}…) wurde [green]freigegeben[/green] "
f"(vorher: {old_status.value})."
)
# ── Subcommand: kiosk revoke ──────────────────────────────────────────────────
@kiosk_app.command("revoke")
def kiosk_revoke(
device_id: str = typer.Argument(..., help="UUID des Kiosk-Geräts"),
yes: bool = typer.Option(False, "--yes", "-y", help="Ohne Bestätigung sperren"),
):
"""Kiosk-Gerät sperren (Status → revoked)."""
async def _get():
session = await _open_session()
try:
return await _find_device(session, device_id), session
except Exception:
await session.close()
raise
async def _revoke():
session = await _open_session()
try:
device = await _find_device(session, device_id)
if device.status == KioskDeviceStatus.REVOKED:
return device, "already_revoked"
old_status = device.status
device.status = KioskDeviceStatus.REVOKED
await session.commit()
return device, old_status
except Exception:
await session.rollback()
raise
finally:
await session.close()
# Bestätigung einholen, wenn --yes nicht gesetzt
if not yes:
async def _peek():
session = await _open_session()
try:
return await _find_device(session, device_id)
finally:
await session.close()
device = _run(_peek())
confirm = typer.confirm(
f"Gerät '{device.name}' ({str(device.id)[:8]}…) wirklich sperren?"
)
if not confirm:
console.print("[dim]Abgebrochen.[/dim]")
raise typer.Exit(0)
device, old_status = _run(_revoke())
if old_status == "already_revoked":
console.print(f"[yellow]Info:[/yellow] Gerät '{device.name}' ist bereits [red]revoked[/red].")
else:
console.print(
f"[bold red]✓[/bold red] Gerät [bold]{device.name}[/bold] "
f"({str(device.id)[:8]}…) wurde [red]gesperrt[/red] "
f"(vorher: {old_status.value})."
)
# ── Subcommand: kiosk rotate-key ─────────────────────────────────────────────
@kiosk_app.command("rotate-key")
def kiosk_rotate_key(
device_id: str = typer.Argument(..., help="UUID des Kiosk-Geräts"),
pubkey: Optional[Path] = typer.Option(None, "--pubkey", help="Pfad zur neuen Public-Key-Datei (optional)"),
yes: bool = typer.Option(False, "--yes", "-y", help="Ohne Bestätigung fortfahren"),
):
"""Ed25519-Schlüssel eines Kiosk-Geräts rotieren (Status → pending, Re-Enrollment erforderlich)."""
# Neuen Public Key einlesen falls angegeben
new_pubkey_str: Optional[str] = None
if pubkey is not None:
if not pubkey.exists():
err_console.print(f"[bold red]Fehler:[/bold red] Public-Key-Datei nicht gefunden: {pubkey}")
raise typer.Exit(1)
new_pubkey_str = pubkey.read_text().strip()
if not new_pubkey_str:
err_console.print("[bold red]Fehler:[/bold red] Public-Key-Datei ist leer.")
raise typer.Exit(1)
# Gerät vorab laden für Bestätigungsdialog
if not yes:
async def _peek():
session = await _open_session()
try:
return await _find_device(session, device_id)
finally:
await session.close()
device_peek = _run(_peek())
console.print(
f"[yellow]Achtung:[/yellow] Gerät [bold]{device_peek.name}[/bold] "
f"({str(device_peek.id)[:8]}…) wird auf [yellow]pending[/yellow] gesetzt.\n"
"Der aktuelle Schlüssel wird gelöscht. Re-Enrollment ist erforderlich."
)
confirm = typer.confirm("Schlüssel wirklich rotieren?")
if not confirm:
console.print("[dim]Abgebrochen.[/dim]")
raise typer.Exit(0)
async def _rotate():
session = await _open_session()
try:
device = await _find_device(session, device_id)
device.public_key = new_pubkey_str
device.enrollment_token_hash = None
device.enrollment_expires_at = None
device.status = KioskDeviceStatus.PENDING
await session.commit()
await session.refresh(device)
return device
except Exception:
await session.rollback()
raise
finally:
await session.close()
device = _run(_rotate())
console.print()
if new_pubkey_str:
fingerprint = _pubkey_fingerprint(new_pubkey_str)
console.print(
f"[bold green]✓[/bold green] Schlüssel für Gerät [bold]{device.name}[/bold] "
f"({str(device.id)[:8]}…) rotiert.\n"
f" [bold]Neuer Fingerprint:[/bold] {fingerprint}\n"
f" [bold]Status:[/bold] {_status_icon(device.status)}"
)
else:
console.print(
f"[bold green]✓[/bold green] Public Key für Gerät [bold]{device.name}[/bold] "
f"({str(device.id)[:8]}…) wurde gelöscht.\n"
f" [bold]Status:[/bold] {_status_icon(device.status)}\n"
" [yellow]Hinweis:[/yellow] Bitte Public Key per Enrollment-Flow neu setzen."
)
console.print()
# ── Subcommand: kiosk info ────────────────────────────────────────────────────
@kiosk_app.command("info")
def kiosk_info(
device_id: str = typer.Argument(..., help="UUID des Kiosk-Geräts"),
):
"""Detailinfo zu einem Kiosk-Gerät anzeigen."""
async def _info():
session = await _open_session()
try:
device = await _find_device(session, device_id)
# Firma nachladen
result = await session.execute(
select(Company).where(Company.id == device.company_id)
)
firm = result.scalar_one_or_none()
return device, firm
finally:
await session.close()
device, firm = _run(_info())
fingerprint = ""
pubkey_preview = ""
if device.public_key:
fingerprint = _pubkey_fingerprint(device.public_key)
# Ersten 60 Zeichen des Keys als Vorschau
key_stripped = device.public_key.strip()
pubkey_preview = (key_stripped[:60] + "") if len(key_stripped) > 60 else key_stripped
created_str = ""
if device.created_at:
ts = device.created_at
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
created_str = ts.strftime("%d.%m.%Y %H:%M:%S UTC")
enrollment_str = ""
if device.enrollment_expires_at:
ts = device.enrollment_expires_at
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
enrollment_str = ts.strftime("%d.%m.%Y %H:%M:%S UTC")
lines = [
f" [bold]ID:[/bold] {device.id}",
f" [bold]Firma:[/bold] {firm.name if firm else str(device.company_id)}",
f" [bold]Name:[/bold] {device.name}",
f" [bold]Standort:[/bold] {device.location or ''}",
f" [bold]Status:[/bold] {_status_icon(device.status)}",
f" [bold]Key-Algorithmus:[/bold] {device.key_algorithm or ''}",
f" [bold]Public Key:[/bold] {pubkey_preview}",
f" [bold]Key-Fingerprint:[/bold] {fingerprint}",
f" [bold]IP-Whitelist:[/bold] {device.ip_whitelist or ''}",
f" [bold]Heartbeat:[/bold] {_heartbeat_label(device.last_heartbeat_at)}",
f" [bold]Client-Version:[/bold] {device.client_version or ''}",
f" [bold]Offline-Queue:[/bold] {device.offline_queue_size}",
f" [bold]Aktueller User:[/bold] {device.current_user_id or ''}",
f" [bold]Enrollment läuft ab:[/bold] {enrollment_str}",
f" [bold]Angelegt am:[/bold] {created_str}",
]
console.print()
console.print(Panel(
"\n".join(lines),
title=f"Kiosk-Gerät: {device.name}",
border_style="cyan",
))
console.print()
# ── Entry Point ───────────────────────────────────────────────────────────────
if __name__ == "__main__":
app()