55d99c829b
-d 1 begrenzte auf direkte Snapshots des Datasets, tank/share wurde nicht eingeschlossen. -r (recursive) liefert alle Sub-Datasets. Cache-Key jetzt dataset-spezifisch um Kollisionen zu vermeiden. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
465 lines
15 KiB
Python
465 lines
15 KiB
Python
"""
|
||
ZFS Command Runner – Wrapper für zpool/zfs CLI Commands
|
||
Handles subprocess execution, parsing, caching, error handling
|
||
"""
|
||
|
||
import subprocess
|
||
import json
|
||
import logging
|
||
from typing import Dict, List, Any, Optional, Tuple
|
||
from dataclasses import dataclass
|
||
from datetime import datetime, timedelta
|
||
import re
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# Cache with TTL
|
||
@dataclass
|
||
class CacheEntry:
|
||
data: Any
|
||
expires_at: datetime
|
||
|
||
class ZFSCache:
|
||
def __init__(self):
|
||
self.pool_status = CacheEntry(None, datetime.now())
|
||
self.snapshots = CacheEntry(None, datetime.now())
|
||
self.datasets = CacheEntry(None, datetime.now())
|
||
|
||
def get(self, key: str) -> Optional[Any]:
|
||
cache_dict = {
|
||
"pool_status": self.pool_status,
|
||
"snapshots": self.snapshots,
|
||
"datasets": self.datasets,
|
||
}
|
||
|
||
if key not in cache_dict:
|
||
return None
|
||
|
||
entry = cache_dict[key]
|
||
if not entry or not entry.data:
|
||
return None
|
||
|
||
if datetime.now() > entry.expires_at:
|
||
return None
|
||
|
||
return entry.data
|
||
|
||
def set(self, key: str, data: Any, ttl_seconds: int = 60):
|
||
cache_dict = {
|
||
"pool_status": self.pool_status,
|
||
"snapshots": self.snapshots,
|
||
"datasets": self.datasets,
|
||
}
|
||
|
||
if key in cache_dict:
|
||
cache_dict[key].data = data
|
||
cache_dict[key].expires_at = datetime.now() + timedelta(seconds=ttl_seconds)
|
||
|
||
class ZFSRunner:
|
||
def __init__(self, timeout: int = 5):
|
||
self.timeout = timeout
|
||
self.cache = ZFSCache()
|
||
|
||
def run_command(self, cmd: List[str], timeout: Optional[int] = None) -> Tuple[str, str, int]:
|
||
"""
|
||
Run subprocess command with timeout
|
||
Returns: (stdout, stderr, returncode)
|
||
"""
|
||
if timeout is None:
|
||
timeout = self.timeout
|
||
|
||
try:
|
||
result = subprocess.run(
|
||
cmd,
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=timeout,
|
||
check=False
|
||
)
|
||
return result.stdout, result.stderr, result.returncode
|
||
except subprocess.TimeoutExpired:
|
||
logger.error(f"Command timeout after {timeout}s: {' '.join(cmd)}")
|
||
return "", f"Command timeout after {timeout}s", -1
|
||
except FileNotFoundError:
|
||
logger.error(f"Command not found: {' '.join(cmd)}")
|
||
return "", f"Command not found: {cmd[0]}", -1
|
||
except Exception as e:
|
||
logger.error(f"Command execution error: {e}")
|
||
return "", str(e), -1
|
||
|
||
# ============== POOL OPERATIONS ==============
|
||
|
||
def list_pools(self) -> List[Dict[str, Any]]:
|
||
"""
|
||
Get list of ZFS pools with status
|
||
Uses cache (TTL 30s)
|
||
"""
|
||
cached = self.cache.get("pool_status")
|
||
if cached:
|
||
return cached
|
||
|
||
stdout, stderr, rc = self.run_command(["zpool", "list", "-H", "-p"])
|
||
|
||
if rc != 0:
|
||
logger.error(f"zpool list failed: {stderr}")
|
||
return []
|
||
|
||
pools = []
|
||
for line in stdout.strip().split("\n"):
|
||
if not line:
|
||
continue
|
||
|
||
parts = line.split()
|
||
if len(parts) < 10:
|
||
continue
|
||
|
||
pool = {
|
||
"name": parts[0],
|
||
"size": int(parts[1]),
|
||
"alloc": int(parts[2]),
|
||
"free": int(parts[3]),
|
||
"fragmentation": f"{parts[6]}%",
|
||
"capacity": f"{parts[7]}%",
|
||
"health": parts[9]
|
||
}
|
||
pools.append(pool)
|
||
|
||
self.cache.set("pool_status", pools, ttl_seconds=30)
|
||
return pools
|
||
|
||
def _parse_vdev_tree(self, config_lines: List[str]) -> List[Dict[str, Any]]:
|
||
"""
|
||
Parse VDEV tree from zpool status config section.
|
||
Uses indentation levels to reconstruct hierarchy.
|
||
Returns list of vdev dicts with name, state, and error counters (read/write/cksum).
|
||
"""
|
||
roots: List[Dict] = []
|
||
stack: List[tuple] = [] # (indent, vdev_dict)
|
||
|
||
for line in config_lines:
|
||
if not line.strip():
|
||
continue
|
||
# Skip header line (NAME STATE READ WRITE CKSUM)
|
||
if line.strip().startswith("NAME"):
|
||
continue
|
||
|
||
indent = len(line) - len(line.lstrip())
|
||
parts = line.split()
|
||
if not parts:
|
||
continue
|
||
|
||
name = parts[0]
|
||
state = parts[1] if len(parts) > 1 else "UNKNOWN"
|
||
|
||
# Parse error counters and convert to integers
|
||
read = 0
|
||
write = 0
|
||
cksum = 0
|
||
|
||
if len(parts) > 2:
|
||
try:
|
||
read = int(parts[2])
|
||
except (ValueError, IndexError):
|
||
read = 0
|
||
|
||
if len(parts) > 3:
|
||
try:
|
||
write = int(parts[3])
|
||
except (ValueError, IndexError):
|
||
write = 0
|
||
|
||
if len(parts) > 4:
|
||
try:
|
||
cksum = int(parts[4])
|
||
except (ValueError, IndexError):
|
||
cksum = 0
|
||
|
||
vdev: Dict[str, Any] = {
|
||
"name": name,
|
||
"state": state,
|
||
"read": read,
|
||
"write": write,
|
||
"cksum": cksum,
|
||
"children": []
|
||
}
|
||
|
||
# Pop stack entries that are at same or deeper indent
|
||
while stack and stack[-1][0] >= indent:
|
||
stack.pop()
|
||
|
||
if stack:
|
||
stack[-1][1]["children"].append(vdev)
|
||
else:
|
||
roots.append(vdev)
|
||
|
||
stack.append((indent, vdev))
|
||
|
||
return roots
|
||
|
||
def get_pool_status(self, pool_name: str) -> Dict[str, Any]:
|
||
"""
|
||
Get detailed pool status including VDEV tree and error counters
|
||
"""
|
||
stdout, stderr, rc = self.run_command(["zpool", "status", pool_name])
|
||
|
||
if rc != 0:
|
||
logger.error(f"zpool status failed for {pool_name}: {stderr}")
|
||
return {}
|
||
|
||
status: Dict[str, Any] = {
|
||
"name": pool_name,
|
||
"state": None,
|
||
"scan": None,
|
||
"errors": None,
|
||
"vdevs": [],
|
||
}
|
||
|
||
lines = stdout.split("\n")
|
||
in_config = False
|
||
config_lines: List[str] = []
|
||
|
||
for line in lines:
|
||
stripped = line.strip()
|
||
if stripped.startswith("state:"):
|
||
status["state"] = stripped.split(":", 1)[1].strip()
|
||
elif stripped.startswith("scan:"):
|
||
status["scan"] = stripped.split(":", 1)[1].strip()
|
||
elif stripped.startswith("errors:"):
|
||
status["errors"] = stripped.split(":", 1)[1].strip()
|
||
in_config = False
|
||
elif stripped == "config:":
|
||
in_config = True
|
||
elif in_config:
|
||
# Collect config block lines (skip blank lines at start)
|
||
if stripped or config_lines:
|
||
config_lines.append(line)
|
||
|
||
if config_lines:
|
||
# Remove the pool name itself (first non-empty line after "NAME" header)
|
||
# and parse only child vdevs
|
||
parsed = self._parse_vdev_tree(config_lines)
|
||
# parsed[0] is the pool root node; its children are the top-level vdevs
|
||
if parsed and parsed[0]["name"] == pool_name:
|
||
status["vdevs"] = parsed[0]["children"]
|
||
else:
|
||
status["vdevs"] = parsed
|
||
|
||
return status
|
||
|
||
def scrub_pool(self, pool_name: str) -> Dict[str, str]:
|
||
"""
|
||
Start or resume scrub on pool
|
||
"""
|
||
stdout, stderr, rc = self.run_command(["zpool", "scrub", pool_name])
|
||
|
||
if rc != 0:
|
||
logger.error(f"zpool scrub failed for {pool_name}: {stderr}")
|
||
return {"status": "error", "message": stderr}
|
||
|
||
return {"status": "success", "message": f"Scrub started for {pool_name}"}
|
||
|
||
# ============== DATASET/FILESYSTEM OPERATIONS ==============
|
||
|
||
def list_datasets(self, pool_name: str, max_depth: int = 2) -> List[Dict[str, Any]]:
|
||
"""
|
||
List datasets in pool (with depth limit for performance)
|
||
"""
|
||
cached = self.cache.get("datasets")
|
||
if cached and cached.get(pool_name):
|
||
return cached[pool_name]
|
||
|
||
stdout, stderr, rc = self.run_command([
|
||
"zfs", "list", "-d", str(max_depth), "-H", "-p",
|
||
"-o", "name,used,avail,refer,mountpoint,type",
|
||
pool_name
|
||
])
|
||
|
||
if rc != 0:
|
||
logger.error(f"zfs list failed for {pool_name}: {stderr}")
|
||
return []
|
||
|
||
datasets = []
|
||
for line in stdout.strip().split("\n"):
|
||
if not line:
|
||
continue
|
||
|
||
parts = line.split("\t")
|
||
if len(parts) < 6:
|
||
continue
|
||
|
||
dataset = {
|
||
"name": parts[0],
|
||
"used": int(parts[1]),
|
||
"avail": int(parts[2]),
|
||
"refer": int(parts[3]),
|
||
"mountpoint": parts[4],
|
||
"type": parts[5] # filesystem, volume, snapshot
|
||
}
|
||
datasets.append(dataset)
|
||
|
||
# Cache per pool
|
||
if not cached:
|
||
cached = {}
|
||
cached[pool_name] = datasets
|
||
self.cache.set("datasets", cached, ttl_seconds=60)
|
||
|
||
return datasets
|
||
|
||
def create_dataset(self, dataset_name: str, props: Optional[Dict[str, str]] = None) -> Dict[str, str]:
|
||
"""
|
||
Create new ZFS dataset/filesystem
|
||
"""
|
||
cmd = ["zfs", "create"]
|
||
|
||
if props:
|
||
for key, val in props.items():
|
||
cmd.extend(["-o", f"{key}={val}"])
|
||
|
||
cmd.append(dataset_name)
|
||
|
||
stdout, stderr, rc = self.run_command(cmd)
|
||
|
||
if rc != 0:
|
||
logger.error(f"zfs create failed: {stderr}")
|
||
return {"status": "error", "message": stderr}
|
||
|
||
return {"status": "success", "message": f"Dataset {dataset_name} created"}
|
||
|
||
def set_dataset_properties(self, dataset_name: str, props: Dict[str, str]) -> Dict[str, str]:
|
||
"""Set ZFS dataset properties (compression, quota, reservation, etc.)"""
|
||
errors = []
|
||
for key, value in props.items():
|
||
if value is None:
|
||
continue
|
||
stdout, stderr, rc = self.run_command(["zfs", "set", f"{key}={value}", dataset_name])
|
||
if rc != 0:
|
||
errors.append(f"{key}: {stderr.strip()}")
|
||
|
||
if errors:
|
||
return {"status": "error", "message": "; ".join(errors)}
|
||
return {"status": "success", "message": f"Properties updated for {dataset_name}"}
|
||
|
||
def destroy_dataset(self, dataset_name: str, recursive: bool = False) -> Dict[str, str]:
|
||
"""
|
||
Destroy ZFS dataset
|
||
"""
|
||
cmd = ["zfs", "destroy"]
|
||
if recursive:
|
||
cmd.append("-r")
|
||
cmd.append(dataset_name)
|
||
|
||
stdout, stderr, rc = self.run_command(cmd)
|
||
|
||
if rc != 0:
|
||
logger.error(f"zfs destroy failed: {stderr}")
|
||
return {"status": "error", "message": stderr}
|
||
|
||
return {"status": "success", "message": f"Dataset {dataset_name} destroyed"}
|
||
|
||
# ============== SNAPSHOT OPERATIONS ==============
|
||
|
||
def list_snapshots(self, dataset_name: Optional[str] = None, limit: int = 50) -> List[Dict[str, Any]]:
|
||
"""
|
||
List snapshots (with limit for performance on many snapshots)
|
||
"""
|
||
cache_key = f"snapshots:{dataset_name or '*'}"
|
||
cached = self.cache.get(cache_key)
|
||
if cached:
|
||
return cached
|
||
|
||
if dataset_name:
|
||
# No -d limit so sub-datasets (e.g. tank/share) are included
|
||
cmd = ["zfs", "list", "-t", "snapshot", "-r", "-H", "-p",
|
||
"-o", "name,used,referenced,creation", dataset_name]
|
||
else:
|
||
cmd = ["zfs", "list", "-t", "snapshot", "-H", "-p",
|
||
"-o", "name,used,referenced,creation"]
|
||
|
||
stdout, stderr, rc = self.run_command(cmd)
|
||
|
||
if rc != 0:
|
||
logger.error(f"zfs list snapshots failed: {stderr}")
|
||
return []
|
||
|
||
snapshots = []
|
||
for line in stdout.strip().split("\n"):
|
||
if not line:
|
||
continue
|
||
|
||
parts = line.split("\t")
|
||
if len(parts) < 4:
|
||
continue
|
||
|
||
snapshot = {
|
||
"name": parts[0],
|
||
"used": int(parts[1]),
|
||
"referenced": int(parts[2]),
|
||
"creation": int(parts[3]), # Unix timestamp
|
||
}
|
||
snapshots.append(snapshot)
|
||
|
||
# Sort by creation time (newest first) and limit
|
||
snapshots.sort(key=lambda x: x["creation"], reverse=True)
|
||
snapshots = snapshots[:limit]
|
||
|
||
self.cache.set(cache_key, snapshots, ttl_seconds=60)
|
||
return snapshots
|
||
|
||
def create_snapshot(self, dataset_name: str, snapshot_name: Optional[str] = None) -> Dict[str, str]:
|
||
"""
|
||
Create snapshot with auto-generated name if not provided
|
||
"""
|
||
if not snapshot_name:
|
||
from datetime import datetime as dt
|
||
snapshot_name = dt.now().strftime("%Y%m%d-%H%M%S")
|
||
|
||
full_name = f"{dataset_name}@{snapshot_name}"
|
||
|
||
stdout, stderr, rc = self.run_command(["zfs", "snapshot", full_name])
|
||
|
||
if rc != 0:
|
||
logger.error(f"zfs snapshot failed: {stderr}")
|
||
return {"status": "error", "message": stderr}
|
||
|
||
return {"status": "success", "message": f"Snapshot {full_name} created"}
|
||
|
||
def destroy_snapshot(self, snapshot_name: str, recursive: bool = False) -> Dict[str, str]:
|
||
"""
|
||
Destroy snapshot
|
||
"""
|
||
cmd = ["zfs", "destroy"]
|
||
if recursive:
|
||
cmd.append("-r")
|
||
cmd.append(snapshot_name)
|
||
|
||
stdout, stderr, rc = self.run_command(cmd)
|
||
|
||
if rc != 0:
|
||
logger.error(f"zfs destroy snapshot failed: {stderr}")
|
||
return {"status": "error", "message": stderr}
|
||
|
||
return {"status": "success", "message": f"Snapshot {snapshot_name} destroyed"}
|
||
|
||
def rollback_snapshot(self, snapshot_name: str) -> Dict[str, str]:
|
||
"""
|
||
Rollback dataset to snapshot
|
||
WARNING: Destroys data after snapshot!
|
||
"""
|
||
stdout, stderr, rc = self.run_command(["zfs", "rollback", "-r", snapshot_name])
|
||
|
||
if rc != 0:
|
||
logger.error(f"zfs rollback failed: {stderr}")
|
||
return {"status": "error", "message": stderr}
|
||
|
||
return {"status": "success", "message": f"Rolled back to {snapshot_name}"}
|
||
|
||
# ============== UTILITY ==============
|
||
|
||
def clear_cache(self):
|
||
"""Clear all caches"""
|
||
self.cache = ZFSCache()
|
||
logger.info("ZFS cache cleared")
|
||
|
||
# Global instance
|
||
zfs_runner = ZFSRunner()
|