a187b625bc
- add_user_to_group: Exception werfen mit stderr Nachricht - remove_user_from_group: Exception werfen mit stderr Nachricht - text=True für subprocess für besseres Error Handling - Router aktualisiert um Fehlermeldungen an Frontend weiterzugeben - Benutzer sehen jetzt detaillierte Fehlermeldungen beim Gruppe-Entfernen Behebt: 'Failed to remove user from group' verschluckt die echte Fehlermeldung Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
577 lines
21 KiB
Python
577 lines
21 KiB
Python
"""
|
|
User and Group Management Service
|
|
Handle system users, groups, and PAM operations
|
|
"""
|
|
|
|
import subprocess
|
|
import logging
|
|
import pwd
|
|
import grp
|
|
from typing import List, Dict, Any, Optional
|
|
from pathlib import Path
|
|
|
|
try:
|
|
import spwd
|
|
except ImportError:
|
|
spwd = None
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class IdentitiesManager:
|
|
"""Manage system users and groups"""
|
|
|
|
def list_users(self) -> List[Dict[str, Any]]:
|
|
"""List all system users"""
|
|
users = []
|
|
try:
|
|
# Use getpwall() which returns an iterator
|
|
import getpass
|
|
# Fallback: read /etc/passwd directly
|
|
result = subprocess.run(
|
|
["/usr/bin/getent", "passwd"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
for line in result.stdout.strip().split('\n'):
|
|
if not line:
|
|
continue
|
|
parts = line.split(':')
|
|
if len(parts) < 7:
|
|
continue
|
|
|
|
try:
|
|
username = parts[0]
|
|
uid = int(parts[2])
|
|
gid = int(parts[3])
|
|
gecos = parts[4]
|
|
home = parts[5]
|
|
shell = parts[6]
|
|
|
|
users.append({
|
|
'username': username,
|
|
'uid': uid,
|
|
'gid': gid,
|
|
'gecos': gecos,
|
|
'home': home,
|
|
'shell': shell,
|
|
'locked': self._is_user_locked(username)
|
|
})
|
|
except Exception as e:
|
|
logger.warning(f"Error parsing user line: {e}")
|
|
|
|
return sorted(users, key=lambda x: x['uid'])
|
|
except Exception as e:
|
|
logger.error(f"Error listing users: {e}")
|
|
return []
|
|
|
|
def list_groups(self) -> List[Dict[str, Any]]:
|
|
"""List all system groups"""
|
|
groups = []
|
|
try:
|
|
# Use getent to read groups
|
|
result = subprocess.run(
|
|
["/usr/bin/getent", "group"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
for line in result.stdout.strip().split('\n'):
|
|
if not line:
|
|
continue
|
|
parts = line.split(':')
|
|
if len(parts) < 4:
|
|
continue
|
|
|
|
try:
|
|
groupname = parts[0]
|
|
gid = int(parts[2])
|
|
members = [m.strip() for m in parts[3].split(',') if m.strip()]
|
|
|
|
groups.append({
|
|
'groupname': groupname,
|
|
'gid': gid,
|
|
'members': members
|
|
})
|
|
except Exception as e:
|
|
logger.warning(f"Error parsing group line: {e}")
|
|
|
|
return sorted(groups, key=lambda x: x['gid'])
|
|
except Exception as e:
|
|
logger.error(f"Error listing groups: {e}")
|
|
return []
|
|
|
|
def get_user_groups(self, username: str) -> List[str]:
|
|
"""Get all groups a user belongs to"""
|
|
try:
|
|
groups = []
|
|
for entry in grp.getall():
|
|
if username in entry.gr_mem:
|
|
groups.append(entry.gr_name)
|
|
|
|
# Also add primary group
|
|
try:
|
|
user_entry = pwd.getpwnam(username)
|
|
primary_group = grp.getgrgid(user_entry.pw_gid)
|
|
if primary_group.gr_name not in groups:
|
|
groups.append(primary_group.gr_name)
|
|
except KeyError:
|
|
pass
|
|
|
|
return sorted(groups)
|
|
except Exception as e:
|
|
logger.error(f"Error getting groups for {username}: {e}")
|
|
return []
|
|
|
|
def create_user(self, username: str, home_dir: Optional[str] = None,
|
|
shell: str = "/bin/bash", gecos: str = "") -> bool:
|
|
"""Create new system user"""
|
|
try:
|
|
# Build useradd command
|
|
cmd = ["/usr/sbin/useradd"]
|
|
|
|
if home_dir:
|
|
cmd.extend(["-d", home_dir])
|
|
else:
|
|
cmd.extend(["-d", f"/home/{username}"])
|
|
|
|
cmd.extend(["-s", shell])
|
|
|
|
if gecos:
|
|
cmd.extend(["-c", gecos])
|
|
|
|
cmd.extend(["-m", username]) # -m to create home directory
|
|
|
|
result = subprocess.run(cmd, capture_output=True, timeout=10)
|
|
|
|
if result.returncode == 0:
|
|
logger.info(f"User created: {username}")
|
|
return True
|
|
else:
|
|
logger.error(f"Failed to create user {username}: {result.stderr.decode()}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Error creating user: {e}")
|
|
return False
|
|
|
|
def delete_user(self, username: str, remove_home: bool = True) -> bool:
|
|
"""Delete system user and Samba user"""
|
|
try:
|
|
# First delete Samba user if exists (using pdbedit for better handling)
|
|
try:
|
|
subprocess.run(
|
|
["/usr/bin/pdbedit", "-x", "-u", username],
|
|
capture_output=True,
|
|
timeout=10
|
|
)
|
|
except Exception:
|
|
pass # Samba not installed or user doesn't exist
|
|
|
|
# Delete system user
|
|
cmd = ["/usr/sbin/userdel"]
|
|
if remove_home:
|
|
cmd.append("-r")
|
|
cmd.append(username)
|
|
|
|
result = subprocess.run(cmd, capture_output=True, timeout=10)
|
|
|
|
if result.returncode == 0:
|
|
logger.info(f"User deleted: {username}")
|
|
return True
|
|
else:
|
|
logger.error(f"Failed to delete user {username}: {result.stderr.decode()}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Error deleting user: {e}")
|
|
return False
|
|
|
|
def create_group(self, groupname: str) -> bool:
|
|
"""Create new system group"""
|
|
try:
|
|
result = subprocess.run(
|
|
["/usr/sbin/groupadd", groupname],
|
|
capture_output=True,
|
|
timeout=10
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
logger.info(f"Group created: {groupname}")
|
|
return True
|
|
else:
|
|
logger.error(f"Failed to create group {groupname}: {result.stderr.decode()}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Error creating group: {e}")
|
|
return False
|
|
|
|
def delete_group(self, groupname: str) -> bool:
|
|
"""Delete system group"""
|
|
try:
|
|
result = subprocess.run(
|
|
["/usr/sbin/groupdel", groupname],
|
|
capture_output=True,
|
|
timeout=10
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
logger.info(f"Group deleted: {groupname}")
|
|
return True
|
|
else:
|
|
logger.error(f"Failed to delete group {groupname}: {result.stderr.decode()}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Error deleting group: {e}")
|
|
return False
|
|
|
|
def add_user_to_group(self, username: str, groupname: str) -> bool:
|
|
"""Add user to group"""
|
|
try:
|
|
result = subprocess.run(
|
|
["/usr/sbin/usermod", "-aG", groupname, username],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
logger.info(f"User {username} added to group {groupname}")
|
|
return True
|
|
else:
|
|
error_msg = result.stderr.strip() or result.stdout.strip() or "Unknown error"
|
|
logger.error(f"Failed to add user to group: {error_msg}")
|
|
raise Exception(f"Failed to add user to group: {error_msg}")
|
|
except Exception as e:
|
|
logger.error(f"Error adding user to group: {e}")
|
|
raise
|
|
|
|
def remove_user_from_group(self, username: str, groupname: str) -> bool:
|
|
"""Remove user from group"""
|
|
try:
|
|
result = subprocess.run(
|
|
["/usr/sbin/gpasswd", "-d", username, groupname],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
logger.info(f"User {username} removed from group {groupname}")
|
|
return True
|
|
else:
|
|
error_msg = result.stderr.strip() or result.stdout.strip() or "Unknown error"
|
|
logger.error(f"Failed to remove user from group: {error_msg}")
|
|
raise Exception(f"Failed to remove user from group: {error_msg}")
|
|
except Exception as e:
|
|
logger.error(f"Error removing user from group: {e}")
|
|
raise
|
|
|
|
def change_password(self, username: str, password: str) -> bool:
|
|
"""Change user password via chpasswd"""
|
|
try:
|
|
# Use chpasswd for password changes
|
|
result = subprocess.run(
|
|
["/usr/sbin/chpasswd"],
|
|
input=f"{username}:{password}\n",
|
|
text=True,
|
|
capture_output=True,
|
|
timeout=10
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
logger.info(f"Password changed for {username}")
|
|
return True
|
|
else:
|
|
logger.error(f"Failed to change password: {result.stderr}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Error changing password: {e}")
|
|
return False
|
|
|
|
def change_shell(self, username: str, shell: str) -> bool:
|
|
"""Change user shell"""
|
|
try:
|
|
result = subprocess.run(
|
|
["/usr/sbin/usermod", "-s", shell, username],
|
|
capture_output=True,
|
|
timeout=10
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
logger.info(f"Shell changed for {username} to {shell}")
|
|
return True
|
|
else:
|
|
logger.error(f"Failed to change shell: {result.stderr.decode()}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Error changing shell: {e}")
|
|
return False
|
|
|
|
def _is_user_locked(self, username: str) -> bool:
|
|
"""Check if user account is locked"""
|
|
if not spwd:
|
|
return False
|
|
try:
|
|
entry = spwd.getspnam(username)
|
|
return entry.sp_lstchg == 0 or entry.sp_max == 0
|
|
except (KeyError, PermissionError):
|
|
return False
|
|
except Exception as e:
|
|
logger.warning(f"Error checking lock status: {e}")
|
|
return False
|
|
|
|
def lock_user(self, username: str) -> bool:
|
|
"""Lock user account"""
|
|
try:
|
|
result = subprocess.run(
|
|
["/usr/sbin/usermod", "-L", username],
|
|
capture_output=True,
|
|
timeout=10
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
logger.info(f"User locked: {username}")
|
|
return True
|
|
else:
|
|
logger.error(f"Failed to lock user: {result.stderr.decode()}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Error locking user: {e}")
|
|
return False
|
|
|
|
def unlock_user(self, username: str) -> bool:
|
|
"""Unlock user account"""
|
|
try:
|
|
result = subprocess.run(
|
|
["/usr/sbin/usermod", "-U", username],
|
|
capture_output=True,
|
|
timeout=10
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
logger.info(f"User unlocked: {username}")
|
|
return True
|
|
else:
|
|
logger.error(f"Failed to unlock user: {result.stderr.decode()}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Error unlocking user: {e}")
|
|
return False
|
|
|
|
def set_samba_password(self, username: str, password: str) -> bool:
|
|
"""Set Samba password for user"""
|
|
try:
|
|
# Use smbpasswd to set Samba password
|
|
# -a flag: add/update user
|
|
# -s flag: read password from stdin
|
|
result = subprocess.run(
|
|
["/usr/bin/smbpasswd", "-a", "-s", username],
|
|
input=f"{password}\n{password}\n",
|
|
text=True,
|
|
capture_output=True,
|
|
timeout=10
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
logger.info(f"Samba password set for {username}")
|
|
return True
|
|
else:
|
|
logger.error(f"Failed to set Samba password: {result.stderr}")
|
|
return False
|
|
except FileNotFoundError:
|
|
logger.error("smbpasswd command not found - Samba not installed?")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Error setting Samba password: {e}")
|
|
return False
|
|
|
|
def get_login_history(self, limit: int = 50) -> List[Dict[str, Any]]:
|
|
"""Get recent login history using last command"""
|
|
import re
|
|
from datetime import datetime
|
|
|
|
logins = []
|
|
days_of_week = {'Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'}
|
|
months = {'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6,
|
|
'Jul': 7, 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12}
|
|
current_year = datetime.now().year
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
["last", "-n", str(limit)],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
for line in result.stdout.strip().split('\n'):
|
|
if not line.strip():
|
|
continue
|
|
|
|
# Skip header, footer, and system entries
|
|
if 'wtmp' in line or 'begins' in line:
|
|
continue
|
|
|
|
try:
|
|
# Find the day-of-week anchor (reliable marker for date section)
|
|
tokens = line.split()
|
|
day_idx = -1
|
|
|
|
for i, token in enumerate(tokens):
|
|
if token in days_of_week:
|
|
day_idx = i
|
|
break
|
|
|
|
if day_idx < 1: # Need at least username before day
|
|
continue
|
|
|
|
# Extract components from token positions
|
|
username = tokens[0]
|
|
|
|
# Skip system entries
|
|
if username in ['wtmp', 'reboot', 'kernel']:
|
|
continue
|
|
|
|
# Try to find full username from /etc/passwd (wtmp truncates to 8 chars)
|
|
full_username = self._get_full_username(username)
|
|
if full_username:
|
|
username = full_username
|
|
|
|
# Everything between username and day-of-week is TTY/host
|
|
device_host_tokens = tokens[1:day_idx]
|
|
|
|
# TTY is typically first token if it contains '/' or starts with 'pts'/'tty'
|
|
tty = '-'
|
|
host = '-'
|
|
|
|
if device_host_tokens:
|
|
first = device_host_tokens[0]
|
|
if '/' in first or first.startswith('pts') or first.startswith('tty'):
|
|
tty = first
|
|
# Remaining tokens are host
|
|
if len(device_host_tokens) > 1:
|
|
host = ' '.join(device_host_tokens[1:])
|
|
else:
|
|
# No TTY, all tokens are host
|
|
host = ' '.join(device_host_tokens)
|
|
|
|
# Extract date components (should be: day month date time)
|
|
# Note: year is NOT in standard last output, we need to infer it
|
|
if day_idx + 3 < len(tokens):
|
|
day = tokens[day_idx]
|
|
month = tokens[day_idx + 1]
|
|
date = tokens[day_idx + 2]
|
|
time_str = tokens[day_idx + 3]
|
|
|
|
# Validate month
|
|
if month not in months:
|
|
logger.debug(f"Invalid month '{month}' in line: {line}")
|
|
continue
|
|
|
|
# Use current year (last entries are usually recent)
|
|
year = current_year
|
|
else:
|
|
logger.debug(f"Not enough date tokens in line: {line}")
|
|
continue
|
|
|
|
# Extract duration from end of line (in parentheses)
|
|
duration_match = re.search(r'\(([^)]+)\)\s*$', line)
|
|
duration = duration_match.group(1) if duration_match else 'still logged in'
|
|
|
|
logins.append({
|
|
'username': username,
|
|
'tty': tty,
|
|
'host': host,
|
|
'date': f"{year}-{months[month]:02d}-{date.zfill(2)}",
|
|
'time': time_str,
|
|
'duration': duration,
|
|
'login_str': f"{day} {month} {date} {time_str} {year}"
|
|
})
|
|
except Exception as e:
|
|
logger.debug(f"Error parsing login line '{line}': {e}")
|
|
continue
|
|
|
|
return logins
|
|
except Exception as e:
|
|
logger.error(f"Error getting login history: {e}")
|
|
return []
|
|
|
|
def _get_full_username(self, truncated: str) -> Optional[str]:
|
|
"""Find full username from /etc/passwd when wtmp has truncated it (8 char limit)
|
|
|
|
wtmp truncates usernames to 8 characters, so we need to look up the full name
|
|
in /etc/passwd by matching usernames that start with the truncated name.
|
|
"""
|
|
try:
|
|
result = subprocess.run(
|
|
["/usr/bin/getent", "passwd"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
for line in result.stdout.strip().split('\n'):
|
|
if not line:
|
|
continue
|
|
parts = line.split(':')
|
|
if parts and parts[0].startswith(truncated):
|
|
# Found a username that starts with the truncated name
|
|
if len(parts[0]) > len(truncated):
|
|
# It's longer than the truncated version
|
|
return parts[0]
|
|
|
|
return None
|
|
except Exception as e:
|
|
logger.debug(f"Error finding full username for '{truncated}': {e}")
|
|
return None
|
|
|
|
def list_samba_users(self) -> List[Dict[str, Any]]:
|
|
"""List all Samba users using pdbedit"""
|
|
users = []
|
|
try:
|
|
result = subprocess.run(
|
|
["/usr/bin/pdbedit", "-L"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=5
|
|
)
|
|
|
|
if result.returncode == 0:
|
|
for line in result.stdout.strip().split('\n'):
|
|
if not line.strip():
|
|
continue
|
|
|
|
# pdbedit output format: username:uid:comment
|
|
parts = line.split(':')
|
|
if len(parts) >= 2:
|
|
try:
|
|
username = parts[0]
|
|
uid = int(parts[1])
|
|
comment = parts[2] if len(parts) > 2 else ""
|
|
|
|
users.append({
|
|
'username': username,
|
|
'uid': uid,
|
|
'comment': comment,
|
|
'type': 'samba'
|
|
})
|
|
except (ValueError, IndexError) as e:
|
|
logger.warning(f"Error parsing Samba user line: {e}")
|
|
|
|
return sorted(users, key=lambda x: x['username'])
|
|
except FileNotFoundError:
|
|
logger.warning("pdbedit not found - Samba may not be installed")
|
|
return []
|
|
except Exception as e:
|
|
logger.error(f"Error listing Samba users: {e}")
|
|
return []
|
|
|
|
|
|
identities_manager = IdentitiesManager()
|