Files
zmb-webui/backend/routers/identities.py
T
Claude Code 6d74d874b6 ZMB Webui: Complete Project – Rebrand & Initial Clean Commit
ARCHITECTURE
============
Backend: FastAPI + uvicorn (port 8000)
  - JWT authentication with PAM system users
  - ZFS CLI wrapper with caching (30-60s TTL)
  - WebSocket pool status broadcaster (30s interval)
  - Services: auth, zfs_runner, file_manager, shares, identities, system_info
  - Routers: pools, datasets, snapshots, shares, identities, navigator, system

Frontend: Next.js 15 + TypeScript (static export)
  - Incremental Static Regeneration (ISR) for weak hardware
  - Type-safe API client (lib/api.ts)
  - Dark mode + custom Tailwind theme
  - Pages: Dashboard, Login, Snapshots, Datasets, Shares, etc.

DEPLOYMENT
==========
Test Target: 192.168.1.179:8090 (Debian LXC)
Production: 10.66.120.3:9090 (Raspberry Pi 4GB ARM64)
Updater: Automated Gitea-based deployment (update-test.sh, update-pi.sh)

FEATURES COMPLETED
==================
Phase 3a: Dashboard Quick Stats (System, CPU, Memory, Storage)
  - Real-time stats with color-coded progress bars
  - Responsive grid layout (mobile: 1, tablet: 2, desktop: 4 columns)
  - ISR-optimized for fast loads on weak hardware

REBRANDING
==========
Renamed throughout:
  - Project: 'ZFS Manager' → 'ZMB Webui'
  - Services: 'zfs-manager' → 'zmb-webui'
  - Systemd units: zfs-manager-backend → zmb-webui-backend
  - Configuration files and documentation

Co-Authored-By: Patrick <patrick@perlbach24.de>
2026-04-22 00:43:05 +02:00

281 lines
8.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
User and Group Management endpoints cockpit-identities
"""
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
from typing import Optional
from services.identities import identities_manager
from services.auth import auth_service
router = APIRouter(prefix="/api/identities", tags=["identities"])
security = HTTPBearer()
def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""Verify JWT token and return username"""
username = auth_service.verify_token(credentials.credentials)
if not username:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token"
)
return username
class CreateUserRequest(BaseModel):
username: str
home_dir: Optional[str] = None
shell: str = "/bin/bash"
gecos: Optional[str] = None
class CreateGroupRequest(BaseModel):
groupname: str
class ChangePasswordRequest(BaseModel):
password: str
class ChangeShellRequest(BaseModel):
shell: str
class AddUserToGroupRequest(BaseModel):
groupname: str
# ============== USERS ==============
@router.get("/users")
async def list_users(current_user: str = Depends(get_current_user)):
"""List all system users"""
try:
users = identities_manager.list_users()
# Add group memberships for each user
for user in users:
user['groups'] = identities_manager.get_user_groups(user['username'])
return {"users": users}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/users")
async def create_user(
request: CreateUserRequest,
current_user: str = Depends(get_current_user)
):
"""Create new system user"""
try:
success = identities_manager.create_user(
request.username,
request.home_dir,
request.shell,
request.gecos or ""
)
if not success:
raise HTTPException(status_code=400, detail="Failed to create user")
return {"status": "created", "username": request.username}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/users/{username}")
async def delete_user(
username: str,
remove_home: bool = True,
current_user: str = Depends(get_current_user)
):
"""Delete system user"""
try:
success = identities_manager.delete_user(username, remove_home)
if not success:
raise HTTPException(status_code=400, detail="Failed to delete user")
return {"status": "deleted", "username": username}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/users/{username}/password")
async def change_password(
username: str,
request: ChangePasswordRequest,
current_user: str = Depends(get_current_user)
):
"""Change user password"""
try:
success = identities_manager.change_password(username, request.password)
if not success:
raise HTTPException(status_code=400, detail="Failed to change password")
return {"status": "updated", "username": username}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/users/{username}/shell")
async def change_shell(
username: str,
request: ChangeShellRequest,
current_user: str = Depends(get_current_user)
):
"""Change user shell"""
try:
success = identities_manager.change_shell(username, request.shell)
if not success:
raise HTTPException(status_code=400, detail="Failed to change shell")
return {"status": "updated", "username": username}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/users/{username}/lock")
async def lock_user(
username: str,
current_user: str = Depends(get_current_user)
):
"""Lock user account"""
try:
success = identities_manager.lock_user(username)
if not success:
raise HTTPException(status_code=400, detail="Failed to lock user")
return {"status": "locked", "username": username}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/users/{username}/unlock")
async def unlock_user(
username: str,
current_user: str = Depends(get_current_user)
):
"""Unlock user account"""
try:
success = identities_manager.unlock_user(username)
if not success:
raise HTTPException(status_code=400, detail="Failed to unlock user")
return {"status": "unlocked", "username": username}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/users/{username}/samba-password")
async def set_samba_password(
username: str,
request: ChangePasswordRequest,
current_user: str = Depends(get_current_user)
):
"""Set Samba password for user"""
try:
success = identities_manager.set_samba_password(username, request.password)
if not success:
raise HTTPException(status_code=400, detail="Failed to set Samba password")
return {"status": "updated", "username": username, "type": "samba"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ============== GROUPS ==============
@router.get("/groups")
async def list_groups(current_user: str = Depends(get_current_user)):
"""List all system groups"""
try:
groups = identities_manager.list_groups()
return {"groups": groups}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/groups")
async def create_group(
request: CreateGroupRequest,
current_user: str = Depends(get_current_user)
):
"""Create new system group"""
try:
success = identities_manager.create_group(request.groupname)
if not success:
raise HTTPException(status_code=400, detail="Failed to create group")
return {"status": "created", "groupname": request.groupname}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/groups/{groupname}")
async def delete_group(
groupname: str,
current_user: str = Depends(get_current_user)
):
"""Delete system group"""
try:
success = identities_manager.delete_group(groupname)
if not success:
raise HTTPException(status_code=400, detail="Failed to delete group")
return {"status": "deleted", "groupname": groupname}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ============== USER-GROUP MEMBERSHIP ==============
@router.post("/users/{username}/groups")
async def add_user_to_group(
username: str,
request: AddUserToGroupRequest,
current_user: str = Depends(get_current_user)
):
"""Add user to group"""
try:
success = identities_manager.add_user_to_group(username, request.groupname)
if not success:
raise HTTPException(status_code=400, detail="Failed to add user to group")
return {"status": "added", "username": username, "groupname": request.groupname}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/users/{username}/groups/{groupname}")
async def remove_user_from_group(
username: str,
groupname: str,
current_user: str = Depends(get_current_user)
):
"""Remove user from group"""
try:
success = identities_manager.remove_user_from_group(username, groupname)
if not success:
raise HTTPException(status_code=400, detail="Failed to remove user from group")
return {"status": "removed", "username": username, "groupname": groupname}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ============== SAMBA USERS ==============
@router.get("/samba-users")
async def list_samba_users(current_user: str = Depends(get_current_user)):
"""List all Samba users"""
try:
users = identities_manager.list_samba_users()
return {"users": users}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ============== LOGIN HISTORY ==============
@router.get("/login-history")
async def get_login_history(
limit: int = 50,
current_user: str = Depends(get_current_user)
):
"""Get recent login history"""
try:
logins = identities_manager.get_login_history(limit)
return {"logins": logins, "total": len(logins)}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))