Refactor: Java-Klassen aus Services entfernt + kritische Bugs gefixt

- AuthService, SystemInfo, IdentitiesManager Klassen → Modul-Funktionen
- grp.getall() → grp.getgrall() (Bug: Methode existierte nie)
- open('/proc/loadavg') ohne context manager gefixt (File-Handle-Leak)
- rx_packets/tx_packets null-check im Frontend (toLocaleString auf undefined)
- PoolCard onClick: /pools/{name} → /zfs (Route existierte nicht, löste Seitenreload aus)
- Alle Router-Imports auf Modul-Aliase umgestellt

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-06-05 14:11:32 +02:00
parent ce78f0ae95
commit f49793e6f2
12 changed files with 853 additions and 1169 deletions
+402 -546
View File
@@ -13,576 +13,432 @@ from datetime import datetime
logger = logging.getLogger(__name__)
class SystemInfo:
"""Get system information"""
def get_hostname() -> Dict[str, str]:
try:
with open("/etc/hostname", "r") as f:
hostname = f.read().strip()
return {"hostname": hostname}
except Exception as e:
logger.error(f"Error getting hostname: {e}")
return {"error": str(e)}
@staticmethod
def get_hostname() -> Dict[str, str]:
"""Get system hostname"""
try:
with open("/etc/hostname", "r") as f:
hostname = f.read().strip()
return {"hostname": hostname}
except Exception as e:
logger.error(f"Error getting hostname: {e}")
return {"error": str(e)}
@staticmethod
def set_hostname(hostname: str) -> Dict[str, str]:
"""Set system hostname"""
try:
with open("/etc/hostname", "w") as f:
f.write(hostname)
def set_hostname(hostname: str) -> Dict[str, str]:
try:
with open("/etc/hostname", "w") as f:
f.write(hostname)
subprocess.run(
["hostnamectl", "set-hostname", hostname],
capture_output=True,
check=False
)
logger.info(f"Set hostname to {hostname}")
return {"status": "success", "hostname": hostname}
except Exception as e:
logger.error(f"Error setting hostname: {e}")
return {"error": str(e)}
# Also update hostnamectl if available
subprocess.run(
["hostnamectl", "set-hostname", hostname],
capture_output=True,
check=False
)
logger.info(f"Set hostname to {hostname}")
return {"status": "success", "hostname": hostname}
except Exception as e:
logger.error(f"Error setting hostname: {e}")
return {"error": str(e)}
@staticmethod
def get_system_info() -> Dict[str, Any]:
"""Get general system information"""
try:
uname = platform.uname()
info = {
"hostname": socket.gethostname(),
"system": uname.system,
"kernel": uname.release,
"machine": uname.machine,
"processor": platform.processor(),
"python": platform.python_version()
}
# Get machine ID
try:
with open("/etc/machine-id", "r") as f:
info["machine_id"] = f.read().strip()
except:
pass
# Get hardware model
try:
result = subprocess.run(
["dmidecode", "-s", "system-product-name"],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
info["model"] = result.stdout.strip()
except:
pass
# Get domain name
try:
result = subprocess.run(
["domainname"],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
domain = result.stdout.strip()
if domain and domain != "(none)":
info["domain"] = domain
except:
pass
return info
except Exception as e:
logger.error(f"Error getting system info: {e}")
return {"error": str(e)}
@staticmethod
def get_uptime() -> Dict[str, Any]:
"""Get system uptime and boot time"""
try:
import time
with open("/proc/uptime", "r") as f:
uptime_seconds = int(float(f.read().split()[0]))
days = uptime_seconds // 86400
hours = (uptime_seconds % 86400) // 3600
minutes = (uptime_seconds % 3600) // 60
# Calculate boot timestamp
current_time = time.time()
boot_timestamp = current_time - uptime_seconds
return {
"uptime_seconds": uptime_seconds,
"uptime_string": f"{days}d {hours}h {minutes}m",
"uptime_formatted": {
"days": days,
"hours": hours,
"minutes": minutes
},
"boot_time": int(boot_timestamp)
}
except Exception as e:
logger.error(f"Error getting uptime: {e}")
return {"error": str(e)}
@staticmethod
def get_memory() -> Dict[str, Any]:
"""Get memory usage"""
try:
with open("/proc/meminfo", "r") as f:
lines = f.readlines()
meminfo = {}
for line in lines:
key, value = line.split(":")
meminfo[key.strip()] = int(value.split()[0]) * 1024 # Convert to bytes
return {
"total": meminfo.get("MemTotal", 0),
"available": meminfo.get("MemAvailable", 0),
"used": meminfo.get("MemTotal", 0) - meminfo.get("MemAvailable", 0),
"free": meminfo.get("MemFree", 0),
"swap_total": meminfo.get("SwapTotal", 0),
"swap_free": meminfo.get("SwapFree", 0),
"swap_used": meminfo.get("SwapTotal", 0) - meminfo.get("SwapFree", 0)
}
except Exception as e:
logger.error(f"Error getting memory info: {e}")
return {"error": str(e)}
@staticmethod
def get_cpu_info() -> Dict[str, Any]:
"""Get CPU information"""
try:
import psutil
except ImportError:
logger.debug("psutil not installed, using fallback")
try:
with open("/proc/cpuinfo", "r") as f:
cpuinfo_text = f.read()
cpu_count = cpuinfo_text.count("processor")
return {
"count": cpu_count,
"load_average": open("/proc/loadavg").read().split()[:3]
}
except Exception as e:
logger.error(f"Error getting CPU info: {e}")
return {"error": str(e)}
def get_system_info() -> Dict[str, Any]:
try:
uname = platform.uname()
info = {
"hostname": socket.gethostname(),
"system": uname.system,
"kernel": uname.release,
"machine": uname.machine,
"processor": platform.processor(),
"python": platform.python_version()
}
try:
return {
"count": psutil.cpu_count(),
"percent": psutil.cpu_percent(interval=1),
"load_average": [round(x, 2) for x in __import__("os").getloadavg()]
}
except Exception as e:
logger.error(f"Error getting CPU info with psutil: {e}")
return {"error": str(e)}
with open("/etc/machine-id", "r") as f:
info["machine_id"] = f.read().strip()
except Exception:
pass
@staticmethod
def get_time() -> Dict[str, str]:
"""Get system time"""
try:
now = datetime.now()
return {
"iso": now.isoformat(),
"timestamp": int(now.timestamp()),
"timezone": datetime.now().astimezone().tzinfo.__str__()
}
except Exception as e:
logger.error(f"Error getting time: {e}")
return {"error": str(e)}
@staticmethod
def set_time(iso_string: str) -> Dict[str, str]:
"""Set system time (requires root)"""
try:
dt = datetime.fromisoformat(iso_string)
result = subprocess.run(
["date", "-s", dt.strftime("%Y-%m-%d %H:%M:%S")],
capture_output=True,
text=True,
check=False
)
if result.returncode != 0:
return {"error": result.stderr}
# Sync hardware clock
subprocess.run(["hwclock", "--systohc"], check=False)
logger.info(f"Set time to {iso_string}")
return {"status": "success", "time": iso_string}
except Exception as e:
logger.error(f"Error setting time: {e}")
return {"error": str(e)}
@staticmethod
def get_updates() -> Dict[str, Any]:
"""Check available updates"""
try:
result = subprocess.run(
["apt", "list", "--upgradable"],
capture_output=True,
text=True,
timeout=10,
check=False
["dmidecode", "-s", "system-product-name"],
capture_output=True, text=True, timeout=5
)
if result.returncode != 0:
return {"error": result.stderr}
packages = []
for line in result.stdout.split("\n")[1:]:
if line.strip():
parts = line.split("/")
if len(parts) >= 2:
packages.append({
"package": parts[0].strip(),
"current": parts[1].split("[")[0].strip() if "[" in line else ""
})
return {
"available": len(packages),
"packages": packages
}
except Exception as e:
logger.error(f"Error checking updates: {e}")
return {"error": str(e)}
@staticmethod
def reboot() -> Dict[str, str]:
"""Reboot system (requires root)"""
try:
subprocess.Popen(["shutdown", "-r", "now"])
return {"status": "success", "message": "System rebooting..."}
except Exception as e:
logger.error(f"Error rebooting: {e}")
return {"error": str(e)}
@staticmethod
def shutdown() -> Dict[str, str]:
"""Shutdown system (requires root)"""
try:
subprocess.Popen(["shutdown", "-h", "now"])
return {"status": "success", "message": "System shutting down..."}
except Exception as e:
logger.error(f"Error shutting down: {e}")
return {"error": str(e)}
@staticmethod
def get_network_info() -> Dict[str, Any]:
"""Get network interface information"""
try:
# Try ip -j addr (JSON output) first
result = subprocess.run(
["/usr/sbin/ip", "-j", "addr"],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
import json
info["model"] = result.stdout.strip()
except Exception:
pass
try:
result = subprocess.run(
["domainname"], capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
domain = result.stdout.strip()
if domain and domain != "(none)":
info["domain"] = domain
except Exception:
pass
return info
except Exception as e:
logger.error(f"Error getting system info: {e}")
return {"error": str(e)}
def get_uptime() -> Dict[str, Any]:
try:
import time
with open("/proc/uptime", "r") as f:
uptime_seconds = int(float(f.read().split()[0]))
days = uptime_seconds // 86400
hours = (uptime_seconds % 86400) // 3600
minutes = (uptime_seconds % 3600) // 60
boot_timestamp = time.time() - uptime_seconds
return {
"uptime_seconds": uptime_seconds,
"uptime_string": f"{days}d {hours}h {minutes}m",
"uptime_formatted": {"days": days, "hours": hours, "minutes": minutes},
"boot_time": int(boot_timestamp)
}
except Exception as e:
logger.error(f"Error getting uptime: {e}")
return {"error": str(e)}
def get_memory() -> Dict[str, Any]:
try:
with open("/proc/meminfo", "r") as f:
lines = f.readlines()
meminfo = {}
for line in lines:
key, value = line.split(":")
meminfo[key.strip()] = int(value.split()[0]) * 1024
return {
"total": meminfo.get("MemTotal", 0),
"available": meminfo.get("MemAvailable", 0),
"used": meminfo.get("MemTotal", 0) - meminfo.get("MemAvailable", 0),
"free": meminfo.get("MemFree", 0),
"swap_total": meminfo.get("SwapTotal", 0),
"swap_free": meminfo.get("SwapFree", 0),
"swap_used": meminfo.get("SwapTotal", 0) - meminfo.get("SwapFree", 0)
}
except Exception as e:
logger.error(f"Error getting memory info: {e}")
return {"error": str(e)}
def get_cpu_info() -> Dict[str, Any]:
try:
import psutil
except ImportError:
logger.debug("psutil not installed, using fallback")
try:
with open("/proc/cpuinfo", "r") as f:
cpuinfo_text = f.read()
cpu_count = cpuinfo_text.count("processor")
with open("/proc/loadavg") as f:
load = f.read()
return {"count": cpu_count, "load_average": load.split()[:3]}
except Exception as e:
logger.error(f"Error getting CPU info: {e}")
return {"error": str(e)}
try:
return {
"count": psutil.cpu_count(),
"percent": psutil.cpu_percent(interval=1),
"load_average": [round(x, 2) for x in __import__("os").getloadavg()]
}
except Exception as e:
logger.error(f"Error getting CPU info with psutil: {e}")
return {"error": str(e)}
def get_time() -> Dict[str, str]:
try:
now = datetime.now()
return {
"iso": now.isoformat(),
"timestamp": int(now.timestamp()),
"timezone": datetime.now().astimezone().tzinfo.__str__()
}
except Exception as e:
logger.error(f"Error getting time: {e}")
return {"error": str(e)}
def set_time(iso_string: str) -> Dict[str, str]:
try:
dt = datetime.fromisoformat(iso_string)
result = subprocess.run(
["date", "-s", dt.strftime("%Y-%m-%d %H:%M:%S")],
capture_output=True, text=True, check=False
)
if result.returncode != 0:
return {"error": result.stderr}
subprocess.run(["hwclock", "--systohc"], check=False)
logger.info(f"Set time to {iso_string}")
return {"status": "success", "time": iso_string}
except Exception as e:
logger.error(f"Error setting time: {e}")
return {"error": str(e)}
def get_updates() -> Dict[str, Any]:
try:
result = subprocess.run(
["apt", "list", "--upgradable"],
capture_output=True, text=True, timeout=10, check=False
)
if result.returncode != 0:
return {"error": result.stderr}
packages = []
for line in result.stdout.split("\n")[1:]:
if line.strip():
parts = line.split("/")
if len(parts) >= 2:
packages.append({
"package": parts[0].strip(),
"current": parts[1].split("[")[0].strip() if "[" in line else ""
})
return {"available": len(packages), "packages": packages}
except Exception as e:
logger.error(f"Error checking updates: {e}")
return {"error": str(e)}
def reboot() -> Dict[str, str]:
try:
subprocess.Popen(["shutdown", "-r", "now"])
return {"status": "success", "message": "System rebooting..."}
except Exception as e:
logger.error(f"Error rebooting: {e}")
return {"error": str(e)}
def shutdown() -> Dict[str, str]:
try:
subprocess.Popen(["shutdown", "-h", "now"])
return {"status": "success", "message": "System shutting down..."}
except Exception as e:
logger.error(f"Error shutting down: {e}")
return {"error": str(e)}
def get_network_info() -> Dict[str, Any]:
try:
result = subprocess.run(
["/usr/sbin/ip", "-j", "addr"],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
import json
try:
data = json.loads(result.stdout)
interfaces = []
try:
data = json.loads(result.stdout)
for iface in data:
addr_info = []
for addr in iface.get("addr_info", []):
addr_info.append({
"family": addr.get("family"),
"local": addr.get("local")
})
interfaces.append({
"name": iface.get("ifname"),
"state": iface.get("operstate", "UNKNOWN"),
"addresses": addr_info
})
return {"interfaces": interfaces}
except json.JSONDecodeError:
pass
for iface in data:
addr_info = [
{"family": addr.get("family"), "local": addr.get("local")}
for addr in iface.get("addr_info", [])
]
interfaces.append({
"name": iface.get("ifname"),
"state": iface.get("operstate", "UNKNOWN"),
"addresses": addr_info
})
return {"interfaces": interfaces}
except json.JSONDecodeError:
pass
# Fallback: read from /proc/net/dev
with open("/proc/net/dev", "r") as f:
lines = f.readlines()
with open("/proc/net/dev", "r") as f:
lines = f.readlines()
interfaces = []
for line in lines[2:]: # Skip header lines
if ":" in line:
name = line.split(":")[0].strip()
interfaces = []
for line in lines[2:]:
if ":" in line:
name = line.split(":")[0].strip()
interfaces.append({"name": name, "state": "UP", "addresses": []})
return {"interfaces": interfaces}
except Exception as e:
logger.error(f"Error getting network info: {e}")
return {"error": str(e)}
def get_network_traffic() -> Dict[str, Any]:
try:
with open("/proc/net/dev", "r") as f:
lines = f.readlines()
interfaces = []
for line in lines[2:]:
if ":" in line:
name, stats_str = line.split(":")
name = name.strip()
stats = stats_str.split()
if len(stats) >= 16:
interfaces.append({
"name": name,
"state": "UP",
"addresses": []
"rx_bytes": int(stats[0]),
"rx_packets": int(stats[1]),
"rx_errors": int(stats[2]),
"rx_drops": int(stats[3]),
"tx_bytes": int(stats[8]),
"tx_packets": int(stats[9]),
"tx_errors": int(stats[10]),
"tx_drops": int(stats[11])
})
return {"interfaces": interfaces}
except Exception as e:
logger.error(f"Error getting network traffic: {e}")
return {"error": str(e)}
return {"interfaces": interfaces}
except Exception as e:
logger.error(f"Error getting network info: {e}")
return {"error": str(e)}
@staticmethod
def get_network_traffic() -> Dict[str, Any]:
"""Get network interface traffic (RX/TX bytes)"""
try:
with open("/proc/net/dev", "r") as f:
lines = f.readlines()
def get_disk_io() -> Dict[str, Any]:
try:
with open("/proc/diskstats", "r") as f:
lines = f.readlines()
interfaces = []
# /proc/net/dev format (after colon):
# RX bytes, RX packets, RX errors, RX drops, RX fifo, RX frame, RX compressed, RX multicast,
# TX bytes, TX packets, TX errors, TX drops, TX fifo, TX collisions, TX carrier, TX compressed
for line in lines[2:]: # Skip header lines
if ":" in line:
name, stats_str = line.split(":")
name = name.strip()
stats = stats_str.split()
if len(stats) >= 16:
interfaces.append({
"name": name,
"rx_bytes": int(stats[0]),
"rx_packets": int(stats[1]),
"rx_errors": int(stats[2]),
"rx_drops": int(stats[3]),
"tx_bytes": int(stats[8]),
"tx_packets": int(stats[9]),
"tx_errors": int(stats[10]),
"tx_drops": int(stats[11])
})
return {"interfaces": interfaces}
except Exception as e:
logger.error(f"Error getting network traffic: {e}")
return {"error": str(e)}
@staticmethod
def get_disk_io() -> Dict[str, Any]:
"""Get disk I/O statistics (read/write operations and bytes)"""
try:
with open("/proc/diskstats", "r") as f:
lines = f.readlines()
disks = []
# /proc/diskstats format:
# major minor name reads_completed reads_merged reads_sectors reads_time_ms
# writes_completed writes_merged writes_sectors writes_time_ms in_progress io_time_ms weighted_io_time_ms
for line in lines:
fields = line.split()
if len(fields) >= 14:
major = int(fields[0])
minor = int(fields[1])
name = fields[2]
# Skip loop devices, ram disks, and other virtual disks
if name.startswith(('dm-', 'loop', 'ram', 'sr', 'zram')):
continue
# Only include actual storage devices (sda, sdb, nvme0n1, etc.)
if not any(name.startswith(prefix) for prefix in ['sd', 'nvme', 'hd', 'vd']):
continue
reads_completed = int(fields[3])
reads_sectors = int(fields[5])
writes_completed = int(fields[7])
writes_sectors = int(fields[9])
# Sectors are typically 512 bytes
reads_bytes = reads_sectors * 512
writes_bytes = writes_sectors * 512
disks.append({
"name": name,
"reads_completed": reads_completed,
"reads_bytes": reads_bytes,
"writes_completed": writes_completed,
"writes_bytes": writes_bytes
})
return {"disks": disks}
except Exception as e:
logger.error(f"Error getting disk I/O: {e}")
return {"error": str(e)}
@staticmethod
def get_services() -> Dict[str, Any]:
"""Get running systemd services"""
try:
result = subprocess.run(
["/usr/bin/systemctl", "list-units", "--type=service", "--state=running", "--no-pager", "--output=json"],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
import json
try:
services = json.loads(result.stdout)
return {
"services": [
{
"name": svc.get("unit"),
"state": svc.get("active"),
"description": svc.get("description")
}
for svc in services if svc.get("unit", "").endswith(".service")
]
}
except json.JSONDecodeError:
pass
# Fallback: parse text output
result = subprocess.run(
["/usr/bin/systemctl", "list-units", "--type=service", "--state=running", "--no-pager"],
capture_output=True,
text=True,
timeout=10
)
services = []
for line in result.stdout.split("\n")[1:]:
if line.strip() and ".service" in line:
parts = line.split()
if len(parts) >= 2:
services.append({
"name": parts[0],
"state": "running",
"description": " ".join(parts[2:]) if len(parts) > 2 else ""
})
return {"services": services}
except Exception as e:
logger.error(f"Error getting services: {e}")
return {"error": str(e)}
@staticmethod
def get_all_units() -> Dict[str, Any]:
"""Get all systemd units (services, targets, sockets, timers, paths)"""
try:
# Get all units without filtering by state
result = subprocess.run(
["/usr/bin/systemctl", "list-units", "--all", "--no-pager", "--output=json"],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
import json
try:
units_data = json.loads(result.stdout)
units = {
"services": [],
"targets": [],
"sockets": [],
"timers": [],
"paths": []
}
for unit in units_data:
name = unit.get("unit", "")
item = {
"name": name,
"active": unit.get("active"), # active/inactive
"sub": unit.get("sub"), # sub-state like "running", "exited", "enabled", etc.
"description": unit.get("description", "")
}
if name.endswith(".service"):
units["services"].append(item)
elif name.endswith(".target"):
units["targets"].append(item)
elif name.endswith(".socket"):
units["sockets"].append(item)
elif name.endswith(".timer"):
units["timers"].append(item)
elif name.endswith(".path"):
units["paths"].append(item)
return units
except json.JSONDecodeError:
pass
# Fallback: parse text output
result = subprocess.run(
["/usr/bin/systemctl", "list-units", "--all", "--no-pager"],
capture_output=True,
text=True,
timeout=10
)
units = {
"services": [],
"targets": [],
"sockets": [],
"timers": [],
"paths": []
}
for line in result.stdout.split("\n")[1:]:
if not line.strip():
disks = []
for line in lines:
fields = line.split()
if len(fields) >= 14:
name = fields[2]
if name.startswith(('dm-', 'loop', 'ram', 'sr', 'zram')):
continue
parts = line.split()
if len(parts) < 2:
if not any(name.startswith(p) for p in ['sd', 'nvme', 'hd', 'vd']):
continue
name = parts[0]
active = parts[1] if len(parts) > 1 else "unknown"
description = " ".join(parts[3:]) if len(parts) > 3 else ""
item = {
disks.append({
"name": name,
"active": active,
"sub": parts[2] if len(parts) > 2 else "",
"description": description
"reads_completed": int(fields[3]),
"reads_bytes": int(fields[5]) * 512,
"writes_completed": int(fields[7]),
"writes_bytes": int(fields[9]) * 512
})
return {"disks": disks}
except Exception as e:
logger.error(f"Error getting disk I/O: {e}")
return {"error": str(e)}
def get_services() -> Dict[str, Any]:
try:
result = subprocess.run(
["/usr/bin/systemctl", "list-units", "--type=service", "--state=running",
"--no-pager", "--output=json"],
capture_output=True, text=True, timeout=10
)
if result.returncode == 0:
import json
try:
services = json.loads(result.stdout)
return {
"services": [
{"name": svc.get("unit"), "state": svc.get("active"), "description": svc.get("description")}
for svc in services if svc.get("unit", "").endswith(".service")
]
}
except json.JSONDecodeError:
pass
if name.endswith(".service"):
units["services"].append(item)
elif name.endswith(".target"):
units["targets"].append(item)
elif name.endswith(".socket"):
units["sockets"].append(item)
elif name.endswith(".timer"):
units["timers"].append(item)
elif name.endswith(".path"):
units["paths"].append(item)
return units
except Exception as e:
logger.error(f"Error getting units: {e}")
return {"error": str(e)}
@staticmethod
def get_journal_logs(limit: int = 20) -> Dict[str, Any]:
"""Get recent journal logs"""
try:
result = subprocess.run(
["/usr/bin/journalctl", "-n", str(limit), "--no-pager", "--output=short"],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
logs = [line.strip() for line in result.stdout.split("\n") if line.strip()]
return {"logs": logs}
return {"logs": []}
except Exception as e:
logger.error(f"Error getting journal logs: {e}")
return {"error": str(e)}
result = subprocess.run(
["/usr/bin/systemctl", "list-units", "--type=service", "--state=running", "--no-pager"],
capture_output=True, text=True, timeout=10
)
services = []
for line in result.stdout.split("\n")[1:]:
if line.strip() and ".service" in line:
parts = line.split()
if len(parts) >= 2:
services.append({
"name": parts[0],
"state": "running",
"description": " ".join(parts[2:]) if len(parts) > 2 else ""
})
return {"services": services}
except Exception as e:
logger.error(f"Error getting services: {e}")
return {"error": str(e)}
# Global instance
system_info = SystemInfo()
def get_all_units() -> Dict[str, Any]:
units = {"services": [], "targets": [], "sockets": [], "timers": [], "paths": []}
try:
result = subprocess.run(
["/usr/bin/systemctl", "list-units", "--all", "--no-pager", "--output=json"],
capture_output=True, text=True, timeout=10
)
if result.returncode == 0:
import json
try:
for unit in json.loads(result.stdout):
name = unit.get("unit", "")
item = {
"name": name,
"active": unit.get("active"),
"sub": unit.get("sub"),
"description": unit.get("description", "")
}
if name.endswith(".service"):
units["services"].append(item)
elif name.endswith(".target"):
units["targets"].append(item)
elif name.endswith(".socket"):
units["sockets"].append(item)
elif name.endswith(".timer"):
units["timers"].append(item)
elif name.endswith(".path"):
units["paths"].append(item)
return units
except json.JSONDecodeError:
pass
result = subprocess.run(
["/usr/bin/systemctl", "list-units", "--all", "--no-pager"],
capture_output=True, text=True, timeout=10
)
for line in result.stdout.split("\n")[1:]:
if not line.strip():
continue
parts = line.split()
if len(parts) < 2:
continue
name = parts[0]
item = {
"name": name,
"active": parts[1] if len(parts) > 1 else "unknown",
"sub": parts[2] if len(parts) > 2 else "",
"description": " ".join(parts[3:]) if len(parts) > 3 else ""
}
if name.endswith(".service"):
units["services"].append(item)
elif name.endswith(".target"):
units["targets"].append(item)
elif name.endswith(".socket"):
units["sockets"].append(item)
elif name.endswith(".timer"):
units["timers"].append(item)
elif name.endswith(".path"):
units["paths"].append(item)
return units
except Exception as e:
logger.error(f"Error getting units: {e}")
return {"error": str(e)}
def get_journal_logs(limit: int = 20) -> Dict[str, Any]:
try:
result = subprocess.run(
["/usr/bin/journalctl", "-n", str(limit), "--no-pager", "--output=short"],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
logs = [line.strip() for line in result.stdout.split("\n") if line.strip()]
return {"logs": logs}
return {"logs": []}
except Exception as e:
logger.error(f"Error getting journal logs: {e}")
return {"error": str(e)}