Files
zmb-webui/backend/services/shares.py
T
patrick a187b625bc Fix: Identities Group Management - bessere Fehlermeldungen
- add_user_to_group: Exception werfen mit stderr Nachricht
- remove_user_from_group: Exception werfen mit stderr Nachricht
- text=True für subprocess für besseres Error Handling
- Router aktualisiert um Fehlermeldungen an Frontend weiterzugeben
- Benutzer sehen jetzt detaillierte Fehlermeldungen beim Gruppe-Entfernen

Behebt: 'Failed to remove user from group' verschluckt die echte Fehlermeldung

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2026-06-04 14:58:50 +02:00

323 lines
11 KiB
Python

"""
Samba and NFS Shares Management
Handles /etc/samba/smb.conf and /etc/exports
"""
import re
import subprocess
import logging
from pathlib import Path
from typing import List, Dict, Any, Optional
logger = logging.getLogger(__name__)
SAMBA_CONFIG = Path("/etc/samba/smb.conf")
NFS_EXPORTS = Path("/etc/exports")
class SharesManager:
"""Manage Samba and NFS shares"""
def list_samba_shares(self) -> List[Dict[str, Any]]:
"""Parse /etc/samba/smb.conf and return shares"""
if not SAMBA_CONFIG.exists():
return []
shares = []
try:
with open(SAMBA_CONFIG, 'r') as f:
content = f.read()
current_share = None
for line in content.split('\n'):
line = line.strip()
if not line or line.startswith('#') or line.startswith(';'):
continue
if line.startswith('[') and line.endswith(']'):
current_share = line[1:-1]
if current_share.lower() != 'global':
shares.append({'name': current_share, 'path': None, 'comment': None})
else:
current_share = None
continue
if '=' in line and current_share:
key, value = line.split('=', 1)
key = key.strip().lower()
value = value.strip()
if key == 'path':
shares[-1]['path'] = value
elif key == 'comment':
shares[-1]['comment'] = value
return [s for s in shares if s['path']]
except Exception as e:
logger.error(f"Error parsing Samba config: {e}")
return []
def create_samba_share(self, name: str, path: str, comment: Optional[str] = None) -> bool:
"""Add Samba share to /etc/samba/smb.conf"""
if not SAMBA_CONFIG.exists() or not name.strip() or not path.strip():
return False
try:
name = name.strip()
path = path.strip()
section = f"\n[{name}]\n path = {path}\n"
if comment:
section += f" comment = {comment}\n"
section += f" browseable = yes\n read only = no\n"
with open(SAMBA_CONFIG, 'a') as f:
f.write(section)
subprocess.run(['/usr/bin/smbcontrol', 'smbd', 'reload-config'], capture_output=True, timeout=10)
logger.info(f"Samba share created: {name}")
return True
except Exception as e:
logger.error(f"Error creating Samba share: {e}")
return False
def update_samba_share(self, old_name: str, new_name: str, path: str, comment: Optional[str] = None) -> bool:
"""Update Samba share in /etc/samba/smb.conf"""
if not SAMBA_CONFIG.exists():
return False
try:
with open(SAMBA_CONFIG, 'r') as f:
content = f.read()
# Find and replace the share section
pattern = rf"\n\[{re.escape(old_name)}\].*?(?=\n\[|\Z)"
match = re.search(pattern, content, flags=re.DOTALL)
if not match:
return False
# Build new section
section = f"\n[{new_name}]\n path = {path}\n"
if comment:
section += f" comment = {comment}\n"
section += f" browseable = yes\n read only = no\n"
new_content = re.sub(pattern, section, content, flags=re.DOTALL)
with open(SAMBA_CONFIG, 'w') as f:
f.write(new_content)
subprocess.run(['/usr/bin/smbcontrol', 'smbd', 'reload-config'], capture_output=True, timeout=10)
logger.info(f"Samba share updated: {old_name}{new_name}")
return True
except Exception as e:
logger.error(f"Error updating Samba share: {e}")
return False
def delete_samba_share(self, name: str) -> bool:
"""Remove Samba share from /etc/samba/smb.conf"""
if not SAMBA_CONFIG.exists():
return False
try:
with open(SAMBA_CONFIG, 'r') as f:
content = f.read()
pattern = rf"\n\[{re.escape(name)}\].*?(?=\n\[|\Z)"
new_content = re.sub(pattern, '', content, flags=re.DOTALL)
if new_content == content:
return False
with open(SAMBA_CONFIG, 'w') as f:
f.write(new_content)
subprocess.run(['/usr/bin/smbcontrol', 'smbd', 'reload-config'], capture_output=True, timeout=10)
logger.info(f"Samba share deleted: {name}")
return True
except Exception as e:
logger.error(f"Error deleting Samba share: {e}")
return False
def list_nfs_shares(self) -> List[Dict[str, Any]]:
"""Parse /etc/exports and return NFS shares"""
if not NFS_EXPORTS.exists():
return []
shares = []
try:
with open(NFS_EXPORTS, 'r') as f:
for line in f:
line = line.strip()
if not line or line.startswith('#'):
continue
parts = line.split()
if len(parts) >= 2:
path = parts[0]
rest = ' '.join(parts[1:])
clients = rest[:rest.index('(')].strip() if '(' in rest else rest
options = rest[rest.index('(') + 1:rest.index(')')] if '(' in rest else None
shares.append({'path': path, 'clients': clients, 'options': options})
return shares
except Exception as e:
logger.error(f"Error parsing NFS exports: {e}")
return []
def create_nfs_share(self, path: str, clients: str, options: Optional[str] = None) -> bool:
"""Add NFS share to /etc/exports"""
if not NFS_EXPORTS.exists() or not path.strip() or not clients.strip():
return False
try:
path = path.strip()
clients = clients.strip()
if not options:
options = "rw,sync,no_subtree_check"
export_line = f"{path} {clients}({options})\n"
with open(NFS_EXPORTS, 'a') as f:
f.write(export_line)
subprocess.run(['/usr/sbin/exportfs', '-r'], capture_output=True, timeout=10)
logger.info(f"NFS share created: {path}")
return True
except Exception as e:
logger.error(f"Error creating NFS share: {e}")
return False
def delete_nfs_share(self, path: str) -> bool:
"""Remove NFS share from /etc/exports"""
if not NFS_EXPORTS.exists():
return False
try:
with open(NFS_EXPORTS, 'r') as f:
lines = f.readlines()
new_lines = [l for l in lines if not l.strip().startswith(path)]
if len(new_lines) == len(lines):
return False
with open(NFS_EXPORTS, 'w') as f:
f.writelines(new_lines)
subprocess.run(['/usr/sbin/exportfs', '-r'], capture_output=True, timeout=10)
logger.info(f"NFS share deleted: {path}")
return True
except Exception as e:
logger.error(f"Error deleting NFS share: {e}")
return False
def get_samba_global_config(self) -> Dict[str, Any]:
"""Read Samba global configuration from registry using 'net conf list'"""
try:
result = subprocess.run(
['/usr/bin/net', 'conf', 'list'],
capture_output=True,
text=True,
timeout=10
)
if result.returncode != 0:
return {"parameters": []}
parameters = []
in_global = False
for line in result.stdout.split('\n'):
if line.strip().startswith('[global]'):
in_global = True
continue
if in_global:
if line.strip().startswith('['):
break
line = line.strip()
if not line or line.startswith(';') or line.startswith('#'):
continue
if '=' in line:
key, value = line.split('=', 1)
parameters.append({
"key": key.strip(),
"value": value.strip()
})
return {"parameters": parameters}
except Exception as e:
logger.error(f"Error reading Samba registry config: {e}")
return {"parameters": []}
def set_samba_global_config(self, parameters: Dict[str, str]) -> bool:
"""Update Samba global configuration parameters using 'net conf setparm'"""
try:
for key, value in parameters.items():
result = subprocess.run(
['/usr/bin/net', 'conf', 'setparm', 'global', key, value],
capture_output=True,
text=True,
timeout=10
)
if result.returncode != 0:
logger.error(f"Failed to set {key}={value}: {result.stderr}")
return False
logger.info(f"Samba global config updated: {len(parameters)} parameters")
return True
except Exception as e:
logger.error(f"Error writing Samba global config: {e}")
return False
def import_samba_config(self, config_file: str) -> bool:
"""Import Samba configuration using net conf import"""
try:
# Use net conf import to load configuration from file
result = subprocess.run(
['net', 'conf', 'import', config_file],
capture_output=True,
timeout=10
)
if result.returncode == 0:
logger.info(f"Samba config imported from {config_file}")
return True
else:
logger.error(f"Failed to import Samba config: {result.stderr.decode()}")
return False
except Exception as e:
logger.error(f"Error importing Samba config: {e}")
return False
def get_nfs_config(self) -> Dict[str, Any]:
"""Read /etc/exports and return as config object"""
if not NFS_EXPORTS.exists():
return {"exports": "", "note": "NFS not configured"}
try:
with open(NFS_EXPORTS, 'r') as f:
content = f.read()
return {"exports": content, "path": str(NFS_EXPORTS)}
except Exception as e:
logger.error(f"Error reading NFS config: {e}")
return {"error": str(e), "path": str(NFS_EXPORTS)}
def set_nfs_config(self, content: str) -> bool:
"""Write to /etc/exports and reload NFS"""
if not NFS_EXPORTS.exists():
return False
try:
with open(NFS_EXPORTS, 'w') as f:
f.write(content)
subprocess.run(['/usr/sbin/exportfs', '-r'], capture_output=True, timeout=10)
logger.info("NFS config updated")
return True
except Exception as e:
logger.error(f"Error writing NFS config: {e}")
return False
share_manager = SharesManager()