""" ZFS Command Runner – Wrapper für zpool/zfs CLI Commands """ import subprocess import json import logging import glob import os import re from typing import Dict, List, Any, Optional, Tuple from datetime import datetime, timedelta logger = logging.getLogger(__name__) _TIMEOUT = 5 # ── ZFS availability ────────────────────────────────────────────────────────── def _probe_zfs() -> bool: try: r = subprocess.run(["zpool", "list"], capture_output=True, timeout=3) return r.returncode == 0 except (FileNotFoundError, subprocess.TimeoutExpired, OSError): return False ZFS_AVAILABLE = _probe_zfs() if not ZFS_AVAILABLE: logger.info("ZFS not available on this system (pools/snapshots disabled)") # ── TTL cache ───────────────────────────────────────────────────────────────── _cache: Dict[str, Tuple[Any, datetime]] = {} def _cache_get(key: str) -> Optional[Any]: if key not in _cache: return None data, expires_at = _cache[key] if datetime.now() > expires_at: return None return data def _cache_set(key: str, data: Any, ttl: int = 60) -> None: _cache[key] = (data, datetime.now() + timedelta(seconds=ttl)) def clear_cache() -> None: _cache.clear() logger.info("ZFS cache cleared") # ── Subprocess helper ───────────────────────────────────────────────────────── def _run(cmd: List[str], timeout: int = _TIMEOUT) -> Tuple[str, str, int]: try: r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout, check=False) return r.stdout, r.stderr, r.returncode except subprocess.TimeoutExpired: logger.error(f"Command timeout after {timeout}s: {' '.join(cmd)}") return "", f"Command timeout after {timeout}s", -1 except FileNotFoundError: logger.debug(f"Command not found: {' '.join(cmd)}") return "", f"Command not found: {cmd[0]}", -1 except Exception as e: logger.error(f"Command execution error: {e}") return "", str(e), -1 # ── Pool operations ─────────────────────────────────────────────────────────── def list_pools() -> List[Dict[str, Any]]: cached = _cache_get("pool_status") if cached is not None: return cached if not ZFS_AVAILABLE: return [] stdout, stderr, rc = _run(["zpool", "list", "-H", "-p"]) if rc != 0: logger.error(f"zpool list failed: {stderr}") return [] pools = [] for line in stdout.strip().split("\n"): if not line: continue parts = line.split() if len(parts) < 10: continue pools.append({ "name": parts[0], "size": int(parts[1]), "alloc": int(parts[2]), "free": int(parts[3]), "fragmentation": f"{parts[6]}%", "capacity": f"{parts[7]}%", "health": parts[9], }) _cache_set("pool_status", pools, ttl=30) return pools def get_disk_id_map() -> Dict[str, str]: mapping: Dict[str, str] = {} for pattern in ["ata-", "nvme-", "scsi-", "wwn-"]: for link in glob.glob(f"/dev/disk/by-id/{pattern}*"): if "-part" in os.path.basename(link): continue try: target = os.path.realpath(link) disk = os.path.basename(target) if disk not in mapping: mapping[disk] = os.path.basename(link) except OSError: continue return mapping def _annotate_disk_ids(vdevs: List[Dict], disk_id_map: Dict[str, str]) -> None: for vdev in vdevs: children = vdev.get("children", []) if not children: name = vdev.get("name", "") base = re.sub(r'\d+$', '', name) if name[-1:].isdigit() else name vdev["disk_id"] = disk_id_map.get(name) or disk_id_map.get(base) else: _annotate_disk_ids(children, disk_id_map) def _parse_vdev_tree(config_lines: List[str]) -> List[Dict[str, Any]]: roots: List[Dict] = [] stack: List[tuple] = [] for line in config_lines: if not line.strip() or line.strip().startswith("NAME"): continue indent = len(line) - len(line.lstrip()) parts = line.split() if not parts: continue vdev: Dict[str, Any] = { "name": parts[0], "state": parts[1] if len(parts) > 1 else "UNKNOWN", "read": int(parts[2]) if len(parts) > 2 else 0, "write": int(parts[3]) if len(parts) > 3 else 0, "cksum": int(parts[4]) if len(parts) > 4 else 0, "children": [], } while stack and stack[-1][0] >= indent: stack.pop() if stack: stack[-1][1]["children"].append(vdev) else: roots.append(vdev) stack.append((indent, vdev)) return roots def get_pool_status(pool_name: str) -> Dict[str, Any]: if not ZFS_AVAILABLE: return {} stdout, stderr, rc = _run(["zpool", "status", pool_name]) if rc != 0: logger.error(f"zpool status failed for {pool_name}: {stderr}") return {} status: Dict[str, Any] = {"name": pool_name, "state": None, "scan": None, "errors": None, "vdevs": []} in_config = False config_lines: List[str] = [] for line in stdout.split("\n"): stripped = line.strip() if stripped.startswith("state:"): status["state"] = stripped.split(":", 1)[1].strip() elif stripped.startswith("scan:"): status["scan"] = stripped.split(":", 1)[1].strip() elif stripped.startswith("errors:"): status["errors"] = stripped.split(":", 1)[1].strip() in_config = False elif stripped == "config:": in_config = True elif in_config and (stripped or config_lines): config_lines.append(line) if config_lines: parsed = _parse_vdev_tree(config_lines) status["vdevs"] = parsed[0]["children"] if parsed and parsed[0]["name"] == pool_name else parsed _annotate_disk_ids(status["vdevs"], get_disk_id_map()) return status def scrub_pool(pool_name: str) -> Dict[str, str]: stdout, stderr, rc = _run(["zpool", "scrub", pool_name]) if rc != 0: logger.error(f"zpool scrub failed for {pool_name}: {stderr}") return {"status": "error", "message": stderr} return {"status": "success", "message": f"Scrub started for {pool_name}"} def get_smart_info(disk: str) -> Dict[str, Any]: stdout, _, rc = _run(["smartctl", "-A", "-i", "--json", f"/dev/{disk}"]) if not stdout: return {} try: data = json.loads(stdout) except json.JSONDecodeError: return {} attrs = {a["name"]: a for a in data.get("ata_smart_attributes", {}).get("table", [])} return { "model": data.get("model_name") or data.get("model_family", ""), "serial": data.get("serial_number", ""), "protocol": data.get("device", {}).get("protocol", ""), "power_on_hours": data.get("power_on_time", {}).get("hours"), "temperature": data.get("temperature", {}).get("current"), "passed": data.get("smart_status", {}).get("passed"), "reallocated_sectors": attrs.get("Reallocated_Sector_Ct", {}).get("raw", {}).get("value", 0), "pending_sectors": attrs.get("Current_Pending_Sector", {}).get("raw", {}).get("value", 0), "uncorrectable": attrs.get("Offline_Uncorrectable", {}).get("raw", {}).get("value", 0), } # ── Dataset operations ──────────────────────────────────────────────────────── def list_datasets(pool_name: str, max_depth: int = 2) -> List[Dict[str, Any]]: cached = _cache_get("datasets") if cached and cached.get(pool_name): return cached[pool_name] if not ZFS_AVAILABLE: return [] stdout, stderr, rc = _run([ "zfs", "list", "-d", str(max_depth), "-H", "-p", "-o", "name,used,avail,refer,mountpoint,type", pool_name ]) if rc != 0: logger.error(f"zfs list failed for {pool_name}: {stderr}") return [] datasets = [] for line in stdout.strip().split("\n"): if not line: continue parts = line.split("\t") if len(parts) < 6: continue datasets.append({ "name": parts[0], "used": int(parts[1]), "avail": int(parts[2]), "refer": int(parts[3]), "mountpoint": parts[4], "type": parts[5], }) pool_cache = cached or {} pool_cache[pool_name] = datasets _cache_set("datasets", pool_cache, ttl=60) return datasets def create_dataset(dataset_name: str, props: Optional[Dict[str, str]] = None) -> Dict[str, str]: cmd = ["zfs", "create"] if props: for key, val in props.items(): cmd.extend(["-o", f"{key}={val}"]) cmd.append(dataset_name) stdout, stderr, rc = _run(cmd) if rc != 0: logger.error(f"zfs create failed: {stderr}") return {"status": "error", "message": stderr} return {"status": "success", "message": f"Dataset {dataset_name} created"} def set_dataset_properties(dataset_name: str, props: Dict[str, str]) -> Dict[str, str]: errors = [] for key, value in props.items(): if value is None: continue _, stderr, rc = _run(["zfs", "set", f"{key}={value}", dataset_name]) if rc != 0: errors.append(f"{key}: {stderr.strip()}") if errors: return {"status": "error", "message": "; ".join(errors)} return {"status": "success", "message": f"Properties updated for {dataset_name}"} def destroy_dataset(dataset_name: str, recursive: bool = False) -> Dict[str, str]: cmd = ["zfs", "destroy"] + (["-r"] if recursive else []) + [dataset_name] stdout, stderr, rc = _run(cmd) if rc != 0: logger.error(f"zfs destroy failed: {stderr}") return {"status": "error", "message": stderr} return {"status": "success", "message": f"Dataset {dataset_name} destroyed"} # ── Snapshot operations ─────────────────────────────────────────────────────── def list_snapshots(dataset_name: Optional[str] = None, limit: int = 50) -> List[Dict[str, Any]]: cache_key = f"snapshots:{dataset_name or '*'}" cached = _cache_get(cache_key) if cached is not None: return cached if not ZFS_AVAILABLE: return [] if dataset_name: cmd = ["zfs", "list", "-t", "snapshot", "-r", "-H", "-p", "-o", "name,used,referenced,creation", dataset_name] else: cmd = ["zfs", "list", "-t", "snapshot", "-H", "-p", "-o", "name,used,referenced,creation"] stdout, stderr, rc = _run(cmd) if rc != 0: logger.error(f"zfs list snapshots failed: {stderr}") return [] snapshots = [] for line in stdout.strip().split("\n"): if not line: continue parts = line.split("\t") if len(parts) < 4: continue snapshots.append({ "name": parts[0], "used": int(parts[1]), "referenced": int(parts[2]), "creation": int(parts[3]), }) snapshots.sort(key=lambda x: x["creation"], reverse=True) snapshots = snapshots[:limit] _cache_set(cache_key, snapshots, ttl=60) return snapshots def create_snapshot(dataset_name: str, snapshot_name: Optional[str] = None) -> Dict[str, str]: if not snapshot_name: snapshot_name = datetime.now().strftime("%Y%m%d-%H%M%S") full_name = f"{dataset_name}@{snapshot_name}" stdout, stderr, rc = _run(["zfs", "snapshot", full_name]) if rc != 0: logger.error(f"zfs snapshot failed: {stderr}") return {"status": "error", "message": stderr} return {"status": "success", "message": f"Snapshot {full_name} created"} def destroy_snapshot(snapshot_name: str, recursive: bool = False) -> Dict[str, str]: cmd = ["zfs", "destroy"] + (["-r"] if recursive else []) + [snapshot_name] stdout, stderr, rc = _run(cmd) if rc != 0: logger.error(f"zfs destroy snapshot failed: {stderr}") return {"status": "error", "message": stderr} return {"status": "success", "message": f"Snapshot {snapshot_name} destroyed"} def rollback_snapshot(snapshot_name: str) -> Dict[str, str]: stdout, stderr, rc = _run(["zfs", "rollback", "-r", snapshot_name]) if rc != 0: logger.error(f"zfs rollback failed: {stderr}") return {"status": "error", "message": stderr} return {"status": "success", "message": f"Rolled back to {snapshot_name}"} # Backward-compat shim so existing routers keep working unchanged class _ZFSRunnerShim: run_command = staticmethod(_run) list_pools = staticmethod(list_pools) get_pool_status = staticmethod(get_pool_status) scrub_pool = staticmethod(scrub_pool) get_disk_id_map = staticmethod(get_disk_id_map) get_smart_info = staticmethod(get_smart_info) list_datasets = staticmethod(list_datasets) create_dataset = staticmethod(create_dataset) set_dataset_properties = staticmethod(set_dataset_properties) destroy_dataset = staticmethod(destroy_dataset) list_snapshots = staticmethod(list_snapshots) create_snapshot = staticmethod(create_snapshot) destroy_snapshot = staticmethod(destroy_snapshot) rollback_snapshot = staticmethod(rollback_snapshot) clear_cache = staticmethod(clear_cache) zfs_runner = _ZFSRunnerShim()