Files
zmb-webui/backend/services/zfs_runner.py
T
patrick e3b42caf01 Fix: list_snapshots mit -r statt -d1, Cache-Key pro Dataset
-d 1 begrenzte auf direkte Snapshots des Datasets, tank/share wurde
nicht eingeschlossen. -r (recursive) liefert alle Sub-Datasets.
Cache-Key jetzt dataset-spezifisch um Kollisionen zu vermeiden.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-04 22:59:40 +02:00

465 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
ZFS Command Runner Wrapper für zpool/zfs CLI Commands
Handles subprocess execution, parsing, caching, error handling
"""
import subprocess
import json
import logging
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime, timedelta
import re
logger = logging.getLogger(__name__)
# Cache with TTL
@dataclass
class CacheEntry:
data: Any
expires_at: datetime
class ZFSCache:
def __init__(self):
self.pool_status = CacheEntry(None, datetime.now())
self.snapshots = CacheEntry(None, datetime.now())
self.datasets = CacheEntry(None, datetime.now())
def get(self, key: str) -> Optional[Any]:
cache_dict = {
"pool_status": self.pool_status,
"snapshots": self.snapshots,
"datasets": self.datasets,
}
if key not in cache_dict:
return None
entry = cache_dict[key]
if not entry or not entry.data:
return None
if datetime.now() > entry.expires_at:
return None
return entry.data
def set(self, key: str, data: Any, ttl_seconds: int = 60):
cache_dict = {
"pool_status": self.pool_status,
"snapshots": self.snapshots,
"datasets": self.datasets,
}
if key in cache_dict:
cache_dict[key].data = data
cache_dict[key].expires_at = datetime.now() + timedelta(seconds=ttl_seconds)
class ZFSRunner:
def __init__(self, timeout: int = 5):
self.timeout = timeout
self.cache = ZFSCache()
def run_command(self, cmd: List[str], timeout: Optional[int] = None) -> Tuple[str, str, int]:
"""
Run subprocess command with timeout
Returns: (stdout, stderr, returncode)
"""
if timeout is None:
timeout = self.timeout
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout,
check=False
)
return result.stdout, result.stderr, result.returncode
except subprocess.TimeoutExpired:
logger.error(f"Command timeout after {timeout}s: {' '.join(cmd)}")
return "", f"Command timeout after {timeout}s", -1
except FileNotFoundError:
logger.error(f"Command not found: {' '.join(cmd)}")
return "", f"Command not found: {cmd[0]}", -1
except Exception as e:
logger.error(f"Command execution error: {e}")
return "", str(e), -1
# ============== POOL OPERATIONS ==============
def list_pools(self) -> List[Dict[str, Any]]:
"""
Get list of ZFS pools with status
Uses cache (TTL 30s)
"""
cached = self.cache.get("pool_status")
if cached:
return cached
stdout, stderr, rc = self.run_command(["zpool", "list", "-H", "-p"])
if rc != 0:
logger.error(f"zpool list failed: {stderr}")
return []
pools = []
for line in stdout.strip().split("\n"):
if not line:
continue
parts = line.split()
if len(parts) < 10:
continue
pool = {
"name": parts[0],
"size": int(parts[1]),
"alloc": int(parts[2]),
"free": int(parts[3]),
"fragmentation": f"{parts[6]}%",
"capacity": f"{parts[7]}%",
"health": parts[9]
}
pools.append(pool)
self.cache.set("pool_status", pools, ttl_seconds=30)
return pools
def _parse_vdev_tree(self, config_lines: List[str]) -> List[Dict[str, Any]]:
"""
Parse VDEV tree from zpool status config section.
Uses indentation levels to reconstruct hierarchy.
Returns list of vdev dicts with name, state, and error counters (read/write/cksum).
"""
roots: List[Dict] = []
stack: List[tuple] = [] # (indent, vdev_dict)
for line in config_lines:
if not line.strip():
continue
# Skip header line (NAME STATE READ WRITE CKSUM)
if line.strip().startswith("NAME"):
continue
indent = len(line) - len(line.lstrip())
parts = line.split()
if not parts:
continue
name = parts[0]
state = parts[1] if len(parts) > 1 else "UNKNOWN"
# Parse error counters and convert to integers
read = 0
write = 0
cksum = 0
if len(parts) > 2:
try:
read = int(parts[2])
except (ValueError, IndexError):
read = 0
if len(parts) > 3:
try:
write = int(parts[3])
except (ValueError, IndexError):
write = 0
if len(parts) > 4:
try:
cksum = int(parts[4])
except (ValueError, IndexError):
cksum = 0
vdev: Dict[str, Any] = {
"name": name,
"state": state,
"read": read,
"write": write,
"cksum": cksum,
"children": []
}
# Pop stack entries that are at same or deeper indent
while stack and stack[-1][0] >= indent:
stack.pop()
if stack:
stack[-1][1]["children"].append(vdev)
else:
roots.append(vdev)
stack.append((indent, vdev))
return roots
def get_pool_status(self, pool_name: str) -> Dict[str, Any]:
"""
Get detailed pool status including VDEV tree and error counters
"""
stdout, stderr, rc = self.run_command(["zpool", "status", pool_name])
if rc != 0:
logger.error(f"zpool status failed for {pool_name}: {stderr}")
return {}
status: Dict[str, Any] = {
"name": pool_name,
"state": None,
"scan": None,
"errors": None,
"vdevs": [],
}
lines = stdout.split("\n")
in_config = False
config_lines: List[str] = []
for line in lines:
stripped = line.strip()
if stripped.startswith("state:"):
status["state"] = stripped.split(":", 1)[1].strip()
elif stripped.startswith("scan:"):
status["scan"] = stripped.split(":", 1)[1].strip()
elif stripped.startswith("errors:"):
status["errors"] = stripped.split(":", 1)[1].strip()
in_config = False
elif stripped == "config:":
in_config = True
elif in_config:
# Collect config block lines (skip blank lines at start)
if stripped or config_lines:
config_lines.append(line)
if config_lines:
# Remove the pool name itself (first non-empty line after "NAME" header)
# and parse only child vdevs
parsed = self._parse_vdev_tree(config_lines)
# parsed[0] is the pool root node; its children are the top-level vdevs
if parsed and parsed[0]["name"] == pool_name:
status["vdevs"] = parsed[0]["children"]
else:
status["vdevs"] = parsed
return status
def scrub_pool(self, pool_name: str) -> Dict[str, str]:
"""
Start or resume scrub on pool
"""
stdout, stderr, rc = self.run_command(["zpool", "scrub", pool_name])
if rc != 0:
logger.error(f"zpool scrub failed for {pool_name}: {stderr}")
return {"status": "error", "message": stderr}
return {"status": "success", "message": f"Scrub started for {pool_name}"}
# ============== DATASET/FILESYSTEM OPERATIONS ==============
def list_datasets(self, pool_name: str, max_depth: int = 2) -> List[Dict[str, Any]]:
"""
List datasets in pool (with depth limit for performance)
"""
cached = self.cache.get("datasets")
if cached and cached.get(pool_name):
return cached[pool_name]
stdout, stderr, rc = self.run_command([
"zfs", "list", "-d", str(max_depth), "-H", "-p",
"-o", "name,used,avail,refer,mountpoint,type",
pool_name
])
if rc != 0:
logger.error(f"zfs list failed for {pool_name}: {stderr}")
return []
datasets = []
for line in stdout.strip().split("\n"):
if not line:
continue
parts = line.split("\t")
if len(parts) < 6:
continue
dataset = {
"name": parts[0],
"used": int(parts[1]),
"avail": int(parts[2]),
"refer": int(parts[3]),
"mountpoint": parts[4],
"type": parts[5] # filesystem, volume, snapshot
}
datasets.append(dataset)
# Cache per pool
if not cached:
cached = {}
cached[pool_name] = datasets
self.cache.set("datasets", cached, ttl_seconds=60)
return datasets
def create_dataset(self, dataset_name: str, props: Optional[Dict[str, str]] = None) -> Dict[str, str]:
"""
Create new ZFS dataset/filesystem
"""
cmd = ["zfs", "create"]
if props:
for key, val in props.items():
cmd.extend(["-o", f"{key}={val}"])
cmd.append(dataset_name)
stdout, stderr, rc = self.run_command(cmd)
if rc != 0:
logger.error(f"zfs create failed: {stderr}")
return {"status": "error", "message": stderr}
return {"status": "success", "message": f"Dataset {dataset_name} created"}
def set_dataset_properties(self, dataset_name: str, props: Dict[str, str]) -> Dict[str, str]:
"""Set ZFS dataset properties (compression, quota, reservation, etc.)"""
errors = []
for key, value in props.items():
if value is None:
continue
stdout, stderr, rc = self.run_command(["zfs", "set", f"{key}={value}", dataset_name])
if rc != 0:
errors.append(f"{key}: {stderr.strip()}")
if errors:
return {"status": "error", "message": "; ".join(errors)}
return {"status": "success", "message": f"Properties updated for {dataset_name}"}
def destroy_dataset(self, dataset_name: str, recursive: bool = False) -> Dict[str, str]:
"""
Destroy ZFS dataset
"""
cmd = ["zfs", "destroy"]
if recursive:
cmd.append("-r")
cmd.append(dataset_name)
stdout, stderr, rc = self.run_command(cmd)
if rc != 0:
logger.error(f"zfs destroy failed: {stderr}")
return {"status": "error", "message": stderr}
return {"status": "success", "message": f"Dataset {dataset_name} destroyed"}
# ============== SNAPSHOT OPERATIONS ==============
def list_snapshots(self, dataset_name: Optional[str] = None, limit: int = 50) -> List[Dict[str, Any]]:
"""
List snapshots (with limit for performance on many snapshots)
"""
cache_key = f"snapshots:{dataset_name or '*'}"
cached = self.cache.get(cache_key)
if cached:
return cached
if dataset_name:
# No -d limit so sub-datasets (e.g. tank/share) are included
cmd = ["zfs", "list", "-t", "snapshot", "-r", "-H", "-p",
"-o", "name,used,referenced,creation", dataset_name]
else:
cmd = ["zfs", "list", "-t", "snapshot", "-H", "-p",
"-o", "name,used,referenced,creation"]
stdout, stderr, rc = self.run_command(cmd)
if rc != 0:
logger.error(f"zfs list snapshots failed: {stderr}")
return []
snapshots = []
for line in stdout.strip().split("\n"):
if not line:
continue
parts = line.split("\t")
if len(parts) < 4:
continue
snapshot = {
"name": parts[0],
"used": int(parts[1]),
"referenced": int(parts[2]),
"creation": int(parts[3]), # Unix timestamp
}
snapshots.append(snapshot)
# Sort by creation time (newest first) and limit
snapshots.sort(key=lambda x: x["creation"], reverse=True)
snapshots = snapshots[:limit]
self.cache.set(cache_key, snapshots, ttl_seconds=60)
return snapshots
def create_snapshot(self, dataset_name: str, snapshot_name: Optional[str] = None) -> Dict[str, str]:
"""
Create snapshot with auto-generated name if not provided
"""
if not snapshot_name:
from datetime import datetime as dt
snapshot_name = dt.now().strftime("%Y%m%d-%H%M%S")
full_name = f"{dataset_name}@{snapshot_name}"
stdout, stderr, rc = self.run_command(["zfs", "snapshot", full_name])
if rc != 0:
logger.error(f"zfs snapshot failed: {stderr}")
return {"status": "error", "message": stderr}
return {"status": "success", "message": f"Snapshot {full_name} created"}
def destroy_snapshot(self, snapshot_name: str, recursive: bool = False) -> Dict[str, str]:
"""
Destroy snapshot
"""
cmd = ["zfs", "destroy"]
if recursive:
cmd.append("-r")
cmd.append(snapshot_name)
stdout, stderr, rc = self.run_command(cmd)
if rc != 0:
logger.error(f"zfs destroy snapshot failed: {stderr}")
return {"status": "error", "message": stderr}
return {"status": "success", "message": f"Snapshot {snapshot_name} destroyed"}
def rollback_snapshot(self, snapshot_name: str) -> Dict[str, str]:
"""
Rollback dataset to snapshot
WARNING: Destroys data after snapshot!
"""
stdout, stderr, rc = self.run_command(["zfs", "rollback", "-r", snapshot_name])
if rc != 0:
logger.error(f"zfs rollback failed: {stderr}")
return {"status": "error", "message": stderr}
return {"status": "success", "message": f"Rolled back to {snapshot_name}"}
# ============== UTILITY ==============
def clear_cache(self):
"""Clear all caches"""
self.cache = ZFSCache()
logger.info("ZFS cache cleared")
# Global instance
zfs_runner = ZFSRunner()