""" User and Group Management Service Handle system users, groups, and PAM operations """ import subprocess import logging import pwd import grp from typing import List, Dict, Any, Optional try: import spwd except ImportError: spwd = None logger = logging.getLogger(__name__) def _is_user_locked(username: str) -> bool: if not spwd: return False try: entry = spwd.getspnam(username) return entry.sp_lstchg == 0 or entry.sp_max == 0 except (KeyError, PermissionError): return False except Exception as e: logger.warning(f"Error checking lock status: {e}") return False def _get_full_username(truncated: str) -> Optional[str]: """Find full username from /etc/passwd when wtmp truncated it to 8 chars.""" try: result = subprocess.run( ["/usr/bin/getent", "passwd"], capture_output=True, text=True, timeout=5 ) if result.returncode == 0: for line in result.stdout.strip().split('\n'): if not line: continue parts = line.split(':') if parts and parts[0].startswith(truncated) and len(parts[0]) > len(truncated): return parts[0] return None except Exception as e: logger.debug(f"Error finding full username for '{truncated}': {e}") return None def list_users() -> List[Dict[str, Any]]: users = [] try: result = subprocess.run( ["/usr/bin/getent", "passwd"], capture_output=True, text=True, timeout=5 ) if result.returncode == 0: for line in result.stdout.strip().split('\n'): if not line: continue parts = line.split(':') if len(parts) < 7: continue try: users.append({ 'username': parts[0], 'uid': int(parts[2]), 'gid': int(parts[3]), 'gecos': parts[4], 'home': parts[5], 'shell': parts[6], 'locked': _is_user_locked(parts[0]) }) except Exception as e: logger.warning(f"Error parsing user line: {e}") return sorted(users, key=lambda x: x['uid']) except Exception as e: logger.error(f"Error listing users: {e}") return [] def list_groups() -> List[Dict[str, Any]]: groups = [] try: result = subprocess.run( ["/usr/bin/getent", "group"], capture_output=True, text=True, timeout=5 ) if result.returncode == 0: for line in result.stdout.strip().split('\n'): if not line: continue parts = line.split(':') if len(parts) < 4: continue try: groups.append({ 'groupname': parts[0], 'gid': int(parts[2]), 'members': [m.strip() for m in parts[3].split(',') if m.strip()] }) except Exception as e: logger.warning(f"Error parsing group line: {e}") return sorted(groups, key=lambda x: x['gid']) except Exception as e: logger.error(f"Error listing groups: {e}") return [] def get_user_groups(username: str) -> List[str]: try: groups = [ entry.gr_name for entry in grp.getgrall() if username in entry.gr_mem ] try: user_entry = pwd.getpwnam(username) primary = grp.getgrgid(user_entry.pw_gid) if primary.gr_name not in groups: groups.append(primary.gr_name) except KeyError: pass return sorted(groups) except Exception as e: logger.error(f"Error getting groups for {username}: {e}") return [] def create_user(username: str, home_dir: Optional[str] = None, shell: str = "/bin/bash", gecos: str = "") -> bool: try: cmd = ["/usr/sbin/useradd", "-d", home_dir or f"/home/{username}", "-s", shell] if gecos: cmd.extend(["-c", gecos]) cmd.extend(["-m", username]) result = subprocess.run(cmd, capture_output=True, timeout=10) if result.returncode == 0: logger.info(f"User created: {username}") return True logger.error(f"Failed to create user {username}: {result.stderr.decode()}") return False except Exception as e: logger.error(f"Error creating user: {e}") return False def delete_user(username: str, remove_home: bool = True) -> bool: try: try: subprocess.run( ["/usr/bin/pdbedit", "-x", "-u", username], capture_output=True, timeout=10 ) except Exception: pass cmd = ["/usr/sbin/userdel"] if remove_home: cmd.append("-r") cmd.append(username) result = subprocess.run(cmd, capture_output=True, timeout=10) if result.returncode == 0: logger.info(f"User deleted: {username}") return True logger.error(f"Failed to delete user {username}: {result.stderr.decode()}") return False except Exception as e: logger.error(f"Error deleting user: {e}") return False def create_group(groupname: str) -> bool: try: result = subprocess.run( ["/usr/sbin/groupadd", groupname], capture_output=True, timeout=10 ) if result.returncode == 0: logger.info(f"Group created: {groupname}") return True logger.error(f"Failed to create group {groupname}: {result.stderr.decode()}") return False except Exception as e: logger.error(f"Error creating group: {e}") return False def delete_group(groupname: str) -> bool: try: result = subprocess.run( ["/usr/sbin/groupdel", groupname], capture_output=True, timeout=10 ) if result.returncode == 0: logger.info(f"Group deleted: {groupname}") return True logger.error(f"Failed to delete group {groupname}: {result.stderr.decode()}") return False except Exception as e: logger.error(f"Error deleting group: {e}") return False def add_user_to_group(username: str, groupname: str) -> bool: try: result = subprocess.run( ["/usr/sbin/usermod", "-aG", groupname, username], capture_output=True, text=True, timeout=10 ) if result.returncode == 0: logger.info(f"User {username} added to group {groupname}") return True error_msg = result.stderr.strip() or result.stdout.strip() or "Unknown error" logger.error(f"Failed to add user to group: {error_msg}") raise Exception(f"Failed to add user to group: {error_msg}") except Exception as e: logger.error(f"Error adding user to group: {e}") raise def remove_user_from_group(username: str, groupname: str) -> bool: try: result = subprocess.run( ["/usr/sbin/gpasswd", "-d", username, groupname], capture_output=True, text=True, timeout=10 ) if result.returncode == 0: logger.info(f"User {username} removed from group {groupname}") return True error_msg = result.stderr.strip() or result.stdout.strip() or "Unknown error" logger.error(f"Failed to remove user from group: {error_msg}") raise Exception(f"Failed to remove user from group: {error_msg}") except Exception as e: logger.error(f"Error removing user from group: {e}") raise def change_password(username: str, password: str) -> bool: try: result = subprocess.run( ["/usr/sbin/chpasswd"], input=f"{username}:{password}\n", text=True, capture_output=True, timeout=10 ) if result.returncode == 0: logger.info(f"Password changed for {username}") return True logger.error(f"Failed to change password: {result.stderr}") return False except Exception as e: logger.error(f"Error changing password: {e}") return False def change_shell(username: str, shell: str) -> bool: try: result = subprocess.run( ["/usr/sbin/usermod", "-s", shell, username], capture_output=True, timeout=10 ) if result.returncode == 0: logger.info(f"Shell changed for {username} to {shell}") return True logger.error(f"Failed to change shell: {result.stderr.decode()}") return False except Exception as e: logger.error(f"Error changing shell: {e}") return False def lock_user(username: str) -> bool: try: result = subprocess.run( ["/usr/sbin/usermod", "-L", username], capture_output=True, timeout=10 ) if result.returncode == 0: logger.info(f"User locked: {username}") return True logger.error(f"Failed to lock user: {result.stderr.decode()}") return False except Exception as e: logger.error(f"Error locking user: {e}") return False def unlock_user(username: str) -> bool: try: result = subprocess.run( ["/usr/sbin/usermod", "-U", username], capture_output=True, timeout=10 ) if result.returncode == 0: logger.info(f"User unlocked: {username}") return True logger.error(f"Failed to unlock user: {result.stderr.decode()}") return False except Exception as e: logger.error(f"Error unlocking user: {e}") return False def set_samba_password(username: str, password: str) -> bool: try: result = subprocess.run( ["/usr/bin/smbpasswd", "-a", "-s", username], input=f"{password}\n{password}\n", text=True, capture_output=True, timeout=10 ) if result.returncode == 0: logger.info(f"Samba password set for {username}") return True logger.error(f"Failed to set Samba password: {result.stderr}") return False except FileNotFoundError: logger.error("smbpasswd command not found - Samba not installed?") return False except Exception as e: logger.error(f"Error setting Samba password: {e}") return False def get_login_history(limit: int = 50) -> List[Dict[str, Any]]: import re from datetime import datetime as dt logins = [] days_of_week = {'Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'} months = {'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6, 'Jul': 7, 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12} current_year = dt.now().year try: result = subprocess.run( ["last", "-n", str(limit)], capture_output=True, text=True, timeout=10 ) if result.returncode != 0: return [] for line in result.stdout.strip().split('\n'): if not line.strip() or 'wtmp' in line or 'begins' in line: continue try: tokens = line.split() day_idx = next((i for i, t in enumerate(tokens) if t in days_of_week), -1) if day_idx < 1: continue username = tokens[0] if username in ['wtmp', 'reboot', 'kernel']: continue full_username = _get_full_username(username) if full_username: username = full_username device_host_tokens = tokens[1:day_idx] tty, host = '-', '-' if device_host_tokens: first = device_host_tokens[0] if '/' in first or first.startswith('pts') or first.startswith('tty'): tty = first host = ' '.join(device_host_tokens[1:]) if len(device_host_tokens) > 1 else '-' else: host = ' '.join(device_host_tokens) if day_idx + 3 >= len(tokens): continue day = tokens[day_idx] month = tokens[day_idx + 1] date = tokens[day_idx + 2] time_str = tokens[day_idx + 3] if month not in months: continue duration_match = re.search(r'\(([^)]+)\)\s*$', line) duration = duration_match.group(1) if duration_match else 'still logged in' logins.append({ 'username': username, 'tty': tty, 'host': host, 'date': f"{current_year}-{months[month]:02d}-{date.zfill(2)}", 'time': time_str, 'duration': duration, 'login_str': f"{day} {month} {date} {time_str} {current_year}" }) except Exception as e: logger.debug(f"Error parsing login line '{line}': {e}") continue return logins except Exception as e: logger.error(f"Error getting login history: {e}") return [] def list_samba_users() -> List[Dict[str, Any]]: users = [] try: result = subprocess.run( ["/usr/bin/pdbedit", "-L"], capture_output=True, text=True, timeout=5 ) if result.returncode == 0: for line in result.stdout.strip().split('\n'): if not line.strip(): continue parts = line.split(':') if len(parts) >= 2: try: users.append({ 'username': parts[0], 'uid': int(parts[1]), 'comment': parts[2] if len(parts) > 2 else "", 'type': 'samba' }) except (ValueError, IndexError) as e: logger.warning(f"Error parsing Samba user line: {e}") return sorted(users, key=lambda x: x['username']) except FileNotFoundError: logger.warning("pdbedit not found - Samba may not be installed") return [] except Exception as e: logger.error(f"Error listing Samba users: {e}") return []