Refactor: Java-Klassen aus Services entfernt + .gitignore aus Repo

shares.py, zfs_runner.py: SharesManager/ZFSRunner → Modul-Funktionen
Backward-compat Shims erhalten (zfs_runner/share_manager bleiben nutzbar)
system_users.py, auth.py.bak: ungenutzte Dateien gelöscht
.gitignore: aus Repo entfernt (enthält interne Pfade/Infos)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-06-05 19:10:06 +02:00
parent 10306cbd5b
commit 673c7d2f96
5 changed files with 541 additions and 1118 deletions
+1
View File
@@ -32,3 +32,4 @@ DEPLOYMENT_PI.md
PROXMOX_*.md PROXMOX_*.md
memory/ memory/
CLAUDE.md CLAUDE.md
.gitignore
-89
View File
@@ -1,89 +0,0 @@
"""
JWT Authentication Service
Handles user login via PAM (Linux system users), token generation, and verification
"""
import logging
import os
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
logger = logging.getLogger(__name__)
# JWT Configuration
SECRET_KEY = os.environ.get("ZFS_SECRET_KEY", "your-secret-key-change-in-production")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_HOURS = 8
# Try to import PAM for system authentication
try:
import pam
PAM_AVAILABLE = True
except ImportError:
PAM_AVAILABLE = False
logger.warning("python-pam not installed, PAM authentication unavailable")
class AuthService:
def __init__(self):
"""Initialize auth service with PAM (Linux system users)"""
if PAM_AVAILABLE:
logger.info("Using PAM authentication (Linux system users)")
else:
logger.error("PAM not available - install python-pam for authentication")
def authenticate_user(self, username: str, password: str) -> Optional[dict]:
"""
Authenticate user via PAM (Linux system users like 'pi', 'root')
Returns user data if valid, None otherwise
"""
if not PAM_AVAILABLE:
logger.error("PAM not available")
return None
try:
p = pam.pam()
if p.authenticate(username, password):
logger.info(f"User {username} authenticated via PAM")
return {
"username": username,
"source": "pam"
}
else:
logger.warning(f"PAM authentication failed for user {username}: {p.reason}")
return None
except Exception as e:
logger.error(f"PAM authentication error: {e}")
return None
def create_access_token(self, username: str, expires_delta: Optional[timedelta] = None) -> str:
"""Create JWT access token"""
if expires_delta is None:
expires_delta = timedelta(hours=ACCESS_TOKEN_EXPIRE_HOURS)
expire = datetime.utcnow() + expires_delta
to_encode = {"sub": username, "exp": expire}
try:
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
except Exception as e:
logger.error(f"Failed to create token: {e}")
raise
def verify_token(self, token: str) -> Optional[str]:
"""Verify JWT token and return username"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
return None
return username
except JWTError:
return None
# Global instance
auth_service = AuthService()
+235 -295
View File
@@ -1,6 +1,5 @@
""" """
Samba and NFS Shares Management Samba and NFS Shares Management
Handles /etc/samba/smb.conf and /etc/exports
""" """
import re import re
@@ -15,320 +14,261 @@ SAMBA_CONFIG = Path("/etc/samba/smb.conf")
NFS_EXPORTS = Path("/etc/exports") NFS_EXPORTS = Path("/etc/exports")
class SharesManager: def list_samba_shares() -> List[Dict[str, Any]]:
"""Manage Samba and NFS shares""" if not SAMBA_CONFIG.exists():
return []
shares = []
try:
with open(SAMBA_CONFIG) as f:
content = f.read()
current_share = None
for line in content.split('\n'):
line = line.strip()
if not line or line.startswith('#') or line.startswith(';'):
continue
if line.startswith('[') and line.endswith(']'):
current_share = line[1:-1]
if current_share.lower() != 'global':
shares.append({'name': current_share, 'path': None, 'comment': None})
else:
current_share = None
continue
if '=' in line and current_share:
key, value = line.split('=', 1)
key = key.strip().lower()
value = value.strip()
if key == 'path':
shares[-1]['path'] = value
elif key == 'comment':
shares[-1]['comment'] = value
return [s for s in shares if s['path']]
except Exception as e:
logger.error(f"Error parsing Samba config: {e}")
return []
def list_samba_shares(self) -> List[Dict[str, Any]]:
"""Parse /etc/samba/smb.conf and return shares"""
if not SAMBA_CONFIG.exists():
return []
shares = [] def create_samba_share(name: str, path: str, comment: Optional[str] = None) -> bool:
try: if not SAMBA_CONFIG.exists() or not name.strip() or not path.strip():
with open(SAMBA_CONFIG, 'r') as f: return False
content = f.read() try:
section = f"\n[{name.strip()}]\n path = {path.strip()}\n"
if comment:
section += f" comment = {comment}\n"
section += " browseable = yes\n read only = no\n"
with open(SAMBA_CONFIG, 'a') as f:
f.write(section)
subprocess.run(['/usr/bin/smbcontrol', 'smbd', 'reload-config'], capture_output=True, timeout=10)
logger.info(f"Samba share created: {name}")
return True
except Exception as e:
logger.error(f"Error creating Samba share: {e}")
return False
current_share = None
for line in content.split('\n'): def update_samba_share(old_name: str, new_name: str, path: str, comment: Optional[str] = None) -> bool:
if not SAMBA_CONFIG.exists():
return False
try:
with open(SAMBA_CONFIG) as f:
content = f.read()
pattern = rf"\n\[{re.escape(old_name)}\].*?(?=\n\[|\Z)"
if not re.search(pattern, content, flags=re.DOTALL):
return False
section = f"\n[{new_name}]\n path = {path}\n"
if comment:
section += f" comment = {comment}\n"
section += " browseable = yes\n read only = no\n"
with open(SAMBA_CONFIG, 'w') as f:
f.write(re.sub(pattern, section, content, flags=re.DOTALL))
subprocess.run(['/usr/bin/smbcontrol', 'smbd', 'reload-config'], capture_output=True, timeout=10)
logger.info(f"Samba share updated: {old_name}{new_name}")
return True
except Exception as e:
logger.error(f"Error updating Samba share: {e}")
return False
def delete_samba_share(name: str) -> bool:
if not SAMBA_CONFIG.exists():
return False
try:
with open(SAMBA_CONFIG) as f:
content = f.read()
pattern = rf"\n\[{re.escape(name)}\].*?(?=\n\[|\Z)"
new_content = re.sub(pattern, '', content, flags=re.DOTALL)
if new_content == content:
return False
with open(SAMBA_CONFIG, 'w') as f:
f.write(new_content)
subprocess.run(['/usr/bin/smbcontrol', 'smbd', 'reload-config'], capture_output=True, timeout=10)
logger.info(f"Samba share deleted: {name}")
return True
except Exception as e:
logger.error(f"Error deleting Samba share: {e}")
return False
def list_nfs_shares() -> List[Dict[str, Any]]:
if not NFS_EXPORTS.exists():
return []
shares = []
try:
with open(NFS_EXPORTS) as f:
for line in f:
line = line.strip() line = line.strip()
if not line or line.startswith('#') or line.startswith(';'): if not line or line.startswith('#'):
continue continue
parts = line.split()
if len(parts) >= 2:
path = parts[0]
rest = ' '.join(parts[1:])
clients = rest[:rest.index('(')].strip() if '(' in rest else rest
options = rest[rest.index('(') + 1:rest.index(')')] if '(' in rest else None
shares.append({'path': path, 'clients': clients, 'options': options})
return shares
except Exception as e:
logger.error(f"Error parsing NFS exports: {e}")
return []
if line.startswith('[') and line.endswith(']'):
current_share = line[1:-1]
if current_share.lower() != 'global':
shares.append({'name': current_share, 'path': None, 'comment': None})
else:
current_share = None
continue
if '=' in line and current_share: def create_nfs_share(path: str, clients: str, options: Optional[str] = None) -> bool:
key, value = line.split('=', 1) if not NFS_EXPORTS.exists() or not path.strip() or not clients.strip():
key = key.strip().lower() return False
value = value.strip() try:
if key == 'path': opts = options or "rw,sync,no_subtree_check"
shares[-1]['path'] = value with open(NFS_EXPORTS, 'a') as f:
elif key == 'comment': f.write(f"{path.strip()} {clients.strip()}({opts})\n")
shares[-1]['comment'] = value subprocess.run(['/usr/sbin/exportfs', '-r'], capture_output=True, timeout=10)
logger.info(f"NFS share created: {path}")
return True
except Exception as e:
logger.error(f"Error creating NFS share: {e}")
return False
return [s for s in shares if s['path']]
except Exception as e: def delete_nfs_share(path: str) -> bool:
logger.error(f"Error parsing Samba config: {e}") if not NFS_EXPORTS.exists():
return [] return False
try:
def create_samba_share(self, name: str, path: str, comment: Optional[str] = None) -> bool: with open(NFS_EXPORTS) as f:
"""Add Samba share to /etc/samba/smb.conf""" lines = f.readlines()
if not SAMBA_CONFIG.exists() or not name.strip() or not path.strip(): new_lines = [l for l in lines if not l.strip().startswith(path)]
if len(new_lines) == len(lines):
return False return False
with open(NFS_EXPORTS, 'w') as f:
f.writelines(new_lines)
subprocess.run(['/usr/sbin/exportfs', '-r'], capture_output=True, timeout=10)
logger.info(f"NFS share deleted: {path}")
return True
except Exception as e:
logger.error(f"Error deleting NFS share: {e}")
return False
try:
name = name.strip()
path = path.strip()
section = f"\n[{name}]\n path = {path}\n"
if comment:
section += f" comment = {comment}\n"
section += f" browseable = yes\n read only = no\n"
with open(SAMBA_CONFIG, 'a') as f: def get_samba_global_config() -> Dict[str, Any]:
f.write(section) try:
result = subprocess.run(
subprocess.run(['/usr/bin/smbcontrol', 'smbd', 'reload-config'], capture_output=True, timeout=10) ['/usr/bin/net', 'conf', 'list'],
logger.info(f"Samba share created: {name}") capture_output=True, text=True, timeout=10
return True )
except Exception as e: if result.returncode != 0:
logger.error(f"Error creating Samba share: {e}")
return False
def update_samba_share(self, old_name: str, new_name: str, path: str, comment: Optional[str] = None) -> bool:
"""Update Samba share in /etc/samba/smb.conf"""
if not SAMBA_CONFIG.exists():
return False
try:
with open(SAMBA_CONFIG, 'r') as f:
content = f.read()
# Find and replace the share section
pattern = rf"\n\[{re.escape(old_name)}\].*?(?=\n\[|\Z)"
match = re.search(pattern, content, flags=re.DOTALL)
if not match:
return False
# Build new section
section = f"\n[{new_name}]\n path = {path}\n"
if comment:
section += f" comment = {comment}\n"
section += f" browseable = yes\n read only = no\n"
new_content = re.sub(pattern, section, content, flags=re.DOTALL)
with open(SAMBA_CONFIG, 'w') as f:
f.write(new_content)
subprocess.run(['/usr/bin/smbcontrol', 'smbd', 'reload-config'], capture_output=True, timeout=10)
logger.info(f"Samba share updated: {old_name}{new_name}")
return True
except Exception as e:
logger.error(f"Error updating Samba share: {e}")
return False
def delete_samba_share(self, name: str) -> bool:
"""Remove Samba share from /etc/samba/smb.conf"""
if not SAMBA_CONFIG.exists():
return False
try:
with open(SAMBA_CONFIG, 'r') as f:
content = f.read()
pattern = rf"\n\[{re.escape(name)}\].*?(?=\n\[|\Z)"
new_content = re.sub(pattern, '', content, flags=re.DOTALL)
if new_content == content:
return False
with open(SAMBA_CONFIG, 'w') as f:
f.write(new_content)
subprocess.run(['/usr/bin/smbcontrol', 'smbd', 'reload-config'], capture_output=True, timeout=10)
logger.info(f"Samba share deleted: {name}")
return True
except Exception as e:
logger.error(f"Error deleting Samba share: {e}")
return False
def list_nfs_shares(self) -> List[Dict[str, Any]]:
"""Parse /etc/exports and return NFS shares"""
if not NFS_EXPORTS.exists():
return []
shares = []
try:
with open(NFS_EXPORTS, 'r') as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
parts = line.split()
if len(parts) >= 2:
path = parts[0]
rest = ' '.join(parts[1:])
clients = rest[:rest.index('(')].strip() if '(' in rest else rest
options = rest[rest.index('(') + 1:rest.index(')')] if '(' in rest else None
shares.append({'path': path, 'clients': clients, 'options': options})
return shares
except Exception as e:
logger.error(f"Error parsing NFS exports: {e}")
return []
def create_nfs_share(self, path: str, clients: str, options: Optional[str] = None) -> bool:
"""Add NFS share to /etc/exports"""
if not NFS_EXPORTS.exists() or not path.strip() or not clients.strip():
return False
try:
path = path.strip()
clients = clients.strip()
if not options:
options = "rw,sync,no_subtree_check"
export_line = f"{path} {clients}({options})\n"
with open(NFS_EXPORTS, 'a') as f:
f.write(export_line)
subprocess.run(['/usr/sbin/exportfs', '-r'], capture_output=True, timeout=10)
logger.info(f"NFS share created: {path}")
return True
except Exception as e:
logger.error(f"Error creating NFS share: {e}")
return False
def delete_nfs_share(self, path: str) -> bool:
"""Remove NFS share from /etc/exports"""
if not NFS_EXPORTS.exists():
return False
try:
with open(NFS_EXPORTS, 'r') as f:
lines = f.readlines()
new_lines = [l for l in lines if not l.strip().startswith(path)]
if len(new_lines) == len(lines):
return False
with open(NFS_EXPORTS, 'w') as f:
f.writelines(new_lines)
subprocess.run(['/usr/sbin/exportfs', '-r'], capture_output=True, timeout=10)
logger.info(f"NFS share deleted: {path}")
return True
except Exception as e:
logger.error(f"Error deleting NFS share: {e}")
return False
def get_samba_global_config(self) -> Dict[str, Any]:
"""Read Samba global configuration from registry using 'net conf list'"""
try:
result = subprocess.run(
['/usr/bin/net', 'conf', 'list'],
capture_output=True,
text=True,
timeout=10
)
if result.returncode != 0:
return {"parameters": []}
parameters = []
in_global = False
for line in result.stdout.split('\n'):
if line.strip().startswith('[global]'):
in_global = True
continue
if in_global:
if line.strip().startswith('['):
break
line = line.strip()
if not line or line.startswith(';') or line.startswith('#'):
continue
if '=' in line:
key, value = line.split('=', 1)
parameters.append({
"key": key.strip(),
"value": value.strip()
})
return {"parameters": parameters}
except Exception as e:
logger.error(f"Error reading Samba registry config: {e}")
return {"parameters": []} return {"parameters": []}
parameters = []
in_global = False
for line in result.stdout.split('\n'):
if line.strip().startswith('[global]'):
in_global = True
continue
if in_global:
if line.strip().startswith('['):
break
line = line.strip()
if not line or line.startswith(';') or line.startswith('#'):
continue
if '=' in line:
key, value = line.split('=', 1)
parameters.append({"key": key.strip(), "value": value.strip()})
return {"parameters": parameters}
except Exception as e:
logger.error(f"Error reading Samba registry config: {e}")
return {"parameters": []}
def set_samba_global_config(self, parameters: Dict[str, str]) -> bool:
"""Update Samba global configuration parameters using 'net conf setparm/delparm'"""
try:
current = self.get_samba_global_config()
current_keys = {p["key"] for p in current.get("parameters", [])}
new_keys = set(parameters.keys())
# Delete keys that were removed def set_samba_global_config(parameters: Dict[str, str]) -> bool:
for key in current_keys - new_keys: try:
subprocess.run( current_keys = {p["key"] for p in get_samba_global_config().get("parameters", [])}
['/usr/bin/net', 'conf', 'delparm', 'global', key], for key in current_keys - set(parameters.keys()):
capture_output=True, text=True, timeout=10 subprocess.run(['/usr/bin/net', 'conf', 'delparm', 'global', key],
) capture_output=True, text=True, timeout=10)
for key, value in parameters.items():
# Set/update remaining keys
for key, value in parameters.items():
result = subprocess.run(
['/usr/bin/net', 'conf', 'setparm', 'global', key, '--', value],
capture_output=True,
text=True,
timeout=10
)
if result.returncode != 0:
logger.error(f"Failed to set {key}={value}: {result.stderr}")
return False
logger.info(f"Samba global config updated: {len(parameters)} parameters")
return True
except Exception as e:
logger.error(f"Error writing Samba global config: {e}")
return False
def import_samba_config(self, config_file: str) -> bool:
"""Import Samba configuration using net conf import"""
try:
# Use net conf import to load configuration from file
result = subprocess.run( result = subprocess.run(
['net', 'conf', 'import', config_file], ['/usr/bin/net', 'conf', 'setparm', 'global', key, '--', value],
capture_output=True, capture_output=True, text=True, timeout=10
timeout=10
) )
if result.returncode == 0: if result.returncode != 0:
logger.info(f"Samba config imported from {config_file}") logger.error(f"Failed to set {key}={value}: {result.stderr}")
return True
else:
logger.error(f"Failed to import Samba config: {result.stderr.decode()}")
return False return False
except Exception as e: logger.info(f"Samba global config updated: {len(parameters)} parameters")
logger.error(f"Error importing Samba config: {e}") return True
return False except Exception as e:
logger.error(f"Error writing Samba global config: {e}")
return False
def get_nfs_config(self) -> Dict[str, Any]:
"""Read /etc/exports and return as config object"""
if not NFS_EXPORTS.exists():
return {"exports": "", "note": "NFS not configured"}
try: def import_samba_config(config_file: str) -> bool:
with open(NFS_EXPORTS, 'r') as f: try:
content = f.read() result = subprocess.run(
return {"exports": content, "path": str(NFS_EXPORTS)} ['/usr/bin/net', 'conf', 'import', config_file],
except Exception as e: capture_output=True, timeout=10
logger.error(f"Error reading NFS config: {e}") )
return {"error": str(e), "path": str(NFS_EXPORTS)} if result.returncode == 0:
logger.info(f"Samba config imported from {config_file}")
def set_nfs_config(self, content: str) -> bool:
"""Write to /etc/exports and reload NFS"""
if not NFS_EXPORTS.exists():
return False
try:
with open(NFS_EXPORTS, 'w') as f:
f.write(content)
subprocess.run(['/usr/sbin/exportfs', '-r'], capture_output=True, timeout=10)
logger.info("NFS config updated")
return True return True
except Exception as e: logger.error(f"Failed to import Samba config: {result.stderr.decode()}")
logger.error(f"Error writing NFS config: {e}") return False
return False except Exception as e:
logger.error(f"Error importing Samba config: {e}")
return False
share_manager = SharesManager() def get_nfs_config() -> Dict[str, Any]:
if not NFS_EXPORTS.exists():
return {"exports": "", "note": "NFS not configured"}
try:
with open(NFS_EXPORTS) as f:
return {"exports": f.read(), "path": str(NFS_EXPORTS)}
except Exception as e:
logger.error(f"Error reading NFS config: {e}")
return {"error": str(e), "path": str(NFS_EXPORTS)}
def set_nfs_config(content: str) -> bool:
if not NFS_EXPORTS.exists():
return False
try:
with open(NFS_EXPORTS, 'w') as f:
f.write(content)
subprocess.run(['/usr/sbin/exportfs', '-r'], capture_output=True, timeout=10)
logger.info("NFS config updated")
return True
except Exception as e:
logger.error(f"Error writing NFS config: {e}")
return False
# Backward-compat shim — routers can use either style
class _ShareManagerShim:
list_samba_shares = staticmethod(list_samba_shares)
create_samba_share = staticmethod(create_samba_share)
update_samba_share = staticmethod(update_samba_share)
delete_samba_share = staticmethod(delete_samba_share)
list_nfs_shares = staticmethod(list_nfs_shares)
create_nfs_share = staticmethod(create_nfs_share)
delete_nfs_share = staticmethod(delete_nfs_share)
get_samba_global_config = staticmethod(get_samba_global_config)
set_samba_global_config = staticmethod(set_samba_global_config)
import_samba_config = staticmethod(import_samba_config)
get_nfs_config = staticmethod(get_nfs_config)
set_nfs_config = staticmethod(set_nfs_config)
share_manager = _ShareManagerShim()
-221
View File
@@ -1,221 +0,0 @@
"""
System User and Group Management
Wrapper around /etc/passwd, /etc/group, useradd, groupadd, etc.
"""
import subprocess
import logging
import pwd
import grp
from typing import List, Dict, Any, Optional
logger = logging.getLogger(__name__)
class SystemUserManager:
"""Manage system users and groups"""
def list_users(self) -> List[Dict[str, Any]]:
"""List all system users"""
users = []
try:
for entry in pwd.getwall():
users.append({
"username": entry.pw_name,
"uid": entry.pw_uid,
"gid": entry.pw_gid,
"gecos": entry.pw_gecos,
"home": entry.pw_dir,
"shell": entry.pw_shell
})
except Exception as e:
logger.error(f"Error listing users: {e}")
return sorted(users, key=lambda u: u["uid"])
def list_groups(self) -> List[Dict[str, Any]]:
"""List all system groups"""
groups = []
try:
for entry in grp.getgrall():
groups.append({
"groupname": entry.gr_name,
"gid": entry.gr_gid,
"members": entry.gr_mem
})
except Exception as e:
logger.error(f"Error listing groups: {e}")
return sorted(groups, key=lambda g: g["gid"])
def get_user(self, username: str) -> Optional[Dict[str, Any]]:
"""Get user details"""
try:
entry = pwd.getpwnam(username)
return {
"username": entry.pw_name,
"uid": entry.pw_uid,
"gid": entry.pw_gid,
"gecos": entry.pw_gecos,
"home": entry.pw_dir,
"shell": entry.pw_shell
}
except KeyError:
return None
except Exception as e:
logger.error(f"Error getting user {username}: {e}")
return None
def get_group(self, groupname: str) -> Optional[Dict[str, Any]]:
"""Get group details"""
try:
entry = grp.getgrnam(groupname)
return {
"groupname": entry.gr_name,
"gid": entry.gr_gid,
"members": entry.gr_mem
}
except KeyError:
return None
except Exception as e:
logger.error(f"Error getting group {groupname}: {e}")
return None
def create_user(
self,
username: str,
password: str,
home_dir: Optional[str] = None,
shell: str = "/bin/bash",
groups: Optional[List[str]] = None
) -> Dict[str, str]:
"""Create new system user"""
try:
cmd = ["useradd"]
if home_dir:
cmd.extend(["-d", home_dir])
cmd.extend(["-s", shell])
if groups:
cmd.extend(["-G", ",".join(groups)])
cmd.append(username)
result = subprocess.run(cmd, capture_output=True, text=True, check=False)
if result.returncode != 0:
logger.error(f"useradd failed: {result.stderr}")
return {"error": result.stderr}
# Set password
if password:
# Use chpasswd for password setting
passwd_cmd = subprocess.Popen(
["chpasswd"],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
_, err = passwd_cmd.communicate(input=f"{username}:{password}\n")
if passwd_cmd.returncode != 0:
logger.error(f"chpasswd failed: {err}")
return {"error": f"User created but password failed: {err}"}
logger.info(f"Created user: {username}")
return {"status": "success", "username": username}
except Exception as e:
logger.error(f"Error creating user: {e}")
return {"error": str(e)}
def delete_user(self, username: str, remove_home: bool = False) -> Dict[str, str]:
"""Delete system user"""
try:
cmd = ["userdel"]
if remove_home:
cmd.append("-r")
cmd.append(username)
result = subprocess.run(cmd, capture_output=True, text=True, check=False)
if result.returncode != 0:
logger.error(f"userdel failed: {result.stderr}")
return {"error": result.stderr}
logger.info(f"Deleted user: {username}")
return {"status": "success", "message": f"User {username} deleted"}
except Exception as e:
logger.error(f"Error deleting user: {e}")
return {"error": str(e)}
def create_group(self, groupname: str) -> Dict[str, str]:
"""Create new system group"""
try:
result = subprocess.run(
["groupadd", groupname],
capture_output=True,
text=True,
check=False
)
if result.returncode != 0:
logger.error(f"groupadd failed: {result.stderr}")
return {"error": result.stderr}
logger.info(f"Created group: {groupname}")
return {"status": "success", "groupname": groupname}
except Exception as e:
logger.error(f"Error creating group: {e}")
return {"error": str(e)}
def delete_group(self, groupname: str) -> Dict[str, str]:
"""Delete system group"""
try:
result = subprocess.run(
["groupdel", groupname],
capture_output=True,
text=True,
check=False
)
if result.returncode != 0:
logger.error(f"groupdel failed: {result.stderr}")
return {"error": result.stderr}
logger.info(f"Deleted group: {groupname}")
return {"status": "success", "message": f"Group {groupname} deleted"}
except Exception as e:
logger.error(f"Error deleting group: {e}")
return {"error": str(e)}
def add_user_to_group(self, username: str, groupname: str) -> Dict[str, str]:
"""Add user to group"""
try:
result = subprocess.run(
["usermod", "-aG", groupname, username],
capture_output=True,
text=True,
check=False
)
if result.returncode != 0:
logger.error(f"usermod failed: {result.stderr}")
return {"error": result.stderr}
logger.info(f"Added {username} to {groupname}")
return {"status": "success", "message": f"{username} added to {groupname}"}
except Exception as e:
logger.error(f"Error adding user to group: {e}")
return {"error": str(e)}
# Global instance
system_user_manager = SystemUserManager()
+305 -513
View File
@@ -1,6 +1,5 @@
""" """
ZFS Command Runner Wrapper für zpool/zfs CLI Commands ZFS Command Runner Wrapper für zpool/zfs CLI Commands
Handles subprocess execution, parsing, caching, error handling
""" """
import subprocess import subprocess
@@ -8,15 +7,16 @@ import json
import logging import logging
import glob import glob
import os import os
from typing import Dict, List, Any, Optional, Tuple
from dataclasses import dataclass
from datetime import datetime, timedelta
import re import re
from typing import Dict, List, Any, Optional, Tuple
from datetime import datetime, timedelta
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Detect ZFS availability once at import time — avoids repeated ERROR logs on LXC/non-ZFS systems _TIMEOUT = 5
# shutil.which is not enough: on privileged LXC containers zpool exists but /dev/zfs is missing
# ── ZFS availability ──────────────────────────────────────────────────────────
def _probe_zfs() -> bool: def _probe_zfs() -> bool:
try: try:
r = subprocess.run(["zpool", "list"], capture_output=True, timeout=3) r = subprocess.run(["zpool", "list"], capture_output=True, timeout=3)
@@ -28,541 +28,333 @@ ZFS_AVAILABLE = _probe_zfs()
if not ZFS_AVAILABLE: if not ZFS_AVAILABLE:
logger.info("ZFS not available on this system (pools/snapshots disabled)") logger.info("ZFS not available on this system (pools/snapshots disabled)")
# Cache with TTL # ── TTL cache ─────────────────────────────────────────────────────────────────
@dataclass
class CacheEntry:
data: Any
expires_at: datetime
class ZFSCache: _cache: Dict[str, Tuple[Any, datetime]] = {}
def __init__(self):
self.pool_status = CacheEntry(None, datetime.now())
self.snapshots = CacheEntry(None, datetime.now())
self.datasets = CacheEntry(None, datetime.now())
def get(self, key: str) -> Optional[Any]: def _cache_get(key: str) -> Optional[Any]:
cache_dict = { if key not in _cache:
"pool_status": self.pool_status, return None
"snapshots": self.snapshots, data, expires_at = _cache[key]
"datasets": self.datasets, if datetime.now() > expires_at:
} return None
return data
if key not in cache_dict: def _cache_set(key: str, data: Any, ttl: int = 60) -> None:
return None _cache[key] = (data, datetime.now() + timedelta(seconds=ttl))
entry = cache_dict[key] def clear_cache() -> None:
if not entry or not entry.data: _cache.clear()
return None logger.info("ZFS cache cleared")
if datetime.now() > entry.expires_at: # ── Subprocess helper ─────────────────────────────────────────────────────────
return None
return entry.data def _run(cmd: List[str], timeout: int = _TIMEOUT) -> Tuple[str, str, int]:
try:
r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout, check=False)
return r.stdout, r.stderr, r.returncode
except subprocess.TimeoutExpired:
logger.error(f"Command timeout after {timeout}s: {' '.join(cmd)}")
return "", f"Command timeout after {timeout}s", -1
except FileNotFoundError:
logger.debug(f"Command not found: {' '.join(cmd)}")
return "", f"Command not found: {cmd[0]}", -1
except Exception as e:
logger.error(f"Command execution error: {e}")
return "", str(e), -1
def set(self, key: str, data: Any, ttl_seconds: int = 60): # ── Pool operations ───────────────────────────────────────────────────────────
cache_dict = {
"pool_status": self.pool_status,
"snapshots": self.snapshots,
"datasets": self.datasets,
}
if key in cache_dict: def list_pools() -> List[Dict[str, Any]]:
cache_dict[key].data = data cached = _cache_get("pool_status")
cache_dict[key].expires_at = datetime.now() + timedelta(seconds=ttl_seconds) if cached is not None:
return cached
if not ZFS_AVAILABLE:
return []
stdout, stderr, rc = _run(["zpool", "list", "-H", "-p"])
if rc != 0:
logger.error(f"zpool list failed: {stderr}")
return []
pools = []
for line in stdout.strip().split("\n"):
if not line:
continue
parts = line.split()
if len(parts) < 10:
continue
pools.append({
"name": parts[0],
"size": int(parts[1]),
"alloc": int(parts[2]),
"free": int(parts[3]),
"fragmentation": f"{parts[6]}%",
"capacity": f"{parts[7]}%",
"health": parts[9],
})
_cache_set("pool_status", pools, ttl=30)
return pools
class ZFSRunner:
def __init__(self, timeout: int = 5):
self.timeout = timeout
self.cache = ZFSCache()
def run_command(self, cmd: List[str], timeout: Optional[int] = None) -> Tuple[str, str, int]: def get_disk_id_map() -> Dict[str, str]:
""" mapping: Dict[str, str] = {}
Run subprocess command with timeout for pattern in ["ata-", "nvme-", "scsi-", "wwn-"]:
Returns: (stdout, stderr, returncode) for link in glob.glob(f"/dev/disk/by-id/{pattern}*"):
""" if "-part" in os.path.basename(link):
if timeout is None:
timeout = self.timeout
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=timeout,
check=False
)
return result.stdout, result.stderr, result.returncode
except subprocess.TimeoutExpired:
logger.error(f"Command timeout after {timeout}s: {' '.join(cmd)}")
return "", f"Command timeout after {timeout}s", -1
except FileNotFoundError:
logger.debug(f"Command not found: {' '.join(cmd)}")
return "", f"Command not found: {cmd[0]}", -1
except Exception as e:
logger.error(f"Command execution error: {e}")
return "", str(e), -1
# ============== POOL OPERATIONS ==============
def list_pools(self) -> List[Dict[str, Any]]:
"""
Get list of ZFS pools with status
Uses cache (TTL 30s)
"""
cached = self.cache.get("pool_status")
if cached:
return cached
if not ZFS_AVAILABLE:
return []
stdout, stderr, rc = self.run_command(["zpool", "list", "-H", "-p"])
if rc != 0:
logger.error(f"zpool list failed: {stderr}")
return []
pools = []
for line in stdout.strip().split("\n"):
if not line:
continue continue
try:
parts = line.split() target = os.path.realpath(link)
if len(parts) < 10: disk = os.path.basename(target)
if disk not in mapping:
mapping[disk] = os.path.basename(link)
except OSError:
continue continue
return mapping
pool = {
"name": parts[0],
"size": int(parts[1]),
"alloc": int(parts[2]),
"free": int(parts[3]),
"fragmentation": f"{parts[6]}%",
"capacity": f"{parts[7]}%",
"health": parts[9]
}
pools.append(pool)
self.cache.set("pool_status", pools, ttl_seconds=30) def _annotate_disk_ids(vdevs: List[Dict], disk_id_map: Dict[str, str]) -> None:
return pools for vdev in vdevs:
children = vdev.get("children", [])
def _parse_vdev_tree(self, config_lines: List[str]) -> List[Dict[str, Any]]: if not children:
""" name = vdev.get("name", "")
Parse VDEV tree from zpool status config section. base = re.sub(r'\d+$', '', name) if name[-1:].isdigit() else name
Uses indentation levels to reconstruct hierarchy. vdev["disk_id"] = disk_id_map.get(name) or disk_id_map.get(base)
Returns list of vdev dicts with name, state, and error counters (read/write/cksum).
"""
roots: List[Dict] = []
stack: List[tuple] = [] # (indent, vdev_dict)
for line in config_lines:
if not line.strip():
continue
# Skip header line (NAME STATE READ WRITE CKSUM)
if line.strip().startswith("NAME"):
continue
indent = len(line) - len(line.lstrip())
parts = line.split()
if not parts:
continue
name = parts[0]
state = parts[1] if len(parts) > 1 else "UNKNOWN"
# Parse error counters and convert to integers
read = 0
write = 0
cksum = 0
if len(parts) > 2:
try:
read = int(parts[2])
except (ValueError, IndexError):
read = 0
if len(parts) > 3:
try:
write = int(parts[3])
except (ValueError, IndexError):
write = 0
if len(parts) > 4:
try:
cksum = int(parts[4])
except (ValueError, IndexError):
cksum = 0
vdev: Dict[str, Any] = {
"name": name,
"state": state,
"read": read,
"write": write,
"cksum": cksum,
"children": []
}
# Pop stack entries that are at same or deeper indent
while stack and stack[-1][0] >= indent:
stack.pop()
if stack:
stack[-1][1]["children"].append(vdev)
else:
roots.append(vdev)
stack.append((indent, vdev))
return roots
def get_disk_id_map(self) -> Dict[str, str]:
"""
Build mapping {sda: ata-WDC_WD20EZRZ-..., nvme0n1: nvme-Samsung_...} from /dev/disk/by-id/.
Prefers ata- prefix, then nvme-, then scsi-, then wwn-.
"""
mapping: Dict[str, str] = {}
priority = ["ata-", "nvme-", "scsi-", "wwn-"]
for pattern in priority:
for link in glob.glob(f"/dev/disk/by-id/{pattern}*"):
if "-part" in os.path.basename(link):
continue
try:
target = os.path.realpath(link)
disk = os.path.basename(target)
if disk not in mapping:
mapping[disk] = os.path.basename(link)
except OSError:
continue
return mapping
def get_smart_info(self, disk: str) -> Dict[str, Any]:
"""
Run smartctl -A -i --json on /dev/disk and return parsed health data.
Returns empty dict if smartctl unavailable or disk not SMART-capable.
"""
stdout, stderr, rc = self.run_command(["smartctl", "-A", "-i", "--json", f"/dev/{disk}"])
if not stdout:
return {}
try:
data = json.loads(stdout)
except json.JSONDecodeError:
return {}
result: Dict[str, Any] = {}
# Device info
device = data.get("device", {})
model_info = data.get("model_name") or data.get("model_family", "")
serial = data.get("serial_number", "")
result["model"] = model_info
result["serial"] = serial
result["protocol"] = device.get("protocol", "")
# Power-on hours
result["power_on_hours"] = data.get("power_on_time", {}).get("hours")
# Temperature
temp = data.get("temperature", {})
result["temperature"] = temp.get("current")
# Overall health (from -H flag data embedded in -i)
smart_status = data.get("smart_status", {})
result["passed"] = smart_status.get("passed")
# Key attributes from ata_smart_attributes
attrs = {a["name"]: a for a in data.get("ata_smart_attributes", {}).get("table", [])}
result["reallocated_sectors"] = attrs.get("Reallocated_Sector_Ct", {}).get("raw", {}).get("value", 0)
result["pending_sectors"] = attrs.get("Current_Pending_Sector", {}).get("raw", {}).get("value", 0)
result["uncorrectable"] = attrs.get("Offline_Uncorrectable", {}).get("raw", {}).get("value", 0)
return result
def get_pool_status(self, pool_name: str) -> Dict[str, Any]:
"""
Get detailed pool status including VDEV tree and error counters
"""
if not ZFS_AVAILABLE:
return {}
stdout, stderr, rc = self.run_command(["zpool", "status", pool_name])
if rc != 0:
logger.error(f"zpool status failed for {pool_name}: {stderr}")
return {}
status: Dict[str, Any] = {
"name": pool_name,
"state": None,
"scan": None,
"errors": None,
"vdevs": [],
}
lines = stdout.split("\n")
in_config = False
config_lines: List[str] = []
for line in lines:
stripped = line.strip()
if stripped.startswith("state:"):
status["state"] = stripped.split(":", 1)[1].strip()
elif stripped.startswith("scan:"):
status["scan"] = stripped.split(":", 1)[1].strip()
elif stripped.startswith("errors:"):
status["errors"] = stripped.split(":", 1)[1].strip()
in_config = False
elif stripped == "config:":
in_config = True
elif in_config:
# Collect config block lines (skip blank lines at start)
if stripped or config_lines:
config_lines.append(line)
if config_lines:
parsed = self._parse_vdev_tree(config_lines)
if parsed and parsed[0]["name"] == pool_name:
status["vdevs"] = parsed[0]["children"]
else:
status["vdevs"] = parsed
# Annotate leaf vdevs with disk_id from /dev/disk/by-id/
disk_id_map = self.get_disk_id_map()
self._annotate_disk_ids(status["vdevs"], disk_id_map)
return status
def _annotate_disk_ids(self, vdevs: List[Dict], disk_id_map: Dict[str, str]) -> None:
"""Recursively annotate leaf vdev nodes with disk_id from by-id map."""
for vdev in vdevs:
children = vdev.get("children", [])
if not children:
name = vdev.get("name", "")
# Strip partition suffix (sda1 → sda)
base = re.sub(r'\d+$', '', name) if name[-1:].isdigit() else name
vdev["disk_id"] = disk_id_map.get(name) or disk_id_map.get(base)
else:
self._annotate_disk_ids(children, disk_id_map)
def scrub_pool(self, pool_name: str) -> Dict[str, str]:
"""
Start or resume scrub on pool
"""
stdout, stderr, rc = self.run_command(["zpool", "scrub", pool_name])
if rc != 0:
logger.error(f"zpool scrub failed for {pool_name}: {stderr}")
return {"status": "error", "message": stderr}
return {"status": "success", "message": f"Scrub started for {pool_name}"}
# ============== DATASET/FILESYSTEM OPERATIONS ==============
def list_datasets(self, pool_name: str, max_depth: int = 2) -> List[Dict[str, Any]]:
"""
List datasets in pool (with depth limit for performance)
"""
cached = self.cache.get("datasets")
if cached and cached.get(pool_name):
return cached[pool_name]
if not ZFS_AVAILABLE:
return []
stdout, stderr, rc = self.run_command([
"zfs", "list", "-d", str(max_depth), "-H", "-p",
"-o", "name,used,avail,refer,mountpoint,type",
pool_name
])
if rc != 0:
logger.error(f"zfs list failed for {pool_name}: {stderr}")
return []
datasets = []
for line in stdout.strip().split("\n"):
if not line:
continue
parts = line.split("\t")
if len(parts) < 6:
continue
dataset = {
"name": parts[0],
"used": int(parts[1]),
"avail": int(parts[2]),
"refer": int(parts[3]),
"mountpoint": parts[4],
"type": parts[5] # filesystem, volume, snapshot
}
datasets.append(dataset)
# Cache per pool
if not cached:
cached = {}
cached[pool_name] = datasets
self.cache.set("datasets", cached, ttl_seconds=60)
return datasets
def create_dataset(self, dataset_name: str, props: Optional[Dict[str, str]] = None) -> Dict[str, str]:
"""
Create new ZFS dataset/filesystem
"""
cmd = ["zfs", "create"]
if props:
for key, val in props.items():
cmd.extend(["-o", f"{key}={val}"])
cmd.append(dataset_name)
stdout, stderr, rc = self.run_command(cmd)
if rc != 0:
logger.error(f"zfs create failed: {stderr}")
return {"status": "error", "message": stderr}
return {"status": "success", "message": f"Dataset {dataset_name} created"}
def set_dataset_properties(self, dataset_name: str, props: Dict[str, str]) -> Dict[str, str]:
"""Set ZFS dataset properties (compression, quota, reservation, etc.)"""
errors = []
for key, value in props.items():
if value is None:
continue
stdout, stderr, rc = self.run_command(["zfs", "set", f"{key}={value}", dataset_name])
if rc != 0:
errors.append(f"{key}: {stderr.strip()}")
if errors:
return {"status": "error", "message": "; ".join(errors)}
return {"status": "success", "message": f"Properties updated for {dataset_name}"}
def destroy_dataset(self, dataset_name: str, recursive: bool = False) -> Dict[str, str]:
"""
Destroy ZFS dataset
"""
cmd = ["zfs", "destroy"]
if recursive:
cmd.append("-r")
cmd.append(dataset_name)
stdout, stderr, rc = self.run_command(cmd)
if rc != 0:
logger.error(f"zfs destroy failed: {stderr}")
return {"status": "error", "message": stderr}
return {"status": "success", "message": f"Dataset {dataset_name} destroyed"}
# ============== SNAPSHOT OPERATIONS ==============
def list_snapshots(self, dataset_name: Optional[str] = None, limit: int = 50) -> List[Dict[str, Any]]:
"""
List snapshots (with limit for performance on many snapshots)
"""
cache_key = f"snapshots:{dataset_name or '*'}"
cached = self.cache.get(cache_key)
if cached:
return cached
if not ZFS_AVAILABLE:
return []
if dataset_name:
# No -d limit so sub-datasets (e.g. tank/share) are included
cmd = ["zfs", "list", "-t", "snapshot", "-r", "-H", "-p",
"-o", "name,used,referenced,creation", dataset_name]
else: else:
cmd = ["zfs", "list", "-t", "snapshot", "-H", "-p", _annotate_disk_ids(children, disk_id_map)
"-o", "name,used,referenced,creation"]
stdout, stderr, rc = self.run_command(cmd)
def _parse_vdev_tree(config_lines: List[str]) -> List[Dict[str, Any]]:
roots: List[Dict] = []
stack: List[tuple] = []
for line in config_lines:
if not line.strip() or line.strip().startswith("NAME"):
continue
indent = len(line) - len(line.lstrip())
parts = line.split()
if not parts:
continue
vdev: Dict[str, Any] = {
"name": parts[0],
"state": parts[1] if len(parts) > 1 else "UNKNOWN",
"read": int(parts[2]) if len(parts) > 2 else 0,
"write": int(parts[3]) if len(parts) > 3 else 0,
"cksum": int(parts[4]) if len(parts) > 4 else 0,
"children": [],
}
while stack and stack[-1][0] >= indent:
stack.pop()
if stack:
stack[-1][1]["children"].append(vdev)
else:
roots.append(vdev)
stack.append((indent, vdev))
return roots
def get_pool_status(pool_name: str) -> Dict[str, Any]:
if not ZFS_AVAILABLE:
return {}
stdout, stderr, rc = _run(["zpool", "status", pool_name])
if rc != 0:
logger.error(f"zpool status failed for {pool_name}: {stderr}")
return {}
status: Dict[str, Any] = {"name": pool_name, "state": None, "scan": None, "errors": None, "vdevs": []}
in_config = False
config_lines: List[str] = []
for line in stdout.split("\n"):
stripped = line.strip()
if stripped.startswith("state:"):
status["state"] = stripped.split(":", 1)[1].strip()
elif stripped.startswith("scan:"):
status["scan"] = stripped.split(":", 1)[1].strip()
elif stripped.startswith("errors:"):
status["errors"] = stripped.split(":", 1)[1].strip()
in_config = False
elif stripped == "config:":
in_config = True
elif in_config and (stripped or config_lines):
config_lines.append(line)
if config_lines:
parsed = _parse_vdev_tree(config_lines)
status["vdevs"] = parsed[0]["children"] if parsed and parsed[0]["name"] == pool_name else parsed
_annotate_disk_ids(status["vdevs"], get_disk_id_map())
return status
def scrub_pool(pool_name: str) -> Dict[str, str]:
stdout, stderr, rc = _run(["zpool", "scrub", pool_name])
if rc != 0:
logger.error(f"zpool scrub failed for {pool_name}: {stderr}")
return {"status": "error", "message": stderr}
return {"status": "success", "message": f"Scrub started for {pool_name}"}
def get_smart_info(disk: str) -> Dict[str, Any]:
stdout, _, rc = _run(["smartctl", "-A", "-i", "--json", f"/dev/{disk}"])
if not stdout:
return {}
try:
data = json.loads(stdout)
except json.JSONDecodeError:
return {}
attrs = {a["name"]: a for a in data.get("ata_smart_attributes", {}).get("table", [])}
return {
"model": data.get("model_name") or data.get("model_family", ""),
"serial": data.get("serial_number", ""),
"protocol": data.get("device", {}).get("protocol", ""),
"power_on_hours": data.get("power_on_time", {}).get("hours"),
"temperature": data.get("temperature", {}).get("current"),
"passed": data.get("smart_status", {}).get("passed"),
"reallocated_sectors": attrs.get("Reallocated_Sector_Ct", {}).get("raw", {}).get("value", 0),
"pending_sectors": attrs.get("Current_Pending_Sector", {}).get("raw", {}).get("value", 0),
"uncorrectable": attrs.get("Offline_Uncorrectable", {}).get("raw", {}).get("value", 0),
}
# ── Dataset operations ────────────────────────────────────────────────────────
def list_datasets(pool_name: str, max_depth: int = 2) -> List[Dict[str, Any]]:
cached = _cache_get("datasets")
if cached and cached.get(pool_name):
return cached[pool_name]
if not ZFS_AVAILABLE:
return []
stdout, stderr, rc = _run([
"zfs", "list", "-d", str(max_depth), "-H", "-p",
"-o", "name,used,avail,refer,mountpoint,type", pool_name
])
if rc != 0:
logger.error(f"zfs list failed for {pool_name}: {stderr}")
return []
datasets = []
for line in stdout.strip().split("\n"):
if not line:
continue
parts = line.split("\t")
if len(parts) < 6:
continue
datasets.append({
"name": parts[0], "used": int(parts[1]), "avail": int(parts[2]),
"refer": int(parts[3]), "mountpoint": parts[4], "type": parts[5],
})
pool_cache = cached or {}
pool_cache[pool_name] = datasets
_cache_set("datasets", pool_cache, ttl=60)
return datasets
def create_dataset(dataset_name: str, props: Optional[Dict[str, str]] = None) -> Dict[str, str]:
cmd = ["zfs", "create"]
if props:
for key, val in props.items():
cmd.extend(["-o", f"{key}={val}"])
cmd.append(dataset_name)
stdout, stderr, rc = _run(cmd)
if rc != 0:
logger.error(f"zfs create failed: {stderr}")
return {"status": "error", "message": stderr}
return {"status": "success", "message": f"Dataset {dataset_name} created"}
def set_dataset_properties(dataset_name: str, props: Dict[str, str]) -> Dict[str, str]:
errors = []
for key, value in props.items():
if value is None:
continue
_, stderr, rc = _run(["zfs", "set", f"{key}={value}", dataset_name])
if rc != 0: if rc != 0:
logger.error(f"zfs list snapshots failed: {stderr}") errors.append(f"{key}: {stderr.strip()}")
return [] if errors:
return {"status": "error", "message": "; ".join(errors)}
return {"status": "success", "message": f"Properties updated for {dataset_name}"}
snapshots = []
for line in stdout.strip().split("\n"):
if not line:
continue
parts = line.split("\t") def destroy_dataset(dataset_name: str, recursive: bool = False) -> Dict[str, str]:
if len(parts) < 4: cmd = ["zfs", "destroy"] + (["-r"] if recursive else []) + [dataset_name]
continue stdout, stderr, rc = _run(cmd)
if rc != 0:
logger.error(f"zfs destroy failed: {stderr}")
return {"status": "error", "message": stderr}
return {"status": "success", "message": f"Dataset {dataset_name} destroyed"}
snapshot = { # ── Snapshot operations ───────────────────────────────────────────────────────
"name": parts[0],
"used": int(parts[1]),
"referenced": int(parts[2]),
"creation": int(parts[3]), # Unix timestamp
}
snapshots.append(snapshot)
# Sort by creation time (newest first) and limit def list_snapshots(dataset_name: Optional[str] = None, limit: int = 50) -> List[Dict[str, Any]]:
snapshots.sort(key=lambda x: x["creation"], reverse=True) cache_key = f"snapshots:{dataset_name or '*'}"
snapshots = snapshots[:limit] cached = _cache_get(cache_key)
if cached is not None:
return cached
if not ZFS_AVAILABLE:
return []
if dataset_name:
cmd = ["zfs", "list", "-t", "snapshot", "-r", "-H", "-p",
"-o", "name,used,referenced,creation", dataset_name]
else:
cmd = ["zfs", "list", "-t", "snapshot", "-H", "-p",
"-o", "name,used,referenced,creation"]
stdout, stderr, rc = _run(cmd)
if rc != 0:
logger.error(f"zfs list snapshots failed: {stderr}")
return []
snapshots = []
for line in stdout.strip().split("\n"):
if not line:
continue
parts = line.split("\t")
if len(parts) < 4:
continue
snapshots.append({
"name": parts[0], "used": int(parts[1]),
"referenced": int(parts[2]), "creation": int(parts[3]),
})
snapshots.sort(key=lambda x: x["creation"], reverse=True)
snapshots = snapshots[:limit]
_cache_set(cache_key, snapshots, ttl=60)
return snapshots
self.cache.set(cache_key, snapshots, ttl_seconds=60)
return snapshots
def create_snapshot(self, dataset_name: str, snapshot_name: Optional[str] = None) -> Dict[str, str]: def create_snapshot(dataset_name: str, snapshot_name: Optional[str] = None) -> Dict[str, str]:
""" if not snapshot_name:
Create snapshot with auto-generated name if not provided snapshot_name = datetime.now().strftime("%Y%m%d-%H%M%S")
""" full_name = f"{dataset_name}@{snapshot_name}"
if not snapshot_name: stdout, stderr, rc = _run(["zfs", "snapshot", full_name])
from datetime import datetime as dt if rc != 0:
snapshot_name = dt.now().strftime("%Y%m%d-%H%M%S") logger.error(f"zfs snapshot failed: {stderr}")
return {"status": "error", "message": stderr}
return {"status": "success", "message": f"Snapshot {full_name} created"}
full_name = f"{dataset_name}@{snapshot_name}"
stdout, stderr, rc = self.run_command(["zfs", "snapshot", full_name]) def destroy_snapshot(snapshot_name: str, recursive: bool = False) -> Dict[str, str]:
cmd = ["zfs", "destroy"] + (["-r"] if recursive else []) + [snapshot_name]
stdout, stderr, rc = _run(cmd)
if rc != 0:
logger.error(f"zfs destroy snapshot failed: {stderr}")
return {"status": "error", "message": stderr}
return {"status": "success", "message": f"Snapshot {snapshot_name} destroyed"}
if rc != 0:
logger.error(f"zfs snapshot failed: {stderr}")
return {"status": "error", "message": stderr}
return {"status": "success", "message": f"Snapshot {full_name} created"} def rollback_snapshot(snapshot_name: str) -> Dict[str, str]:
stdout, stderr, rc = _run(["zfs", "rollback", "-r", snapshot_name])
if rc != 0:
logger.error(f"zfs rollback failed: {stderr}")
return {"status": "error", "message": stderr}
return {"status": "success", "message": f"Rolled back to {snapshot_name}"}
def destroy_snapshot(self, snapshot_name: str, recursive: bool = False) -> Dict[str, str]:
"""
Destroy snapshot
"""
cmd = ["zfs", "destroy"]
if recursive:
cmd.append("-r")
cmd.append(snapshot_name)
stdout, stderr, rc = self.run_command(cmd) # Backward-compat shim so existing routers keep working unchanged
class _ZFSRunnerShim:
run_command = staticmethod(_run)
list_pools = staticmethod(list_pools)
get_pool_status = staticmethod(get_pool_status)
scrub_pool = staticmethod(scrub_pool)
get_disk_id_map = staticmethod(get_disk_id_map)
get_smart_info = staticmethod(get_smart_info)
list_datasets = staticmethod(list_datasets)
create_dataset = staticmethod(create_dataset)
set_dataset_properties = staticmethod(set_dataset_properties)
destroy_dataset = staticmethod(destroy_dataset)
list_snapshots = staticmethod(list_snapshots)
create_snapshot = staticmethod(create_snapshot)
destroy_snapshot = staticmethod(destroy_snapshot)
rollback_snapshot = staticmethod(rollback_snapshot)
clear_cache = staticmethod(clear_cache)
if rc != 0: zfs_runner = _ZFSRunnerShim()
logger.error(f"zfs destroy snapshot failed: {stderr}")
return {"status": "error", "message": stderr}
return {"status": "success", "message": f"Snapshot {snapshot_name} destroyed"}
def rollback_snapshot(self, snapshot_name: str) -> Dict[str, str]:
"""
Rollback dataset to snapshot
WARNING: Destroys data after snapshot!
"""
stdout, stderr, rc = self.run_command(["zfs", "rollback", "-r", snapshot_name])
if rc != 0:
logger.error(f"zfs rollback failed: {stderr}")
return {"status": "error", "message": stderr}
return {"status": "success", "message": f"Rolled back to {snapshot_name}"}
# ============== UTILITY ==============
def clear_cache(self):
"""Clear all caches"""
self.cache = ZFSCache()
logger.info("ZFS cache cleared")
# Global instance
zfs_runner = ZFSRunner()