12248afa3a
Neues Modul services/platform_info.py prüft systemd-detect-virt einmalig beim Start (statt pro Request). SMART-Abfragen werden in Containern übersprungen, da /dev/sdX dort meist nicht verfügbar ist. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
365 lines
14 KiB
Python
365 lines
14 KiB
Python
"""
|
||
ZFS Command Runner – Wrapper für zpool/zfs CLI Commands
|
||
"""
|
||
|
||
import subprocess
|
||
import json
|
||
import logging
|
||
import glob
|
||
import os
|
||
import re
|
||
from typing import Dict, List, Any, Optional, Tuple
|
||
from datetime import datetime, timedelta
|
||
|
||
from services.platform_info import IS_CONTAINER
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
_TIMEOUT = 5
|
||
|
||
# ── ZFS availability ──────────────────────────────────────────────────────────
|
||
|
||
def _probe_zfs() -> bool:
|
||
try:
|
||
r = subprocess.run(["zpool", "list"], capture_output=True, timeout=3)
|
||
return r.returncode == 0
|
||
except (FileNotFoundError, subprocess.TimeoutExpired, OSError):
|
||
return False
|
||
|
||
ZFS_AVAILABLE = _probe_zfs()
|
||
if not ZFS_AVAILABLE:
|
||
logger.info("ZFS not available on this system (pools/snapshots disabled)")
|
||
|
||
# ── TTL cache ─────────────────────────────────────────────────────────────────
|
||
|
||
_cache: Dict[str, Tuple[Any, datetime]] = {}
|
||
|
||
def _cache_get(key: str) -> Optional[Any]:
|
||
if key not in _cache:
|
||
return None
|
||
data, expires_at = _cache[key]
|
||
if datetime.now() > expires_at:
|
||
return None
|
||
return data
|
||
|
||
def _cache_set(key: str, data: Any, ttl: int = 60) -> None:
|
||
_cache[key] = (data, datetime.now() + timedelta(seconds=ttl))
|
||
|
||
def clear_cache() -> None:
|
||
_cache.clear()
|
||
logger.info("ZFS cache cleared")
|
||
|
||
# ── Subprocess helper ─────────────────────────────────────────────────────────
|
||
|
||
def _run(cmd: List[str], timeout: int = _TIMEOUT) -> Tuple[str, str, int]:
|
||
try:
|
||
r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout, check=False)
|
||
return r.stdout, r.stderr, r.returncode
|
||
except subprocess.TimeoutExpired:
|
||
logger.error(f"Command timeout after {timeout}s: {' '.join(cmd)}")
|
||
return "", f"Command timeout after {timeout}s", -1
|
||
except FileNotFoundError:
|
||
logger.debug(f"Command not found: {' '.join(cmd)}")
|
||
return "", f"Command not found: {cmd[0]}", -1
|
||
except Exception as e:
|
||
logger.error(f"Command execution error: {e}")
|
||
return "", str(e), -1
|
||
|
||
# ── Pool operations ───────────────────────────────────────────────────────────
|
||
|
||
def list_pools() -> List[Dict[str, Any]]:
|
||
cached = _cache_get("pool_status")
|
||
if cached is not None:
|
||
return cached
|
||
if not ZFS_AVAILABLE:
|
||
return []
|
||
stdout, stderr, rc = _run(["zpool", "list", "-H", "-p"])
|
||
if rc != 0:
|
||
logger.error(f"zpool list failed: {stderr}")
|
||
return []
|
||
pools = []
|
||
for line in stdout.strip().split("\n"):
|
||
if not line:
|
||
continue
|
||
parts = line.split()
|
||
if len(parts) < 10:
|
||
continue
|
||
pools.append({
|
||
"name": parts[0],
|
||
"size": int(parts[1]),
|
||
"alloc": int(parts[2]),
|
||
"free": int(parts[3]),
|
||
"fragmentation": f"{parts[6]}%",
|
||
"capacity": f"{parts[7]}%",
|
||
"health": parts[9],
|
||
})
|
||
_cache_set("pool_status", pools, ttl=30)
|
||
return pools
|
||
|
||
|
||
def get_disk_id_map() -> Dict[str, str]:
|
||
mapping: Dict[str, str] = {}
|
||
for pattern in ["ata-", "nvme-", "scsi-", "wwn-"]:
|
||
for link in glob.glob(f"/dev/disk/by-id/{pattern}*"):
|
||
if "-part" in os.path.basename(link):
|
||
continue
|
||
try:
|
||
target = os.path.realpath(link)
|
||
disk = os.path.basename(target)
|
||
if disk not in mapping:
|
||
mapping[disk] = os.path.basename(link)
|
||
except OSError:
|
||
continue
|
||
return mapping
|
||
|
||
|
||
def _annotate_disk_ids(vdevs: List[Dict], disk_id_map: Dict[str, str]) -> None:
|
||
for vdev in vdevs:
|
||
children = vdev.get("children", [])
|
||
if not children:
|
||
name = vdev.get("name", "")
|
||
base = re.sub(r'\d+$', '', name) if name[-1:].isdigit() else name
|
||
vdev["disk_id"] = disk_id_map.get(name) or disk_id_map.get(base)
|
||
else:
|
||
_annotate_disk_ids(children, disk_id_map)
|
||
|
||
|
||
def _parse_vdev_tree(config_lines: List[str]) -> List[Dict[str, Any]]:
|
||
roots: List[Dict] = []
|
||
stack: List[tuple] = []
|
||
for line in config_lines:
|
||
if not line.strip() or line.strip().startswith("NAME"):
|
||
continue
|
||
indent = len(line) - len(line.lstrip())
|
||
parts = line.split()
|
||
if not parts:
|
||
continue
|
||
vdev: Dict[str, Any] = {
|
||
"name": parts[0],
|
||
"state": parts[1] if len(parts) > 1 else "UNKNOWN",
|
||
"read": int(parts[2]) if len(parts) > 2 else 0,
|
||
"write": int(parts[3]) if len(parts) > 3 else 0,
|
||
"cksum": int(parts[4]) if len(parts) > 4 else 0,
|
||
"children": [],
|
||
}
|
||
while stack and stack[-1][0] >= indent:
|
||
stack.pop()
|
||
if stack:
|
||
stack[-1][1]["children"].append(vdev)
|
||
else:
|
||
roots.append(vdev)
|
||
stack.append((indent, vdev))
|
||
return roots
|
||
|
||
|
||
def get_pool_status(pool_name: str) -> Dict[str, Any]:
|
||
if not ZFS_AVAILABLE:
|
||
return {}
|
||
stdout, stderr, rc = _run(["zpool", "status", pool_name])
|
||
if rc != 0:
|
||
logger.error(f"zpool status failed for {pool_name}: {stderr}")
|
||
return {}
|
||
status: Dict[str, Any] = {"name": pool_name, "state": None, "scan": None, "errors": None, "vdevs": []}
|
||
in_config = False
|
||
config_lines: List[str] = []
|
||
for line in stdout.split("\n"):
|
||
stripped = line.strip()
|
||
if stripped.startswith("state:"):
|
||
status["state"] = stripped.split(":", 1)[1].strip()
|
||
elif stripped.startswith("scan:"):
|
||
status["scan"] = stripped.split(":", 1)[1].strip()
|
||
elif stripped.startswith("errors:"):
|
||
status["errors"] = stripped.split(":", 1)[1].strip()
|
||
in_config = False
|
||
elif stripped == "config:":
|
||
in_config = True
|
||
elif in_config and (stripped or config_lines):
|
||
config_lines.append(line)
|
||
if config_lines:
|
||
parsed = _parse_vdev_tree(config_lines)
|
||
status["vdevs"] = parsed[0]["children"] if parsed and parsed[0]["name"] == pool_name else parsed
|
||
_annotate_disk_ids(status["vdevs"], get_disk_id_map())
|
||
return status
|
||
|
||
|
||
def scrub_pool(pool_name: str) -> Dict[str, str]:
|
||
stdout, stderr, rc = _run(["zpool", "scrub", pool_name])
|
||
if rc != 0:
|
||
logger.error(f"zpool scrub failed for {pool_name}: {stderr}")
|
||
return {"status": "error", "message": stderr}
|
||
return {"status": "success", "message": f"Scrub started for {pool_name}"}
|
||
|
||
|
||
def get_smart_info(disk: str) -> Dict[str, Any]:
|
||
if IS_CONTAINER:
|
||
return {"available": False, "reason": "container"}
|
||
stdout, _, rc = _run(["smartctl", "-A", "-i", "--json", f"/dev/{disk}"])
|
||
if not stdout:
|
||
return {}
|
||
try:
|
||
data = json.loads(stdout)
|
||
except json.JSONDecodeError:
|
||
return {}
|
||
attrs = {a["name"]: a for a in data.get("ata_smart_attributes", {}).get("table", [])}
|
||
return {
|
||
"model": data.get("model_name") or data.get("model_family", ""),
|
||
"serial": data.get("serial_number", ""),
|
||
"protocol": data.get("device", {}).get("protocol", ""),
|
||
"power_on_hours": data.get("power_on_time", {}).get("hours"),
|
||
"temperature": data.get("temperature", {}).get("current"),
|
||
"passed": data.get("smart_status", {}).get("passed"),
|
||
"reallocated_sectors": attrs.get("Reallocated_Sector_Ct", {}).get("raw", {}).get("value", 0),
|
||
"pending_sectors": attrs.get("Current_Pending_Sector", {}).get("raw", {}).get("value", 0),
|
||
"uncorrectable": attrs.get("Offline_Uncorrectable", {}).get("raw", {}).get("value", 0),
|
||
}
|
||
|
||
# ── Dataset operations ────────────────────────────────────────────────────────
|
||
|
||
def list_datasets(pool_name: str, max_depth: int = 2) -> List[Dict[str, Any]]:
|
||
cached = _cache_get("datasets")
|
||
if cached and cached.get(pool_name):
|
||
return cached[pool_name]
|
||
if not ZFS_AVAILABLE:
|
||
return []
|
||
stdout, stderr, rc = _run([
|
||
"zfs", "list", "-d", str(max_depth), "-H", "-p",
|
||
"-o", "name,used,avail,refer,mountpoint,type", pool_name
|
||
])
|
||
if rc != 0:
|
||
logger.error(f"zfs list failed for {pool_name}: {stderr}")
|
||
return []
|
||
datasets = []
|
||
for line in stdout.strip().split("\n"):
|
||
if not line:
|
||
continue
|
||
parts = line.split("\t")
|
||
if len(parts) < 6:
|
||
continue
|
||
datasets.append({
|
||
"name": parts[0], "used": int(parts[1]), "avail": int(parts[2]),
|
||
"refer": int(parts[3]), "mountpoint": parts[4], "type": parts[5],
|
||
})
|
||
pool_cache = cached or {}
|
||
pool_cache[pool_name] = datasets
|
||
_cache_set("datasets", pool_cache, ttl=60)
|
||
return datasets
|
||
|
||
|
||
def create_dataset(dataset_name: str, props: Optional[Dict[str, str]] = None) -> Dict[str, str]:
|
||
cmd = ["zfs", "create"]
|
||
if props:
|
||
for key, val in props.items():
|
||
cmd.extend(["-o", f"{key}={val}"])
|
||
cmd.append(dataset_name)
|
||
stdout, stderr, rc = _run(cmd)
|
||
if rc != 0:
|
||
logger.error(f"zfs create failed: {stderr}")
|
||
return {"status": "error", "message": stderr}
|
||
return {"status": "success", "message": f"Dataset {dataset_name} created"}
|
||
|
||
|
||
def set_dataset_properties(dataset_name: str, props: Dict[str, str]) -> Dict[str, str]:
|
||
errors = []
|
||
for key, value in props.items():
|
||
if value is None:
|
||
continue
|
||
_, stderr, rc = _run(["zfs", "set", f"{key}={value}", dataset_name])
|
||
if rc != 0:
|
||
errors.append(f"{key}: {stderr.strip()}")
|
||
if errors:
|
||
return {"status": "error", "message": "; ".join(errors)}
|
||
return {"status": "success", "message": f"Properties updated for {dataset_name}"}
|
||
|
||
|
||
def destroy_dataset(dataset_name: str, recursive: bool = False) -> Dict[str, str]:
|
||
cmd = ["zfs", "destroy"] + (["-r"] if recursive else []) + [dataset_name]
|
||
stdout, stderr, rc = _run(cmd)
|
||
if rc != 0:
|
||
logger.error(f"zfs destroy failed: {stderr}")
|
||
return {"status": "error", "message": stderr}
|
||
return {"status": "success", "message": f"Dataset {dataset_name} destroyed"}
|
||
|
||
# ── Snapshot operations ───────────────────────────────────────────────────────
|
||
|
||
def list_snapshots(dataset_name: Optional[str] = None, limit: int = 50) -> List[Dict[str, Any]]:
|
||
cache_key = f"snapshots:{dataset_name or '*'}"
|
||
cached = _cache_get(cache_key)
|
||
if cached is not None:
|
||
return cached
|
||
if not ZFS_AVAILABLE:
|
||
return []
|
||
if dataset_name:
|
||
cmd = ["zfs", "list", "-t", "snapshot", "-r", "-H", "-p",
|
||
"-o", "name,used,referenced,creation", dataset_name]
|
||
else:
|
||
cmd = ["zfs", "list", "-t", "snapshot", "-H", "-p",
|
||
"-o", "name,used,referenced,creation"]
|
||
stdout, stderr, rc = _run(cmd)
|
||
if rc != 0:
|
||
logger.error(f"zfs list snapshots failed: {stderr}")
|
||
return []
|
||
snapshots = []
|
||
for line in stdout.strip().split("\n"):
|
||
if not line:
|
||
continue
|
||
parts = line.split("\t")
|
||
if len(parts) < 4:
|
||
continue
|
||
snapshots.append({
|
||
"name": parts[0], "used": int(parts[1]),
|
||
"referenced": int(parts[2]), "creation": int(parts[3]),
|
||
})
|
||
snapshots.sort(key=lambda x: x["creation"], reverse=True)
|
||
snapshots = snapshots[:limit]
|
||
_cache_set(cache_key, snapshots, ttl=60)
|
||
return snapshots
|
||
|
||
|
||
def create_snapshot(dataset_name: str, snapshot_name: Optional[str] = None) -> Dict[str, str]:
|
||
if not snapshot_name:
|
||
snapshot_name = datetime.now().strftime("%Y%m%d-%H%M%S")
|
||
full_name = f"{dataset_name}@{snapshot_name}"
|
||
stdout, stderr, rc = _run(["zfs", "snapshot", full_name])
|
||
if rc != 0:
|
||
logger.error(f"zfs snapshot failed: {stderr}")
|
||
return {"status": "error", "message": stderr}
|
||
return {"status": "success", "message": f"Snapshot {full_name} created"}
|
||
|
||
|
||
def destroy_snapshot(snapshot_name: str, recursive: bool = False) -> Dict[str, str]:
|
||
cmd = ["zfs", "destroy"] + (["-r"] if recursive else []) + [snapshot_name]
|
||
stdout, stderr, rc = _run(cmd)
|
||
if rc != 0:
|
||
logger.error(f"zfs destroy snapshot failed: {stderr}")
|
||
return {"status": "error", "message": stderr}
|
||
return {"status": "success", "message": f"Snapshot {snapshot_name} destroyed"}
|
||
|
||
|
||
def rollback_snapshot(snapshot_name: str) -> Dict[str, str]:
|
||
stdout, stderr, rc = _run(["zfs", "rollback", "-r", snapshot_name])
|
||
if rc != 0:
|
||
logger.error(f"zfs rollback failed: {stderr}")
|
||
return {"status": "error", "message": stderr}
|
||
return {"status": "success", "message": f"Rolled back to {snapshot_name}"}
|
||
|
||
|
||
# Backward-compat shim so existing routers keep working unchanged
|
||
class _ZFSRunnerShim:
|
||
run_command = staticmethod(_run)
|
||
list_pools = staticmethod(list_pools)
|
||
get_pool_status = staticmethod(get_pool_status)
|
||
scrub_pool = staticmethod(scrub_pool)
|
||
get_disk_id_map = staticmethod(get_disk_id_map)
|
||
get_smart_info = staticmethod(get_smart_info)
|
||
list_datasets = staticmethod(list_datasets)
|
||
create_dataset = staticmethod(create_dataset)
|
||
set_dataset_properties = staticmethod(set_dataset_properties)
|
||
destroy_dataset = staticmethod(destroy_dataset)
|
||
list_snapshots = staticmethod(list_snapshots)
|
||
create_snapshot = staticmethod(create_snapshot)
|
||
destroy_snapshot = staticmethod(destroy_snapshot)
|
||
rollback_snapshot = staticmethod(rollback_snapshot)
|
||
clear_cache = staticmethod(clear_cache)
|
||
|
||
zfs_runner = _ZFSRunnerShim()
|