""" ZFS Command Runner – Wrapper für zpool/zfs CLI Commands """ import subprocess import json import logging import glob import os import re from typing import Dict, List, Any, Optional, Tuple from datetime import datetime, timedelta from services.platform_info import IS_CONTAINER logger = logging.getLogger(__name__) _TIMEOUT = 5 # ── ZFS availability ────────────────────────────────────────────────────────── def _probe_zfs() -> bool: try: r = subprocess.run(["zpool", "list"], capture_output=True, timeout=3) return r.returncode == 0 except (FileNotFoundError, subprocess.TimeoutExpired, OSError): return False ZFS_AVAILABLE = _probe_zfs() if not ZFS_AVAILABLE: logger.info("ZFS not available on this system (pools/snapshots disabled)") # ── TTL cache ───────────────────────────────────────────────────────────────── _cache: Dict[str, Tuple[Any, datetime]] = {} def _cache_get(key: str) -> Optional[Any]: if key not in _cache: return None data, expires_at = _cache[key] if datetime.now() > expires_at: return None return data def _cache_set(key: str, data: Any, ttl: int = 60) -> None: _cache[key] = (data, datetime.now() + timedelta(seconds=ttl)) def clear_cache() -> None: _cache.clear() logger.info("ZFS cache cleared") # ── Subprocess helper ───────────────────────────────────────────────────────── def _run(cmd: List[str], timeout: int = _TIMEOUT) -> Tuple[str, str, int]: try: r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout, check=False) return r.stdout, r.stderr, r.returncode except subprocess.TimeoutExpired: logger.error(f"Command timeout after {timeout}s: {' '.join(cmd)}") return "", f"Command timeout after {timeout}s", -1 except FileNotFoundError: logger.debug(f"Command not found: {' '.join(cmd)}") return "", f"Command not found: {cmd[0]}", -1 except Exception as e: logger.error(f"Command execution error: {e}") return "", str(e), -1 # ── Pool operations ─────────────────────────────────────────────────────────── def list_pools() -> List[Dict[str, Any]]: cached = _cache_get("pool_status") if cached is not None: return cached if not ZFS_AVAILABLE: return [] stdout, stderr, rc = _run(["zpool", "list", "-H", "-p"]) if rc != 0: logger.error(f"zpool list failed: {stderr}") return [] pools = [] for line in stdout.strip().split("\n"): if not line: continue parts = line.split() if len(parts) < 10: continue pools.append({ "name": parts[0], "size": int(parts[1]), "alloc": int(parts[2]), "free": int(parts[3]), "fragmentation": f"{parts[6]}%", "capacity": f"{parts[7]}%", "health": parts[9], }) _cache_set("pool_status", pools, ttl=30) return pools def get_disk_id_map() -> Dict[str, str]: mapping: Dict[str, str] = {} for pattern in ["ata-", "nvme-", "scsi-", "wwn-"]: for link in glob.glob(f"/dev/disk/by-id/{pattern}*"): if "-part" in os.path.basename(link): continue try: target = os.path.realpath(link) disk = os.path.basename(target) if disk not in mapping: mapping[disk] = os.path.basename(link) except OSError: continue return mapping def _annotate_disk_ids(vdevs: List[Dict], disk_id_map: Dict[str, str]) -> None: for vdev in vdevs: children = vdev.get("children", []) if not children: name = vdev.get("name", "") base = re.sub(r'\d+$', '', name) if name[-1:].isdigit() else name vdev["disk_id"] = disk_id_map.get(name) or disk_id_map.get(base) else: _annotate_disk_ids(children, disk_id_map) def _parse_vdev_tree(config_lines: List[str]) -> List[Dict[str, Any]]: roots: List[Dict] = [] stack: List[tuple] = [] for line in config_lines: if not line.strip() or line.strip().startswith("NAME"): continue indent = len(line) - len(line.lstrip()) parts = line.split() if not parts: continue vdev: Dict[str, Any] = { "name": parts[0], "state": parts[1] if len(parts) > 1 else "UNKNOWN", "read": int(parts[2]) if len(parts) > 2 else 0, "write": int(parts[3]) if len(parts) > 3 else 0, "cksum": int(parts[4]) if len(parts) > 4 else 0, "children": [], } while stack and stack[-1][0] >= indent: stack.pop() if stack: stack[-1][1]["children"].append(vdev) else: roots.append(vdev) stack.append((indent, vdev)) return roots def get_pool_status(pool_name: str) -> Dict[str, Any]: if not ZFS_AVAILABLE: return {} stdout, stderr, rc = _run(["zpool", "status", pool_name]) if rc != 0: logger.error(f"zpool status failed for {pool_name}: {stderr}") return {} status: Dict[str, Any] = {"name": pool_name, "state": None, "scan": None, "errors": None, "vdevs": []} in_config = False config_lines: List[str] = [] for line in stdout.split("\n"): stripped = line.strip() if stripped.startswith("state:"): status["state"] = stripped.split(":", 1)[1].strip() elif stripped.startswith("scan:"): status["scan"] = stripped.split(":", 1)[1].strip() elif stripped.startswith("errors:"): status["errors"] = stripped.split(":", 1)[1].strip() in_config = False elif stripped == "config:": in_config = True elif in_config and (stripped or config_lines): config_lines.append(line) if config_lines: parsed = _parse_vdev_tree(config_lines) status["vdevs"] = parsed[0]["children"] if parsed and parsed[0]["name"] == pool_name else parsed _annotate_disk_ids(status["vdevs"], get_disk_id_map()) return status def scrub_pool(pool_name: str) -> Dict[str, str]: stdout, stderr, rc = _run(["zpool", "scrub", pool_name]) if rc != 0: logger.error(f"zpool scrub failed for {pool_name}: {stderr}") return {"status": "error", "message": stderr} return {"status": "success", "message": f"Scrub started for {pool_name}"} def get_smart_info(disk: str) -> Dict[str, Any]: if IS_CONTAINER: return {"available": False, "reason": "container"} stdout, _, rc = _run(["smartctl", "-A", "-i", "--json", f"/dev/{disk}"]) if not stdout: return {} try: data = json.loads(stdout) except json.JSONDecodeError: return {} attrs = {a["name"]: a for a in data.get("ata_smart_attributes", {}).get("table", [])} return { "model": data.get("model_name") or data.get("model_family", ""), "serial": data.get("serial_number", ""), "protocol": data.get("device", {}).get("protocol", ""), "power_on_hours": data.get("power_on_time", {}).get("hours"), "temperature": data.get("temperature", {}).get("current"), "passed": data.get("smart_status", {}).get("passed"), "reallocated_sectors": attrs.get("Reallocated_Sector_Ct", {}).get("raw", {}).get("value", 0), "pending_sectors": attrs.get("Current_Pending_Sector", {}).get("raw", {}).get("value", 0), "uncorrectable": attrs.get("Offline_Uncorrectable", {}).get("raw", {}).get("value", 0), } # ── Dataset operations ──────────────────────────────────────────────────────── def list_datasets(pool_name: str, max_depth: int = 2) -> List[Dict[str, Any]]: cached = _cache_get("datasets") if cached and cached.get(pool_name): return cached[pool_name] if not ZFS_AVAILABLE: return [] stdout, stderr, rc = _run([ "zfs", "list", "-d", str(max_depth), "-H", "-p", "-o", "name,used,avail,refer,mountpoint,type", pool_name ]) if rc != 0: logger.error(f"zfs list failed for {pool_name}: {stderr}") return [] datasets = [] for line in stdout.strip().split("\n"): if not line: continue parts = line.split("\t") if len(parts) < 6: continue datasets.append({ "name": parts[0], "used": int(parts[1]), "avail": int(parts[2]), "refer": int(parts[3]), "mountpoint": parts[4], "type": parts[5], }) pool_cache = cached or {} pool_cache[pool_name] = datasets _cache_set("datasets", pool_cache, ttl=60) return datasets def create_dataset(dataset_name: str, props: Optional[Dict[str, str]] = None) -> Dict[str, str]: cmd = ["zfs", "create"] if props: for key, val in props.items(): cmd.extend(["-o", f"{key}={val}"]) cmd.append(dataset_name) stdout, stderr, rc = _run(cmd) if rc != 0: logger.error(f"zfs create failed: {stderr}") return {"status": "error", "message": stderr} return {"status": "success", "message": f"Dataset {dataset_name} created"} def set_dataset_properties(dataset_name: str, props: Dict[str, str]) -> Dict[str, str]: errors = [] for key, value in props.items(): if value is None: continue _, stderr, rc = _run(["zfs", "set", f"{key}={value}", dataset_name]) if rc != 0: errors.append(f"{key}: {stderr.strip()}") if errors: return {"status": "error", "message": "; ".join(errors)} return {"status": "success", "message": f"Properties updated for {dataset_name}"} def destroy_dataset(dataset_name: str, recursive: bool = False) -> Dict[str, str]: cmd = ["zfs", "destroy"] + (["-r"] if recursive else []) + [dataset_name] stdout, stderr, rc = _run(cmd) if rc != 0: logger.error(f"zfs destroy failed: {stderr}") return {"status": "error", "message": stderr} return {"status": "success", "message": f"Dataset {dataset_name} destroyed"} # ── Snapshot operations ─────────────────────────────────────────────────────── def list_snapshots(dataset_name: Optional[str] = None, limit: int = 50) -> List[Dict[str, Any]]: cache_key = f"snapshots:{dataset_name or '*'}" cached = _cache_get(cache_key) if cached is not None: return cached if not ZFS_AVAILABLE: return [] if dataset_name: cmd = ["zfs", "list", "-t", "snapshot", "-r", "-H", "-p", "-o", "name,used,referenced,creation", dataset_name] else: cmd = ["zfs", "list", "-t", "snapshot", "-H", "-p", "-o", "name,used,referenced,creation"] stdout, stderr, rc = _run(cmd) if rc != 0: logger.error(f"zfs list snapshots failed: {stderr}") return [] snapshots = [] for line in stdout.strip().split("\n"): if not line: continue parts = line.split("\t") if len(parts) < 4: continue snapshots.append({ "name": parts[0], "used": int(parts[1]), "referenced": int(parts[2]), "creation": int(parts[3]), }) snapshots.sort(key=lambda x: x["creation"], reverse=True) snapshots = snapshots[:limit] _cache_set(cache_key, snapshots, ttl=60) return snapshots def create_snapshot(dataset_name: str, snapshot_name: Optional[str] = None) -> Dict[str, str]: if not snapshot_name: snapshot_name = datetime.now().strftime("%Y%m%d-%H%M%S") full_name = f"{dataset_name}@{snapshot_name}" stdout, stderr, rc = _run(["zfs", "snapshot", full_name]) if rc != 0: logger.error(f"zfs snapshot failed: {stderr}") return {"status": "error", "message": stderr} return {"status": "success", "message": f"Snapshot {full_name} created"} def destroy_snapshot(snapshot_name: str, recursive: bool = False) -> Dict[str, str]: cmd = ["zfs", "destroy"] + (["-r"] if recursive else []) + [snapshot_name] stdout, stderr, rc = _run(cmd) if rc != 0: logger.error(f"zfs destroy snapshot failed: {stderr}") return {"status": "error", "message": stderr} return {"status": "success", "message": f"Snapshot {snapshot_name} destroyed"} def rollback_snapshot(snapshot_name: str) -> Dict[str, str]: stdout, stderr, rc = _run(["zfs", "rollback", "-r", snapshot_name]) if rc != 0: logger.error(f"zfs rollback failed: {stderr}") return {"status": "error", "message": stderr} return {"status": "success", "message": f"Rolled back to {snapshot_name}"} # Backward-compat shim so existing routers keep working unchanged class _ZFSRunnerShim: run_command = staticmethod(_run) list_pools = staticmethod(list_pools) get_pool_status = staticmethod(get_pool_status) scrub_pool = staticmethod(scrub_pool) get_disk_id_map = staticmethod(get_disk_id_map) get_smart_info = staticmethod(get_smart_info) list_datasets = staticmethod(list_datasets) create_dataset = staticmethod(create_dataset) set_dataset_properties = staticmethod(set_dataset_properties) destroy_dataset = staticmethod(destroy_dataset) list_snapshots = staticmethod(list_snapshots) create_snapshot = staticmethod(create_snapshot) destroy_snapshot = staticmethod(destroy_snapshot) rollback_snapshot = staticmethod(rollback_snapshot) clear_cache = staticmethod(clear_cache) zfs_runner = _ZFSRunnerShim()