f49793e6f2
- AuthService, SystemInfo, IdentitiesManager Klassen → Modul-Funktionen
- grp.getall() → grp.getgrall() (Bug: Methode existierte nie)
- open('/proc/loadavg') ohne context manager gefixt (File-Handle-Leak)
- rx_packets/tx_packets null-check im Frontend (toLocaleString auf undefined)
- PoolCard onClick: /pools/{name} → /zfs (Route existierte nicht, löste Seitenreload aus)
- Alle Router-Imports auf Modul-Aliase umgestellt
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
432 lines
14 KiB
Python
432 lines
14 KiB
Python
"""
|
|
User and Group Management Service
|
|
Handle system users, groups, and PAM operations
|
|
"""
|
|
|
|
import subprocess
|
|
import logging
|
|
import pwd
|
|
import grp
|
|
from typing import List, Dict, Any, Optional
|
|
|
|
try:
|
|
import spwd
|
|
except ImportError:
|
|
spwd = None
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _is_user_locked(username: str) -> bool:
|
|
if not spwd:
|
|
return False
|
|
try:
|
|
entry = spwd.getspnam(username)
|
|
return entry.sp_lstchg == 0 or entry.sp_max == 0
|
|
except (KeyError, PermissionError):
|
|
return False
|
|
except Exception as e:
|
|
logger.warning(f"Error checking lock status: {e}")
|
|
return False
|
|
|
|
|
|
def _get_full_username(truncated: str) -> Optional[str]:
|
|
"""Find full username from /etc/passwd when wtmp truncated it to 8 chars."""
|
|
try:
|
|
result = subprocess.run(
|
|
["/usr/bin/getent", "passwd"],
|
|
capture_output=True, text=True, timeout=5
|
|
)
|
|
if result.returncode == 0:
|
|
for line in result.stdout.strip().split('\n'):
|
|
if not line:
|
|
continue
|
|
parts = line.split(':')
|
|
if parts and parts[0].startswith(truncated) and len(parts[0]) > len(truncated):
|
|
return parts[0]
|
|
return None
|
|
except Exception as e:
|
|
logger.debug(f"Error finding full username for '{truncated}': {e}")
|
|
return None
|
|
|
|
|
|
def list_users() -> List[Dict[str, Any]]:
|
|
users = []
|
|
try:
|
|
result = subprocess.run(
|
|
["/usr/bin/getent", "passwd"],
|
|
capture_output=True, text=True, timeout=5
|
|
)
|
|
if result.returncode == 0:
|
|
for line in result.stdout.strip().split('\n'):
|
|
if not line:
|
|
continue
|
|
parts = line.split(':')
|
|
if len(parts) < 7:
|
|
continue
|
|
try:
|
|
users.append({
|
|
'username': parts[0],
|
|
'uid': int(parts[2]),
|
|
'gid': int(parts[3]),
|
|
'gecos': parts[4],
|
|
'home': parts[5],
|
|
'shell': parts[6],
|
|
'locked': _is_user_locked(parts[0])
|
|
})
|
|
except Exception as e:
|
|
logger.warning(f"Error parsing user line: {e}")
|
|
return sorted(users, key=lambda x: x['uid'])
|
|
except Exception as e:
|
|
logger.error(f"Error listing users: {e}")
|
|
return []
|
|
|
|
|
|
def list_groups() -> List[Dict[str, Any]]:
|
|
groups = []
|
|
try:
|
|
result = subprocess.run(
|
|
["/usr/bin/getent", "group"],
|
|
capture_output=True, text=True, timeout=5
|
|
)
|
|
if result.returncode == 0:
|
|
for line in result.stdout.strip().split('\n'):
|
|
if not line:
|
|
continue
|
|
parts = line.split(':')
|
|
if len(parts) < 4:
|
|
continue
|
|
try:
|
|
groups.append({
|
|
'groupname': parts[0],
|
|
'gid': int(parts[2]),
|
|
'members': [m.strip() for m in parts[3].split(',') if m.strip()]
|
|
})
|
|
except Exception as e:
|
|
logger.warning(f"Error parsing group line: {e}")
|
|
return sorted(groups, key=lambda x: x['gid'])
|
|
except Exception as e:
|
|
logger.error(f"Error listing groups: {e}")
|
|
return []
|
|
|
|
|
|
def get_user_groups(username: str) -> List[str]:
|
|
try:
|
|
groups = [
|
|
entry.gr_name
|
|
for entry in grp.getgrall()
|
|
if username in entry.gr_mem
|
|
]
|
|
try:
|
|
user_entry = pwd.getpwnam(username)
|
|
primary = grp.getgrgid(user_entry.pw_gid)
|
|
if primary.gr_name not in groups:
|
|
groups.append(primary.gr_name)
|
|
except KeyError:
|
|
pass
|
|
return sorted(groups)
|
|
except Exception as e:
|
|
logger.error(f"Error getting groups for {username}: {e}")
|
|
return []
|
|
|
|
|
|
def create_user(username: str, home_dir: Optional[str] = None,
|
|
shell: str = "/bin/bash", gecos: str = "") -> bool:
|
|
try:
|
|
cmd = ["/usr/sbin/useradd", "-d", home_dir or f"/home/{username}", "-s", shell]
|
|
if gecos:
|
|
cmd.extend(["-c", gecos])
|
|
cmd.extend(["-m", username])
|
|
result = subprocess.run(cmd, capture_output=True, timeout=10)
|
|
if result.returncode == 0:
|
|
logger.info(f"User created: {username}")
|
|
return True
|
|
logger.error(f"Failed to create user {username}: {result.stderr.decode()}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Error creating user: {e}")
|
|
return False
|
|
|
|
|
|
def delete_user(username: str, remove_home: bool = True) -> bool:
|
|
try:
|
|
try:
|
|
subprocess.run(
|
|
["/usr/bin/pdbedit", "-x", "-u", username],
|
|
capture_output=True, timeout=10
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
cmd = ["/usr/sbin/userdel"]
|
|
if remove_home:
|
|
cmd.append("-r")
|
|
cmd.append(username)
|
|
result = subprocess.run(cmd, capture_output=True, timeout=10)
|
|
if result.returncode == 0:
|
|
logger.info(f"User deleted: {username}")
|
|
return True
|
|
logger.error(f"Failed to delete user {username}: {result.stderr.decode()}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Error deleting user: {e}")
|
|
return False
|
|
|
|
|
|
def create_group(groupname: str) -> bool:
|
|
try:
|
|
result = subprocess.run(
|
|
["/usr/sbin/groupadd", groupname],
|
|
capture_output=True, timeout=10
|
|
)
|
|
if result.returncode == 0:
|
|
logger.info(f"Group created: {groupname}")
|
|
return True
|
|
logger.error(f"Failed to create group {groupname}: {result.stderr.decode()}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Error creating group: {e}")
|
|
return False
|
|
|
|
|
|
def delete_group(groupname: str) -> bool:
|
|
try:
|
|
result = subprocess.run(
|
|
["/usr/sbin/groupdel", groupname],
|
|
capture_output=True, timeout=10
|
|
)
|
|
if result.returncode == 0:
|
|
logger.info(f"Group deleted: {groupname}")
|
|
return True
|
|
logger.error(f"Failed to delete group {groupname}: {result.stderr.decode()}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Error deleting group: {e}")
|
|
return False
|
|
|
|
|
|
def add_user_to_group(username: str, groupname: str) -> bool:
|
|
try:
|
|
result = subprocess.run(
|
|
["/usr/sbin/usermod", "-aG", groupname, username],
|
|
capture_output=True, text=True, timeout=10
|
|
)
|
|
if result.returncode == 0:
|
|
logger.info(f"User {username} added to group {groupname}")
|
|
return True
|
|
error_msg = result.stderr.strip() or result.stdout.strip() or "Unknown error"
|
|
logger.error(f"Failed to add user to group: {error_msg}")
|
|
raise Exception(f"Failed to add user to group: {error_msg}")
|
|
except Exception as e:
|
|
logger.error(f"Error adding user to group: {e}")
|
|
raise
|
|
|
|
|
|
def remove_user_from_group(username: str, groupname: str) -> bool:
|
|
try:
|
|
result = subprocess.run(
|
|
["/usr/sbin/gpasswd", "-d", username, groupname],
|
|
capture_output=True, text=True, timeout=10
|
|
)
|
|
if result.returncode == 0:
|
|
logger.info(f"User {username} removed from group {groupname}")
|
|
return True
|
|
error_msg = result.stderr.strip() or result.stdout.strip() or "Unknown error"
|
|
logger.error(f"Failed to remove user from group: {error_msg}")
|
|
raise Exception(f"Failed to remove user from group: {error_msg}")
|
|
except Exception as e:
|
|
logger.error(f"Error removing user from group: {e}")
|
|
raise
|
|
|
|
|
|
def change_password(username: str, password: str) -> bool:
|
|
try:
|
|
result = subprocess.run(
|
|
["/usr/sbin/chpasswd"],
|
|
input=f"{username}:{password}\n",
|
|
text=True, capture_output=True, timeout=10
|
|
)
|
|
if result.returncode == 0:
|
|
logger.info(f"Password changed for {username}")
|
|
return True
|
|
logger.error(f"Failed to change password: {result.stderr}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Error changing password: {e}")
|
|
return False
|
|
|
|
|
|
def change_shell(username: str, shell: str) -> bool:
|
|
try:
|
|
result = subprocess.run(
|
|
["/usr/sbin/usermod", "-s", shell, username],
|
|
capture_output=True, timeout=10
|
|
)
|
|
if result.returncode == 0:
|
|
logger.info(f"Shell changed for {username} to {shell}")
|
|
return True
|
|
logger.error(f"Failed to change shell: {result.stderr.decode()}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Error changing shell: {e}")
|
|
return False
|
|
|
|
|
|
def lock_user(username: str) -> bool:
|
|
try:
|
|
result = subprocess.run(
|
|
["/usr/sbin/usermod", "-L", username],
|
|
capture_output=True, timeout=10
|
|
)
|
|
if result.returncode == 0:
|
|
logger.info(f"User locked: {username}")
|
|
return True
|
|
logger.error(f"Failed to lock user: {result.stderr.decode()}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Error locking user: {e}")
|
|
return False
|
|
|
|
|
|
def unlock_user(username: str) -> bool:
|
|
try:
|
|
result = subprocess.run(
|
|
["/usr/sbin/usermod", "-U", username],
|
|
capture_output=True, timeout=10
|
|
)
|
|
if result.returncode == 0:
|
|
logger.info(f"User unlocked: {username}")
|
|
return True
|
|
logger.error(f"Failed to unlock user: {result.stderr.decode()}")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Error unlocking user: {e}")
|
|
return False
|
|
|
|
|
|
def set_samba_password(username: str, password: str) -> bool:
|
|
try:
|
|
result = subprocess.run(
|
|
["/usr/bin/smbpasswd", "-a", "-s", username],
|
|
input=f"{password}\n{password}\n",
|
|
text=True, capture_output=True, timeout=10
|
|
)
|
|
if result.returncode == 0:
|
|
logger.info(f"Samba password set for {username}")
|
|
return True
|
|
logger.error(f"Failed to set Samba password: {result.stderr}")
|
|
return False
|
|
except FileNotFoundError:
|
|
logger.error("smbpasswd command not found - Samba not installed?")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Error setting Samba password: {e}")
|
|
return False
|
|
|
|
|
|
def get_login_history(limit: int = 50) -> List[Dict[str, Any]]:
|
|
import re
|
|
from datetime import datetime as dt
|
|
|
|
logins = []
|
|
days_of_week = {'Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'}
|
|
months = {'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6,
|
|
'Jul': 7, 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12}
|
|
current_year = dt.now().year
|
|
|
|
try:
|
|
result = subprocess.run(
|
|
["last", "-n", str(limit)],
|
|
capture_output=True, text=True, timeout=10
|
|
)
|
|
if result.returncode != 0:
|
|
return []
|
|
|
|
for line in result.stdout.strip().split('\n'):
|
|
if not line.strip() or 'wtmp' in line or 'begins' in line:
|
|
continue
|
|
try:
|
|
tokens = line.split()
|
|
day_idx = next((i for i, t in enumerate(tokens) if t in days_of_week), -1)
|
|
if day_idx < 1:
|
|
continue
|
|
|
|
username = tokens[0]
|
|
if username in ['wtmp', 'reboot', 'kernel']:
|
|
continue
|
|
|
|
full_username = _get_full_username(username)
|
|
if full_username:
|
|
username = full_username
|
|
|
|
device_host_tokens = tokens[1:day_idx]
|
|
tty, host = '-', '-'
|
|
if device_host_tokens:
|
|
first = device_host_tokens[0]
|
|
if '/' in first or first.startswith('pts') or first.startswith('tty'):
|
|
tty = first
|
|
host = ' '.join(device_host_tokens[1:]) if len(device_host_tokens) > 1 else '-'
|
|
else:
|
|
host = ' '.join(device_host_tokens)
|
|
|
|
if day_idx + 3 >= len(tokens):
|
|
continue
|
|
day = tokens[day_idx]
|
|
month = tokens[day_idx + 1]
|
|
date = tokens[day_idx + 2]
|
|
time_str = tokens[day_idx + 3]
|
|
if month not in months:
|
|
continue
|
|
|
|
duration_match = re.search(r'\(([^)]+)\)\s*$', line)
|
|
duration = duration_match.group(1) if duration_match else 'still logged in'
|
|
|
|
logins.append({
|
|
'username': username,
|
|
'tty': tty,
|
|
'host': host,
|
|
'date': f"{current_year}-{months[month]:02d}-{date.zfill(2)}",
|
|
'time': time_str,
|
|
'duration': duration,
|
|
'login_str': f"{day} {month} {date} {time_str} {current_year}"
|
|
})
|
|
except Exception as e:
|
|
logger.debug(f"Error parsing login line '{line}': {e}")
|
|
continue
|
|
|
|
return logins
|
|
except Exception as e:
|
|
logger.error(f"Error getting login history: {e}")
|
|
return []
|
|
|
|
|
|
def list_samba_users() -> List[Dict[str, Any]]:
|
|
users = []
|
|
try:
|
|
result = subprocess.run(
|
|
["/usr/bin/pdbedit", "-L"],
|
|
capture_output=True, text=True, timeout=5
|
|
)
|
|
if result.returncode == 0:
|
|
for line in result.stdout.strip().split('\n'):
|
|
if not line.strip():
|
|
continue
|
|
parts = line.split(':')
|
|
if len(parts) >= 2:
|
|
try:
|
|
users.append({
|
|
'username': parts[0],
|
|
'uid': int(parts[1]),
|
|
'comment': parts[2] if len(parts) > 2 else "",
|
|
'type': 'samba'
|
|
})
|
|
except (ValueError, IndexError) as e:
|
|
logger.warning(f"Error parsing Samba user line: {e}")
|
|
return sorted(users, key=lambda x: x['username'])
|
|
except FileNotFoundError:
|
|
logger.warning("pdbedit not found - Samba may not be installed")
|
|
return []
|
|
except Exception as e:
|
|
logger.error(f"Error listing Samba users: {e}")
|
|
return []
|