Files
zmb-webui/backend/routers/identities.py
patrick a187b625bc Fix: Identities Group Management - bessere Fehlermeldungen
- add_user_to_group: Exception werfen mit stderr Nachricht
- remove_user_from_group: Exception werfen mit stderr Nachricht
- text=True für subprocess für besseres Error Handling
- Router aktualisiert um Fehlermeldungen an Frontend weiterzugeben
- Benutzer sehen jetzt detaillierte Fehlermeldungen beim Gruppe-Entfernen

Behebt: 'Failed to remove user from group' verschluckt die echte Fehlermeldung

Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
2026-06-04 14:58:50 +02:00

277 lines
8.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
User and Group Management endpoints cockpit-identities
"""
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
from typing import Optional
from services.identities import identities_manager
from services.auth import auth_service
router = APIRouter(prefix="/api/identities", tags=["identities"])
security = HTTPBearer()
def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""Verify JWT token and return username"""
username = auth_service.verify_token(credentials.credentials)
if not username:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token"
)
return username
class CreateUserRequest(BaseModel):
username: str
home_dir: Optional[str] = None
shell: str = "/bin/bash"
gecos: Optional[str] = None
class CreateGroupRequest(BaseModel):
groupname: str
class ChangePasswordRequest(BaseModel):
password: str
class ChangeShellRequest(BaseModel):
shell: str
class AddUserToGroupRequest(BaseModel):
groupname: str
# ============== USERS ==============
@router.get("/users")
async def list_users(current_user: str = Depends(get_current_user)):
"""List all system users"""
try:
users = identities_manager.list_users()
# Add group memberships for each user
for user in users:
user['groups'] = identities_manager.get_user_groups(user['username'])
return {"users": users}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/users")
async def create_user(
request: CreateUserRequest,
current_user: str = Depends(get_current_user)
):
"""Create new system user"""
try:
success = identities_manager.create_user(
request.username,
request.home_dir,
request.shell,
request.gecos or ""
)
if not success:
raise HTTPException(status_code=400, detail="Failed to create user")
return {"status": "created", "username": request.username}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/users/{username}")
async def delete_user(
username: str,
remove_home: bool = True,
current_user: str = Depends(get_current_user)
):
"""Delete system user"""
try:
success = identities_manager.delete_user(username, remove_home)
if not success:
raise HTTPException(status_code=400, detail="Failed to delete user")
return {"status": "deleted", "username": username}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/users/{username}/password")
async def change_password(
username: str,
request: ChangePasswordRequest,
current_user: str = Depends(get_current_user)
):
"""Change user password"""
try:
success = identities_manager.change_password(username, request.password)
if not success:
raise HTTPException(status_code=400, detail="Failed to change password")
return {"status": "updated", "username": username}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/users/{username}/shell")
async def change_shell(
username: str,
request: ChangeShellRequest,
current_user: str = Depends(get_current_user)
):
"""Change user shell"""
try:
success = identities_manager.change_shell(username, request.shell)
if not success:
raise HTTPException(status_code=400, detail="Failed to change shell")
return {"status": "updated", "username": username}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/users/{username}/lock")
async def lock_user(
username: str,
current_user: str = Depends(get_current_user)
):
"""Lock user account"""
try:
success = identities_manager.lock_user(username)
if not success:
raise HTTPException(status_code=400, detail="Failed to lock user")
return {"status": "locked", "username": username}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/users/{username}/unlock")
async def unlock_user(
username: str,
current_user: str = Depends(get_current_user)
):
"""Unlock user account"""
try:
success = identities_manager.unlock_user(username)
if not success:
raise HTTPException(status_code=400, detail="Failed to unlock user")
return {"status": "unlocked", "username": username}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/users/{username}/samba-password")
async def set_samba_password(
username: str,
request: ChangePasswordRequest,
current_user: str = Depends(get_current_user)
):
"""Set Samba password for user"""
try:
success = identities_manager.set_samba_password(username, request.password)
if not success:
raise HTTPException(status_code=400, detail="Failed to set Samba password")
return {"status": "updated", "username": username, "type": "samba"}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ============== GROUPS ==============
@router.get("/groups")
async def list_groups(current_user: str = Depends(get_current_user)):
"""List all system groups"""
try:
groups = identities_manager.list_groups()
return {"groups": groups}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/groups")
async def create_group(
request: CreateGroupRequest,
current_user: str = Depends(get_current_user)
):
"""Create new system group"""
try:
success = identities_manager.create_group(request.groupname)
if not success:
raise HTTPException(status_code=400, detail="Failed to create group")
return {"status": "created", "groupname": request.groupname}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/groups/{groupname}")
async def delete_group(
groupname: str,
current_user: str = Depends(get_current_user)
):
"""Delete system group"""
try:
success = identities_manager.delete_group(groupname)
if not success:
raise HTTPException(status_code=400, detail="Failed to delete group")
return {"status": "deleted", "groupname": groupname}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ============== USER-GROUP MEMBERSHIP ==============
@router.post("/users/{username}/groups")
async def add_user_to_group(
username: str,
request: AddUserToGroupRequest,
current_user: str = Depends(get_current_user)
):
"""Add user to group"""
try:
identities_manager.add_user_to_group(username, request.groupname)
return {"status": "added", "username": username, "groupname": request.groupname}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@router.delete("/users/{username}/groups/{groupname}")
async def remove_user_from_group(
username: str,
groupname: str,
current_user: str = Depends(get_current_user)
):
"""Remove user from group"""
try:
identities_manager.remove_user_from_group(username, groupname)
return {"status": "removed", "username": username, "groupname": groupname}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
# ============== SAMBA USERS ==============
@router.get("/samba-users")
async def list_samba_users(current_user: str = Depends(get_current_user)):
"""List all Samba users"""
try:
users = identities_manager.list_samba_users()
return {"users": users}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# ============== LOGIN HISTORY ==============
@router.get("/login-history")
async def get_login_history(
limit: int = 50,
current_user: str = Depends(get_current_user)
):
"""Get recent login history"""
try:
logins = identities_manager.get_login_history(limit)
return {"logins": logins, "total": len(logins)}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))