""" System User and Group Management Wrapper around /etc/passwd, /etc/group, useradd, groupadd, etc. """ import subprocess import logging import pwd import grp from typing import List, Dict, Any, Optional logger = logging.getLogger(__name__) class SystemUserManager: """Manage system users and groups""" def list_users(self) -> List[Dict[str, Any]]: """List all system users""" users = [] try: for entry in pwd.getwall(): users.append({ "username": entry.pw_name, "uid": entry.pw_uid, "gid": entry.pw_gid, "gecos": entry.pw_gecos, "home": entry.pw_dir, "shell": entry.pw_shell }) except Exception as e: logger.error(f"Error listing users: {e}") return sorted(users, key=lambda u: u["uid"]) def list_groups(self) -> List[Dict[str, Any]]: """List all system groups""" groups = [] try: for entry in grp.getgrall(): groups.append({ "groupname": entry.gr_name, "gid": entry.gr_gid, "members": entry.gr_mem }) except Exception as e: logger.error(f"Error listing groups: {e}") return sorted(groups, key=lambda g: g["gid"]) def get_user(self, username: str) -> Optional[Dict[str, Any]]: """Get user details""" try: entry = pwd.getpwnam(username) return { "username": entry.pw_name, "uid": entry.pw_uid, "gid": entry.pw_gid, "gecos": entry.pw_gecos, "home": entry.pw_dir, "shell": entry.pw_shell } except KeyError: return None except Exception as e: logger.error(f"Error getting user {username}: {e}") return None def get_group(self, groupname: str) -> Optional[Dict[str, Any]]: """Get group details""" try: entry = grp.getgrnam(groupname) return { "groupname": entry.gr_name, "gid": entry.gr_gid, "members": entry.gr_mem } except KeyError: return None except Exception as e: logger.error(f"Error getting group {groupname}: {e}") return None def create_user( self, username: str, password: str, home_dir: Optional[str] = None, shell: str = "/bin/bash", groups: Optional[List[str]] = None ) -> Dict[str, str]: """Create new system user""" try: cmd = ["useradd"] if home_dir: cmd.extend(["-d", home_dir]) cmd.extend(["-s", shell]) if groups: cmd.extend(["-G", ",".join(groups)]) cmd.append(username) result = subprocess.run(cmd, capture_output=True, text=True, check=False) if result.returncode != 0: logger.error(f"useradd failed: {result.stderr}") return {"error": result.stderr} # Set password if password: # Use chpasswd for password setting passwd_cmd = subprocess.Popen( ["chpasswd"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True ) _, err = passwd_cmd.communicate(input=f"{username}:{password}\n") if passwd_cmd.returncode != 0: logger.error(f"chpasswd failed: {err}") return {"error": f"User created but password failed: {err}"} logger.info(f"Created user: {username}") return {"status": "success", "username": username} except Exception as e: logger.error(f"Error creating user: {e}") return {"error": str(e)} def delete_user(self, username: str, remove_home: bool = False) -> Dict[str, str]: """Delete system user""" try: cmd = ["userdel"] if remove_home: cmd.append("-r") cmd.append(username) result = subprocess.run(cmd, capture_output=True, text=True, check=False) if result.returncode != 0: logger.error(f"userdel failed: {result.stderr}") return {"error": result.stderr} logger.info(f"Deleted user: {username}") return {"status": "success", "message": f"User {username} deleted"} except Exception as e: logger.error(f"Error deleting user: {e}") return {"error": str(e)} def create_group(self, groupname: str) -> Dict[str, str]: """Create new system group""" try: result = subprocess.run( ["groupadd", groupname], capture_output=True, text=True, check=False ) if result.returncode != 0: logger.error(f"groupadd failed: {result.stderr}") return {"error": result.stderr} logger.info(f"Created group: {groupname}") return {"status": "success", "groupname": groupname} except Exception as e: logger.error(f"Error creating group: {e}") return {"error": str(e)} def delete_group(self, groupname: str) -> Dict[str, str]: """Delete system group""" try: result = subprocess.run( ["groupdel", groupname], capture_output=True, text=True, check=False ) if result.returncode != 0: logger.error(f"groupdel failed: {result.stderr}") return {"error": result.stderr} logger.info(f"Deleted group: {groupname}") return {"status": "success", "message": f"Group {groupname} deleted"} except Exception as e: logger.error(f"Error deleting group: {e}") return {"error": str(e)} def add_user_to_group(self, username: str, groupname: str) -> Dict[str, str]: """Add user to group""" try: result = subprocess.run( ["usermod", "-aG", groupname, username], capture_output=True, text=True, check=False ) if result.returncode != 0: logger.error(f"usermod failed: {result.stderr}") return {"error": result.stderr} logger.info(f"Added {username} to {groupname}") return {"status": "success", "message": f"{username} added to {groupname}"} except Exception as e: logger.error(f"Error adding user to group: {e}") return {"error": str(e)} # Global instance system_user_manager = SystemUserManager()