Files
zmb-webui/backend/services/identities.py
T
Claude Code 92bed208e0 ZMB Webui: Complete Project – Rebrand & Initial Clean Commit
ARCHITECTURE
============
Backend: FastAPI + uvicorn (port 8000)
  - JWT authentication with PAM system users
  - ZFS CLI wrapper with caching (30-60s TTL)
  - WebSocket pool status broadcaster (30s interval)
  - Services: auth, zfs_runner, file_manager, shares, identities, system_info
  - Routers: pools, datasets, snapshots, shares, identities, navigator, system

Frontend: Next.js 15 + TypeScript (static export)
  - Incremental Static Regeneration (ISR) for weak hardware
  - Type-safe API client (lib/api.ts)
  - Dark mode + custom Tailwind theme
  - Pages: Dashboard, Login, Snapshots, Datasets, Shares, etc.

DEPLOYMENT
==========
Test Target: 192.168.1.179:8090 (Debian LXC)
Production: 10.66.120.3:9090 (Raspberry Pi 4GB ARM64)
Updater: Automated Gitea-based deployment (update-test.sh, update-pi.sh)

FEATURES COMPLETED
==================
Phase 3a: Dashboard Quick Stats (System, CPU, Memory, Storage)
  - Real-time stats with color-coded progress bars
  - Responsive grid layout (mobile: 1, tablet: 2, desktop: 4 columns)
  - ISR-optimized for fast loads on weak hardware

REBRANDING
==========
Renamed throughout:
  - Project: 'ZFS Manager' → 'ZMB Webui'
  - Services: 'zfs-manager' → 'zmb-webui'
  - Systemd units: zfs-manager-backend → zmb-webui-backend
  - Configuration files and documentation

Co-Authored-By: Patrick <patrick@perlbach24.de>
2026-04-22 00:43:05 +02:00

573 lines
20 KiB
Python

"""
User and Group Management Service
Handle system users, groups, and PAM operations
"""
import subprocess
import logging
import pwd
import grp
from typing import List, Dict, Any, Optional
from pathlib import Path
try:
import spwd
except ImportError:
spwd = None
logger = logging.getLogger(__name__)
class IdentitiesManager:
"""Manage system users and groups"""
def list_users(self) -> List[Dict[str, Any]]:
"""List all system users"""
users = []
try:
# Use getpwall() which returns an iterator
import getpass
# Fallback: read /etc/passwd directly
result = subprocess.run(
["/usr/bin/getent", "passwd"],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
for line in result.stdout.strip().split('\n'):
if not line:
continue
parts = line.split(':')
if len(parts) < 7:
continue
try:
username = parts[0]
uid = int(parts[2])
gid = int(parts[3])
gecos = parts[4]
home = parts[5]
shell = parts[6]
users.append({
'username': username,
'uid': uid,
'gid': gid,
'gecos': gecos,
'home': home,
'shell': shell,
'locked': self._is_user_locked(username)
})
except Exception as e:
logger.warning(f"Error parsing user line: {e}")
return sorted(users, key=lambda x: x['uid'])
except Exception as e:
logger.error(f"Error listing users: {e}")
return []
def list_groups(self) -> List[Dict[str, Any]]:
"""List all system groups"""
groups = []
try:
# Use getent to read groups
result = subprocess.run(
["/usr/bin/getent", "group"],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
for line in result.stdout.strip().split('\n'):
if not line:
continue
parts = line.split(':')
if len(parts) < 4:
continue
try:
groupname = parts[0]
gid = int(parts[2])
members = [m.strip() for m in parts[3].split(',') if m.strip()]
groups.append({
'groupname': groupname,
'gid': gid,
'members': members
})
except Exception as e:
logger.warning(f"Error parsing group line: {e}")
return sorted(groups, key=lambda x: x['gid'])
except Exception as e:
logger.error(f"Error listing groups: {e}")
return []
def get_user_groups(self, username: str) -> List[str]:
"""Get all groups a user belongs to"""
try:
groups = []
for entry in grp.getall():
if username in entry.gr_mem:
groups.append(entry.gr_name)
# Also add primary group
try:
user_entry = pwd.getpwnam(username)
primary_group = grp.getgrgid(user_entry.pw_gid)
if primary_group.gr_name not in groups:
groups.append(primary_group.gr_name)
except KeyError:
pass
return sorted(groups)
except Exception as e:
logger.error(f"Error getting groups for {username}: {e}")
return []
def create_user(self, username: str, home_dir: Optional[str] = None,
shell: str = "/bin/bash", gecos: str = "") -> bool:
"""Create new system user"""
try:
# Build useradd command
cmd = ["/usr/sbin/useradd"]
if home_dir:
cmd.extend(["-d", home_dir])
else:
cmd.extend(["-d", f"/home/{username}"])
cmd.extend(["-s", shell])
if gecos:
cmd.extend(["-c", gecos])
cmd.extend(["-m", username]) # -m to create home directory
result = subprocess.run(cmd, capture_output=True, timeout=10)
if result.returncode == 0:
logger.info(f"User created: {username}")
return True
else:
logger.error(f"Failed to create user {username}: {result.stderr.decode()}")
return False
except Exception as e:
logger.error(f"Error creating user: {e}")
return False
def delete_user(self, username: str, remove_home: bool = True) -> bool:
"""Delete system user and Samba user"""
try:
# First delete Samba user if exists (using pdbedit for better handling)
try:
subprocess.run(
["/usr/bin/pdbedit", "-x", "-u", username],
capture_output=True,
timeout=10
)
except Exception:
pass # Samba not installed or user doesn't exist
# Delete system user
cmd = ["/usr/sbin/userdel"]
if remove_home:
cmd.append("-r")
cmd.append(username)
result = subprocess.run(cmd, capture_output=True, timeout=10)
if result.returncode == 0:
logger.info(f"User deleted: {username}")
return True
else:
logger.error(f"Failed to delete user {username}: {result.stderr.decode()}")
return False
except Exception as e:
logger.error(f"Error deleting user: {e}")
return False
def create_group(self, groupname: str) -> bool:
"""Create new system group"""
try:
result = subprocess.run(
["/usr/sbin/groupadd", groupname],
capture_output=True,
timeout=10
)
if result.returncode == 0:
logger.info(f"Group created: {groupname}")
return True
else:
logger.error(f"Failed to create group {groupname}: {result.stderr.decode()}")
return False
except Exception as e:
logger.error(f"Error creating group: {e}")
return False
def delete_group(self, groupname: str) -> bool:
"""Delete system group"""
try:
result = subprocess.run(
["/usr/sbin/groupdel", groupname],
capture_output=True,
timeout=10
)
if result.returncode == 0:
logger.info(f"Group deleted: {groupname}")
return True
else:
logger.error(f"Failed to delete group {groupname}: {result.stderr.decode()}")
return False
except Exception as e:
logger.error(f"Error deleting group: {e}")
return False
def add_user_to_group(self, username: str, groupname: str) -> bool:
"""Add user to group"""
try:
result = subprocess.run(
["/usr/sbin/usermod", "-aG", groupname, username],
capture_output=True,
timeout=10
)
if result.returncode == 0:
logger.info(f"User {username} added to group {groupname}")
return True
else:
logger.error(f"Failed to add user to group: {result.stderr.decode()}")
return False
except Exception as e:
logger.error(f"Error adding user to group: {e}")
return False
def remove_user_from_group(self, username: str, groupname: str) -> bool:
"""Remove user from group"""
try:
result = subprocess.run(
["/usr/sbin/gpasswd", "-d", username, groupname],
capture_output=True,
timeout=10
)
if result.returncode == 0:
logger.info(f"User {username} removed from group {groupname}")
return True
else:
logger.error(f"Failed to remove user from group: {result.stderr.decode()}")
return False
except Exception as e:
logger.error(f"Error removing user from group: {e}")
return False
def change_password(self, username: str, password: str) -> bool:
"""Change user password via chpasswd"""
try:
# Use chpasswd for password changes
result = subprocess.run(
["/usr/sbin/chpasswd"],
input=f"{username}:{password}\n",
text=True,
capture_output=True,
timeout=10
)
if result.returncode == 0:
logger.info(f"Password changed for {username}")
return True
else:
logger.error(f"Failed to change password: {result.stderr}")
return False
except Exception as e:
logger.error(f"Error changing password: {e}")
return False
def change_shell(self, username: str, shell: str) -> bool:
"""Change user shell"""
try:
result = subprocess.run(
["/usr/sbin/usermod", "-s", shell, username],
capture_output=True,
timeout=10
)
if result.returncode == 0:
logger.info(f"Shell changed for {username} to {shell}")
return True
else:
logger.error(f"Failed to change shell: {result.stderr.decode()}")
return False
except Exception as e:
logger.error(f"Error changing shell: {e}")
return False
def _is_user_locked(self, username: str) -> bool:
"""Check if user account is locked"""
if not spwd:
return False
try:
entry = spwd.getspnam(username)
return entry.sp_lstchg == 0 or entry.sp_max == 0
except (KeyError, PermissionError):
return False
except Exception as e:
logger.warning(f"Error checking lock status: {e}")
return False
def lock_user(self, username: str) -> bool:
"""Lock user account"""
try:
result = subprocess.run(
["/usr/sbin/usermod", "-L", username],
capture_output=True,
timeout=10
)
if result.returncode == 0:
logger.info(f"User locked: {username}")
return True
else:
logger.error(f"Failed to lock user: {result.stderr.decode()}")
return False
except Exception as e:
logger.error(f"Error locking user: {e}")
return False
def unlock_user(self, username: str) -> bool:
"""Unlock user account"""
try:
result = subprocess.run(
["/usr/sbin/usermod", "-U", username],
capture_output=True,
timeout=10
)
if result.returncode == 0:
logger.info(f"User unlocked: {username}")
return True
else:
logger.error(f"Failed to unlock user: {result.stderr.decode()}")
return False
except Exception as e:
logger.error(f"Error unlocking user: {e}")
return False
def set_samba_password(self, username: str, password: str) -> bool:
"""Set Samba password for user"""
try:
# Use smbpasswd to set Samba password
# -a flag: add/update user
# -s flag: read password from stdin
result = subprocess.run(
["/usr/bin/smbpasswd", "-a", "-s", username],
input=f"{password}\n{password}\n",
text=True,
capture_output=True,
timeout=10
)
if result.returncode == 0:
logger.info(f"Samba password set for {username}")
return True
else:
logger.error(f"Failed to set Samba password: {result.stderr}")
return False
except FileNotFoundError:
logger.error("smbpasswd command not found - Samba not installed?")
return False
except Exception as e:
logger.error(f"Error setting Samba password: {e}")
return False
def get_login_history(self, limit: int = 50) -> List[Dict[str, Any]]:
"""Get recent login history using last command"""
import re
from datetime import datetime
logins = []
days_of_week = {'Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'}
months = {'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6,
'Jul': 7, 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12}
current_year = datetime.now().year
try:
result = subprocess.run(
["last", "-n", str(limit)],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
for line in result.stdout.strip().split('\n'):
if not line.strip():
continue
# Skip header, footer, and system entries
if 'wtmp' in line or 'begins' in line:
continue
try:
# Find the day-of-week anchor (reliable marker for date section)
tokens = line.split()
day_idx = -1
for i, token in enumerate(tokens):
if token in days_of_week:
day_idx = i
break
if day_idx < 1: # Need at least username before day
continue
# Extract components from token positions
username = tokens[0]
# Skip system entries
if username in ['wtmp', 'reboot', 'kernel']:
continue
# Try to find full username from /etc/passwd (wtmp truncates to 8 chars)
full_username = self._get_full_username(username)
if full_username:
username = full_username
# Everything between username and day-of-week is TTY/host
device_host_tokens = tokens[1:day_idx]
# TTY is typically first token if it contains '/' or starts with 'pts'/'tty'
tty = '-'
host = '-'
if device_host_tokens:
first = device_host_tokens[0]
if '/' in first or first.startswith('pts') or first.startswith('tty'):
tty = first
# Remaining tokens are host
if len(device_host_tokens) > 1:
host = ' '.join(device_host_tokens[1:])
else:
# No TTY, all tokens are host
host = ' '.join(device_host_tokens)
# Extract date components (should be: day month date time)
# Note: year is NOT in standard last output, we need to infer it
if day_idx + 3 < len(tokens):
day = tokens[day_idx]
month = tokens[day_idx + 1]
date = tokens[day_idx + 2]
time_str = tokens[day_idx + 3]
# Validate month
if month not in months:
logger.debug(f"Invalid month '{month}' in line: {line}")
continue
# Use current year (last entries are usually recent)
year = current_year
else:
logger.debug(f"Not enough date tokens in line: {line}")
continue
# Extract duration from end of line (in parentheses)
duration_match = re.search(r'\(([^)]+)\)\s*$', line)
duration = duration_match.group(1) if duration_match else 'still logged in'
logins.append({
'username': username,
'tty': tty,
'host': host,
'date': f"{year}-{months[month]:02d}-{date.zfill(2)}",
'time': time_str,
'duration': duration,
'login_str': f"{day} {month} {date} {time_str} {year}"
})
except Exception as e:
logger.debug(f"Error parsing login line '{line}': {e}")
continue
return logins
except Exception as e:
logger.error(f"Error getting login history: {e}")
return []
def _get_full_username(self, truncated: str) -> Optional[str]:
"""Find full username from /etc/passwd when wtmp has truncated it (8 char limit)
wtmp truncates usernames to 8 characters, so we need to look up the full name
in /etc/passwd by matching usernames that start with the truncated name.
"""
try:
result = subprocess.run(
["/usr/bin/getent", "passwd"],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
for line in result.stdout.strip().split('\n'):
if not line:
continue
parts = line.split(':')
if parts and parts[0].startswith(truncated):
# Found a username that starts with the truncated name
if len(parts[0]) > len(truncated):
# It's longer than the truncated version
return parts[0]
return None
except Exception as e:
logger.debug(f"Error finding full username for '{truncated}': {e}")
return None
def list_samba_users(self) -> List[Dict[str, Any]]:
"""List all Samba users using pdbedit"""
users = []
try:
result = subprocess.run(
["/usr/bin/pdbedit", "-L"],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
for line in result.stdout.strip().split('\n'):
if not line.strip():
continue
# pdbedit output format: username:uid:comment
parts = line.split(':')
if len(parts) >= 2:
try:
username = parts[0]
uid = int(parts[1])
comment = parts[2] if len(parts) > 2 else ""
users.append({
'username': username,
'uid': uid,
'comment': comment,
'type': 'samba'
})
except (ValueError, IndexError) as e:
logger.warning(f"Error parsing Samba user line: {e}")
return sorted(users, key=lambda x: x['username'])
except FileNotFoundError:
logger.warning("pdbedit not found - Samba may not be installed")
return []
except Exception as e:
logger.error(f"Error listing Samba users: {e}")
return []
identities_manager = IdentitiesManager()