""" User and Group Management Service Handle system users, groups, and PAM operations """ import subprocess import logging import pwd import grp from typing import List, Dict, Any, Optional from pathlib import Path try: import spwd except ImportError: spwd = None logger = logging.getLogger(__name__) class IdentitiesManager: """Manage system users and groups""" def list_users(self) -> List[Dict[str, Any]]: """List all system users""" users = [] try: # Use getpwall() which returns an iterator import getpass # Fallback: read /etc/passwd directly result = subprocess.run( ["/usr/bin/getent", "passwd"], capture_output=True, text=True, timeout=5 ) if result.returncode == 0: for line in result.stdout.strip().split('\n'): if not line: continue parts = line.split(':') if len(parts) < 7: continue try: username = parts[0] uid = int(parts[2]) gid = int(parts[3]) gecos = parts[4] home = parts[5] shell = parts[6] users.append({ 'username': username, 'uid': uid, 'gid': gid, 'gecos': gecos, 'home': home, 'shell': shell, 'locked': self._is_user_locked(username) }) except Exception as e: logger.warning(f"Error parsing user line: {e}") return sorted(users, key=lambda x: x['uid']) except Exception as e: logger.error(f"Error listing users: {e}") return [] def list_groups(self) -> List[Dict[str, Any]]: """List all system groups""" groups = [] try: # Use getent to read groups result = subprocess.run( ["/usr/bin/getent", "group"], capture_output=True, text=True, timeout=5 ) if result.returncode == 0: for line in result.stdout.strip().split('\n'): if not line: continue parts = line.split(':') if len(parts) < 4: continue try: groupname = parts[0] gid = int(parts[2]) members = [m.strip() for m in parts[3].split(',') if m.strip()] groups.append({ 'groupname': groupname, 'gid': gid, 'members': members }) except Exception as e: logger.warning(f"Error parsing group line: {e}") return sorted(groups, key=lambda x: x['gid']) except Exception as e: logger.error(f"Error listing groups: {e}") return [] def get_user_groups(self, username: str) -> List[str]: """Get all groups a user belongs to""" try: groups = [] for entry in grp.getall(): if username in entry.gr_mem: groups.append(entry.gr_name) # Also add primary group try: user_entry = pwd.getpwnam(username) primary_group = grp.getgrgid(user_entry.pw_gid) if primary_group.gr_name not in groups: groups.append(primary_group.gr_name) except KeyError: pass return sorted(groups) except Exception as e: logger.error(f"Error getting groups for {username}: {e}") return [] def create_user(self, username: str, home_dir: Optional[str] = None, shell: str = "/bin/bash", gecos: str = "") -> bool: """Create new system user""" try: # Build useradd command cmd = ["/usr/sbin/useradd"] if home_dir: cmd.extend(["-d", home_dir]) else: cmd.extend(["-d", f"/home/{username}"]) cmd.extend(["-s", shell]) if gecos: cmd.extend(["-c", gecos]) cmd.extend(["-m", username]) # -m to create home directory result = subprocess.run(cmd, capture_output=True, timeout=10) if result.returncode == 0: logger.info(f"User created: {username}") return True else: logger.error(f"Failed to create user {username}: {result.stderr.decode()}") return False except Exception as e: logger.error(f"Error creating user: {e}") return False def delete_user(self, username: str, remove_home: bool = True) -> bool: """Delete system user and Samba user""" try: # First delete Samba user if exists (using pdbedit for better handling) try: subprocess.run( ["/usr/bin/pdbedit", "-x", "-u", username], capture_output=True, timeout=10 ) except Exception: pass # Samba not installed or user doesn't exist # Delete system user cmd = ["/usr/sbin/userdel"] if remove_home: cmd.append("-r") cmd.append(username) result = subprocess.run(cmd, capture_output=True, timeout=10) if result.returncode == 0: logger.info(f"User deleted: {username}") return True else: logger.error(f"Failed to delete user {username}: {result.stderr.decode()}") return False except Exception as e: logger.error(f"Error deleting user: {e}") return False def create_group(self, groupname: str) -> bool: """Create new system group""" try: result = subprocess.run( ["/usr/sbin/groupadd", groupname], capture_output=True, timeout=10 ) if result.returncode == 0: logger.info(f"Group created: {groupname}") return True else: logger.error(f"Failed to create group {groupname}: {result.stderr.decode()}") return False except Exception as e: logger.error(f"Error creating group: {e}") return False def delete_group(self, groupname: str) -> bool: """Delete system group""" try: result = subprocess.run( ["/usr/sbin/groupdel", groupname], capture_output=True, timeout=10 ) if result.returncode == 0: logger.info(f"Group deleted: {groupname}") return True else: logger.error(f"Failed to delete group {groupname}: {result.stderr.decode()}") return False except Exception as e: logger.error(f"Error deleting group: {e}") return False def add_user_to_group(self, username: str, groupname: str) -> bool: """Add user to group""" try: result = subprocess.run( ["/usr/sbin/usermod", "-aG", groupname, username], capture_output=True, text=True, timeout=10 ) if result.returncode == 0: logger.info(f"User {username} added to group {groupname}") return True else: error_msg = result.stderr.strip() or result.stdout.strip() or "Unknown error" logger.error(f"Failed to add user to group: {error_msg}") raise Exception(f"Failed to add user to group: {error_msg}") except Exception as e: logger.error(f"Error adding user to group: {e}") raise def remove_user_from_group(self, username: str, groupname: str) -> bool: """Remove user from group""" try: result = subprocess.run( ["/usr/sbin/gpasswd", "-d", username, groupname], capture_output=True, text=True, timeout=10 ) if result.returncode == 0: logger.info(f"User {username} removed from group {groupname}") return True else: error_msg = result.stderr.strip() or result.stdout.strip() or "Unknown error" logger.error(f"Failed to remove user from group: {error_msg}") raise Exception(f"Failed to remove user from group: {error_msg}") except Exception as e: logger.error(f"Error removing user from group: {e}") raise def change_password(self, username: str, password: str) -> bool: """Change user password via chpasswd""" try: # Use chpasswd for password changes result = subprocess.run( ["/usr/sbin/chpasswd"], input=f"{username}:{password}\n", text=True, capture_output=True, timeout=10 ) if result.returncode == 0: logger.info(f"Password changed for {username}") return True else: logger.error(f"Failed to change password: {result.stderr}") return False except Exception as e: logger.error(f"Error changing password: {e}") return False def change_shell(self, username: str, shell: str) -> bool: """Change user shell""" try: result = subprocess.run( ["/usr/sbin/usermod", "-s", shell, username], capture_output=True, timeout=10 ) if result.returncode == 0: logger.info(f"Shell changed for {username} to {shell}") return True else: logger.error(f"Failed to change shell: {result.stderr.decode()}") return False except Exception as e: logger.error(f"Error changing shell: {e}") return False def _is_user_locked(self, username: str) -> bool: """Check if user account is locked""" if not spwd: return False try: entry = spwd.getspnam(username) return entry.sp_lstchg == 0 or entry.sp_max == 0 except (KeyError, PermissionError): return False except Exception as e: logger.warning(f"Error checking lock status: {e}") return False def lock_user(self, username: str) -> bool: """Lock user account""" try: result = subprocess.run( ["/usr/sbin/usermod", "-L", username], capture_output=True, timeout=10 ) if result.returncode == 0: logger.info(f"User locked: {username}") return True else: logger.error(f"Failed to lock user: {result.stderr.decode()}") return False except Exception as e: logger.error(f"Error locking user: {e}") return False def unlock_user(self, username: str) -> bool: """Unlock user account""" try: result = subprocess.run( ["/usr/sbin/usermod", "-U", username], capture_output=True, timeout=10 ) if result.returncode == 0: logger.info(f"User unlocked: {username}") return True else: logger.error(f"Failed to unlock user: {result.stderr.decode()}") return False except Exception as e: logger.error(f"Error unlocking user: {e}") return False def set_samba_password(self, username: str, password: str) -> bool: """Set Samba password for user""" try: # Use smbpasswd to set Samba password # -a flag: add/update user # -s flag: read password from stdin result = subprocess.run( ["/usr/bin/smbpasswd", "-a", "-s", username], input=f"{password}\n{password}\n", text=True, capture_output=True, timeout=10 ) if result.returncode == 0: logger.info(f"Samba password set for {username}") return True else: logger.error(f"Failed to set Samba password: {result.stderr}") return False except FileNotFoundError: logger.error("smbpasswd command not found - Samba not installed?") return False except Exception as e: logger.error(f"Error setting Samba password: {e}") return False def get_login_history(self, limit: int = 50) -> List[Dict[str, Any]]: """Get recent login history using last command""" import re from datetime import datetime logins = [] days_of_week = {'Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'} months = {'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6, 'Jul': 7, 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12} current_year = datetime.now().year try: result = subprocess.run( ["last", "-n", str(limit)], capture_output=True, text=True, timeout=10 ) if result.returncode == 0: for line in result.stdout.strip().split('\n'): if not line.strip(): continue # Skip header, footer, and system entries if 'wtmp' in line or 'begins' in line: continue try: # Find the day-of-week anchor (reliable marker for date section) tokens = line.split() day_idx = -1 for i, token in enumerate(tokens): if token in days_of_week: day_idx = i break if day_idx < 1: # Need at least username before day continue # Extract components from token positions username = tokens[0] # Skip system entries if username in ['wtmp', 'reboot', 'kernel']: continue # Try to find full username from /etc/passwd (wtmp truncates to 8 chars) full_username = self._get_full_username(username) if full_username: username = full_username # Everything between username and day-of-week is TTY/host device_host_tokens = tokens[1:day_idx] # TTY is typically first token if it contains '/' or starts with 'pts'/'tty' tty = '-' host = '-' if device_host_tokens: first = device_host_tokens[0] if '/' in first or first.startswith('pts') or first.startswith('tty'): tty = first # Remaining tokens are host if len(device_host_tokens) > 1: host = ' '.join(device_host_tokens[1:]) else: # No TTY, all tokens are host host = ' '.join(device_host_tokens) # Extract date components (should be: day month date time) # Note: year is NOT in standard last output, we need to infer it if day_idx + 3 < len(tokens): day = tokens[day_idx] month = tokens[day_idx + 1] date = tokens[day_idx + 2] time_str = tokens[day_idx + 3] # Validate month if month not in months: logger.debug(f"Invalid month '{month}' in line: {line}") continue # Use current year (last entries are usually recent) year = current_year else: logger.debug(f"Not enough date tokens in line: {line}") continue # Extract duration from end of line (in parentheses) duration_match = re.search(r'\(([^)]+)\)\s*$', line) duration = duration_match.group(1) if duration_match else 'still logged in' logins.append({ 'username': username, 'tty': tty, 'host': host, 'date': f"{year}-{months[month]:02d}-{date.zfill(2)}", 'time': time_str, 'duration': duration, 'login_str': f"{day} {month} {date} {time_str} {year}" }) except Exception as e: logger.debug(f"Error parsing login line '{line}': {e}") continue return logins except Exception as e: logger.error(f"Error getting login history: {e}") return [] def _get_full_username(self, truncated: str) -> Optional[str]: """Find full username from /etc/passwd when wtmp has truncated it (8 char limit) wtmp truncates usernames to 8 characters, so we need to look up the full name in /etc/passwd by matching usernames that start with the truncated name. """ try: result = subprocess.run( ["/usr/bin/getent", "passwd"], capture_output=True, text=True, timeout=5 ) if result.returncode == 0: for line in result.stdout.strip().split('\n'): if not line: continue parts = line.split(':') if parts and parts[0].startswith(truncated): # Found a username that starts with the truncated name if len(parts[0]) > len(truncated): # It's longer than the truncated version return parts[0] return None except Exception as e: logger.debug(f"Error finding full username for '{truncated}': {e}") return None def list_samba_users(self) -> List[Dict[str, Any]]: """List all Samba users using pdbedit""" users = [] try: result = subprocess.run( ["/usr/bin/pdbedit", "-L"], capture_output=True, text=True, timeout=5 ) if result.returncode == 0: for line in result.stdout.strip().split('\n'): if not line.strip(): continue # pdbedit output format: username:uid:comment parts = line.split(':') if len(parts) >= 2: try: username = parts[0] uid = int(parts[1]) comment = parts[2] if len(parts) > 2 else "" users.append({ 'username': username, 'uid': uid, 'comment': comment, 'type': 'samba' }) except (ValueError, IndexError) as e: logger.warning(f"Error parsing Samba user line: {e}") return sorted(users, key=lambda x: x['username']) except FileNotFoundError: logger.warning("pdbedit not found - Samba may not be installed") return [] except Exception as e: logger.error(f"Error listing Samba users: {e}") return [] identities_manager = IdentitiesManager()