f49793e6f2
- AuthService, SystemInfo, IdentitiesManager Klassen → Modul-Funktionen
- grp.getall() → grp.getgrall() (Bug: Methode existierte nie)
- open('/proc/loadavg') ohne context manager gefixt (File-Handle-Leak)
- rx_packets/tx_packets null-check im Frontend (toLocaleString auf undefined)
- PoolCard onClick: /pools/{name} → /zfs (Route existierte nicht, löste Seitenreload aus)
- Alle Router-Imports auf Modul-Aliase umgestellt
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
63 lines
1.9 KiB
Python
63 lines
1.9 KiB
Python
"""
|
|
JWT Authentication Service
|
|
Handles user login via PAM (Linux system users), token generation, and verification
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional
|
|
|
|
from jose import JWTError, jwt
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
SECRET_KEY = os.environ.get("ZFS_SECRET_KEY", "your-secret-key-change-in-production")
|
|
ALGORITHM = "HS256"
|
|
ACCESS_TOKEN_EXPIRE_HOURS = 8
|
|
|
|
try:
|
|
import pam
|
|
PAM_AVAILABLE = True
|
|
logger.info("Using PAM authentication (Linux system users)")
|
|
except ImportError:
|
|
PAM_AVAILABLE = False
|
|
logger.warning("python-pam not installed, PAM authentication unavailable")
|
|
|
|
|
|
def authenticate_user(username: str, password: str) -> Optional[dict]:
|
|
if not PAM_AVAILABLE:
|
|
logger.error("PAM not available")
|
|
return None
|
|
try:
|
|
p = pam.pam()
|
|
if p.authenticate(username, password):
|
|
logger.info(f"User {username} authenticated via PAM")
|
|
return {"username": username, "source": "pam"}
|
|
logger.warning(f"PAM authentication failed for user {username}: {p.reason}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"PAM authentication error: {e}")
|
|
return None
|
|
|
|
|
|
def create_access_token(username: str, expires_delta: Optional[timedelta] = None) -> str:
|
|
if expires_delta is None:
|
|
expires_delta = timedelta(hours=ACCESS_TOKEN_EXPIRE_HOURS)
|
|
expire = datetime.utcnow() + expires_delta
|
|
to_encode = {"sub": username, "exp": expire}
|
|
try:
|
|
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
|
|
except Exception as e:
|
|
logger.error(f"Failed to create token: {e}")
|
|
raise
|
|
|
|
|
|
def verify_token(token: str) -> Optional[str]:
|
|
try:
|
|
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
|
username: str = payload.get("sub")
|
|
return username if username else None
|
|
except JWTError:
|
|
return None
|