Files
patrick f49793e6f2 Refactor: Java-Klassen aus Services entfernt + kritische Bugs gefixt
- AuthService, SystemInfo, IdentitiesManager Klassen → Modul-Funktionen
- grp.getall() → grp.getgrall() (Bug: Methode existierte nie)
- open('/proc/loadavg') ohne context manager gefixt (File-Handle-Leak)
- rx_packets/tx_packets null-check im Frontend (toLocaleString auf undefined)
- PoolCard onClick: /pools/{name} → /zfs (Route existierte nicht, löste Seitenreload aus)
- Alle Router-Imports auf Modul-Aliase umgestellt

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 14:11:32 +02:00

432 lines
14 KiB
Python

"""
User and Group Management Service
Handle system users, groups, and PAM operations
"""
import subprocess
import logging
import pwd
import grp
from typing import List, Dict, Any, Optional
try:
import spwd
except ImportError:
spwd = None
logger = logging.getLogger(__name__)
def _is_user_locked(username: str) -> bool:
if not spwd:
return False
try:
entry = spwd.getspnam(username)
return entry.sp_lstchg == 0 or entry.sp_max == 0
except (KeyError, PermissionError):
return False
except Exception as e:
logger.warning(f"Error checking lock status: {e}")
return False
def _get_full_username(truncated: str) -> Optional[str]:
"""Find full username from /etc/passwd when wtmp truncated it to 8 chars."""
try:
result = subprocess.run(
["/usr/bin/getent", "passwd"],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
for line in result.stdout.strip().split('\n'):
if not line:
continue
parts = line.split(':')
if parts and parts[0].startswith(truncated) and len(parts[0]) > len(truncated):
return parts[0]
return None
except Exception as e:
logger.debug(f"Error finding full username for '{truncated}': {e}")
return None
def list_users() -> List[Dict[str, Any]]:
users = []
try:
result = subprocess.run(
["/usr/bin/getent", "passwd"],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
for line in result.stdout.strip().split('\n'):
if not line:
continue
parts = line.split(':')
if len(parts) < 7:
continue
try:
users.append({
'username': parts[0],
'uid': int(parts[2]),
'gid': int(parts[3]),
'gecos': parts[4],
'home': parts[5],
'shell': parts[6],
'locked': _is_user_locked(parts[0])
})
except Exception as e:
logger.warning(f"Error parsing user line: {e}")
return sorted(users, key=lambda x: x['uid'])
except Exception as e:
logger.error(f"Error listing users: {e}")
return []
def list_groups() -> List[Dict[str, Any]]:
groups = []
try:
result = subprocess.run(
["/usr/bin/getent", "group"],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
for line in result.stdout.strip().split('\n'):
if not line:
continue
parts = line.split(':')
if len(parts) < 4:
continue
try:
groups.append({
'groupname': parts[0],
'gid': int(parts[2]),
'members': [m.strip() for m in parts[3].split(',') if m.strip()]
})
except Exception as e:
logger.warning(f"Error parsing group line: {e}")
return sorted(groups, key=lambda x: x['gid'])
except Exception as e:
logger.error(f"Error listing groups: {e}")
return []
def get_user_groups(username: str) -> List[str]:
try:
groups = [
entry.gr_name
for entry in grp.getgrall()
if username in entry.gr_mem
]
try:
user_entry = pwd.getpwnam(username)
primary = grp.getgrgid(user_entry.pw_gid)
if primary.gr_name not in groups:
groups.append(primary.gr_name)
except KeyError:
pass
return sorted(groups)
except Exception as e:
logger.error(f"Error getting groups for {username}: {e}")
return []
def create_user(username: str, home_dir: Optional[str] = None,
shell: str = "/bin/bash", gecos: str = "") -> bool:
try:
cmd = ["/usr/sbin/useradd", "-d", home_dir or f"/home/{username}", "-s", shell]
if gecos:
cmd.extend(["-c", gecos])
cmd.extend(["-m", username])
result = subprocess.run(cmd, capture_output=True, timeout=10)
if result.returncode == 0:
logger.info(f"User created: {username}")
return True
logger.error(f"Failed to create user {username}: {result.stderr.decode()}")
return False
except Exception as e:
logger.error(f"Error creating user: {e}")
return False
def delete_user(username: str, remove_home: bool = True) -> bool:
try:
try:
subprocess.run(
["/usr/bin/pdbedit", "-x", "-u", username],
capture_output=True, timeout=10
)
except Exception:
pass
cmd = ["/usr/sbin/userdel"]
if remove_home:
cmd.append("-r")
cmd.append(username)
result = subprocess.run(cmd, capture_output=True, timeout=10)
if result.returncode == 0:
logger.info(f"User deleted: {username}")
return True
logger.error(f"Failed to delete user {username}: {result.stderr.decode()}")
return False
except Exception as e:
logger.error(f"Error deleting user: {e}")
return False
def create_group(groupname: str) -> bool:
try:
result = subprocess.run(
["/usr/sbin/groupadd", groupname],
capture_output=True, timeout=10
)
if result.returncode == 0:
logger.info(f"Group created: {groupname}")
return True
logger.error(f"Failed to create group {groupname}: {result.stderr.decode()}")
return False
except Exception as e:
logger.error(f"Error creating group: {e}")
return False
def delete_group(groupname: str) -> bool:
try:
result = subprocess.run(
["/usr/sbin/groupdel", groupname],
capture_output=True, timeout=10
)
if result.returncode == 0:
logger.info(f"Group deleted: {groupname}")
return True
logger.error(f"Failed to delete group {groupname}: {result.stderr.decode()}")
return False
except Exception as e:
logger.error(f"Error deleting group: {e}")
return False
def add_user_to_group(username: str, groupname: str) -> bool:
try:
result = subprocess.run(
["/usr/sbin/usermod", "-aG", groupname, username],
capture_output=True, text=True, timeout=10
)
if result.returncode == 0:
logger.info(f"User {username} added to group {groupname}")
return True
error_msg = result.stderr.strip() or result.stdout.strip() or "Unknown error"
logger.error(f"Failed to add user to group: {error_msg}")
raise Exception(f"Failed to add user to group: {error_msg}")
except Exception as e:
logger.error(f"Error adding user to group: {e}")
raise
def remove_user_from_group(username: str, groupname: str) -> bool:
try:
result = subprocess.run(
["/usr/sbin/gpasswd", "-d", username, groupname],
capture_output=True, text=True, timeout=10
)
if result.returncode == 0:
logger.info(f"User {username} removed from group {groupname}")
return True
error_msg = result.stderr.strip() or result.stdout.strip() or "Unknown error"
logger.error(f"Failed to remove user from group: {error_msg}")
raise Exception(f"Failed to remove user from group: {error_msg}")
except Exception as e:
logger.error(f"Error removing user from group: {e}")
raise
def change_password(username: str, password: str) -> bool:
try:
result = subprocess.run(
["/usr/sbin/chpasswd"],
input=f"{username}:{password}\n",
text=True, capture_output=True, timeout=10
)
if result.returncode == 0:
logger.info(f"Password changed for {username}")
return True
logger.error(f"Failed to change password: {result.stderr}")
return False
except Exception as e:
logger.error(f"Error changing password: {e}")
return False
def change_shell(username: str, shell: str) -> bool:
try:
result = subprocess.run(
["/usr/sbin/usermod", "-s", shell, username],
capture_output=True, timeout=10
)
if result.returncode == 0:
logger.info(f"Shell changed for {username} to {shell}")
return True
logger.error(f"Failed to change shell: {result.stderr.decode()}")
return False
except Exception as e:
logger.error(f"Error changing shell: {e}")
return False
def lock_user(username: str) -> bool:
try:
result = subprocess.run(
["/usr/sbin/usermod", "-L", username],
capture_output=True, timeout=10
)
if result.returncode == 0:
logger.info(f"User locked: {username}")
return True
logger.error(f"Failed to lock user: {result.stderr.decode()}")
return False
except Exception as e:
logger.error(f"Error locking user: {e}")
return False
def unlock_user(username: str) -> bool:
try:
result = subprocess.run(
["/usr/sbin/usermod", "-U", username],
capture_output=True, timeout=10
)
if result.returncode == 0:
logger.info(f"User unlocked: {username}")
return True
logger.error(f"Failed to unlock user: {result.stderr.decode()}")
return False
except Exception as e:
logger.error(f"Error unlocking user: {e}")
return False
def set_samba_password(username: str, password: str) -> bool:
try:
result = subprocess.run(
["/usr/bin/smbpasswd", "-a", "-s", username],
input=f"{password}\n{password}\n",
text=True, capture_output=True, timeout=10
)
if result.returncode == 0:
logger.info(f"Samba password set for {username}")
return True
logger.error(f"Failed to set Samba password: {result.stderr}")
return False
except FileNotFoundError:
logger.error("smbpasswd command not found - Samba not installed?")
return False
except Exception as e:
logger.error(f"Error setting Samba password: {e}")
return False
def get_login_history(limit: int = 50) -> List[Dict[str, Any]]:
import re
from datetime import datetime as dt
logins = []
days_of_week = {'Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'}
months = {'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6,
'Jul': 7, 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12}
current_year = dt.now().year
try:
result = subprocess.run(
["last", "-n", str(limit)],
capture_output=True, text=True, timeout=10
)
if result.returncode != 0:
return []
for line in result.stdout.strip().split('\n'):
if not line.strip() or 'wtmp' in line or 'begins' in line:
continue
try:
tokens = line.split()
day_idx = next((i for i, t in enumerate(tokens) if t in days_of_week), -1)
if day_idx < 1:
continue
username = tokens[0]
if username in ['wtmp', 'reboot', 'kernel']:
continue
full_username = _get_full_username(username)
if full_username:
username = full_username
device_host_tokens = tokens[1:day_idx]
tty, host = '-', '-'
if device_host_tokens:
first = device_host_tokens[0]
if '/' in first or first.startswith('pts') or first.startswith('tty'):
tty = first
host = ' '.join(device_host_tokens[1:]) if len(device_host_tokens) > 1 else '-'
else:
host = ' '.join(device_host_tokens)
if day_idx + 3 >= len(tokens):
continue
day = tokens[day_idx]
month = tokens[day_idx + 1]
date = tokens[day_idx + 2]
time_str = tokens[day_idx + 3]
if month not in months:
continue
duration_match = re.search(r'\(([^)]+)\)\s*$', line)
duration = duration_match.group(1) if duration_match else 'still logged in'
logins.append({
'username': username,
'tty': tty,
'host': host,
'date': f"{current_year}-{months[month]:02d}-{date.zfill(2)}",
'time': time_str,
'duration': duration,
'login_str': f"{day} {month} {date} {time_str} {current_year}"
})
except Exception as e:
logger.debug(f"Error parsing login line '{line}': {e}")
continue
return logins
except Exception as e:
logger.error(f"Error getting login history: {e}")
return []
def list_samba_users() -> List[Dict[str, Any]]:
users = []
try:
result = subprocess.run(
["/usr/bin/pdbedit", "-L"],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
for line in result.stdout.strip().split('\n'):
if not line.strip():
continue
parts = line.split(':')
if len(parts) >= 2:
try:
users.append({
'username': parts[0],
'uid': int(parts[1]),
'comment': parts[2] if len(parts) > 2 else "",
'type': 'samba'
})
except (ValueError, IndexError) as e:
logger.warning(f"Error parsing Samba user line: {e}")
return sorted(users, key=lambda x: x['username'])
except FileNotFoundError:
logger.warning("pdbedit not found - Samba may not be installed")
return []
except Exception as e:
logger.error(f"Error listing Samba users: {e}")
return []