Files
patrick 12248afa3a Refactor: Container-Erkennung zentralisiert, SMART-Check auf LXC überspringen
Neues Modul services/platform_info.py prüft systemd-detect-virt einmalig
beim Start (statt pro Request). SMART-Abfragen werden in Containern
übersprungen, da /dev/sdX dort meist nicht verfügbar ist.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-15 20:37:14 +02:00

365 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
ZFS Command Runner Wrapper für zpool/zfs CLI Commands
"""
import subprocess
import json
import logging
import glob
import os
import re
from typing import Dict, List, Any, Optional, Tuple
from datetime import datetime, timedelta
from services.platform_info import IS_CONTAINER
logger = logging.getLogger(__name__)
_TIMEOUT = 5
# ── ZFS availability ──────────────────────────────────────────────────────────
def _probe_zfs() -> bool:
try:
r = subprocess.run(["zpool", "list"], capture_output=True, timeout=3)
return r.returncode == 0
except (FileNotFoundError, subprocess.TimeoutExpired, OSError):
return False
ZFS_AVAILABLE = _probe_zfs()
if not ZFS_AVAILABLE:
logger.info("ZFS not available on this system (pools/snapshots disabled)")
# ── TTL cache ─────────────────────────────────────────────────────────────────
_cache: Dict[str, Tuple[Any, datetime]] = {}
def _cache_get(key: str) -> Optional[Any]:
if key not in _cache:
return None
data, expires_at = _cache[key]
if datetime.now() > expires_at:
return None
return data
def _cache_set(key: str, data: Any, ttl: int = 60) -> None:
_cache[key] = (data, datetime.now() + timedelta(seconds=ttl))
def clear_cache() -> None:
_cache.clear()
logger.info("ZFS cache cleared")
# ── Subprocess helper ─────────────────────────────────────────────────────────
def _run(cmd: List[str], timeout: int = _TIMEOUT) -> Tuple[str, str, int]:
try:
r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout, check=False)
return r.stdout, r.stderr, r.returncode
except subprocess.TimeoutExpired:
logger.error(f"Command timeout after {timeout}s: {' '.join(cmd)}")
return "", f"Command timeout after {timeout}s", -1
except FileNotFoundError:
logger.debug(f"Command not found: {' '.join(cmd)}")
return "", f"Command not found: {cmd[0]}", -1
except Exception as e:
logger.error(f"Command execution error: {e}")
return "", str(e), -1
# ── Pool operations ───────────────────────────────────────────────────────────
def list_pools() -> List[Dict[str, Any]]:
cached = _cache_get("pool_status")
if cached is not None:
return cached
if not ZFS_AVAILABLE:
return []
stdout, stderr, rc = _run(["zpool", "list", "-H", "-p"])
if rc != 0:
logger.error(f"zpool list failed: {stderr}")
return []
pools = []
for line in stdout.strip().split("\n"):
if not line:
continue
parts = line.split()
if len(parts) < 10:
continue
pools.append({
"name": parts[0],
"size": int(parts[1]),
"alloc": int(parts[2]),
"free": int(parts[3]),
"fragmentation": f"{parts[6]}%",
"capacity": f"{parts[7]}%",
"health": parts[9],
})
_cache_set("pool_status", pools, ttl=30)
return pools
def get_disk_id_map() -> Dict[str, str]:
mapping: Dict[str, str] = {}
for pattern in ["ata-", "nvme-", "scsi-", "wwn-"]:
for link in glob.glob(f"/dev/disk/by-id/{pattern}*"):
if "-part" in os.path.basename(link):
continue
try:
target = os.path.realpath(link)
disk = os.path.basename(target)
if disk not in mapping:
mapping[disk] = os.path.basename(link)
except OSError:
continue
return mapping
def _annotate_disk_ids(vdevs: List[Dict], disk_id_map: Dict[str, str]) -> None:
for vdev in vdevs:
children = vdev.get("children", [])
if not children:
name = vdev.get("name", "")
base = re.sub(r'\d+$', '', name) if name[-1:].isdigit() else name
vdev["disk_id"] = disk_id_map.get(name) or disk_id_map.get(base)
else:
_annotate_disk_ids(children, disk_id_map)
def _parse_vdev_tree(config_lines: List[str]) -> List[Dict[str, Any]]:
roots: List[Dict] = []
stack: List[tuple] = []
for line in config_lines:
if not line.strip() or line.strip().startswith("NAME"):
continue
indent = len(line) - len(line.lstrip())
parts = line.split()
if not parts:
continue
vdev: Dict[str, Any] = {
"name": parts[0],
"state": parts[1] if len(parts) > 1 else "UNKNOWN",
"read": int(parts[2]) if len(parts) > 2 else 0,
"write": int(parts[3]) if len(parts) > 3 else 0,
"cksum": int(parts[4]) if len(parts) > 4 else 0,
"children": [],
}
while stack and stack[-1][0] >= indent:
stack.pop()
if stack:
stack[-1][1]["children"].append(vdev)
else:
roots.append(vdev)
stack.append((indent, vdev))
return roots
def get_pool_status(pool_name: str) -> Dict[str, Any]:
if not ZFS_AVAILABLE:
return {}
stdout, stderr, rc = _run(["zpool", "status", pool_name])
if rc != 0:
logger.error(f"zpool status failed for {pool_name}: {stderr}")
return {}
status: Dict[str, Any] = {"name": pool_name, "state": None, "scan": None, "errors": None, "vdevs": []}
in_config = False
config_lines: List[str] = []
for line in stdout.split("\n"):
stripped = line.strip()
if stripped.startswith("state:"):
status["state"] = stripped.split(":", 1)[1].strip()
elif stripped.startswith("scan:"):
status["scan"] = stripped.split(":", 1)[1].strip()
elif stripped.startswith("errors:"):
status["errors"] = stripped.split(":", 1)[1].strip()
in_config = False
elif stripped == "config:":
in_config = True
elif in_config and (stripped or config_lines):
config_lines.append(line)
if config_lines:
parsed = _parse_vdev_tree(config_lines)
status["vdevs"] = parsed[0]["children"] if parsed and parsed[0]["name"] == pool_name else parsed
_annotate_disk_ids(status["vdevs"], get_disk_id_map())
return status
def scrub_pool(pool_name: str) -> Dict[str, str]:
stdout, stderr, rc = _run(["zpool", "scrub", pool_name])
if rc != 0:
logger.error(f"zpool scrub failed for {pool_name}: {stderr}")
return {"status": "error", "message": stderr}
return {"status": "success", "message": f"Scrub started for {pool_name}"}
def get_smart_info(disk: str) -> Dict[str, Any]:
if IS_CONTAINER:
return {"available": False, "reason": "container"}
stdout, _, rc = _run(["smartctl", "-A", "-i", "--json", f"/dev/{disk}"])
if not stdout:
return {}
try:
data = json.loads(stdout)
except json.JSONDecodeError:
return {}
attrs = {a["name"]: a for a in data.get("ata_smart_attributes", {}).get("table", [])}
return {
"model": data.get("model_name") or data.get("model_family", ""),
"serial": data.get("serial_number", ""),
"protocol": data.get("device", {}).get("protocol", ""),
"power_on_hours": data.get("power_on_time", {}).get("hours"),
"temperature": data.get("temperature", {}).get("current"),
"passed": data.get("smart_status", {}).get("passed"),
"reallocated_sectors": attrs.get("Reallocated_Sector_Ct", {}).get("raw", {}).get("value", 0),
"pending_sectors": attrs.get("Current_Pending_Sector", {}).get("raw", {}).get("value", 0),
"uncorrectable": attrs.get("Offline_Uncorrectable", {}).get("raw", {}).get("value", 0),
}
# ── Dataset operations ────────────────────────────────────────────────────────
def list_datasets(pool_name: str, max_depth: int = 2) -> List[Dict[str, Any]]:
cached = _cache_get("datasets")
if cached and cached.get(pool_name):
return cached[pool_name]
if not ZFS_AVAILABLE:
return []
stdout, stderr, rc = _run([
"zfs", "list", "-d", str(max_depth), "-H", "-p",
"-o", "name,used,avail,refer,mountpoint,type", pool_name
])
if rc != 0:
logger.error(f"zfs list failed for {pool_name}: {stderr}")
return []
datasets = []
for line in stdout.strip().split("\n"):
if not line:
continue
parts = line.split("\t")
if len(parts) < 6:
continue
datasets.append({
"name": parts[0], "used": int(parts[1]), "avail": int(parts[2]),
"refer": int(parts[3]), "mountpoint": parts[4], "type": parts[5],
})
pool_cache = cached or {}
pool_cache[pool_name] = datasets
_cache_set("datasets", pool_cache, ttl=60)
return datasets
def create_dataset(dataset_name: str, props: Optional[Dict[str, str]] = None) -> Dict[str, str]:
cmd = ["zfs", "create"]
if props:
for key, val in props.items():
cmd.extend(["-o", f"{key}={val}"])
cmd.append(dataset_name)
stdout, stderr, rc = _run(cmd)
if rc != 0:
logger.error(f"zfs create failed: {stderr}")
return {"status": "error", "message": stderr}
return {"status": "success", "message": f"Dataset {dataset_name} created"}
def set_dataset_properties(dataset_name: str, props: Dict[str, str]) -> Dict[str, str]:
errors = []
for key, value in props.items():
if value is None:
continue
_, stderr, rc = _run(["zfs", "set", f"{key}={value}", dataset_name])
if rc != 0:
errors.append(f"{key}: {stderr.strip()}")
if errors:
return {"status": "error", "message": "; ".join(errors)}
return {"status": "success", "message": f"Properties updated for {dataset_name}"}
def destroy_dataset(dataset_name: str, recursive: bool = False) -> Dict[str, str]:
cmd = ["zfs", "destroy"] + (["-r"] if recursive else []) + [dataset_name]
stdout, stderr, rc = _run(cmd)
if rc != 0:
logger.error(f"zfs destroy failed: {stderr}")
return {"status": "error", "message": stderr}
return {"status": "success", "message": f"Dataset {dataset_name} destroyed"}
# ── Snapshot operations ───────────────────────────────────────────────────────
def list_snapshots(dataset_name: Optional[str] = None, limit: int = 50) -> List[Dict[str, Any]]:
cache_key = f"snapshots:{dataset_name or '*'}"
cached = _cache_get(cache_key)
if cached is not None:
return cached
if not ZFS_AVAILABLE:
return []
if dataset_name:
cmd = ["zfs", "list", "-t", "snapshot", "-r", "-H", "-p",
"-o", "name,used,referenced,creation", dataset_name]
else:
cmd = ["zfs", "list", "-t", "snapshot", "-H", "-p",
"-o", "name,used,referenced,creation"]
stdout, stderr, rc = _run(cmd)
if rc != 0:
logger.error(f"zfs list snapshots failed: {stderr}")
return []
snapshots = []
for line in stdout.strip().split("\n"):
if not line:
continue
parts = line.split("\t")
if len(parts) < 4:
continue
snapshots.append({
"name": parts[0], "used": int(parts[1]),
"referenced": int(parts[2]), "creation": int(parts[3]),
})
snapshots.sort(key=lambda x: x["creation"], reverse=True)
snapshots = snapshots[:limit]
_cache_set(cache_key, snapshots, ttl=60)
return snapshots
def create_snapshot(dataset_name: str, snapshot_name: Optional[str] = None) -> Dict[str, str]:
if not snapshot_name:
snapshot_name = datetime.now().strftime("%Y%m%d-%H%M%S")
full_name = f"{dataset_name}@{snapshot_name}"
stdout, stderr, rc = _run(["zfs", "snapshot", full_name])
if rc != 0:
logger.error(f"zfs snapshot failed: {stderr}")
return {"status": "error", "message": stderr}
return {"status": "success", "message": f"Snapshot {full_name} created"}
def destroy_snapshot(snapshot_name: str, recursive: bool = False) -> Dict[str, str]:
cmd = ["zfs", "destroy"] + (["-r"] if recursive else []) + [snapshot_name]
stdout, stderr, rc = _run(cmd)
if rc != 0:
logger.error(f"zfs destroy snapshot failed: {stderr}")
return {"status": "error", "message": stderr}
return {"status": "success", "message": f"Snapshot {snapshot_name} destroyed"}
def rollback_snapshot(snapshot_name: str) -> Dict[str, str]:
stdout, stderr, rc = _run(["zfs", "rollback", "-r", snapshot_name])
if rc != 0:
logger.error(f"zfs rollback failed: {stderr}")
return {"status": "error", "message": stderr}
return {"status": "success", "message": f"Rolled back to {snapshot_name}"}
# Backward-compat shim so existing routers keep working unchanged
class _ZFSRunnerShim:
run_command = staticmethod(_run)
list_pools = staticmethod(list_pools)
get_pool_status = staticmethod(get_pool_status)
scrub_pool = staticmethod(scrub_pool)
get_disk_id_map = staticmethod(get_disk_id_map)
get_smart_info = staticmethod(get_smart_info)
list_datasets = staticmethod(list_datasets)
create_dataset = staticmethod(create_dataset)
set_dataset_properties = staticmethod(set_dataset_properties)
destroy_dataset = staticmethod(destroy_dataset)
list_snapshots = staticmethod(list_snapshots)
create_snapshot = staticmethod(create_snapshot)
destroy_snapshot = staticmethod(destroy_snapshot)
rollback_snapshot = staticmethod(rollback_snapshot)
clear_cache = staticmethod(clear_cache)
zfs_runner = _ZFSRunnerShim()