""" System Information Service Hostname, time, updates, CPU, memory, etc. """ import subprocess import logging import socket import platform from typing import Dict, Any, Optional from datetime import datetime logger = logging.getLogger(__name__) def get_hostname() -> Dict[str, str]: try: with open("/etc/hostname", "r") as f: hostname = f.read().strip() return {"hostname": hostname} except Exception as e: logger.error(f"Error getting hostname: {e}") return {"error": str(e)} def set_hostname(hostname: str) -> Dict[str, str]: try: with open("/etc/hostname", "w") as f: f.write(hostname) subprocess.run( ["hostnamectl", "set-hostname", hostname], capture_output=True, check=False ) logger.info(f"Set hostname to {hostname}") return {"status": "success", "hostname": hostname} except Exception as e: logger.error(f"Error setting hostname: {e}") return {"error": str(e)} def get_system_info() -> Dict[str, Any]: try: uname = platform.uname() info = { "hostname": socket.gethostname(), "system": uname.system, "kernel": uname.release, "machine": uname.machine, "processor": platform.processor(), "python": platform.python_version() } try: with open("/etc/machine-id", "r") as f: info["machine_id"] = f.read().strip() except Exception: pass try: result = subprocess.run( ["dmidecode", "-s", "system-product-name"], capture_output=True, text=True, timeout=5 ) if result.returncode == 0: info["model"] = result.stdout.strip() except Exception: pass try: result = subprocess.run( ["domainname"], capture_output=True, text=True, timeout=5 ) if result.returncode == 0: domain = result.stdout.strip() if domain and domain != "(none)": info["domain"] = domain except Exception: pass return info except Exception as e: logger.error(f"Error getting system info: {e}") return {"error": str(e)} def get_uptime() -> Dict[str, Any]: try: import time with open("/proc/uptime", "r") as f: uptime_seconds = int(float(f.read().split()[0])) days = uptime_seconds // 86400 hours = (uptime_seconds % 86400) // 3600 minutes = (uptime_seconds % 3600) // 60 boot_timestamp = time.time() - uptime_seconds return { "uptime_seconds": uptime_seconds, "uptime_string": f"{days}d {hours}h {minutes}m", "uptime_formatted": {"days": days, "hours": hours, "minutes": minutes}, "boot_time": int(boot_timestamp) } except Exception as e: logger.error(f"Error getting uptime: {e}") return {"error": str(e)} def get_memory() -> Dict[str, Any]: try: with open("/proc/meminfo", "r") as f: lines = f.readlines() meminfo = {} for line in lines: key, value = line.split(":") meminfo[key.strip()] = int(value.split()[0]) * 1024 return { "total": meminfo.get("MemTotal", 0), "available": meminfo.get("MemAvailable", 0), "used": meminfo.get("MemTotal", 0) - meminfo.get("MemAvailable", 0), "free": meminfo.get("MemFree", 0), "swap_total": meminfo.get("SwapTotal", 0), "swap_free": meminfo.get("SwapFree", 0), "swap_used": meminfo.get("SwapTotal", 0) - meminfo.get("SwapFree", 0) } except Exception as e: logger.error(f"Error getting memory info: {e}") return {"error": str(e)} def get_cpu_info() -> Dict[str, Any]: try: import psutil except ImportError: logger.debug("psutil not installed, using fallback") try: with open("/proc/cpuinfo", "r") as f: cpuinfo_text = f.read() cpu_count = cpuinfo_text.count("processor") with open("/proc/loadavg") as f: load = f.read() return {"count": cpu_count, "load_average": load.split()[:3]} except Exception as e: logger.error(f"Error getting CPU info: {e}") return {"error": str(e)} try: return { "count": psutil.cpu_count(), "percent": psutil.cpu_percent(interval=1), "load_average": [round(x, 2) for x in __import__("os").getloadavg()] } except Exception as e: logger.error(f"Error getting CPU info with psutil: {e}") return {"error": str(e)} def get_time() -> Dict[str, str]: try: now = datetime.now() return { "iso": now.isoformat(), "timestamp": int(now.timestamp()), "timezone": datetime.now().astimezone().tzinfo.__str__() } except Exception as e: logger.error(f"Error getting time: {e}") return {"error": str(e)} def set_time(iso_string: str) -> Dict[str, str]: try: dt = datetime.fromisoformat(iso_string) result = subprocess.run( ["date", "-s", dt.strftime("%Y-%m-%d %H:%M:%S")], capture_output=True, text=True, check=False ) if result.returncode != 0: return {"error": result.stderr} subprocess.run(["hwclock", "--systohc"], check=False) logger.info(f"Set time to {iso_string}") return {"status": "success", "time": iso_string} except Exception as e: logger.error(f"Error setting time: {e}") return {"error": str(e)} def get_updates() -> Dict[str, Any]: try: result = subprocess.run( ["apt", "list", "--upgradable"], capture_output=True, text=True, timeout=10, check=False ) if result.returncode != 0: return {"error": result.stderr} packages = [] for line in result.stdout.split("\n")[1:]: if line.strip(): parts = line.split("/") if len(parts) >= 2: packages.append({ "package": parts[0].strip(), "current": parts[1].split("[")[0].strip() if "[" in line else "" }) return {"available": len(packages), "packages": packages} except Exception as e: logger.error(f"Error checking updates: {e}") return {"error": str(e)} def reboot() -> Dict[str, str]: try: subprocess.Popen(["shutdown", "-r", "now"]) return {"status": "success", "message": "System rebooting..."} except Exception as e: logger.error(f"Error rebooting: {e}") return {"error": str(e)} def shutdown() -> Dict[str, str]: try: subprocess.Popen(["shutdown", "-h", "now"]) return {"status": "success", "message": "System shutting down..."} except Exception as e: logger.error(f"Error shutting down: {e}") return {"error": str(e)} def get_network_info() -> Dict[str, Any]: try: result = subprocess.run( ["/usr/sbin/ip", "-j", "addr"], capture_output=True, text=True, timeout=5 ) if result.returncode == 0: import json try: data = json.loads(result.stdout) interfaces = [] for iface in data: addr_info = [ {"family": addr.get("family"), "local": addr.get("local")} for addr in iface.get("addr_info", []) ] interfaces.append({ "name": iface.get("ifname"), "state": iface.get("operstate", "UNKNOWN"), "addresses": addr_info }) return {"interfaces": interfaces} except json.JSONDecodeError: pass with open("/proc/net/dev", "r") as f: lines = f.readlines() interfaces = [] for line in lines[2:]: if ":" in line: name = line.split(":")[0].strip() interfaces.append({"name": name, "state": "UP", "addresses": []}) return {"interfaces": interfaces} except Exception as e: logger.error(f"Error getting network info: {e}") return {"error": str(e)} def get_network_traffic() -> Dict[str, Any]: try: with open("/proc/net/dev", "r") as f: lines = f.readlines() interfaces = [] for line in lines[2:]: if ":" in line: name, stats_str = line.split(":") name = name.strip() stats = stats_str.split() if len(stats) >= 16: interfaces.append({ "name": name, "rx_bytes": int(stats[0]), "rx_packets": int(stats[1]), "rx_errors": int(stats[2]), "rx_drops": int(stats[3]), "tx_bytes": int(stats[8]), "tx_packets": int(stats[9]), "tx_errors": int(stats[10]), "tx_drops": int(stats[11]) }) return {"interfaces": interfaces} except Exception as e: logger.error(f"Error getting network traffic: {e}") return {"error": str(e)} def _is_container() -> bool: try: r = subprocess.run(["/usr/bin/systemd-detect-virt"], capture_output=True, text=True, timeout=3) return r.returncode == 0 and r.stdout.strip() not in ("none", "") except Exception: return False def get_disk_io() -> Dict[str, Any]: if _is_container(): return {"disks": []} try: with open("/proc/diskstats", "r") as f: lines = f.readlines() disks = [] for line in lines: fields = line.split() if len(fields) >= 14: name = fields[2] if name.startswith(('dm-', 'loop', 'ram', 'sr', 'zram')): continue if not any(name.startswith(p) for p in ['sd', 'nvme', 'hd', 'vd']): continue disks.append({ "name": name, "reads_completed": int(fields[3]), "reads_bytes": int(fields[5]) * 512, "writes_completed": int(fields[7]), "writes_bytes": int(fields[9]) * 512 }) return {"disks": disks} except Exception as e: logger.error(f"Error getting disk I/O: {e}") return {"error": str(e)} def get_services() -> Dict[str, Any]: try: result = subprocess.run( ["/usr/bin/systemctl", "list-units", "--type=service", "--state=running", "--no-pager", "--output=json"], capture_output=True, text=True, timeout=10 ) if result.returncode == 0: import json try: services = json.loads(result.stdout) return { "services": [ {"name": svc.get("unit"), "state": svc.get("active"), "description": svc.get("description")} for svc in services if svc.get("unit", "").endswith(".service") ] } except json.JSONDecodeError: pass result = subprocess.run( ["/usr/bin/systemctl", "list-units", "--type=service", "--state=running", "--no-pager"], capture_output=True, text=True, timeout=10 ) services = [] for line in result.stdout.split("\n")[1:]: if line.strip() and ".service" in line: parts = line.split() if len(parts) >= 2: services.append({ "name": parts[0], "state": "running", "description": " ".join(parts[2:]) if len(parts) > 2 else "" }) return {"services": services} except Exception as e: logger.error(f"Error getting services: {e}") return {"error": str(e)} def get_all_units() -> Dict[str, Any]: units = {"services": [], "targets": [], "sockets": [], "timers": [], "paths": []} try: result = subprocess.run( ["/usr/bin/systemctl", "list-units", "--all", "--no-pager", "--output=json"], capture_output=True, text=True, timeout=10 ) if result.returncode == 0: import json try: for unit in json.loads(result.stdout): name = unit.get("unit", "") item = { "name": name, "active": unit.get("active"), "sub": unit.get("sub"), "description": unit.get("description", "") } if name.endswith(".service"): units["services"].append(item) elif name.endswith(".target"): units["targets"].append(item) elif name.endswith(".socket"): units["sockets"].append(item) elif name.endswith(".timer"): units["timers"].append(item) elif name.endswith(".path"): units["paths"].append(item) return units except json.JSONDecodeError: pass result = subprocess.run( ["/usr/bin/systemctl", "list-units", "--all", "--no-pager"], capture_output=True, text=True, timeout=10 ) for line in result.stdout.split("\n")[1:]: if not line.strip(): continue parts = line.split() if len(parts) < 2: continue name = parts[0] item = { "name": name, "active": parts[1] if len(parts) > 1 else "unknown", "sub": parts[2] if len(parts) > 2 else "", "description": " ".join(parts[3:]) if len(parts) > 3 else "" } if name.endswith(".service"): units["services"].append(item) elif name.endswith(".target"): units["targets"].append(item) elif name.endswith(".socket"): units["sockets"].append(item) elif name.endswith(".timer"): units["timers"].append(item) elif name.endswith(".path"): units["paths"].append(item) return units except Exception as e: logger.error(f"Error getting units: {e}") return {"error": str(e)} def get_disk_usage() -> Dict[str, Any]: try: result = subprocess.run( ["/usr/bin/df", "-P", "-x", "tmpfs", "-x", "devtmpfs", "-x", "squashfs", "-x", "overlay"], capture_output=True, text=True, timeout=5 ) if result.returncode != 0: return {"error": result.stderr} filesystems = [] for line in result.stdout.strip().split("\n")[1:]: parts = line.split() if len(parts) < 6: continue try: total = int(parts[1]) * 1024 used = int(parts[2]) * 1024 available = int(parts[3]) * 1024 capacity = int(parts[4].rstrip("%")) filesystems.append({ "filesystem": parts[0], "mountpoint": parts[5], "total": total, "used": used, "available": available, "capacity": capacity, }) except (ValueError, IndexError): continue return {"filesystems": filesystems} except Exception as e: logger.error(f"Error getting disk usage: {e}") return {"error": str(e)} def get_journal_logs(limit: int = 20) -> Dict[str, Any]: try: result = subprocess.run( ["/usr/bin/journalctl", "-n", str(limit), "--no-pager", "--output=short"], capture_output=True, text=True, timeout=5 ) if result.returncode == 0: logs = [line.strip() for line in result.stdout.split("\n") if line.strip()] return {"logs": logs} return {"logs": []} except Exception as e: logger.error(f"Error getting journal logs: {e}") return {"error": str(e)}