Files
zmb-webui/backend/services/shares.py
T
Claude Code 92bed208e0 ZMB Webui: Complete Project – Rebrand & Initial Clean Commit
ARCHITECTURE
============
Backend: FastAPI + uvicorn (port 8000)
  - JWT authentication with PAM system users
  - ZFS CLI wrapper with caching (30-60s TTL)
  - WebSocket pool status broadcaster (30s interval)
  - Services: auth, zfs_runner, file_manager, shares, identities, system_info
  - Routers: pools, datasets, snapshots, shares, identities, navigator, system

Frontend: Next.js 15 + TypeScript (static export)
  - Incremental Static Regeneration (ISR) for weak hardware
  - Type-safe API client (lib/api.ts)
  - Dark mode + custom Tailwind theme
  - Pages: Dashboard, Login, Snapshots, Datasets, Shares, etc.

DEPLOYMENT
==========
Test Target: 192.168.1.179:8090 (Debian LXC)
Production: 10.66.120.3:9090 (Raspberry Pi 4GB ARM64)
Updater: Automated Gitea-based deployment (update-test.sh, update-pi.sh)

FEATURES COMPLETED
==================
Phase 3a: Dashboard Quick Stats (System, CPU, Memory, Storage)
  - Real-time stats with color-coded progress bars
  - Responsive grid layout (mobile: 1, tablet: 2, desktop: 4 columns)
  - ISR-optimized for fast loads on weak hardware

REBRANDING
==========
Renamed throughout:
  - Project: 'ZFS Manager' → 'ZMB Webui'
  - Services: 'zfs-manager' → 'zmb-webui'
  - Systemd units: zfs-manager-backend → zmb-webui-backend
  - Configuration files and documentation

Co-Authored-By: Patrick <patrick@perlbach24.de>
2026-04-22 00:43:05 +02:00

300 lines
10 KiB
Python

"""
Samba and NFS Shares Management
Handles /etc/samba/smb.conf and /etc/exports
"""
import re
import subprocess
import logging
from pathlib import Path
from typing import List, Dict, Any, Optional
logger = logging.getLogger(__name__)
SAMBA_CONFIG = Path("/etc/samba/smb.conf")
NFS_EXPORTS = Path("/etc/exports")
class SharesManager:
"""Manage Samba and NFS shares"""
def list_samba_shares(self) -> List[Dict[str, Any]]:
"""Parse /etc/samba/smb.conf and return shares"""
if not SAMBA_CONFIG.exists():
return []
shares = []
try:
with open(SAMBA_CONFIG, 'r') as f:
content = f.read()
current_share = None
for line in content.split('\n'):
line = line.strip()
if not line or line.startswith('#') or line.startswith(';'):
continue
if line.startswith('[') and line.endswith(']'):
current_share = line[1:-1]
if current_share.lower() != 'global':
shares.append({'name': current_share, 'path': None, 'comment': None})
else:
current_share = None
continue
if '=' in line and current_share:
key, value = line.split('=', 1)
key = key.strip().lower()
value = value.strip()
if key == 'path':
shares[-1]['path'] = value
elif key == 'comment':
shares[-1]['comment'] = value
return [s for s in shares if s['path']]
except Exception as e:
logger.error(f"Error parsing Samba config: {e}")
return []
def create_samba_share(self, name: str, path: str, comment: Optional[str] = None) -> bool:
"""Add Samba share to /etc/samba/smb.conf"""
if not SAMBA_CONFIG.exists() or not name.strip() or not path.strip():
return False
try:
name = name.strip()
path = path.strip()
section = f"\n[{name}]\n path = {path}\n"
if comment:
section += f" comment = {comment}\n"
section += f" browseable = yes\n read only = no\n"
with open(SAMBA_CONFIG, 'a') as f:
f.write(section)
subprocess.run(['smbcontrol', 'smbd', 'reload-config'], capture_output=True, timeout=10)
logger.info(f"Samba share created: {name}")
return True
except Exception as e:
logger.error(f"Error creating Samba share: {e}")
return False
def delete_samba_share(self, name: str) -> bool:
"""Remove Samba share from /etc/samba/smb.conf"""
if not SAMBA_CONFIG.exists():
return False
try:
with open(SAMBA_CONFIG, 'r') as f:
content = f.read()
pattern = rf"\n\[{re.escape(name)}\].*?(?=\n\[|\Z)"
new_content = re.sub(pattern, '', content, flags=re.DOTALL)
if new_content == content:
return False
with open(SAMBA_CONFIG, 'w') as f:
f.write(new_content)
subprocess.run(['smbcontrol', 'smbd', 'reload-config'], capture_output=True, timeout=10)
logger.info(f"Samba share deleted: {name}")
return True
except Exception as e:
logger.error(f"Error deleting Samba share: {e}")
return False
def list_nfs_shares(self) -> List[Dict[str, Any]]:
"""Parse /etc/exports and return NFS shares"""
if not NFS_EXPORTS.exists():
return []
shares = []
try:
with open(NFS_EXPORTS, 'r') as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
parts = line.split()
if len(parts) >= 2:
path = parts[0]
rest = ' '.join(parts[1:])
clients = rest[:rest.index('(')].strip() if '(' in rest else rest
options = rest[rest.index('(') + 1:rest.index(')')] if '(' in rest else None
shares.append({'path': path, 'clients': clients, 'options': options})
return shares
except Exception as e:
logger.error(f"Error parsing NFS exports: {e}")
return []
def create_nfs_share(self, path: str, clients: str, options: Optional[str] = None) -> bool:
"""Add NFS share to /etc/exports"""
if not NFS_EXPORTS.exists() or not path.strip() or not clients.strip():
return False
try:
path = path.strip()
clients = clients.strip()
if not options:
options = "rw,sync,no_subtree_check"
export_line = f"{path} {clients}({options})\n"
with open(NFS_EXPORTS, 'a') as f:
f.write(export_line)
subprocess.run(['exportfs', '-r'], capture_output=True, timeout=10)
logger.info(f"NFS share created: {path}")
return True
except Exception as e:
logger.error(f"Error creating NFS share: {e}")
return False
def delete_nfs_share(self, path: str) -> bool:
"""Remove NFS share from /etc/exports"""
if not NFS_EXPORTS.exists():
return False
try:
with open(NFS_EXPORTS, 'r') as f:
lines = f.readlines()
new_lines = [l for l in lines if not l.strip().startswith(path)]
if len(new_lines) == len(lines):
return False
with open(NFS_EXPORTS, 'w') as f:
f.writelines(new_lines)
subprocess.run(['exportfs', '-r'], capture_output=True, timeout=10)
logger.info(f"NFS share deleted: {path}")
return True
except Exception as e:
logger.error(f"Error deleting NFS share: {e}")
return False
def get_samba_global_config(self) -> Dict[str, Any]:
"""Read Samba global configuration section"""
if not SAMBA_CONFIG.exists():
return {"raw": ""}
try:
with open(SAMBA_CONFIG, 'r') as f:
content = f.read()
# Extract global section
global_section = ""
lines = content.split('\n')
in_global = False
for line in lines:
if line.strip().startswith('[global]'):
in_global = True
continue
if in_global:
if line.strip().startswith('['):
break
global_section += line + '\n'
return {"raw": global_section.strip()}
except Exception as e:
logger.error(f"Error reading Samba global config: {e}")
return {"raw": ""}
def set_samba_global_config(self, config_text: str) -> bool:
"""Write Samba global configuration section"""
if not SAMBA_CONFIG.exists():
return False
try:
with open(SAMBA_CONFIG, 'r') as f:
lines = f.readlines()
# Find global section and shares
output_lines = []
skip_global = False
for i, line in enumerate(lines):
if line.strip().startswith('[global]'):
skip_global = True
output_lines.append('[global]\n')
# Add config lines
for config_line in config_text.split('\n'):
if config_line.strip():
output_lines.append(' ' + config_line + '\n')
output_lines.append('\n')
continue
if skip_global:
if line.strip().startswith('['):
skip_global = False
output_lines.append(line)
continue
output_lines.append(line)
with open(SAMBA_CONFIG, 'w') as f:
f.writelines(output_lines)
subprocess.run(['smbcontrol', 'smbd', 'reload-config'], capture_output=True, timeout=10)
logger.info("Samba global config updated")
return True
except Exception as e:
logger.error(f"Error writing Samba global config: {e}")
return False
def import_samba_config(self, config_file: str) -> bool:
"""Import Samba configuration using net conf import"""
try:
# Use net conf import to load configuration from file
result = subprocess.run(
['net', 'conf', 'import', config_file],
capture_output=True,
timeout=10
)
if result.returncode == 0:
logger.info(f"Samba config imported from {config_file}")
return True
else:
logger.error(f"Failed to import Samba config: {result.stderr.decode()}")
return False
except Exception as e:
logger.error(f"Error importing Samba config: {e}")
return False
def get_nfs_config(self) -> Dict[str, Any]:
"""Read /etc/exports and return as config object"""
if not NFS_EXPORTS.exists():
return {"exports": "", "note": "NFS not configured"}
try:
with open(NFS_EXPORTS, 'r') as f:
content = f.read()
return {"exports": content, "path": str(NFS_EXPORTS)}
except Exception as e:
logger.error(f"Error reading NFS config: {e}")
return {"error": str(e), "path": str(NFS_EXPORTS)}
def set_nfs_config(self, content: str) -> bool:
"""Write to /etc/exports and reload NFS"""
if not NFS_EXPORTS.exists():
return False
try:
with open(NFS_EXPORTS, 'w') as f:
f.write(content)
subprocess.run(['exportfs', '-r'], capture_output=True, timeout=10)
logger.info("NFS config updated")
return True
except Exception as e:
logger.error(f"Error writing NFS config: {e}")
return False
share_manager = SharesManager()