55ae3b79ae
Backend: - Use 'net conf setparm' to update individual parameters - Accept dictionary of key-value pairs in API Frontend: - Add Edit/Cancel/Save buttons to Samba Config tab - Inline editing of configuration values - Save changes back to registry Changes are applied immediately via net conf setparm. Co-Authored-By: Patrick <patrick@perlbach24.de>
289 lines
10 KiB
Python
289 lines
10 KiB
Python
"""
|
|
Samba and NFS Shares Management
|
|
Handles /etc/samba/smb.conf and /etc/exports
|
|
"""
|
|
|
|
import re
|
|
import subprocess
|
|
import logging
|
|
from pathlib import Path
|
|
from typing import List, Dict, Any, Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
SAMBA_CONFIG = Path("/etc/samba/smb.conf")
|
|
NFS_EXPORTS = Path("/etc/exports")
|
|
|
|
|
|
class SharesManager:
|
|
"""Manage Samba and NFS shares"""
|
|
|
|
def list_samba_shares(self) -> List[Dict[str, Any]]:
|
|
"""Parse /etc/samba/smb.conf and return shares"""
|
|
if not SAMBA_CONFIG.exists():
|
|
return []
|
|
|
|
shares = []
|
|
try:
|
|
with open(SAMBA_CONFIG, 'r') as f:
|
|
content = f.read()
|
|
|
|
current_share = None
|
|
for line in content.split('\n'):
|
|
line = line.strip()
|
|
if not line or line.startswith('#') or line.startswith(';'):
|
|
continue
|
|
|
|
if line.startswith('[') and line.endswith(']'):
|
|
current_share = line[1:-1]
|
|
if current_share.lower() != 'global':
|
|
shares.append({'name': current_share, 'path': None, 'comment': None})
|
|
else:
|
|
current_share = None
|
|
continue
|
|
|
|
if '=' in line and current_share:
|
|
key, value = line.split('=', 1)
|
|
key = key.strip().lower()
|
|
value = value.strip()
|
|
if key == 'path':
|
|
shares[-1]['path'] = value
|
|
elif key == 'comment':
|
|
shares[-1]['comment'] = value
|
|
|
|
return [s for s in shares if s['path']]
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error parsing Samba config: {e}")
|
|
return []
|
|
|
|
def create_samba_share(self, name: str, path: str, comment: Optional[str] = None) -> bool:
|
|
"""Add Samba share to /etc/samba/smb.conf"""
|
|
if not SAMBA_CONFIG.exists() or not name.strip() or not path.strip():
|
|
return False
|
|
|
|
try:
|
|
name = name.strip()
|
|
path = path.strip()
|
|
section = f"\n[{name}]\n path = {path}\n"
|
|
if comment:
|
|
section += f" comment = {comment}\n"
|
|
section += f" browseable = yes\n read only = no\n"
|
|
|
|
with open(SAMBA_CONFIG, 'a') as f:
|
|
f.write(section)
|
|
|
|
subprocess.run(['smbcontrol', 'smbd', 'reload-config'], capture_output=True, timeout=10)
|
|
logger.info(f"Samba share created: {name}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error creating Samba share: {e}")
|
|
return False
|
|
|
|
def delete_samba_share(self, name: str) -> bool:
|
|
"""Remove Samba share from /etc/samba/smb.conf"""
|
|
if not SAMBA_CONFIG.exists():
|
|
return False
|
|
|
|
try:
|
|
with open(SAMBA_CONFIG, 'r') as f:
|
|
content = f.read()
|
|
|
|
pattern = rf"\n\[{re.escape(name)}\].*?(?=\n\[|\Z)"
|
|
new_content = re.sub(pattern, '', content, flags=re.DOTALL)
|
|
|
|
if new_content == content:
|
|
return False
|
|
|
|
with open(SAMBA_CONFIG, 'w') as f:
|
|
f.write(new_content)
|
|
|
|
subprocess.run(['smbcontrol', 'smbd', 'reload-config'], capture_output=True, timeout=10)
|
|
logger.info(f"Samba share deleted: {name}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error deleting Samba share: {e}")
|
|
return False
|
|
|
|
def list_nfs_shares(self) -> List[Dict[str, Any]]:
|
|
"""Parse /etc/exports and return NFS shares"""
|
|
if not NFS_EXPORTS.exists():
|
|
return []
|
|
|
|
shares = []
|
|
try:
|
|
with open(NFS_EXPORTS, 'r') as f:
|
|
for line in f:
|
|
line = line.strip()
|
|
if not line or line.startswith('#'):
|
|
continue
|
|
|
|
parts = line.split()
|
|
if len(parts) >= 2:
|
|
path = parts[0]
|
|
rest = ' '.join(parts[1:])
|
|
clients = rest[:rest.index('(')].strip() if '(' in rest else rest
|
|
options = rest[rest.index('(') + 1:rest.index(')')] if '(' in rest else None
|
|
|
|
shares.append({'path': path, 'clients': clients, 'options': options})
|
|
|
|
return shares
|
|
except Exception as e:
|
|
logger.error(f"Error parsing NFS exports: {e}")
|
|
return []
|
|
|
|
def create_nfs_share(self, path: str, clients: str, options: Optional[str] = None) -> bool:
|
|
"""Add NFS share to /etc/exports"""
|
|
if not NFS_EXPORTS.exists() or not path.strip() or not clients.strip():
|
|
return False
|
|
|
|
try:
|
|
path = path.strip()
|
|
clients = clients.strip()
|
|
if not options:
|
|
options = "rw,sync,no_subtree_check"
|
|
|
|
export_line = f"{path} {clients}({options})\n"
|
|
|
|
with open(NFS_EXPORTS, 'a') as f:
|
|
f.write(export_line)
|
|
|
|
subprocess.run(['exportfs', '-r'], capture_output=True, timeout=10)
|
|
logger.info(f"NFS share created: {path}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error creating NFS share: {e}")
|
|
return False
|
|
|
|
def delete_nfs_share(self, path: str) -> bool:
|
|
"""Remove NFS share from /etc/exports"""
|
|
if not NFS_EXPORTS.exists():
|
|
return False
|
|
|
|
try:
|
|
with open(NFS_EXPORTS, 'r') as f:
|
|
lines = f.readlines()
|
|
|
|
new_lines = [l for l in lines if not l.strip().startswith(path)]
|
|
|
|
if len(new_lines) == len(lines):
|
|
return False
|
|
|
|
with open(NFS_EXPORTS, 'w') as f:
|
|
f.writelines(new_lines)
|
|
|
|
subprocess.run(['exportfs', '-r'], capture_output=True, timeout=10)
|
|
logger.info(f"NFS share deleted: {path}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error deleting NFS share: {e}")
|
|
return False
|
|
|
|
def get_samba_global_config(self) -> Dict[str, Any]:
|
|
"""Read Samba global configuration from registry using 'net conf list'"""
|
|
try:
|
|
result = subprocess.run(
|
|
['/usr/bin/net', 'conf', 'list'],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
return {"parameters": []}
|
|
|
|
parameters = []
|
|
in_global = False
|
|
for line in result.stdout.split('\n'):
|
|
if line.strip().startswith('[global]'):
|
|
in_global = True
|
|
continue
|
|
if in_global:
|
|
if line.strip().startswith('['):
|
|
break
|
|
line = line.strip()
|
|
if not line or line.startswith(';') or line.startswith('#'):
|
|
continue
|
|
if '=' in line:
|
|
key, value = line.split('=', 1)
|
|
parameters.append({
|
|
"key": key.strip(),
|
|
"value": value.strip()
|
|
})
|
|
|
|
return {"parameters": parameters}
|
|
except Exception as e:
|
|
logger.error(f"Error reading Samba registry config: {e}")
|
|
return {"parameters": []}
|
|
|
|
def set_samba_global_config(self, parameters: Dict[str, str]) -> bool:
|
|
"""Update Samba global configuration parameters using 'net conf setparm'"""
|
|
try:
|
|
for key, value in parameters.items():
|
|
result = subprocess.run(
|
|
['/usr/bin/net', 'conf', 'setparm', 'global', key, value],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10
|
|
)
|
|
if result.returncode != 0:
|
|
logger.error(f"Failed to set {key}={value}: {result.stderr}")
|
|
return False
|
|
|
|
logger.info(f"Samba global config updated: {len(parameters)} parameters")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error writing Samba global config: {e}")
|
|
return False
|
|
|
|
def import_samba_config(self, config_file: str) -> bool:
|
|
"""Import Samba configuration using net conf import"""
|
|
try:
|
|
# Use net conf import to load configuration from file
|
|
result = subprocess.run(
|
|
['net', 'conf', 'import', config_file],
|
|
capture_output=True,
|
|
timeout=10
|
|
)
|
|
if result.returncode == 0:
|
|
logger.info(f"Samba config imported from {config_file}")
|
|
return True
|
|
else:
|
|
logger.error(f"Failed to import Samba config: {result.stderr.decode()}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Error importing Samba config: {e}")
|
|
return False
|
|
|
|
def get_nfs_config(self) -> Dict[str, Any]:
|
|
"""Read /etc/exports and return as config object"""
|
|
if not NFS_EXPORTS.exists():
|
|
return {"exports": "", "note": "NFS not configured"}
|
|
|
|
try:
|
|
with open(NFS_EXPORTS, 'r') as f:
|
|
content = f.read()
|
|
return {"exports": content, "path": str(NFS_EXPORTS)}
|
|
except Exception as e:
|
|
logger.error(f"Error reading NFS config: {e}")
|
|
return {"error": str(e), "path": str(NFS_EXPORTS)}
|
|
|
|
def set_nfs_config(self, content: str) -> bool:
|
|
"""Write to /etc/exports and reload NFS"""
|
|
if not NFS_EXPORTS.exists():
|
|
return False
|
|
|
|
try:
|
|
with open(NFS_EXPORTS, 'w') as f:
|
|
f.write(content)
|
|
|
|
subprocess.run(['exportfs', '-r'], capture_output=True, timeout=10)
|
|
logger.info("NFS config updated")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error writing NFS config: {e}")
|
|
return False
|
|
|
|
|
|
share_manager = SharesManager()
|