feat(kiosk): Stufe 2 – Ed25519-Auth, CLI-Tool, neue KioskDevicesPage

2A – Backend Ed25519-Verifizierung:
- app/core/kiosk_security.py (NEU): verify_kiosk_request() Dependency
  - Timestamp-Check (30s Drift), Nonce-Cache (Redis/In-Memory), IP-Whitelist
  - Ed25519-Signatur über METHOD+PATH+TIMESTAMP+NONCE+sha256(BODY)
  - PEM + OpenSSH Key-Format unterstützt
- app/routers/kiosk.py: approve/revoke Endpunkte, POST /heartbeat (Ed25519-signiert)
- app/services/kiosk_service.py: token-basierte Methoden entfernt, approve/revoke/heartbeat
- app/schemas/kiosk.py: KioskDeviceOut mit heartbeat_status, HeartbeatRequest/Response

2B – CLI-Tool:
- cli.py (NEU, 529 Zeilen): Typer-CLI mit kiosk add/list/approve/revoke/info
  - Public-Key-Fingerprint (SHA256), Rich-Tabellen, CIDR-Validierung
  - Direkter DB-Zugriff mit RLS-Bypass

2C – Frontend:
- KioskDevicesPage.tsx: Zwei-Tab-Layout (Wartet/Aktiv), Status-Ampel,
  Auto-Refresh 30s, Ed25519-Workflow (kein Token mehr)
- Layout.tsx: KioskHealthBadge (online/total, 30s Refresh, nur COMPANY_ADMIN)

requirements.txt: typer>=0.12.0, rich>=13.7.0

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-24 12:13:46 +02:00
parent 981bde3dc1
commit 0f83d13c0c
10 changed files with 1438 additions and 226 deletions
+529
View File
@@ -0,0 +1,529 @@
"""
TimeMaster CLI Kiosk-Geräteverwaltung
Verwendung (auf dem Server):
cd /opt/timemaster/backend
source venv/bin/activate
python cli.py kiosk list
python cli.py kiosk add --company "Acme GmbH" --name "Eingang" --pubkey ~/.ssh/kiosk.pub
"""
import asyncio
import base64
import hashlib
import ipaddress
import sys
import uuid
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
import typer
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from rich import box
# ── Projekt-Imports ───────────────────────────────────────────────────────────
# Sicherstellen, dass das backend/-Verzeichnis im Python-Pfad ist,
# damit app.* importiert werden kann.
_HERE = Path(__file__).parent.resolve()
if str(_HERE) not in sys.path:
sys.path.insert(0, str(_HERE))
# .env im selben Verzeichnis laden, bevor pydantic-settings greift
_env_file = _HERE / ".env"
if _env_file.exists():
# Minimales manuelles Laden, damit DATABASE_URL vor dem Settings-Import steht
import os
for line in _env_file.read_text().splitlines():
line = line.strip()
if line and not line.startswith("#") and "=" in line:
key, _, val = line.partition("=")
key = key.strip()
val = val.strip().strip('"').strip("'")
if key not in os.environ:
os.environ[key] = val
from sqlalchemy import select, text, update
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine
from app.core.config import settings
from app.models.company import Company
from app.models.kiosk_device import KioskDevice, KioskDeviceStatus
# ── Typer Apps ────────────────────────────────────────────────────────────────
app = typer.Typer(
name="timemaster",
help="TimeMaster CLI Verwaltungstool für den Server",
no_args_is_help=True,
add_completion=False,
)
kiosk_app = typer.Typer(
help="Kiosk-Geräteverwaltung",
no_args_is_help=True,
)
app.add_typer(kiosk_app, name="kiosk")
console = Console()
err_console = Console(stderr=True)
# ── DB-Hilfsfunktionen ────────────────────────────────────────────────────────
def _make_engine():
"""Erstellt einen AsyncEngine auf Basis der aktuellen DATABASE_URL."""
db_url = settings.database_url
return create_async_engine(db_url, pool_pre_ping=True, pool_size=2, max_overflow=2)
async def _open_session() -> AsyncSession:
"""Gibt eine neue AsyncSession zurück mit deaktiviertem RLS."""
engine = _make_engine()
Session = async_sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
session = Session()
await session.execute(text("SET LOCAL app.bypass_rls = 'on'"))
return session
def _run(coro):
"""Blockierendes asyncio.run() mit sauberem Fehler-Handling."""
try:
return asyncio.run(coro)
except Exception as exc:
err_console.print(f"[bold red]Fehler:[/bold red] {exc}")
raise typer.Exit(1)
# ── Public-Key-Fingerprint (SHA-256, wie ssh-keygen -l -E sha256) ─────────────
def _pubkey_fingerprint(pubkey_str: str) -> str:
"""
Berechnet SHA-256-Fingerprint eines OpenSSH-Public-Keys.
Format: SHA256:<base64> (ohne trailing =, wie ssh-keygen -l -E sha256)
Funktioniert für 'ssh-ed25519 AAAA...' und PEM-Blöcke.
"""
pubkey_str = pubkey_str.strip()
try:
if pubkey_str.startswith("-----BEGIN"):
# PEM → binär
lines = [l for l in pubkey_str.splitlines() if not l.startswith("-----")]
raw = base64.b64decode("".join(lines))
else:
# OpenSSH-Einzeiler: zweites Feld ist der Base64-Teil
parts = pubkey_str.split()
if len(parts) < 2:
raise ValueError("Ungültiges Public-Key-Format")
raw = base64.b64decode(parts[1])
digest = hashlib.sha256(raw).digest()
b64 = base64.b64encode(digest).decode().rstrip("=")
return f"SHA256:{b64}"
except Exception:
return "(Fingerprint nicht berechenbar)"
# ── Heartbeat-Status-Hilfsfunktion ────────────────────────────────────────────
def _heartbeat_label(last_hb: Optional[datetime]) -> str:
if last_hb is None:
return "[dim]nie[/dim]"
now = datetime.now(timezone.utc)
if last_hb.tzinfo is None:
last_hb = last_hb.replace(tzinfo=timezone.utc)
delta = (now - last_hb).total_seconds()
ts = last_hb.strftime("%d.%m.%Y %H:%M:%S")
if delta < 90:
return f"[green]online[/green] ({ts})"
elif delta < 300:
return f"[yellow]stale[/yellow] ({ts})"
else:
return f"[red]offline[/red] ({ts})"
def _status_icon(status: KioskDeviceStatus) -> str:
return {
KioskDeviceStatus.APPROVED: "[green]approved[/green]",
KioskDeviceStatus.PENDING: "[yellow]pending[/yellow]",
KioskDeviceStatus.REVOKED: "[red]revoked[/red]",
}.get(status, status.value)
# ── CIDR-Validierung ──────────────────────────────────────────────────────────
def _validate_ip_whitelist(ip_whitelist: Optional[str]) -> None:
"""Wirft typer.BadParameter bei ungültiger CIDR-Notation."""
if not ip_whitelist:
return
for cidr in ip_whitelist.split(","):
cidr = cidr.strip()
if not cidr:
continue
try:
ipaddress.ip_network(cidr, strict=False)
except ValueError:
raise typer.BadParameter(
f"Ungültige CIDR-Notation: '{cidr}'. "
"Beispiel: 10.0.0.0/24,192.168.1.0/24"
)
# ── Firma suchen ──────────────────────────────────────────────────────────────
async def _find_company(session: AsyncSession, company_name: str) -> Company:
"""
Sucht Firma case-insensitiv. Fehler wenn 0 oder >1 Treffer.
"""
result = await session.execute(
select(Company).where(Company.name.ilike(f"%{company_name}%"))
)
companies = result.scalars().all()
if not companies:
raise typer.BadParameter(
f"Keine Firma mit dem Namen '{company_name}' gefunden."
)
if len(companies) > 1:
names = ", ".join(c.name for c in companies)
raise typer.BadParameter(
f"Mehrere Firmen gefunden: {names}\n"
"Bitte den Namen genauer angeben."
)
return companies[0]
# ── Gerät suchen ──────────────────────────────────────────────────────────────
async def _find_device(session: AsyncSession, device_id: str) -> KioskDevice:
try:
dev_uuid = uuid.UUID(device_id)
except ValueError:
raise typer.BadParameter(f"Ungültige UUID: '{device_id}'")
result = await session.execute(
select(KioskDevice).where(KioskDevice.id == dev_uuid)
)
device = result.scalar_one_or_none()
if device is None:
raise typer.BadParameter(f"Kein Gerät mit ID '{device_id}' gefunden.")
return device
# ── Subcommand: kiosk add ─────────────────────────────────────────────────────
@kiosk_app.command("add")
def kiosk_add(
company: str = typer.Option(..., "--company", help="Firmenname (Teilübereinstimmung möglich)"),
name: str = typer.Option(..., "--name", help="Name des Kiosk-Geräts, z.B. 'Eingang Berlin'"),
location: Optional[str] = typer.Option(None, "--location", help="Standort-Beschreibung"),
pubkey: Optional[Path] = typer.Option(None, "--pubkey", help="Pfad zur Public-Key-Datei (OpenSSH oder PEM)"),
ip_whitelist: Optional[str] = typer.Option(None, "--ip-whitelist", help="CIDR-Liste, z.B. '10.0.0.0/24,192.168.1.0/24'"),
):
"""Neues Kiosk-Gerät registrieren (Status: pending)."""
# Validierungen vor DB-Zugriff
_validate_ip_whitelist(ip_whitelist)
pubkey_str: Optional[str] = None
if pubkey is not None:
if not pubkey.exists():
err_console.print(f"[bold red]Fehler:[/bold red] Public-Key-Datei nicht gefunden: {pubkey}")
raise typer.Exit(1)
pubkey_str = pubkey.read_text().strip()
if not pubkey_str:
err_console.print("[bold red]Fehler:[/bold red] Public-Key-Datei ist leer.")
raise typer.Exit(1)
async def _add():
session = await _open_session()
try:
firm = await _find_company(session, company)
device = KioskDevice(
company_id=firm.id,
name=name,
location=location,
status=KioskDeviceStatus.PENDING,
public_key=pubkey_str,
key_algorithm="ed25519",
ip_whitelist=ip_whitelist,
)
session.add(device)
await session.commit()
await session.refresh(device)
return device, firm
except Exception:
await session.rollback()
raise
finally:
await session.close()
device, firm = _run(_add())
console.print()
console.print(Panel(
f"[bold green]✓ Gerät erfolgreich angelegt[/bold green]\n\n"
f" [bold]Name:[/bold] {device.name}\n"
f" [bold]ID:[/bold] {device.id}\n"
f" [bold]Firma:[/bold] {firm.name}\n"
f" [bold]Standort:[/bold] {device.location or ''}\n"
f" [bold]Status:[/bold] {_status_icon(device.status)}\n"
+ (f" [bold]Fingerprint:[/bold] {_pubkey_fingerprint(device.public_key)}\n" if device.public_key else " [bold]Public Key:[/bold] [dim]nicht gesetzt[/dim]\n")
+ (f" [bold]IP-Whitelist:[/bold] {device.ip_whitelist}\n" if device.ip_whitelist else ""),
title="Kiosk-Gerät angelegt",
border_style="green",
))
console.print()
console.print("[yellow]Hinweis:[/yellow] Das Gerät wartet auf Admin-Freigabe im Web-Interface.")
if not pubkey_str:
console.print("[yellow]Hinweis:[/yellow] Kein Public Key gesetzt. Der Enrollment-Flow muss im Browser abgeschlossen werden.")
console.print()
# ── Subcommand: kiosk list ────────────────────────────────────────────────────
@kiosk_app.command("list")
def kiosk_list(
company: Optional[str] = typer.Option(None, "--company", help="Nur Geräte dieser Firma anzeigen"),
status: Optional[str] = typer.Option(None, "--status", help="Filter: pending|approved|revoked"),
):
"""Alle registrierten Kiosk-Geräte auflisten."""
# Status-Enum validieren
status_filter: Optional[KioskDeviceStatus] = None
if status is not None:
try:
status_filter = KioskDeviceStatus(status.lower())
except ValueError:
err_console.print(
f"[bold red]Fehler:[/bold red] Ungültiger Status '{status}'. "
"Erlaubt: pending, approved, revoked"
)
raise typer.Exit(1)
async def _list():
session = await _open_session()
try:
q = select(KioskDevice, Company).join(Company, KioskDevice.company_id == Company.id)
if status_filter is not None:
q = q.where(KioskDevice.status == status_filter)
if company:
q = q.where(Company.name.ilike(f"%{company}%"))
q = q.order_by(Company.name, KioskDevice.name)
result = await session.execute(q)
return result.all()
finally:
await session.close()
rows = _run(_list())
if not rows:
console.print("[dim]Keine Geräte gefunden.[/dim]")
return
table = Table(
show_header=True,
header_style="bold cyan",
box=box.ROUNDED,
show_lines=False,
)
table.add_column("ID", style="dim", min_width=8, max_width=36, no_wrap=True)
table.add_column("Firma", min_width=10)
table.add_column("Name", min_width=12)
table.add_column("Standort")
table.add_column("Status", min_width=10)
table.add_column("Heartbeat", min_width=14)
table.add_column("Key-Algo", justify="center")
for device, firm in rows:
# UUID kurz anzeigen (erste 8 Zeichen + ...)
short_id = str(device.id)[:8] + ""
table.add_row(
short_id,
firm.name,
device.name,
device.location or "",
_status_icon(device.status),
_heartbeat_label(device.last_heartbeat_at),
device.key_algorithm or "",
)
console.print()
console.print(table)
console.print(f" [dim]{len(rows)} Gerät(e)[/dim]")
console.print()
# ── Subcommand: kiosk approve ─────────────────────────────────────────────────
@kiosk_app.command("approve")
def kiosk_approve(
device_id: str = typer.Argument(..., help="UUID des Kiosk-Geräts"),
):
"""Kiosk-Gerät freigeben (Status: pending → approved)."""
async def _approve():
session = await _open_session()
try:
device = await _find_device(session, device_id)
if device.status == KioskDeviceStatus.APPROVED:
return device, "already_approved"
old_status = device.status
device.status = KioskDeviceStatus.APPROVED
await session.commit()
return device, old_status
except Exception:
await session.rollback()
raise
finally:
await session.close()
device, old_status = _run(_approve())
if old_status == "already_approved":
console.print(f"[yellow]Info:[/yellow] Gerät '{device.name}' ist bereits [green]approved[/green].")
else:
console.print(
f"[bold green]✓[/bold green] Gerät [bold]{device.name}[/bold] "
f"({str(device.id)[:8]}…) wurde [green]freigegeben[/green] "
f"(vorher: {old_status.value})."
)
# ── Subcommand: kiosk revoke ──────────────────────────────────────────────────
@kiosk_app.command("revoke")
def kiosk_revoke(
device_id: str = typer.Argument(..., help="UUID des Kiosk-Geräts"),
yes: bool = typer.Option(False, "--yes", "-y", help="Ohne Bestätigung sperren"),
):
"""Kiosk-Gerät sperren (Status → revoked)."""
async def _get():
session = await _open_session()
try:
return await _find_device(session, device_id), session
except Exception:
await session.close()
raise
async def _revoke():
session = await _open_session()
try:
device = await _find_device(session, device_id)
if device.status == KioskDeviceStatus.REVOKED:
return device, "already_revoked"
old_status = device.status
device.status = KioskDeviceStatus.REVOKED
await session.commit()
return device, old_status
except Exception:
await session.rollback()
raise
finally:
await session.close()
# Bestätigung einholen, wenn --yes nicht gesetzt
if not yes:
async def _peek():
session = await _open_session()
try:
return await _find_device(session, device_id)
finally:
await session.close()
device = _run(_peek())
confirm = typer.confirm(
f"Gerät '{device.name}' ({str(device.id)[:8]}…) wirklich sperren?"
)
if not confirm:
console.print("[dim]Abgebrochen.[/dim]")
raise typer.Exit(0)
device, old_status = _run(_revoke())
if old_status == "already_revoked":
console.print(f"[yellow]Info:[/yellow] Gerät '{device.name}' ist bereits [red]revoked[/red].")
else:
console.print(
f"[bold red]✓[/bold red] Gerät [bold]{device.name}[/bold] "
f"({str(device.id)[:8]}…) wurde [red]gesperrt[/red] "
f"(vorher: {old_status.value})."
)
# ── Subcommand: kiosk info ────────────────────────────────────────────────────
@kiosk_app.command("info")
def kiosk_info(
device_id: str = typer.Argument(..., help="UUID des Kiosk-Geräts"),
):
"""Detailinfo zu einem Kiosk-Gerät anzeigen."""
async def _info():
session = await _open_session()
try:
device = await _find_device(session, device_id)
# Firma nachladen
result = await session.execute(
select(Company).where(Company.id == device.company_id)
)
firm = result.scalar_one_or_none()
return device, firm
finally:
await session.close()
device, firm = _run(_info())
fingerprint = ""
pubkey_preview = ""
if device.public_key:
fingerprint = _pubkey_fingerprint(device.public_key)
# Ersten 60 Zeichen des Keys als Vorschau
key_stripped = device.public_key.strip()
pubkey_preview = (key_stripped[:60] + "") if len(key_stripped) > 60 else key_stripped
created_str = ""
if device.created_at:
ts = device.created_at
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
created_str = ts.strftime("%d.%m.%Y %H:%M:%S UTC")
enrollment_str = ""
if device.enrollment_expires_at:
ts = device.enrollment_expires_at
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
enrollment_str = ts.strftime("%d.%m.%Y %H:%M:%S UTC")
lines = [
f" [bold]ID:[/bold] {device.id}",
f" [bold]Firma:[/bold] {firm.name if firm else str(device.company_id)}",
f" [bold]Name:[/bold] {device.name}",
f" [bold]Standort:[/bold] {device.location or ''}",
f" [bold]Status:[/bold] {_status_icon(device.status)}",
f" [bold]Key-Algorithmus:[/bold] {device.key_algorithm or ''}",
f" [bold]Public Key:[/bold] {pubkey_preview}",
f" [bold]Key-Fingerprint:[/bold] {fingerprint}",
f" [bold]IP-Whitelist:[/bold] {device.ip_whitelist or ''}",
f" [bold]Heartbeat:[/bold] {_heartbeat_label(device.last_heartbeat_at)}",
f" [bold]Client-Version:[/bold] {device.client_version or ''}",
f" [bold]Offline-Queue:[/bold] {device.offline_queue_size}",
f" [bold]Aktueller User:[/bold] {device.current_user_id or ''}",
f" [bold]Enrollment läuft ab:[/bold] {enrollment_str}",
f" [bold]Angelegt am:[/bold] {created_str}",
]
console.print()
console.print(Panel(
"\n".join(lines),
title=f"Kiosk-Gerät: {device.name}",
border_style="cyan",
))
console.print()
# ── Entry Point ───────────────────────────────────────────────────────────────
if __name__ == "__main__":
app()