6d74d874b6
ARCHITECTURE ============ Backend: FastAPI + uvicorn (port 8000) - JWT authentication with PAM system users - ZFS CLI wrapper with caching (30-60s TTL) - WebSocket pool status broadcaster (30s interval) - Services: auth, zfs_runner, file_manager, shares, identities, system_info - Routers: pools, datasets, snapshots, shares, identities, navigator, system Frontend: Next.js 15 + TypeScript (static export) - Incremental Static Regeneration (ISR) for weak hardware - Type-safe API client (lib/api.ts) - Dark mode + custom Tailwind theme - Pages: Dashboard, Login, Snapshots, Datasets, Shares, etc. DEPLOYMENT ========== Test Target: 192.168.1.179:8090 (Debian LXC) Production: 10.66.120.3:9090 (Raspberry Pi 4GB ARM64) Updater: Automated Gitea-based deployment (update-test.sh, update-pi.sh) FEATURES COMPLETED ================== Phase 3a: Dashboard Quick Stats (System, CPU, Memory, Storage) - Real-time stats with color-coded progress bars - Responsive grid layout (mobile: 1, tablet: 2, desktop: 4 columns) - ISR-optimized for fast loads on weak hardware REBRANDING ========== Renamed throughout: - Project: 'ZFS Manager' → 'ZMB Webui' - Services: 'zfs-manager' → 'zmb-webui' - Systemd units: zfs-manager-backend → zmb-webui-backend - Configuration files and documentation Co-Authored-By: Patrick <patrick@perlbach24.de>
589 lines
21 KiB
Python
589 lines
21 KiB
Python
"""
|
|
System Information Service
|
|
Hostname, time, updates, CPU, memory, etc.
|
|
"""
|
|
|
|
import subprocess
|
|
import logging
|
|
import socket
|
|
import platform
|
|
from typing import Dict, Any, Optional
|
|
from datetime import datetime
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class SystemInfo:
|
|
"""Get system information"""
|
|
|
|
@staticmethod
|
|
def get_hostname() -> Dict[str, str]:
|
|
"""Get system hostname"""
|
|
try:
|
|
with open("/etc/hostname", "r") as f:
|
|
hostname = f.read().strip()
|
|
return {"hostname": hostname}
|
|
except Exception as e:
|
|
logger.error(f"Error getting hostname: {e}")
|
|
return {"error": str(e)}
|
|
|
|
@staticmethod
|
|
def set_hostname(hostname: str) -> Dict[str, str]:
|
|
"""Set system hostname"""
|
|
try:
|
|
with open("/etc/hostname", "w") as f:
|
|
f.write(hostname)
|
|
|
|
# Also update hostnamectl if available
|
|
subprocess.run(
|
|
["hostnamectl", "set-hostname", hostname],
|
|
capture_output=True,
|
|
check=False
|
|
)
|
|
|
|
logger.info(f"Set hostname to {hostname}")
|
|
return {"status": "success", "hostname": hostname}
|
|
except Exception as e:
|
|
logger.error(f"Error setting hostname: {e}")
|
|
return {"error": str(e)}
|
|
|
|
@staticmethod
|
|
def get_system_info() -> Dict[str, Any]:
|
|
"""Get general system information"""
|
|
try:
|
|
uname = platform.uname()
|
|
info = {
|
|
"hostname": socket.gethostname(),
|
|
"system": uname.system,
|
|
"kernel": uname.release,
|
|
"machine": uname.machine,
|
|
"processor": platform.processor(),
|
|
"python": platform.python_version()
|
|
}
|
|
|
|
# Get machine ID
|
|
try:
|
|
with open("/etc/machine-id", "r") as f:
|
|
info["machine_id"] = f.read().strip()
|
|
except:
|
|
pass
|
|
|
|
# Get hardware model
|
|
try:
|
|
result = subprocess.run(
|
|
["dmidecode", "-s", "system-product-name"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
if result.returncode == 0:
|
|
info["model"] = result.stdout.strip()
|
|
except:
|
|
pass
|
|
|
|
# Get domain name
|
|
try:
|
|
result = subprocess.run(
|
|
["domainname"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
if result.returncode == 0:
|
|
domain = result.stdout.strip()
|
|
if domain and domain != "(none)":
|
|
info["domain"] = domain
|
|
except:
|
|
pass
|
|
|
|
return info
|
|
except Exception as e:
|
|
logger.error(f"Error getting system info: {e}")
|
|
return {"error": str(e)}
|
|
|
|
@staticmethod
|
|
def get_uptime() -> Dict[str, Any]:
|
|
"""Get system uptime and boot time"""
|
|
try:
|
|
import time
|
|
with open("/proc/uptime", "r") as f:
|
|
uptime_seconds = int(float(f.read().split()[0]))
|
|
|
|
days = uptime_seconds // 86400
|
|
hours = (uptime_seconds % 86400) // 3600
|
|
minutes = (uptime_seconds % 3600) // 60
|
|
|
|
# Calculate boot timestamp
|
|
current_time = time.time()
|
|
boot_timestamp = current_time - uptime_seconds
|
|
|
|
return {
|
|
"uptime_seconds": uptime_seconds,
|
|
"uptime_string": f"{days}d {hours}h {minutes}m",
|
|
"uptime_formatted": {
|
|
"days": days,
|
|
"hours": hours,
|
|
"minutes": minutes
|
|
},
|
|
"boot_time": int(boot_timestamp)
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error getting uptime: {e}")
|
|
return {"error": str(e)}
|
|
|
|
@staticmethod
|
|
def get_memory() -> Dict[str, Any]:
|
|
"""Get memory usage"""
|
|
try:
|
|
with open("/proc/meminfo", "r") as f:
|
|
lines = f.readlines()
|
|
|
|
meminfo = {}
|
|
for line in lines:
|
|
key, value = line.split(":")
|
|
meminfo[key.strip()] = int(value.split()[0]) * 1024 # Convert to bytes
|
|
|
|
return {
|
|
"total": meminfo.get("MemTotal", 0),
|
|
"available": meminfo.get("MemAvailable", 0),
|
|
"used": meminfo.get("MemTotal", 0) - meminfo.get("MemAvailable", 0),
|
|
"free": meminfo.get("MemFree", 0),
|
|
"swap_total": meminfo.get("SwapTotal", 0),
|
|
"swap_free": meminfo.get("SwapFree", 0),
|
|
"swap_used": meminfo.get("SwapTotal", 0) - meminfo.get("SwapFree", 0)
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error getting memory info: {e}")
|
|
return {"error": str(e)}
|
|
|
|
@staticmethod
|
|
def get_cpu_info() -> Dict[str, Any]:
|
|
"""Get CPU information"""
|
|
try:
|
|
import psutil
|
|
except ImportError:
|
|
logger.debug("psutil not installed, using fallback")
|
|
try:
|
|
with open("/proc/cpuinfo", "r") as f:
|
|
cpuinfo_text = f.read()
|
|
cpu_count = cpuinfo_text.count("processor")
|
|
|
|
return {
|
|
"count": cpu_count,
|
|
"load_average": open("/proc/loadavg").read().split()[:3]
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error getting CPU info: {e}")
|
|
return {"error": str(e)}
|
|
|
|
try:
|
|
return {
|
|
"count": psutil.cpu_count(),
|
|
"percent": psutil.cpu_percent(interval=1),
|
|
"load_average": [round(x, 2) for x in __import__("os").getloadavg()]
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error getting CPU info with psutil: {e}")
|
|
return {"error": str(e)}
|
|
|
|
@staticmethod
|
|
def get_time() -> Dict[str, str]:
|
|
"""Get system time"""
|
|
try:
|
|
now = datetime.now()
|
|
return {
|
|
"iso": now.isoformat(),
|
|
"timestamp": int(now.timestamp()),
|
|
"timezone": datetime.now().astimezone().tzinfo.__str__()
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error getting time: {e}")
|
|
return {"error": str(e)}
|
|
|
|
@staticmethod
|
|
def set_time(iso_string: str) -> Dict[str, str]:
|
|
"""Set system time (requires root)"""
|
|
try:
|
|
dt = datetime.fromisoformat(iso_string)
|
|
|
|
result = subprocess.run(
|
|
["date", "-s", dt.strftime("%Y-%m-%d %H:%M:%S")],
|
|
capture_output=True,
|
|
text=True,
|
|
check=False
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
return {"error": result.stderr}
|
|
|
|
# Sync hardware clock
|
|
subprocess.run(["hwclock", "--systohc"], check=False)
|
|
|
|
logger.info(f"Set time to {iso_string}")
|
|
return {"status": "success", "time": iso_string}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error setting time: {e}")
|
|
return {"error": str(e)}
|
|
|
|
@staticmethod
|
|
def get_updates() -> Dict[str, Any]:
|
|
"""Check available updates"""
|
|
try:
|
|
result = subprocess.run(
|
|
["apt", "list", "--upgradable"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10,
|
|
check=False
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
return {"error": result.stderr}
|
|
|
|
packages = []
|
|
for line in result.stdout.split("\n")[1:]:
|
|
if line.strip():
|
|
parts = line.split("/")
|
|
if len(parts) >= 2:
|
|
packages.append({
|
|
"package": parts[0].strip(),
|
|
"current": parts[1].split("[")[0].strip() if "[" in line else ""
|
|
})
|
|
|
|
return {
|
|
"available": len(packages),
|
|
"packages": packages
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error checking updates: {e}")
|
|
return {"error": str(e)}
|
|
|
|
@staticmethod
|
|
def reboot() -> Dict[str, str]:
|
|
"""Reboot system (requires root)"""
|
|
try:
|
|
subprocess.Popen(["shutdown", "-r", "now"])
|
|
return {"status": "success", "message": "System rebooting..."}
|
|
except Exception as e:
|
|
logger.error(f"Error rebooting: {e}")
|
|
return {"error": str(e)}
|
|
|
|
@staticmethod
|
|
def shutdown() -> Dict[str, str]:
|
|
"""Shutdown system (requires root)"""
|
|
try:
|
|
subprocess.Popen(["shutdown", "-h", "now"])
|
|
return {"status": "success", "message": "System shutting down..."}
|
|
except Exception as e:
|
|
logger.error(f"Error shutting down: {e}")
|
|
return {"error": str(e)}
|
|
|
|
@staticmethod
|
|
def get_network_info() -> Dict[str, Any]:
|
|
"""Get network interface information"""
|
|
try:
|
|
# Try ip -j addr (JSON output) first
|
|
result = subprocess.run(
|
|
["/usr/sbin/ip", "-j", "addr"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
import json
|
|
interfaces = []
|
|
try:
|
|
data = json.loads(result.stdout)
|
|
for iface in data:
|
|
addr_info = []
|
|
for addr in iface.get("addr_info", []):
|
|
addr_info.append({
|
|
"family": addr.get("family"),
|
|
"local": addr.get("local")
|
|
})
|
|
interfaces.append({
|
|
"name": iface.get("ifname"),
|
|
"state": iface.get("operstate", "UNKNOWN"),
|
|
"addresses": addr_info
|
|
})
|
|
return {"interfaces": interfaces}
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
# Fallback: read from /proc/net/dev
|
|
with open("/proc/net/dev", "r") as f:
|
|
lines = f.readlines()
|
|
|
|
interfaces = []
|
|
for line in lines[2:]: # Skip header lines
|
|
if ":" in line:
|
|
name = line.split(":")[0].strip()
|
|
interfaces.append({
|
|
"name": name,
|
|
"state": "UP",
|
|
"addresses": []
|
|
})
|
|
|
|
return {"interfaces": interfaces}
|
|
except Exception as e:
|
|
logger.error(f"Error getting network info: {e}")
|
|
return {"error": str(e)}
|
|
|
|
@staticmethod
|
|
def get_network_traffic() -> Dict[str, Any]:
|
|
"""Get network interface traffic (RX/TX bytes)"""
|
|
try:
|
|
with open("/proc/net/dev", "r") as f:
|
|
lines = f.readlines()
|
|
|
|
interfaces = []
|
|
# /proc/net/dev format (after colon):
|
|
# RX bytes, RX packets, RX errors, RX drops, RX fifo, RX frame, RX compressed, RX multicast,
|
|
# TX bytes, TX packets, TX errors, TX drops, TX fifo, TX collisions, TX carrier, TX compressed
|
|
for line in lines[2:]: # Skip header lines
|
|
if ":" in line:
|
|
name, stats_str = line.split(":")
|
|
name = name.strip()
|
|
stats = stats_str.split()
|
|
|
|
if len(stats) >= 16:
|
|
interfaces.append({
|
|
"name": name,
|
|
"rx_bytes": int(stats[0]),
|
|
"rx_packets": int(stats[1]),
|
|
"rx_errors": int(stats[2]),
|
|
"rx_drops": int(stats[3]),
|
|
"tx_bytes": int(stats[8]),
|
|
"tx_packets": int(stats[9]),
|
|
"tx_errors": int(stats[10]),
|
|
"tx_drops": int(stats[11])
|
|
})
|
|
|
|
return {"interfaces": interfaces}
|
|
except Exception as e:
|
|
logger.error(f"Error getting network traffic: {e}")
|
|
return {"error": str(e)}
|
|
|
|
@staticmethod
|
|
def get_disk_io() -> Dict[str, Any]:
|
|
"""Get disk I/O statistics (read/write operations and bytes)"""
|
|
try:
|
|
with open("/proc/diskstats", "r") as f:
|
|
lines = f.readlines()
|
|
|
|
disks = []
|
|
# /proc/diskstats format:
|
|
# major minor name reads_completed reads_merged reads_sectors reads_time_ms
|
|
# writes_completed writes_merged writes_sectors writes_time_ms in_progress io_time_ms weighted_io_time_ms
|
|
for line in lines:
|
|
fields = line.split()
|
|
if len(fields) >= 14:
|
|
major = int(fields[0])
|
|
minor = int(fields[1])
|
|
name = fields[2]
|
|
|
|
# Skip loop devices, ram disks, and other virtual disks
|
|
if name.startswith(('dm-', 'loop', 'ram', 'sr', 'zram')):
|
|
continue
|
|
# Only include actual storage devices (sda, sdb, nvme0n1, etc.)
|
|
if not any(name.startswith(prefix) for prefix in ['sd', 'nvme', 'hd', 'vd']):
|
|
continue
|
|
|
|
reads_completed = int(fields[3])
|
|
reads_sectors = int(fields[5])
|
|
writes_completed = int(fields[7])
|
|
writes_sectors = int(fields[9])
|
|
|
|
# Sectors are typically 512 bytes
|
|
reads_bytes = reads_sectors * 512
|
|
writes_bytes = writes_sectors * 512
|
|
|
|
disks.append({
|
|
"name": name,
|
|
"reads_completed": reads_completed,
|
|
"reads_bytes": reads_bytes,
|
|
"writes_completed": writes_completed,
|
|
"writes_bytes": writes_bytes
|
|
})
|
|
|
|
return {"disks": disks}
|
|
except Exception as e:
|
|
logger.error(f"Error getting disk I/O: {e}")
|
|
return {"error": str(e)}
|
|
|
|
@staticmethod
|
|
def get_services() -> Dict[str, Any]:
|
|
"""Get running systemd services"""
|
|
try:
|
|
result = subprocess.run(
|
|
["/usr/bin/systemctl", "list-units", "--type=service", "--state=running", "--no-pager", "--output=json"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
import json
|
|
try:
|
|
services = json.loads(result.stdout)
|
|
return {
|
|
"services": [
|
|
{
|
|
"name": svc.get("unit"),
|
|
"state": svc.get("active"),
|
|
"description": svc.get("description")
|
|
}
|
|
for svc in services if svc.get("unit", "").endswith(".service")
|
|
]
|
|
}
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
# Fallback: parse text output
|
|
result = subprocess.run(
|
|
["/usr/bin/systemctl", "list-units", "--type=service", "--state=running", "--no-pager"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10
|
|
)
|
|
|
|
services = []
|
|
for line in result.stdout.split("\n")[1:]:
|
|
if line.strip() and ".service" in line:
|
|
parts = line.split()
|
|
if len(parts) >= 2:
|
|
services.append({
|
|
"name": parts[0],
|
|
"state": "running",
|
|
"description": " ".join(parts[2:]) if len(parts) > 2 else ""
|
|
})
|
|
|
|
return {"services": services}
|
|
except Exception as e:
|
|
logger.error(f"Error getting services: {e}")
|
|
return {"error": str(e)}
|
|
|
|
@staticmethod
|
|
def get_all_units() -> Dict[str, Any]:
|
|
"""Get all systemd units (services, targets, sockets, timers, paths)"""
|
|
try:
|
|
# Get all units without filtering by state
|
|
result = subprocess.run(
|
|
["/usr/bin/systemctl", "list-units", "--all", "--no-pager", "--output=json"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
import json
|
|
try:
|
|
units_data = json.loads(result.stdout)
|
|
units = {
|
|
"services": [],
|
|
"targets": [],
|
|
"sockets": [],
|
|
"timers": [],
|
|
"paths": []
|
|
}
|
|
|
|
for unit in units_data:
|
|
name = unit.get("unit", "")
|
|
item = {
|
|
"name": name,
|
|
"active": unit.get("active"), # active/inactive
|
|
"sub": unit.get("sub"), # sub-state like "running", "exited", "enabled", etc.
|
|
"description": unit.get("description", "")
|
|
}
|
|
|
|
if name.endswith(".service"):
|
|
units["services"].append(item)
|
|
elif name.endswith(".target"):
|
|
units["targets"].append(item)
|
|
elif name.endswith(".socket"):
|
|
units["sockets"].append(item)
|
|
elif name.endswith(".timer"):
|
|
units["timers"].append(item)
|
|
elif name.endswith(".path"):
|
|
units["paths"].append(item)
|
|
|
|
return units
|
|
except json.JSONDecodeError:
|
|
pass
|
|
|
|
# Fallback: parse text output
|
|
result = subprocess.run(
|
|
["/usr/bin/systemctl", "list-units", "--all", "--no-pager"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10
|
|
)
|
|
|
|
units = {
|
|
"services": [],
|
|
"targets": [],
|
|
"sockets": [],
|
|
"timers": [],
|
|
"paths": []
|
|
}
|
|
|
|
for line in result.stdout.split("\n")[1:]:
|
|
if not line.strip():
|
|
continue
|
|
parts = line.split()
|
|
if len(parts) < 2:
|
|
continue
|
|
|
|
name = parts[0]
|
|
active = parts[1] if len(parts) > 1 else "unknown"
|
|
description = " ".join(parts[3:]) if len(parts) > 3 else ""
|
|
|
|
item = {
|
|
"name": name,
|
|
"active": active,
|
|
"sub": parts[2] if len(parts) > 2 else "",
|
|
"description": description
|
|
}
|
|
|
|
if name.endswith(".service"):
|
|
units["services"].append(item)
|
|
elif name.endswith(".target"):
|
|
units["targets"].append(item)
|
|
elif name.endswith(".socket"):
|
|
units["sockets"].append(item)
|
|
elif name.endswith(".timer"):
|
|
units["timers"].append(item)
|
|
elif name.endswith(".path"):
|
|
units["paths"].append(item)
|
|
|
|
return units
|
|
except Exception as e:
|
|
logger.error(f"Error getting units: {e}")
|
|
return {"error": str(e)}
|
|
|
|
@staticmethod
|
|
def get_journal_logs(limit: int = 20) -> Dict[str, Any]:
|
|
"""Get recent journal logs"""
|
|
try:
|
|
result = subprocess.run(
|
|
["/usr/bin/journalctl", "-n", str(limit), "--no-pager", "--output=short"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
logs = [line.strip() for line in result.stdout.split("\n") if line.strip()]
|
|
return {"logs": logs}
|
|
|
|
return {"logs": []}
|
|
except Exception as e:
|
|
logger.error(f"Error getting journal logs: {e}")
|
|
return {"error": str(e)}
|
|
|
|
|
|
# Global instance
|
|
system_info = SystemInfo()
|