Files
zmb-webui/backend/services/system_info.py
T
patrick 12248afa3a Refactor: Container-Erkennung zentralisiert, SMART-Check auf LXC überspringen
Neues Modul services/platform_info.py prüft systemd-detect-virt einmalig
beim Start (statt pro Request). SMART-Abfragen werden in Containern
übersprungen, da /dev/sdX dort meist nicht verfügbar ist.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-15 20:37:14 +02:00

485 lines
16 KiB
Python

"""
System Information Service
Hostname, time, updates, CPU, memory, etc.
"""
import subprocess
import logging
import socket
import platform
from typing import Dict, Any, Optional
from datetime import datetime
from services.platform_info import IS_CONTAINER
logger = logging.getLogger(__name__)
def get_hostname() -> Dict[str, str]:
try:
with open("/etc/hostname", "r") as f:
hostname = f.read().strip()
return {"hostname": hostname}
except Exception as e:
logger.error(f"Error getting hostname: {e}")
return {"error": str(e)}
def set_hostname(hostname: str) -> Dict[str, str]:
try:
with open("/etc/hostname", "w") as f:
f.write(hostname)
subprocess.run(
["hostnamectl", "set-hostname", hostname],
capture_output=True,
check=False
)
logger.info(f"Set hostname to {hostname}")
return {"status": "success", "hostname": hostname}
except Exception as e:
logger.error(f"Error setting hostname: {e}")
return {"error": str(e)}
def get_system_info() -> Dict[str, Any]:
try:
uname = platform.uname()
info = {
"hostname": socket.gethostname(),
"system": uname.system,
"kernel": uname.release,
"machine": uname.machine,
"processor": platform.processor(),
"python": platform.python_version()
}
try:
with open("/etc/machine-id", "r") as f:
info["machine_id"] = f.read().strip()
except Exception:
pass
try:
result = subprocess.run(
["dmidecode", "-s", "system-product-name"],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
info["model"] = result.stdout.strip()
except Exception:
pass
try:
result = subprocess.run(
["domainname"], capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
domain = result.stdout.strip()
if domain and domain != "(none)":
info["domain"] = domain
except Exception:
pass
return info
except Exception as e:
logger.error(f"Error getting system info: {e}")
return {"error": str(e)}
def get_uptime() -> Dict[str, Any]:
try:
import time
with open("/proc/uptime", "r") as f:
uptime_seconds = int(float(f.read().split()[0]))
days = uptime_seconds // 86400
hours = (uptime_seconds % 86400) // 3600
minutes = (uptime_seconds % 3600) // 60
boot_timestamp = time.time() - uptime_seconds
return {
"uptime_seconds": uptime_seconds,
"uptime_string": f"{days}d {hours}h {minutes}m",
"uptime_formatted": {"days": days, "hours": hours, "minutes": minutes},
"boot_time": int(boot_timestamp)
}
except Exception as e:
logger.error(f"Error getting uptime: {e}")
return {"error": str(e)}
def get_memory() -> Dict[str, Any]:
try:
with open("/proc/meminfo", "r") as f:
lines = f.readlines()
meminfo = {}
for line in lines:
key, value = line.split(":")
meminfo[key.strip()] = int(value.split()[0]) * 1024
return {
"total": meminfo.get("MemTotal", 0),
"available": meminfo.get("MemAvailable", 0),
"used": meminfo.get("MemTotal", 0) - meminfo.get("MemAvailable", 0),
"free": meminfo.get("MemFree", 0),
"swap_total": meminfo.get("SwapTotal", 0),
"swap_free": meminfo.get("SwapFree", 0),
"swap_used": meminfo.get("SwapTotal", 0) - meminfo.get("SwapFree", 0)
}
except Exception as e:
logger.error(f"Error getting memory info: {e}")
return {"error": str(e)}
def get_cpu_info() -> Dict[str, Any]:
try:
import psutil
except ImportError:
logger.debug("psutil not installed, using fallback")
try:
with open("/proc/cpuinfo", "r") as f:
cpuinfo_text = f.read()
cpu_count = cpuinfo_text.count("processor")
with open("/proc/loadavg") as f:
load = f.read()
return {"count": cpu_count, "load_average": load.split()[:3]}
except Exception as e:
logger.error(f"Error getting CPU info: {e}")
return {"error": str(e)}
try:
return {
"count": psutil.cpu_count(),
"percent": psutil.cpu_percent(interval=1),
"load_average": [round(x, 2) for x in __import__("os").getloadavg()]
}
except Exception as e:
logger.error(f"Error getting CPU info with psutil: {e}")
return {"error": str(e)}
def get_time() -> Dict[str, str]:
try:
now = datetime.now()
return {
"iso": now.isoformat(),
"timestamp": int(now.timestamp()),
"timezone": datetime.now().astimezone().tzinfo.__str__()
}
except Exception as e:
logger.error(f"Error getting time: {e}")
return {"error": str(e)}
def set_time(iso_string: str) -> Dict[str, str]:
try:
dt = datetime.fromisoformat(iso_string)
result = subprocess.run(
["date", "-s", dt.strftime("%Y-%m-%d %H:%M:%S")],
capture_output=True, text=True, check=False
)
if result.returncode != 0:
return {"error": result.stderr}
subprocess.run(["hwclock", "--systohc"], check=False)
logger.info(f"Set time to {iso_string}")
return {"status": "success", "time": iso_string}
except Exception as e:
logger.error(f"Error setting time: {e}")
return {"error": str(e)}
def get_updates() -> Dict[str, Any]:
try:
result = subprocess.run(
["apt", "list", "--upgradable"],
capture_output=True, text=True, timeout=10, check=False
)
if result.returncode != 0:
return {"error": result.stderr}
packages = []
for line in result.stdout.split("\n")[1:]:
if line.strip():
parts = line.split("/")
if len(parts) >= 2:
packages.append({
"package": parts[0].strip(),
"current": parts[1].split("[")[0].strip() if "[" in line else ""
})
return {"available": len(packages), "packages": packages}
except Exception as e:
logger.error(f"Error checking updates: {e}")
return {"error": str(e)}
def reboot() -> Dict[str, str]:
try:
subprocess.Popen(["shutdown", "-r", "now"])
return {"status": "success", "message": "System rebooting..."}
except Exception as e:
logger.error(f"Error rebooting: {e}")
return {"error": str(e)}
def shutdown() -> Dict[str, str]:
try:
subprocess.Popen(["shutdown", "-h", "now"])
return {"status": "success", "message": "System shutting down..."}
except Exception as e:
logger.error(f"Error shutting down: {e}")
return {"error": str(e)}
def get_network_info() -> Dict[str, Any]:
try:
result = subprocess.run(
["/usr/sbin/ip", "-j", "addr"],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
import json
try:
data = json.loads(result.stdout)
interfaces = []
for iface in data:
addr_info = [
{"family": addr.get("family"), "local": addr.get("local")}
for addr in iface.get("addr_info", [])
]
interfaces.append({
"name": iface.get("ifname"),
"state": iface.get("operstate", "UNKNOWN"),
"addresses": addr_info
})
return {"interfaces": interfaces}
except json.JSONDecodeError:
pass
with open("/proc/net/dev", "r") as f:
lines = f.readlines()
interfaces = []
for line in lines[2:]:
if ":" in line:
name = line.split(":")[0].strip()
interfaces.append({"name": name, "state": "UP", "addresses": []})
return {"interfaces": interfaces}
except Exception as e:
logger.error(f"Error getting network info: {e}")
return {"error": str(e)}
def get_network_traffic() -> Dict[str, Any]:
try:
with open("/proc/net/dev", "r") as f:
lines = f.readlines()
interfaces = []
for line in lines[2:]:
if ":" in line:
name, stats_str = line.split(":")
name = name.strip()
stats = stats_str.split()
if len(stats) >= 16:
interfaces.append({
"name": name,
"rx_bytes": int(stats[0]),
"rx_packets": int(stats[1]),
"rx_errors": int(stats[2]),
"rx_drops": int(stats[3]),
"tx_bytes": int(stats[8]),
"tx_packets": int(stats[9]),
"tx_errors": int(stats[10]),
"tx_drops": int(stats[11])
})
return {"interfaces": interfaces}
except Exception as e:
logger.error(f"Error getting network traffic: {e}")
return {"error": str(e)}
def get_disk_io() -> Dict[str, Any]:
if IS_CONTAINER:
return {"disks": []}
try:
with open("/proc/diskstats", "r") as f:
lines = f.readlines()
disks = []
for line in lines:
fields = line.split()
if len(fields) >= 14:
name = fields[2]
if name.startswith(('dm-', 'loop', 'ram', 'sr', 'zram')):
continue
if not any(name.startswith(p) for p in ['sd', 'nvme', 'hd', 'vd']):
continue
disks.append({
"name": name,
"reads_completed": int(fields[3]),
"reads_bytes": int(fields[5]) * 512,
"writes_completed": int(fields[7]),
"writes_bytes": int(fields[9]) * 512
})
return {"disks": disks}
except Exception as e:
logger.error(f"Error getting disk I/O: {e}")
return {"error": str(e)}
def get_services() -> Dict[str, Any]:
try:
result = subprocess.run(
["/usr/bin/systemctl", "list-units", "--type=service", "--state=running",
"--no-pager", "--output=json"],
capture_output=True, text=True, timeout=10
)
if result.returncode == 0:
import json
try:
services = json.loads(result.stdout)
return {
"services": [
{"name": svc.get("unit"), "state": svc.get("active"), "description": svc.get("description")}
for svc in services if svc.get("unit", "").endswith(".service")
]
}
except json.JSONDecodeError:
pass
result = subprocess.run(
["/usr/bin/systemctl", "list-units", "--type=service", "--state=running", "--no-pager"],
capture_output=True, text=True, timeout=10
)
services = []
for line in result.stdout.split("\n")[1:]:
if line.strip() and ".service" in line:
parts = line.split()
if len(parts) >= 2:
services.append({
"name": parts[0],
"state": "running",
"description": " ".join(parts[2:]) if len(parts) > 2 else ""
})
return {"services": services}
except Exception as e:
logger.error(f"Error getting services: {e}")
return {"error": str(e)}
def get_all_units() -> Dict[str, Any]:
units = {"services": [], "targets": [], "sockets": [], "timers": [], "paths": []}
try:
result = subprocess.run(
["/usr/bin/systemctl", "list-units", "--all", "--no-pager", "--output=json"],
capture_output=True, text=True, timeout=10
)
if result.returncode == 0:
import json
try:
for unit in json.loads(result.stdout):
name = unit.get("unit", "")
item = {
"name": name,
"active": unit.get("active"),
"sub": unit.get("sub"),
"description": unit.get("description", "")
}
if name.endswith(".service"):
units["services"].append(item)
elif name.endswith(".target"):
units["targets"].append(item)
elif name.endswith(".socket"):
units["sockets"].append(item)
elif name.endswith(".timer"):
units["timers"].append(item)
elif name.endswith(".path"):
units["paths"].append(item)
return units
except json.JSONDecodeError:
pass
result = subprocess.run(
["/usr/bin/systemctl", "list-units", "--all", "--no-pager"],
capture_output=True, text=True, timeout=10
)
for line in result.stdout.split("\n")[1:]:
if not line.strip():
continue
parts = line.split()
if len(parts) < 2:
continue
name = parts[0]
item = {
"name": name,
"active": parts[1] if len(parts) > 1 else "unknown",
"sub": parts[2] if len(parts) > 2 else "",
"description": " ".join(parts[3:]) if len(parts) > 3 else ""
}
if name.endswith(".service"):
units["services"].append(item)
elif name.endswith(".target"):
units["targets"].append(item)
elif name.endswith(".socket"):
units["sockets"].append(item)
elif name.endswith(".timer"):
units["timers"].append(item)
elif name.endswith(".path"):
units["paths"].append(item)
return units
except Exception as e:
logger.error(f"Error getting units: {e}")
return {"error": str(e)}
def get_disk_usage() -> Dict[str, Any]:
try:
result = subprocess.run(
["/usr/bin/df", "-P", "-x", "tmpfs", "-x", "devtmpfs", "-x", "squashfs", "-x", "overlay"],
capture_output=True, text=True, timeout=5
)
if result.returncode != 0:
return {"error": result.stderr}
filesystems = []
for line in result.stdout.strip().split("\n")[1:]:
parts = line.split()
if len(parts) < 6:
continue
try:
total = int(parts[1]) * 1024
used = int(parts[2]) * 1024
available = int(parts[3]) * 1024
capacity = int(parts[4].rstrip("%"))
filesystems.append({
"filesystem": parts[0],
"mountpoint": parts[5],
"total": total,
"used": used,
"available": available,
"capacity": capacity,
})
except (ValueError, IndexError):
continue
return {"filesystems": filesystems}
except Exception as e:
logger.error(f"Error getting disk usage: {e}")
return {"error": str(e)}
def get_journal_logs(limit: int = 20) -> Dict[str, Any]:
try:
result = subprocess.run(
["/usr/bin/journalctl", "-n", str(limit), "--no-pager", "--output=short"],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
logs = [line.strip() for line in result.stdout.split("\n") if line.strip()]
return {"logs": logs}
return {"logs": []}
except Exception as e:
logger.error(f"Error getting journal logs: {e}")
return {"error": str(e)}