Files
zmb-webui/backend/services/identities.py
patrick a187b625bc Fix: Identities Group Management - bessere Fehlermeldungen
- add_user_to_group: Exception werfen mit stderr Nachricht
- remove_user_from_group: Exception werfen mit stderr Nachricht
- text=True für subprocess für besseres Error Handling
- Router aktualisiert um Fehlermeldungen an Frontend weiterzugeben
- Benutzer sehen jetzt detaillierte Fehlermeldungen beim Gruppe-Entfernen

Behebt: 'Failed to remove user from group' verschluckt die echte Fehlermeldung

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2026-06-04 14:58:50 +02:00

577 lines
21 KiB
Python

"""
User and Group Management Service
Handle system users, groups, and PAM operations
"""
import subprocess
import logging
import pwd
import grp
from typing import List, Dict, Any, Optional
from pathlib import Path
try:
import spwd
except ImportError:
spwd = None
logger = logging.getLogger(__name__)
class IdentitiesManager:
"""Manage system users and groups"""
def list_users(self) -> List[Dict[str, Any]]:
"""List all system users"""
users = []
try:
# Use getpwall() which returns an iterator
import getpass
# Fallback: read /etc/passwd directly
result = subprocess.run(
["/usr/bin/getent", "passwd"],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
for line in result.stdout.strip().split('\n'):
if not line:
continue
parts = line.split(':')
if len(parts) < 7:
continue
try:
username = parts[0]
uid = int(parts[2])
gid = int(parts[3])
gecos = parts[4]
home = parts[5]
shell = parts[6]
users.append({
'username': username,
'uid': uid,
'gid': gid,
'gecos': gecos,
'home': home,
'shell': shell,
'locked': self._is_user_locked(username)
})
except Exception as e:
logger.warning(f"Error parsing user line: {e}")
return sorted(users, key=lambda x: x['uid'])
except Exception as e:
logger.error(f"Error listing users: {e}")
return []
def list_groups(self) -> List[Dict[str, Any]]:
"""List all system groups"""
groups = []
try:
# Use getent to read groups
result = subprocess.run(
["/usr/bin/getent", "group"],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
for line in result.stdout.strip().split('\n'):
if not line:
continue
parts = line.split(':')
if len(parts) < 4:
continue
try:
groupname = parts[0]
gid = int(parts[2])
members = [m.strip() for m in parts[3].split(',') if m.strip()]
groups.append({
'groupname': groupname,
'gid': gid,
'members': members
})
except Exception as e:
logger.warning(f"Error parsing group line: {e}")
return sorted(groups, key=lambda x: x['gid'])
except Exception as e:
logger.error(f"Error listing groups: {e}")
return []
def get_user_groups(self, username: str) -> List[str]:
"""Get all groups a user belongs to"""
try:
groups = []
for entry in grp.getall():
if username in entry.gr_mem:
groups.append(entry.gr_name)
# Also add primary group
try:
user_entry = pwd.getpwnam(username)
primary_group = grp.getgrgid(user_entry.pw_gid)
if primary_group.gr_name not in groups:
groups.append(primary_group.gr_name)
except KeyError:
pass
return sorted(groups)
except Exception as e:
logger.error(f"Error getting groups for {username}: {e}")
return []
def create_user(self, username: str, home_dir: Optional[str] = None,
shell: str = "/bin/bash", gecos: str = "") -> bool:
"""Create new system user"""
try:
# Build useradd command
cmd = ["/usr/sbin/useradd"]
if home_dir:
cmd.extend(["-d", home_dir])
else:
cmd.extend(["-d", f"/home/{username}"])
cmd.extend(["-s", shell])
if gecos:
cmd.extend(["-c", gecos])
cmd.extend(["-m", username]) # -m to create home directory
result = subprocess.run(cmd, capture_output=True, timeout=10)
if result.returncode == 0:
logger.info(f"User created: {username}")
return True
else:
logger.error(f"Failed to create user {username}: {result.stderr.decode()}")
return False
except Exception as e:
logger.error(f"Error creating user: {e}")
return False
def delete_user(self, username: str, remove_home: bool = True) -> bool:
"""Delete system user and Samba user"""
try:
# First delete Samba user if exists (using pdbedit for better handling)
try:
subprocess.run(
["/usr/bin/pdbedit", "-x", "-u", username],
capture_output=True,
timeout=10
)
except Exception:
pass # Samba not installed or user doesn't exist
# Delete system user
cmd = ["/usr/sbin/userdel"]
if remove_home:
cmd.append("-r")
cmd.append(username)
result = subprocess.run(cmd, capture_output=True, timeout=10)
if result.returncode == 0:
logger.info(f"User deleted: {username}")
return True
else:
logger.error(f"Failed to delete user {username}: {result.stderr.decode()}")
return False
except Exception as e:
logger.error(f"Error deleting user: {e}")
return False
def create_group(self, groupname: str) -> bool:
"""Create new system group"""
try:
result = subprocess.run(
["/usr/sbin/groupadd", groupname],
capture_output=True,
timeout=10
)
if result.returncode == 0:
logger.info(f"Group created: {groupname}")
return True
else:
logger.error(f"Failed to create group {groupname}: {result.stderr.decode()}")
return False
except Exception as e:
logger.error(f"Error creating group: {e}")
return False
def delete_group(self, groupname: str) -> bool:
"""Delete system group"""
try:
result = subprocess.run(
["/usr/sbin/groupdel", groupname],
capture_output=True,
timeout=10
)
if result.returncode == 0:
logger.info(f"Group deleted: {groupname}")
return True
else:
logger.error(f"Failed to delete group {groupname}: {result.stderr.decode()}")
return False
except Exception as e:
logger.error(f"Error deleting group: {e}")
return False
def add_user_to_group(self, username: str, groupname: str) -> bool:
"""Add user to group"""
try:
result = subprocess.run(
["/usr/sbin/usermod", "-aG", groupname, username],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
logger.info(f"User {username} added to group {groupname}")
return True
else:
error_msg = result.stderr.strip() or result.stdout.strip() or "Unknown error"
logger.error(f"Failed to add user to group: {error_msg}")
raise Exception(f"Failed to add user to group: {error_msg}")
except Exception as e:
logger.error(f"Error adding user to group: {e}")
raise
def remove_user_from_group(self, username: str, groupname: str) -> bool:
"""Remove user from group"""
try:
result = subprocess.run(
["/usr/sbin/gpasswd", "-d", username, groupname],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
logger.info(f"User {username} removed from group {groupname}")
return True
else:
error_msg = result.stderr.strip() or result.stdout.strip() or "Unknown error"
logger.error(f"Failed to remove user from group: {error_msg}")
raise Exception(f"Failed to remove user from group: {error_msg}")
except Exception as e:
logger.error(f"Error removing user from group: {e}")
raise
def change_password(self, username: str, password: str) -> bool:
"""Change user password via chpasswd"""
try:
# Use chpasswd for password changes
result = subprocess.run(
["/usr/sbin/chpasswd"],
input=f"{username}:{password}\n",
text=True,
capture_output=True,
timeout=10
)
if result.returncode == 0:
logger.info(f"Password changed for {username}")
return True
else:
logger.error(f"Failed to change password: {result.stderr}")
return False
except Exception as e:
logger.error(f"Error changing password: {e}")
return False
def change_shell(self, username: str, shell: str) -> bool:
"""Change user shell"""
try:
result = subprocess.run(
["/usr/sbin/usermod", "-s", shell, username],
capture_output=True,
timeout=10
)
if result.returncode == 0:
logger.info(f"Shell changed for {username} to {shell}")
return True
else:
logger.error(f"Failed to change shell: {result.stderr.decode()}")
return False
except Exception as e:
logger.error(f"Error changing shell: {e}")
return False
def _is_user_locked(self, username: str) -> bool:
"""Check if user account is locked"""
if not spwd:
return False
try:
entry = spwd.getspnam(username)
return entry.sp_lstchg == 0 or entry.sp_max == 0
except (KeyError, PermissionError):
return False
except Exception as e:
logger.warning(f"Error checking lock status: {e}")
return False
def lock_user(self, username: str) -> bool:
"""Lock user account"""
try:
result = subprocess.run(
["/usr/sbin/usermod", "-L", username],
capture_output=True,
timeout=10
)
if result.returncode == 0:
logger.info(f"User locked: {username}")
return True
else:
logger.error(f"Failed to lock user: {result.stderr.decode()}")
return False
except Exception as e:
logger.error(f"Error locking user: {e}")
return False
def unlock_user(self, username: str) -> bool:
"""Unlock user account"""
try:
result = subprocess.run(
["/usr/sbin/usermod", "-U", username],
capture_output=True,
timeout=10
)
if result.returncode == 0:
logger.info(f"User unlocked: {username}")
return True
else:
logger.error(f"Failed to unlock user: {result.stderr.decode()}")
return False
except Exception as e:
logger.error(f"Error unlocking user: {e}")
return False
def set_samba_password(self, username: str, password: str) -> bool:
"""Set Samba password for user"""
try:
# Use smbpasswd to set Samba password
# -a flag: add/update user
# -s flag: read password from stdin
result = subprocess.run(
["/usr/bin/smbpasswd", "-a", "-s", username],
input=f"{password}\n{password}\n",
text=True,
capture_output=True,
timeout=10
)
if result.returncode == 0:
logger.info(f"Samba password set for {username}")
return True
else:
logger.error(f"Failed to set Samba password: {result.stderr}")
return False
except FileNotFoundError:
logger.error("smbpasswd command not found - Samba not installed?")
return False
except Exception as e:
logger.error(f"Error setting Samba password: {e}")
return False
def get_login_history(self, limit: int = 50) -> List[Dict[str, Any]]:
"""Get recent login history using last command"""
import re
from datetime import datetime
logins = []
days_of_week = {'Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'}
months = {'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6,
'Jul': 7, 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12}
current_year = datetime.now().year
try:
result = subprocess.run(
["last", "-n", str(limit)],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
for line in result.stdout.strip().split('\n'):
if not line.strip():
continue
# Skip header, footer, and system entries
if 'wtmp' in line or 'begins' in line:
continue
try:
# Find the day-of-week anchor (reliable marker for date section)
tokens = line.split()
day_idx = -1
for i, token in enumerate(tokens):
if token in days_of_week:
day_idx = i
break
if day_idx < 1: # Need at least username before day
continue
# Extract components from token positions
username = tokens[0]
# Skip system entries
if username in ['wtmp', 'reboot', 'kernel']:
continue
# Try to find full username from /etc/passwd (wtmp truncates to 8 chars)
full_username = self._get_full_username(username)
if full_username:
username = full_username
# Everything between username and day-of-week is TTY/host
device_host_tokens = tokens[1:day_idx]
# TTY is typically first token if it contains '/' or starts with 'pts'/'tty'
tty = '-'
host = '-'
if device_host_tokens:
first = device_host_tokens[0]
if '/' in first or first.startswith('pts') or first.startswith('tty'):
tty = first
# Remaining tokens are host
if len(device_host_tokens) > 1:
host = ' '.join(device_host_tokens[1:])
else:
# No TTY, all tokens are host
host = ' '.join(device_host_tokens)
# Extract date components (should be: day month date time)
# Note: year is NOT in standard last output, we need to infer it
if day_idx + 3 < len(tokens):
day = tokens[day_idx]
month = tokens[day_idx + 1]
date = tokens[day_idx + 2]
time_str = tokens[day_idx + 3]
# Validate month
if month not in months:
logger.debug(f"Invalid month '{month}' in line: {line}")
continue
# Use current year (last entries are usually recent)
year = current_year
else:
logger.debug(f"Not enough date tokens in line: {line}")
continue
# Extract duration from end of line (in parentheses)
duration_match = re.search(r'\(([^)]+)\)\s*$', line)
duration = duration_match.group(1) if duration_match else 'still logged in'
logins.append({
'username': username,
'tty': tty,
'host': host,
'date': f"{year}-{months[month]:02d}-{date.zfill(2)}",
'time': time_str,
'duration': duration,
'login_str': f"{day} {month} {date} {time_str} {year}"
})
except Exception as e:
logger.debug(f"Error parsing login line '{line}': {e}")
continue
return logins
except Exception as e:
logger.error(f"Error getting login history: {e}")
return []
def _get_full_username(self, truncated: str) -> Optional[str]:
"""Find full username from /etc/passwd when wtmp has truncated it (8 char limit)
wtmp truncates usernames to 8 characters, so we need to look up the full name
in /etc/passwd by matching usernames that start with the truncated name.
"""
try:
result = subprocess.run(
["/usr/bin/getent", "passwd"],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
for line in result.stdout.strip().split('\n'):
if not line:
continue
parts = line.split(':')
if parts and parts[0].startswith(truncated):
# Found a username that starts with the truncated name
if len(parts[0]) > len(truncated):
# It's longer than the truncated version
return parts[0]
return None
except Exception as e:
logger.debug(f"Error finding full username for '{truncated}': {e}")
return None
def list_samba_users(self) -> List[Dict[str, Any]]:
"""List all Samba users using pdbedit"""
users = []
try:
result = subprocess.run(
["/usr/bin/pdbedit", "-L"],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
for line in result.stdout.strip().split('\n'):
if not line.strip():
continue
# pdbedit output format: username:uid:comment
parts = line.split(':')
if len(parts) >= 2:
try:
username = parts[0]
uid = int(parts[1])
comment = parts[2] if len(parts) > 2 else ""
users.append({
'username': username,
'uid': uid,
'comment': comment,
'type': 'samba'
})
except (ValueError, IndexError) as e:
logger.warning(f"Error parsing Samba user line: {e}")
return sorted(users, key=lambda x: x['username'])
except FileNotFoundError:
logger.warning("pdbedit not found - Samba may not be installed")
return []
except Exception as e:
logger.error(f"Error listing Samba users: {e}")
return []
identities_manager = IdentitiesManager()