""" Samba and NFS Shares Management """ import re import subprocess import logging from pathlib import Path from typing import List, Dict, Any, Optional logger = logging.getLogger(__name__) SAMBA_CONFIG = Path("/etc/samba/smb.conf") NFS_EXPORTS = Path("/etc/exports") def list_samba_shares() -> List[Dict[str, Any]]: if not SAMBA_CONFIG.exists(): return [] shares = [] try: with open(SAMBA_CONFIG) as f: content = f.read() current_share = None for line in content.split('\n'): line = line.strip() if not line or line.startswith('#') or line.startswith(';'): continue if line.startswith('[') and line.endswith(']'): current_share = line[1:-1] if current_share.lower() != 'global': shares.append({'name': current_share, 'path': None, 'comment': None}) else: current_share = None continue if '=' in line and current_share: key, value = line.split('=', 1) key = key.strip().lower() value = value.strip() if key == 'path': shares[-1]['path'] = value elif key == 'comment': shares[-1]['comment'] = value return [s for s in shares if s['path']] except Exception as e: logger.error(f"Error parsing Samba config: {e}") return [] def create_samba_share(name: str, path: str, comment: Optional[str] = None) -> bool: if not SAMBA_CONFIG.exists() or not name.strip() or not path.strip(): return False try: section = f"\n[{name.strip()}]\n path = {path.strip()}\n" if comment: section += f" comment = {comment}\n" section += " browseable = yes\n read only = no\n" with open(SAMBA_CONFIG, 'a') as f: f.write(section) subprocess.run(['/usr/bin/smbcontrol', 'smbd', 'reload-config'], capture_output=True, timeout=10) logger.info(f"Samba share created: {name}") return True except Exception as e: logger.error(f"Error creating Samba share: {e}") return False def update_samba_share(old_name: str, new_name: str, path: str, comment: Optional[str] = None) -> bool: if not SAMBA_CONFIG.exists(): return False try: with open(SAMBA_CONFIG) as f: content = f.read() pattern = rf"\n\[{re.escape(old_name)}\].*?(?=\n\[|\Z)" if not re.search(pattern, content, flags=re.DOTALL): return False section = f"\n[{new_name}]\n path = {path}\n" if comment: section += f" comment = {comment}\n" section += " browseable = yes\n read only = no\n" with open(SAMBA_CONFIG, 'w') as f: f.write(re.sub(pattern, section, content, flags=re.DOTALL)) subprocess.run(['/usr/bin/smbcontrol', 'smbd', 'reload-config'], capture_output=True, timeout=10) logger.info(f"Samba share updated: {old_name} → {new_name}") return True except Exception as e: logger.error(f"Error updating Samba share: {e}") return False def delete_samba_share(name: str) -> bool: if not SAMBA_CONFIG.exists(): return False try: with open(SAMBA_CONFIG) as f: content = f.read() pattern = rf"\n\[{re.escape(name)}\].*?(?=\n\[|\Z)" new_content = re.sub(pattern, '', content, flags=re.DOTALL) if new_content == content: return False with open(SAMBA_CONFIG, 'w') as f: f.write(new_content) subprocess.run(['/usr/bin/smbcontrol', 'smbd', 'reload-config'], capture_output=True, timeout=10) logger.info(f"Samba share deleted: {name}") return True except Exception as e: logger.error(f"Error deleting Samba share: {e}") return False def list_nfs_shares() -> List[Dict[str, Any]]: if not NFS_EXPORTS.exists(): return [] shares = [] try: with open(NFS_EXPORTS) as f: for line in f: line = line.strip() if not line or line.startswith('#'): continue parts = line.split() if len(parts) >= 2: path = parts[0] rest = ' '.join(parts[1:]) clients = rest[:rest.index('(')].strip() if '(' in rest else rest options = rest[rest.index('(') + 1:rest.index(')')] if '(' in rest else None shares.append({'path': path, 'clients': clients, 'options': options}) return shares except Exception as e: logger.error(f"Error parsing NFS exports: {e}") return [] def create_nfs_share(path: str, clients: str, options: Optional[str] = None) -> bool: if not NFS_EXPORTS.exists() or not path.strip() or not clients.strip(): return False try: opts = options or "rw,sync,no_subtree_check" with open(NFS_EXPORTS, 'a') as f: f.write(f"{path.strip()} {clients.strip()}({opts})\n") subprocess.run(['/usr/sbin/exportfs', '-r'], capture_output=True, timeout=10) logger.info(f"NFS share created: {path}") return True except Exception as e: logger.error(f"Error creating NFS share: {e}") return False def delete_nfs_share(path: str) -> bool: if not NFS_EXPORTS.exists(): return False try: with open(NFS_EXPORTS) as f: lines = f.readlines() new_lines = [l for l in lines if not l.strip().startswith(path)] if len(new_lines) == len(lines): return False with open(NFS_EXPORTS, 'w') as f: f.writelines(new_lines) subprocess.run(['/usr/sbin/exportfs', '-r'], capture_output=True, timeout=10) logger.info(f"NFS share deleted: {path}") return True except Exception as e: logger.error(f"Error deleting NFS share: {e}") return False def get_samba_global_config() -> Dict[str, Any]: try: result = subprocess.run( ['/usr/bin/net', 'conf', 'list'], capture_output=True, text=True, timeout=10 ) if result.returncode != 0: return {"parameters": []} parameters = [] in_global = False for line in result.stdout.split('\n'): if line.strip().startswith('[global]'): in_global = True continue if in_global: if line.strip().startswith('['): break line = line.strip() if not line or line.startswith(';') or line.startswith('#'): continue if '=' in line: key, value = line.split('=', 1) parameters.append({"key": key.strip(), "value": value.strip()}) return {"parameters": parameters} except Exception as e: logger.error(f"Error reading Samba registry config: {e}") return {"parameters": []} def set_samba_global_config(parameters: Dict[str, str]) -> bool: try: current_keys = {p["key"] for p in get_samba_global_config().get("parameters", [])} for key in current_keys - set(parameters.keys()): subprocess.run(['/usr/bin/net', 'conf', 'delparm', 'global', key], capture_output=True, text=True, timeout=10) for key, value in parameters.items(): result = subprocess.run( ['/usr/bin/net', 'conf', 'setparm', 'global', key, '--', value], capture_output=True, text=True, timeout=10 ) if result.returncode != 0: logger.error(f"Failed to set {key}={value}: {result.stderr}") return False logger.info(f"Samba global config updated: {len(parameters)} parameters") return True except Exception as e: logger.error(f"Error writing Samba global config: {e}") return False def import_samba_config(config_file: str) -> bool: try: result = subprocess.run( ['/usr/bin/net', 'conf', 'import', config_file], capture_output=True, timeout=10 ) if result.returncode == 0: logger.info(f"Samba config imported from {config_file}") return True logger.error(f"Failed to import Samba config: {result.stderr.decode()}") return False except Exception as e: logger.error(f"Error importing Samba config: {e}") return False def get_nfs_config() -> Dict[str, Any]: if not NFS_EXPORTS.exists(): return {"exports": "", "note": "NFS not configured"} try: with open(NFS_EXPORTS) as f: return {"exports": f.read(), "path": str(NFS_EXPORTS)} except Exception as e: logger.error(f"Error reading NFS config: {e}") return {"error": str(e), "path": str(NFS_EXPORTS)} def set_nfs_config(content: str) -> bool: if not NFS_EXPORTS.exists(): return False try: with open(NFS_EXPORTS, 'w') as f: f.write(content) subprocess.run(['/usr/sbin/exportfs', '-r'], capture_output=True, timeout=10) logger.info("NFS config updated") return True except Exception as e: logger.error(f"Error writing NFS config: {e}") return False # Backward-compat shim — routers can use either style class _ShareManagerShim: list_samba_shares = staticmethod(list_samba_shares) create_samba_share = staticmethod(create_samba_share) update_samba_share = staticmethod(update_samba_share) delete_samba_share = staticmethod(delete_samba_share) list_nfs_shares = staticmethod(list_nfs_shares) create_nfs_share = staticmethod(create_nfs_share) delete_nfs_share = staticmethod(delete_nfs_share) get_samba_global_config = staticmethod(get_samba_global_config) set_samba_global_config = staticmethod(set_samba_global_config) import_samba_config = staticmethod(import_samba_config) get_nfs_config = staticmethod(get_nfs_config) set_nfs_config = staticmethod(set_nfs_config) share_manager = _ShareManagerShim()