diff --git a/backend/routers/auth.py b/backend/routers/auth.py index cd6625c..b2a5fcc 100644 --- a/backend/routers/auth.py +++ b/backend/routers/auth.py @@ -6,7 +6,7 @@ from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from pydantic import BaseModel -from services.auth import auth_service +from services import auth as auth_service from models import Token router = APIRouter(prefix="/api/auth", tags=["auth"]) diff --git a/backend/routers/datasets.py b/backend/routers/datasets.py index 6a0373c..828041b 100644 --- a/backend/routers/datasets.py +++ b/backend/routers/datasets.py @@ -8,7 +8,7 @@ from typing import List, Optional from pydantic import BaseModel from services.zfs_runner import zfs_runner -from services.auth import auth_service +from services import auth as auth_service from models import Dataset, DatasetType router = APIRouter(prefix="/api/datasets", tags=["datasets"]) diff --git a/backend/routers/identities.py b/backend/routers/identities.py index c747c49..db06004 100644 --- a/backend/routers/identities.py +++ b/backend/routers/identities.py @@ -7,8 +7,8 @@ from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from pydantic import BaseModel from typing import Optional -from services.identities import identities_manager -from services.auth import auth_service +from services import identities as identities_manager +from services import auth as auth_service router = APIRouter(prefix="/api/identities", tags=["identities"]) security = HTTPBearer() diff --git a/backend/routers/navigator.py b/backend/routers/navigator.py index de19f39..5a17c0a 100644 --- a/backend/routers/navigator.py +++ b/backend/routers/navigator.py @@ -14,7 +14,7 @@ import jwt from datetime import datetime, timedelta from services.file_manager import file_manager -from services.auth import auth_service +from services import auth as auth_service router = APIRouter(prefix="/api/navigator", tags=["navigator"]) security = HTTPBearer() diff --git a/backend/routers/pools.py b/backend/routers/pools.py index 1feeb42..3013b08 100644 --- a/backend/routers/pools.py +++ b/backend/routers/pools.py @@ -8,7 +8,7 @@ from typing import List import re from services.zfs_runner import zfs_runner -from services.auth import auth_service +from services import auth as auth_service from models import Pool, PoolStatus, PoolHealth router = APIRouter(prefix="/api/pools", tags=["pools"]) diff --git a/backend/routers/shares.py b/backend/routers/shares.py index 36ab071..57b2f33 100644 --- a/backend/routers/shares.py +++ b/backend/routers/shares.py @@ -8,7 +8,7 @@ from pydantic import BaseModel from typing import Optional from services.shares import share_manager -from services.auth import auth_service +from services import auth as auth_service router = APIRouter(prefix="/api/shares", tags=["shares"]) security = HTTPBearer() diff --git a/backend/routers/snapshots.py b/backend/routers/snapshots.py index fa86141..25cdde7 100644 --- a/backend/routers/snapshots.py +++ b/backend/routers/snapshots.py @@ -9,7 +9,7 @@ from pydantic import BaseModel from datetime import datetime from services.zfs_runner import zfs_runner -from services.auth import auth_service +from services import auth as auth_service from models import Snapshot router = APIRouter(prefix="/api/snapshots", tags=["snapshots"]) diff --git a/backend/routers/system.py b/backend/routers/system.py index dc83edf..7ffdb11 100644 --- a/backend/routers/system.py +++ b/backend/routers/system.py @@ -6,8 +6,8 @@ from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from pydantic import BaseModel -from services.system_info import system_info -from services.auth import auth_service +from services import system_info +from services import auth as auth_service router = APIRouter(prefix="/api/system", tags=["system"]) security = HTTPBearer() diff --git a/backend/services/auth.py b/backend/services/auth.py index b2a63db..b4ca6f0 100644 --- a/backend/services/auth.py +++ b/backend/services/auth.py @@ -12,78 +12,51 @@ from jose import JWTError, jwt logger = logging.getLogger(__name__) -# JWT Configuration SECRET_KEY = os.environ.get("ZFS_SECRET_KEY", "your-secret-key-change-in-production") ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_HOURS = 8 -# Try to import PAM for system authentication try: import pam PAM_AVAILABLE = True + logger.info("Using PAM authentication (Linux system users)") except ImportError: PAM_AVAILABLE = False logger.warning("python-pam not installed, PAM authentication unavailable") -class AuthService: - def __init__(self): - """Initialize auth service with PAM (Linux system users)""" - if PAM_AVAILABLE: - logger.info("Using PAM authentication (Linux system users)") - else: - logger.error("PAM not available - install python-pam for authentication") - - def authenticate_user(self, username: str, password: str) -> Optional[dict]: - """ - Authenticate user via PAM (Linux system users like 'pi', 'root') - Returns user data if valid, None otherwise - """ - if not PAM_AVAILABLE: - logger.error("PAM not available") - return None - - try: - p = pam.pam() - if p.authenticate(username, password): - logger.info(f"User {username} authenticated via PAM") - return { - "username": username, - "source": "pam" - } - else: - logger.warning(f"PAM authentication failed for user {username}: {p.reason}") - return None - except Exception as e: - logger.error(f"PAM authentication error: {e}") - return None - - def create_access_token(self, username: str, expires_delta: Optional[timedelta] = None) -> str: - """Create JWT access token""" - if expires_delta is None: - expires_delta = timedelta(hours=ACCESS_TOKEN_EXPIRE_HOURS) - - expire = datetime.utcnow() + expires_delta - to_encode = {"sub": username, "exp": expire} - - try: - encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) - return encoded_jwt - except Exception as e: - logger.error(f"Failed to create token: {e}") - raise - - def verify_token(self, token: str) -> Optional[str]: - """Verify JWT token and return username""" - try: - payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) - username: str = payload.get("sub") - if username is None: - return None - return username - except JWTError: - return None +def authenticate_user(username: str, password: str) -> Optional[dict]: + if not PAM_AVAILABLE: + logger.error("PAM not available") + return None + try: + p = pam.pam() + if p.authenticate(username, password): + logger.info(f"User {username} authenticated via PAM") + return {"username": username, "source": "pam"} + logger.warning(f"PAM authentication failed for user {username}: {p.reason}") + return None + except Exception as e: + logger.error(f"PAM authentication error: {e}") + return None -# Global instance -auth_service = AuthService() +def create_access_token(username: str, expires_delta: Optional[timedelta] = None) -> str: + if expires_delta is None: + expires_delta = timedelta(hours=ACCESS_TOKEN_EXPIRE_HOURS) + expire = datetime.utcnow() + expires_delta + to_encode = {"sub": username, "exp": expire} + try: + return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) + except Exception as e: + logger.error(f"Failed to create token: {e}") + raise + + +def verify_token(token: str) -> Optional[str]: + try: + payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) + username: str = payload.get("sub") + return username if username else None + except JWTError: + return None diff --git a/backend/services/identities.py b/backend/services/identities.py index db6aac1..d309270 100644 --- a/backend/services/identities.py +++ b/backend/services/identities.py @@ -8,7 +8,6 @@ import logging import pwd import grp from typing import List, Dict, Any, Optional -from pathlib import Path try: import spwd @@ -18,559 +17,415 @@ except ImportError: logger = logging.getLogger(__name__) -class IdentitiesManager: - """Manage system users and groups""" +def _is_user_locked(username: str) -> bool: + if not spwd: + return False + try: + entry = spwd.getspnam(username) + return entry.sp_lstchg == 0 or entry.sp_max == 0 + except (KeyError, PermissionError): + return False + except Exception as e: + logger.warning(f"Error checking lock status: {e}") + return False - def list_users(self) -> List[Dict[str, Any]]: - """List all system users""" - users = [] + +def _get_full_username(truncated: str) -> Optional[str]: + """Find full username from /etc/passwd when wtmp truncated it to 8 chars.""" + try: + result = subprocess.run( + ["/usr/bin/getent", "passwd"], + capture_output=True, text=True, timeout=5 + ) + if result.returncode == 0: + for line in result.stdout.strip().split('\n'): + if not line: + continue + parts = line.split(':') + if parts and parts[0].startswith(truncated) and len(parts[0]) > len(truncated): + return parts[0] + return None + except Exception as e: + logger.debug(f"Error finding full username for '{truncated}': {e}") + return None + + +def list_users() -> List[Dict[str, Any]]: + users = [] + try: + result = subprocess.run( + ["/usr/bin/getent", "passwd"], + capture_output=True, text=True, timeout=5 + ) + if result.returncode == 0: + for line in result.stdout.strip().split('\n'): + if not line: + continue + parts = line.split(':') + if len(parts) < 7: + continue + try: + users.append({ + 'username': parts[0], + 'uid': int(parts[2]), + 'gid': int(parts[3]), + 'gecos': parts[4], + 'home': parts[5], + 'shell': parts[6], + 'locked': _is_user_locked(parts[0]) + }) + except Exception as e: + logger.warning(f"Error parsing user line: {e}") + return sorted(users, key=lambda x: x['uid']) + except Exception as e: + logger.error(f"Error listing users: {e}") + return [] + + +def list_groups() -> List[Dict[str, Any]]: + groups = [] + try: + result = subprocess.run( + ["/usr/bin/getent", "group"], + capture_output=True, text=True, timeout=5 + ) + if result.returncode == 0: + for line in result.stdout.strip().split('\n'): + if not line: + continue + parts = line.split(':') + if len(parts) < 4: + continue + try: + groups.append({ + 'groupname': parts[0], + 'gid': int(parts[2]), + 'members': [m.strip() for m in parts[3].split(',') if m.strip()] + }) + except Exception as e: + logger.warning(f"Error parsing group line: {e}") + return sorted(groups, key=lambda x: x['gid']) + except Exception as e: + logger.error(f"Error listing groups: {e}") + return [] + + +def get_user_groups(username: str) -> List[str]: + try: + groups = [ + entry.gr_name + for entry in grp.getgrall() + if username in entry.gr_mem + ] try: - # Use getpwall() which returns an iterator - import getpass - # Fallback: read /etc/passwd directly - result = subprocess.run( - ["/usr/bin/getent", "passwd"], - capture_output=True, - text=True, - timeout=5 + user_entry = pwd.getpwnam(username) + primary = grp.getgrgid(user_entry.pw_gid) + if primary.gr_name not in groups: + groups.append(primary.gr_name) + except KeyError: + pass + return sorted(groups) + except Exception as e: + logger.error(f"Error getting groups for {username}: {e}") + return [] + + +def create_user(username: str, home_dir: Optional[str] = None, + shell: str = "/bin/bash", gecos: str = "") -> bool: + try: + cmd = ["/usr/sbin/useradd", "-d", home_dir or f"/home/{username}", "-s", shell] + if gecos: + cmd.extend(["-c", gecos]) + cmd.extend(["-m", username]) + result = subprocess.run(cmd, capture_output=True, timeout=10) + if result.returncode == 0: + logger.info(f"User created: {username}") + return True + logger.error(f"Failed to create user {username}: {result.stderr.decode()}") + return False + except Exception as e: + logger.error(f"Error creating user: {e}") + return False + + +def delete_user(username: str, remove_home: bool = True) -> bool: + try: + try: + subprocess.run( + ["/usr/bin/pdbedit", "-x", "-u", username], + capture_output=True, timeout=10 ) + except Exception: + pass - if result.returncode == 0: - for line in result.stdout.strip().split('\n'): - if not line: - continue - parts = line.split(':') - if len(parts) < 7: - continue + cmd = ["/usr/sbin/userdel"] + if remove_home: + cmd.append("-r") + cmd.append(username) + result = subprocess.run(cmd, capture_output=True, timeout=10) + if result.returncode == 0: + logger.info(f"User deleted: {username}") + return True + logger.error(f"Failed to delete user {username}: {result.stderr.decode()}") + return False + except Exception as e: + logger.error(f"Error deleting user: {e}") + return False + +def create_group(groupname: str) -> bool: + try: + result = subprocess.run( + ["/usr/sbin/groupadd", groupname], + capture_output=True, timeout=10 + ) + if result.returncode == 0: + logger.info(f"Group created: {groupname}") + return True + logger.error(f"Failed to create group {groupname}: {result.stderr.decode()}") + return False + except Exception as e: + logger.error(f"Error creating group: {e}") + return False + + +def delete_group(groupname: str) -> bool: + try: + result = subprocess.run( + ["/usr/sbin/groupdel", groupname], + capture_output=True, timeout=10 + ) + if result.returncode == 0: + logger.info(f"Group deleted: {groupname}") + return True + logger.error(f"Failed to delete group {groupname}: {result.stderr.decode()}") + return False + except Exception as e: + logger.error(f"Error deleting group: {e}") + return False + + +def add_user_to_group(username: str, groupname: str) -> bool: + try: + result = subprocess.run( + ["/usr/sbin/usermod", "-aG", groupname, username], + capture_output=True, text=True, timeout=10 + ) + if result.returncode == 0: + logger.info(f"User {username} added to group {groupname}") + return True + error_msg = result.stderr.strip() or result.stdout.strip() or "Unknown error" + logger.error(f"Failed to add user to group: {error_msg}") + raise Exception(f"Failed to add user to group: {error_msg}") + except Exception as e: + logger.error(f"Error adding user to group: {e}") + raise + + +def remove_user_from_group(username: str, groupname: str) -> bool: + try: + result = subprocess.run( + ["/usr/sbin/gpasswd", "-d", username, groupname], + capture_output=True, text=True, timeout=10 + ) + if result.returncode == 0: + logger.info(f"User {username} removed from group {groupname}") + return True + error_msg = result.stderr.strip() or result.stdout.strip() or "Unknown error" + logger.error(f"Failed to remove user from group: {error_msg}") + raise Exception(f"Failed to remove user from group: {error_msg}") + except Exception as e: + logger.error(f"Error removing user from group: {e}") + raise + + +def change_password(username: str, password: str) -> bool: + try: + result = subprocess.run( + ["/usr/sbin/chpasswd"], + input=f"{username}:{password}\n", + text=True, capture_output=True, timeout=10 + ) + if result.returncode == 0: + logger.info(f"Password changed for {username}") + return True + logger.error(f"Failed to change password: {result.stderr}") + return False + except Exception as e: + logger.error(f"Error changing password: {e}") + return False + + +def change_shell(username: str, shell: str) -> bool: + try: + result = subprocess.run( + ["/usr/sbin/usermod", "-s", shell, username], + capture_output=True, timeout=10 + ) + if result.returncode == 0: + logger.info(f"Shell changed for {username} to {shell}") + return True + logger.error(f"Failed to change shell: {result.stderr.decode()}") + return False + except Exception as e: + logger.error(f"Error changing shell: {e}") + return False + + +def lock_user(username: str) -> bool: + try: + result = subprocess.run( + ["/usr/sbin/usermod", "-L", username], + capture_output=True, timeout=10 + ) + if result.returncode == 0: + logger.info(f"User locked: {username}") + return True + logger.error(f"Failed to lock user: {result.stderr.decode()}") + return False + except Exception as e: + logger.error(f"Error locking user: {e}") + return False + + +def unlock_user(username: str) -> bool: + try: + result = subprocess.run( + ["/usr/sbin/usermod", "-U", username], + capture_output=True, timeout=10 + ) + if result.returncode == 0: + logger.info(f"User unlocked: {username}") + return True + logger.error(f"Failed to unlock user: {result.stderr.decode()}") + return False + except Exception as e: + logger.error(f"Error unlocking user: {e}") + return False + + +def set_samba_password(username: str, password: str) -> bool: + try: + result = subprocess.run( + ["/usr/bin/smbpasswd", "-a", "-s", username], + input=f"{password}\n{password}\n", + text=True, capture_output=True, timeout=10 + ) + if result.returncode == 0: + logger.info(f"Samba password set for {username}") + return True + logger.error(f"Failed to set Samba password: {result.stderr}") + return False + except FileNotFoundError: + logger.error("smbpasswd command not found - Samba not installed?") + return False + except Exception as e: + logger.error(f"Error setting Samba password: {e}") + return False + + +def get_login_history(limit: int = 50) -> List[Dict[str, Any]]: + import re + from datetime import datetime as dt + + logins = [] + days_of_week = {'Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'} + months = {'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6, + 'Jul': 7, 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12} + current_year = dt.now().year + + try: + result = subprocess.run( + ["last", "-n", str(limit)], + capture_output=True, text=True, timeout=10 + ) + if result.returncode != 0: + return [] + + for line in result.stdout.strip().split('\n'): + if not line.strip() or 'wtmp' in line or 'begins' in line: + continue + try: + tokens = line.split() + day_idx = next((i for i, t in enumerate(tokens) if t in days_of_week), -1) + if day_idx < 1: + continue + + username = tokens[0] + if username in ['wtmp', 'reboot', 'kernel']: + continue + + full_username = _get_full_username(username) + if full_username: + username = full_username + + device_host_tokens = tokens[1:day_idx] + tty, host = '-', '-' + if device_host_tokens: + first = device_host_tokens[0] + if '/' in first or first.startswith('pts') or first.startswith('tty'): + tty = first + host = ' '.join(device_host_tokens[1:]) if len(device_host_tokens) > 1 else '-' + else: + host = ' '.join(device_host_tokens) + + if day_idx + 3 >= len(tokens): + continue + day = tokens[day_idx] + month = tokens[day_idx + 1] + date = tokens[day_idx + 2] + time_str = tokens[day_idx + 3] + if month not in months: + continue + + duration_match = re.search(r'\(([^)]+)\)\s*$', line) + duration = duration_match.group(1) if duration_match else 'still logged in' + + logins.append({ + 'username': username, + 'tty': tty, + 'host': host, + 'date': f"{current_year}-{months[month]:02d}-{date.zfill(2)}", + 'time': time_str, + 'duration': duration, + 'login_str': f"{day} {month} {date} {time_str} {current_year}" + }) + except Exception as e: + logger.debug(f"Error parsing login line '{line}': {e}") + continue + + return logins + except Exception as e: + logger.error(f"Error getting login history: {e}") + return [] + + +def list_samba_users() -> List[Dict[str, Any]]: + users = [] + try: + result = subprocess.run( + ["/usr/bin/pdbedit", "-L"], + capture_output=True, text=True, timeout=5 + ) + if result.returncode == 0: + for line in result.stdout.strip().split('\n'): + if not line.strip(): + continue + parts = line.split(':') + if len(parts) >= 2: try: - username = parts[0] - uid = int(parts[2]) - gid = int(parts[3]) - gecos = parts[4] - home = parts[5] - shell = parts[6] - users.append({ - 'username': username, - 'uid': uid, - 'gid': gid, - 'gecos': gecos, - 'home': home, - 'shell': shell, - 'locked': self._is_user_locked(username) + 'username': parts[0], + 'uid': int(parts[1]), + 'comment': parts[2] if len(parts) > 2 else "", + 'type': 'samba' }) - except Exception as e: - logger.warning(f"Error parsing user line: {e}") - - return sorted(users, key=lambda x: x['uid']) - except Exception as e: - logger.error(f"Error listing users: {e}") - return [] - - def list_groups(self) -> List[Dict[str, Any]]: - """List all system groups""" - groups = [] - try: - # Use getent to read groups - result = subprocess.run( - ["/usr/bin/getent", "group"], - capture_output=True, - text=True, - timeout=5 - ) - - if result.returncode == 0: - for line in result.stdout.strip().split('\n'): - if not line: - continue - parts = line.split(':') - if len(parts) < 4: - continue - - try: - groupname = parts[0] - gid = int(parts[2]) - members = [m.strip() for m in parts[3].split(',') if m.strip()] - - groups.append({ - 'groupname': groupname, - 'gid': gid, - 'members': members - }) - except Exception as e: - logger.warning(f"Error parsing group line: {e}") - - return sorted(groups, key=lambda x: x['gid']) - except Exception as e: - logger.error(f"Error listing groups: {e}") - return [] - - def get_user_groups(self, username: str) -> List[str]: - """Get all groups a user belongs to""" - try: - groups = [] - for entry in grp.getall(): - if username in entry.gr_mem: - groups.append(entry.gr_name) - - # Also add primary group - try: - user_entry = pwd.getpwnam(username) - primary_group = grp.getgrgid(user_entry.pw_gid) - if primary_group.gr_name not in groups: - groups.append(primary_group.gr_name) - except KeyError: - pass - - return sorted(groups) - except Exception as e: - logger.error(f"Error getting groups for {username}: {e}") - return [] - - def create_user(self, username: str, home_dir: Optional[str] = None, - shell: str = "/bin/bash", gecos: str = "") -> bool: - """Create new system user""" - try: - # Build useradd command - cmd = ["/usr/sbin/useradd"] - - if home_dir: - cmd.extend(["-d", home_dir]) - else: - cmd.extend(["-d", f"/home/{username}"]) - - cmd.extend(["-s", shell]) - - if gecos: - cmd.extend(["-c", gecos]) - - cmd.extend(["-m", username]) # -m to create home directory - - result = subprocess.run(cmd, capture_output=True, timeout=10) - - if result.returncode == 0: - logger.info(f"User created: {username}") - return True - else: - logger.error(f"Failed to create user {username}: {result.stderr.decode()}") - return False - except Exception as e: - logger.error(f"Error creating user: {e}") - return False - - def delete_user(self, username: str, remove_home: bool = True) -> bool: - """Delete system user and Samba user""" - try: - # First delete Samba user if exists (using pdbedit for better handling) - try: - subprocess.run( - ["/usr/bin/pdbedit", "-x", "-u", username], - capture_output=True, - timeout=10 - ) - except Exception: - pass # Samba not installed or user doesn't exist - - # Delete system user - cmd = ["/usr/sbin/userdel"] - if remove_home: - cmd.append("-r") - cmd.append(username) - - result = subprocess.run(cmd, capture_output=True, timeout=10) - - if result.returncode == 0: - logger.info(f"User deleted: {username}") - return True - else: - logger.error(f"Failed to delete user {username}: {result.stderr.decode()}") - return False - except Exception as e: - logger.error(f"Error deleting user: {e}") - return False - - def create_group(self, groupname: str) -> bool: - """Create new system group""" - try: - result = subprocess.run( - ["/usr/sbin/groupadd", groupname], - capture_output=True, - timeout=10 - ) - - if result.returncode == 0: - logger.info(f"Group created: {groupname}") - return True - else: - logger.error(f"Failed to create group {groupname}: {result.stderr.decode()}") - return False - except Exception as e: - logger.error(f"Error creating group: {e}") - return False - - def delete_group(self, groupname: str) -> bool: - """Delete system group""" - try: - result = subprocess.run( - ["/usr/sbin/groupdel", groupname], - capture_output=True, - timeout=10 - ) - - if result.returncode == 0: - logger.info(f"Group deleted: {groupname}") - return True - else: - logger.error(f"Failed to delete group {groupname}: {result.stderr.decode()}") - return False - except Exception as e: - logger.error(f"Error deleting group: {e}") - return False - - def add_user_to_group(self, username: str, groupname: str) -> bool: - """Add user to group""" - try: - result = subprocess.run( - ["/usr/sbin/usermod", "-aG", groupname, username], - capture_output=True, - text=True, - timeout=10 - ) - - if result.returncode == 0: - logger.info(f"User {username} added to group {groupname}") - return True - else: - error_msg = result.stderr.strip() or result.stdout.strip() or "Unknown error" - logger.error(f"Failed to add user to group: {error_msg}") - raise Exception(f"Failed to add user to group: {error_msg}") - except Exception as e: - logger.error(f"Error adding user to group: {e}") - raise - - def remove_user_from_group(self, username: str, groupname: str) -> bool: - """Remove user from group""" - try: - result = subprocess.run( - ["/usr/sbin/gpasswd", "-d", username, groupname], - capture_output=True, - text=True, - timeout=10 - ) - - if result.returncode == 0: - logger.info(f"User {username} removed from group {groupname}") - return True - else: - error_msg = result.stderr.strip() or result.stdout.strip() or "Unknown error" - logger.error(f"Failed to remove user from group: {error_msg}") - raise Exception(f"Failed to remove user from group: {error_msg}") - except Exception as e: - logger.error(f"Error removing user from group: {e}") - raise - - def change_password(self, username: str, password: str) -> bool: - """Change user password via chpasswd""" - try: - # Use chpasswd for password changes - result = subprocess.run( - ["/usr/sbin/chpasswd"], - input=f"{username}:{password}\n", - text=True, - capture_output=True, - timeout=10 - ) - - if result.returncode == 0: - logger.info(f"Password changed for {username}") - return True - else: - logger.error(f"Failed to change password: {result.stderr}") - return False - except Exception as e: - logger.error(f"Error changing password: {e}") - return False - - def change_shell(self, username: str, shell: str) -> bool: - """Change user shell""" - try: - result = subprocess.run( - ["/usr/sbin/usermod", "-s", shell, username], - capture_output=True, - timeout=10 - ) - - if result.returncode == 0: - logger.info(f"Shell changed for {username} to {shell}") - return True - else: - logger.error(f"Failed to change shell: {result.stderr.decode()}") - return False - except Exception as e: - logger.error(f"Error changing shell: {e}") - return False - - def _is_user_locked(self, username: str) -> bool: - """Check if user account is locked""" - if not spwd: - return False - try: - entry = spwd.getspnam(username) - return entry.sp_lstchg == 0 or entry.sp_max == 0 - except (KeyError, PermissionError): - return False - except Exception as e: - logger.warning(f"Error checking lock status: {e}") - return False - - def lock_user(self, username: str) -> bool: - """Lock user account""" - try: - result = subprocess.run( - ["/usr/sbin/usermod", "-L", username], - capture_output=True, - timeout=10 - ) - - if result.returncode == 0: - logger.info(f"User locked: {username}") - return True - else: - logger.error(f"Failed to lock user: {result.stderr.decode()}") - return False - except Exception as e: - logger.error(f"Error locking user: {e}") - return False - - def unlock_user(self, username: str) -> bool: - """Unlock user account""" - try: - result = subprocess.run( - ["/usr/sbin/usermod", "-U", username], - capture_output=True, - timeout=10 - ) - - if result.returncode == 0: - logger.info(f"User unlocked: {username}") - return True - else: - logger.error(f"Failed to unlock user: {result.stderr.decode()}") - return False - except Exception as e: - logger.error(f"Error unlocking user: {e}") - return False - - def set_samba_password(self, username: str, password: str) -> bool: - """Set Samba password for user""" - try: - # Use smbpasswd to set Samba password - # -a flag: add/update user - # -s flag: read password from stdin - result = subprocess.run( - ["/usr/bin/smbpasswd", "-a", "-s", username], - input=f"{password}\n{password}\n", - text=True, - capture_output=True, - timeout=10 - ) - - if result.returncode == 0: - logger.info(f"Samba password set for {username}") - return True - else: - logger.error(f"Failed to set Samba password: {result.stderr}") - return False - except FileNotFoundError: - logger.error("smbpasswd command not found - Samba not installed?") - return False - except Exception as e: - logger.error(f"Error setting Samba password: {e}") - return False - - def get_login_history(self, limit: int = 50) -> List[Dict[str, Any]]: - """Get recent login history using last command""" - import re - from datetime import datetime - - logins = [] - days_of_week = {'Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun'} - months = {'Jan': 1, 'Feb': 2, 'Mar': 3, 'Apr': 4, 'May': 5, 'Jun': 6, - 'Jul': 7, 'Aug': 8, 'Sep': 9, 'Oct': 10, 'Nov': 11, 'Dec': 12} - current_year = datetime.now().year - - try: - result = subprocess.run( - ["last", "-n", str(limit)], - capture_output=True, - text=True, - timeout=10 - ) - - if result.returncode == 0: - for line in result.stdout.strip().split('\n'): - if not line.strip(): - continue - - # Skip header, footer, and system entries - if 'wtmp' in line or 'begins' in line: - continue - - try: - # Find the day-of-week anchor (reliable marker for date section) - tokens = line.split() - day_idx = -1 - - for i, token in enumerate(tokens): - if token in days_of_week: - day_idx = i - break - - if day_idx < 1: # Need at least username before day - continue - - # Extract components from token positions - username = tokens[0] - - # Skip system entries - if username in ['wtmp', 'reboot', 'kernel']: - continue - - # Try to find full username from /etc/passwd (wtmp truncates to 8 chars) - full_username = self._get_full_username(username) - if full_username: - username = full_username - - # Everything between username and day-of-week is TTY/host - device_host_tokens = tokens[1:day_idx] - - # TTY is typically first token if it contains '/' or starts with 'pts'/'tty' - tty = '-' - host = '-' - - if device_host_tokens: - first = device_host_tokens[0] - if '/' in first or first.startswith('pts') or first.startswith('tty'): - tty = first - # Remaining tokens are host - if len(device_host_tokens) > 1: - host = ' '.join(device_host_tokens[1:]) - else: - # No TTY, all tokens are host - host = ' '.join(device_host_tokens) - - # Extract date components (should be: day month date time) - # Note: year is NOT in standard last output, we need to infer it - if day_idx + 3 < len(tokens): - day = tokens[day_idx] - month = tokens[day_idx + 1] - date = tokens[day_idx + 2] - time_str = tokens[day_idx + 3] - - # Validate month - if month not in months: - logger.debug(f"Invalid month '{month}' in line: {line}") - continue - - # Use current year (last entries are usually recent) - year = current_year - else: - logger.debug(f"Not enough date tokens in line: {line}") - continue - - # Extract duration from end of line (in parentheses) - duration_match = re.search(r'\(([^)]+)\)\s*$', line) - duration = duration_match.group(1) if duration_match else 'still logged in' - - logins.append({ - 'username': username, - 'tty': tty, - 'host': host, - 'date': f"{year}-{months[month]:02d}-{date.zfill(2)}", - 'time': time_str, - 'duration': duration, - 'login_str': f"{day} {month} {date} {time_str} {year}" - }) - except Exception as e: - logger.debug(f"Error parsing login line '{line}': {e}") - continue - - return logins - except Exception as e: - logger.error(f"Error getting login history: {e}") - return [] - - def _get_full_username(self, truncated: str) -> Optional[str]: - """Find full username from /etc/passwd when wtmp has truncated it (8 char limit) - - wtmp truncates usernames to 8 characters, so we need to look up the full name - in /etc/passwd by matching usernames that start with the truncated name. - """ - try: - result = subprocess.run( - ["/usr/bin/getent", "passwd"], - capture_output=True, - text=True, - timeout=5 - ) - - if result.returncode == 0: - for line in result.stdout.strip().split('\n'): - if not line: - continue - parts = line.split(':') - if parts and parts[0].startswith(truncated): - # Found a username that starts with the truncated name - if len(parts[0]) > len(truncated): - # It's longer than the truncated version - return parts[0] - - return None - except Exception as e: - logger.debug(f"Error finding full username for '{truncated}': {e}") - return None - - def list_samba_users(self) -> List[Dict[str, Any]]: - """List all Samba users using pdbedit""" - users = [] - try: - result = subprocess.run( - ["/usr/bin/pdbedit", "-L"], - capture_output=True, - text=True, - timeout=5 - ) - - if result.returncode == 0: - for line in result.stdout.strip().split('\n'): - if not line.strip(): - continue - - # pdbedit output format: username:uid:comment - parts = line.split(':') - if len(parts) >= 2: - try: - username = parts[0] - uid = int(parts[1]) - comment = parts[2] if len(parts) > 2 else "" - - users.append({ - 'username': username, - 'uid': uid, - 'comment': comment, - 'type': 'samba' - }) - except (ValueError, IndexError) as e: - logger.warning(f"Error parsing Samba user line: {e}") - - return sorted(users, key=lambda x: x['username']) - except FileNotFoundError: - logger.warning("pdbedit not found - Samba may not be installed") - return [] - except Exception as e: - logger.error(f"Error listing Samba users: {e}") - return [] - - -identities_manager = IdentitiesManager() + except (ValueError, IndexError) as e: + logger.warning(f"Error parsing Samba user line: {e}") + return sorted(users, key=lambda x: x['username']) + except FileNotFoundError: + logger.warning("pdbedit not found - Samba may not be installed") + return [] + except Exception as e: + logger.error(f"Error listing Samba users: {e}") + return [] diff --git a/backend/services/system_info.py b/backend/services/system_info.py index 8652530..e87271b 100644 --- a/backend/services/system_info.py +++ b/backend/services/system_info.py @@ -13,576 +13,432 @@ from datetime import datetime logger = logging.getLogger(__name__) -class SystemInfo: - """Get system information""" +def get_hostname() -> Dict[str, str]: + try: + with open("/etc/hostname", "r") as f: + hostname = f.read().strip() + return {"hostname": hostname} + except Exception as e: + logger.error(f"Error getting hostname: {e}") + return {"error": str(e)} - @staticmethod - def get_hostname() -> Dict[str, str]: - """Get system hostname""" - try: - with open("/etc/hostname", "r") as f: - hostname = f.read().strip() - return {"hostname": hostname} - except Exception as e: - logger.error(f"Error getting hostname: {e}") - return {"error": str(e)} - @staticmethod - def set_hostname(hostname: str) -> Dict[str, str]: - """Set system hostname""" - try: - with open("/etc/hostname", "w") as f: - f.write(hostname) +def set_hostname(hostname: str) -> Dict[str, str]: + try: + with open("/etc/hostname", "w") as f: + f.write(hostname) + subprocess.run( + ["hostnamectl", "set-hostname", hostname], + capture_output=True, + check=False + ) + logger.info(f"Set hostname to {hostname}") + return {"status": "success", "hostname": hostname} + except Exception as e: + logger.error(f"Error setting hostname: {e}") + return {"error": str(e)} - # Also update hostnamectl if available - subprocess.run( - ["hostnamectl", "set-hostname", hostname], - capture_output=True, - check=False - ) - logger.info(f"Set hostname to {hostname}") - return {"status": "success", "hostname": hostname} - except Exception as e: - logger.error(f"Error setting hostname: {e}") - return {"error": str(e)} - - @staticmethod - def get_system_info() -> Dict[str, Any]: - """Get general system information""" - try: - uname = platform.uname() - info = { - "hostname": socket.gethostname(), - "system": uname.system, - "kernel": uname.release, - "machine": uname.machine, - "processor": platform.processor(), - "python": platform.python_version() - } - - # Get machine ID - try: - with open("/etc/machine-id", "r") as f: - info["machine_id"] = f.read().strip() - except: - pass - - # Get hardware model - try: - result = subprocess.run( - ["dmidecode", "-s", "system-product-name"], - capture_output=True, - text=True, - timeout=5 - ) - if result.returncode == 0: - info["model"] = result.stdout.strip() - except: - pass - - # Get domain name - try: - result = subprocess.run( - ["domainname"], - capture_output=True, - text=True, - timeout=5 - ) - if result.returncode == 0: - domain = result.stdout.strip() - if domain and domain != "(none)": - info["domain"] = domain - except: - pass - - return info - except Exception as e: - logger.error(f"Error getting system info: {e}") - return {"error": str(e)} - - @staticmethod - def get_uptime() -> Dict[str, Any]: - """Get system uptime and boot time""" - try: - import time - with open("/proc/uptime", "r") as f: - uptime_seconds = int(float(f.read().split()[0])) - - days = uptime_seconds // 86400 - hours = (uptime_seconds % 86400) // 3600 - minutes = (uptime_seconds % 3600) // 60 - - # Calculate boot timestamp - current_time = time.time() - boot_timestamp = current_time - uptime_seconds - - return { - "uptime_seconds": uptime_seconds, - "uptime_string": f"{days}d {hours}h {minutes}m", - "uptime_formatted": { - "days": days, - "hours": hours, - "minutes": minutes - }, - "boot_time": int(boot_timestamp) - } - except Exception as e: - logger.error(f"Error getting uptime: {e}") - return {"error": str(e)} - - @staticmethod - def get_memory() -> Dict[str, Any]: - """Get memory usage""" - try: - with open("/proc/meminfo", "r") as f: - lines = f.readlines() - - meminfo = {} - for line in lines: - key, value = line.split(":") - meminfo[key.strip()] = int(value.split()[0]) * 1024 # Convert to bytes - - return { - "total": meminfo.get("MemTotal", 0), - "available": meminfo.get("MemAvailable", 0), - "used": meminfo.get("MemTotal", 0) - meminfo.get("MemAvailable", 0), - "free": meminfo.get("MemFree", 0), - "swap_total": meminfo.get("SwapTotal", 0), - "swap_free": meminfo.get("SwapFree", 0), - "swap_used": meminfo.get("SwapTotal", 0) - meminfo.get("SwapFree", 0) - } - except Exception as e: - logger.error(f"Error getting memory info: {e}") - return {"error": str(e)} - - @staticmethod - def get_cpu_info() -> Dict[str, Any]: - """Get CPU information""" - try: - import psutil - except ImportError: - logger.debug("psutil not installed, using fallback") - try: - with open("/proc/cpuinfo", "r") as f: - cpuinfo_text = f.read() - cpu_count = cpuinfo_text.count("processor") - - return { - "count": cpu_count, - "load_average": open("/proc/loadavg").read().split()[:3] - } - except Exception as e: - logger.error(f"Error getting CPU info: {e}") - return {"error": str(e)} +def get_system_info() -> Dict[str, Any]: + try: + uname = platform.uname() + info = { + "hostname": socket.gethostname(), + "system": uname.system, + "kernel": uname.release, + "machine": uname.machine, + "processor": platform.processor(), + "python": platform.python_version() + } try: - return { - "count": psutil.cpu_count(), - "percent": psutil.cpu_percent(interval=1), - "load_average": [round(x, 2) for x in __import__("os").getloadavg()] - } - except Exception as e: - logger.error(f"Error getting CPU info with psutil: {e}") - return {"error": str(e)} + with open("/etc/machine-id", "r") as f: + info["machine_id"] = f.read().strip() + except Exception: + pass - @staticmethod - def get_time() -> Dict[str, str]: - """Get system time""" - try: - now = datetime.now() - return { - "iso": now.isoformat(), - "timestamp": int(now.timestamp()), - "timezone": datetime.now().astimezone().tzinfo.__str__() - } - except Exception as e: - logger.error(f"Error getting time: {e}") - return {"error": str(e)} - - @staticmethod - def set_time(iso_string: str) -> Dict[str, str]: - """Set system time (requires root)""" - try: - dt = datetime.fromisoformat(iso_string) - - result = subprocess.run( - ["date", "-s", dt.strftime("%Y-%m-%d %H:%M:%S")], - capture_output=True, - text=True, - check=False - ) - - if result.returncode != 0: - return {"error": result.stderr} - - # Sync hardware clock - subprocess.run(["hwclock", "--systohc"], check=False) - - logger.info(f"Set time to {iso_string}") - return {"status": "success", "time": iso_string} - - except Exception as e: - logger.error(f"Error setting time: {e}") - return {"error": str(e)} - - @staticmethod - def get_updates() -> Dict[str, Any]: - """Check available updates""" try: result = subprocess.run( - ["apt", "list", "--upgradable"], - capture_output=True, - text=True, - timeout=10, - check=False + ["dmidecode", "-s", "system-product-name"], + capture_output=True, text=True, timeout=5 ) - - if result.returncode != 0: - return {"error": result.stderr} - - packages = [] - for line in result.stdout.split("\n")[1:]: - if line.strip(): - parts = line.split("/") - if len(parts) >= 2: - packages.append({ - "package": parts[0].strip(), - "current": parts[1].split("[")[0].strip() if "[" in line else "" - }) - - return { - "available": len(packages), - "packages": packages - } - except Exception as e: - logger.error(f"Error checking updates: {e}") - return {"error": str(e)} - - @staticmethod - def reboot() -> Dict[str, str]: - """Reboot system (requires root)""" - try: - subprocess.Popen(["shutdown", "-r", "now"]) - return {"status": "success", "message": "System rebooting..."} - except Exception as e: - logger.error(f"Error rebooting: {e}") - return {"error": str(e)} - - @staticmethod - def shutdown() -> Dict[str, str]: - """Shutdown system (requires root)""" - try: - subprocess.Popen(["shutdown", "-h", "now"]) - return {"status": "success", "message": "System shutting down..."} - except Exception as e: - logger.error(f"Error shutting down: {e}") - return {"error": str(e)} - - @staticmethod - def get_network_info() -> Dict[str, Any]: - """Get network interface information""" - try: - # Try ip -j addr (JSON output) first - result = subprocess.run( - ["/usr/sbin/ip", "-j", "addr"], - capture_output=True, - text=True, - timeout=5 - ) - if result.returncode == 0: - import json + info["model"] = result.stdout.strip() + except Exception: + pass + + try: + result = subprocess.run( + ["domainname"], capture_output=True, text=True, timeout=5 + ) + if result.returncode == 0: + domain = result.stdout.strip() + if domain and domain != "(none)": + info["domain"] = domain + except Exception: + pass + + return info + except Exception as e: + logger.error(f"Error getting system info: {e}") + return {"error": str(e)} + + +def get_uptime() -> Dict[str, Any]: + try: + import time + with open("/proc/uptime", "r") as f: + uptime_seconds = int(float(f.read().split()[0])) + + days = uptime_seconds // 86400 + hours = (uptime_seconds % 86400) // 3600 + minutes = (uptime_seconds % 3600) // 60 + boot_timestamp = time.time() - uptime_seconds + + return { + "uptime_seconds": uptime_seconds, + "uptime_string": f"{days}d {hours}h {minutes}m", + "uptime_formatted": {"days": days, "hours": hours, "minutes": minutes}, + "boot_time": int(boot_timestamp) + } + except Exception as e: + logger.error(f"Error getting uptime: {e}") + return {"error": str(e)} + + +def get_memory() -> Dict[str, Any]: + try: + with open("/proc/meminfo", "r") as f: + lines = f.readlines() + + meminfo = {} + for line in lines: + key, value = line.split(":") + meminfo[key.strip()] = int(value.split()[0]) * 1024 + + return { + "total": meminfo.get("MemTotal", 0), + "available": meminfo.get("MemAvailable", 0), + "used": meminfo.get("MemTotal", 0) - meminfo.get("MemAvailable", 0), + "free": meminfo.get("MemFree", 0), + "swap_total": meminfo.get("SwapTotal", 0), + "swap_free": meminfo.get("SwapFree", 0), + "swap_used": meminfo.get("SwapTotal", 0) - meminfo.get("SwapFree", 0) + } + except Exception as e: + logger.error(f"Error getting memory info: {e}") + return {"error": str(e)} + + +def get_cpu_info() -> Dict[str, Any]: + try: + import psutil + except ImportError: + logger.debug("psutil not installed, using fallback") + try: + with open("/proc/cpuinfo", "r") as f: + cpuinfo_text = f.read() + cpu_count = cpuinfo_text.count("processor") + with open("/proc/loadavg") as f: + load = f.read() + return {"count": cpu_count, "load_average": load.split()[:3]} + except Exception as e: + logger.error(f"Error getting CPU info: {e}") + return {"error": str(e)} + + try: + return { + "count": psutil.cpu_count(), + "percent": psutil.cpu_percent(interval=1), + "load_average": [round(x, 2) for x in __import__("os").getloadavg()] + } + except Exception as e: + logger.error(f"Error getting CPU info with psutil: {e}") + return {"error": str(e)} + + +def get_time() -> Dict[str, str]: + try: + now = datetime.now() + return { + "iso": now.isoformat(), + "timestamp": int(now.timestamp()), + "timezone": datetime.now().astimezone().tzinfo.__str__() + } + except Exception as e: + logger.error(f"Error getting time: {e}") + return {"error": str(e)} + + +def set_time(iso_string: str) -> Dict[str, str]: + try: + dt = datetime.fromisoformat(iso_string) + result = subprocess.run( + ["date", "-s", dt.strftime("%Y-%m-%d %H:%M:%S")], + capture_output=True, text=True, check=False + ) + if result.returncode != 0: + return {"error": result.stderr} + subprocess.run(["hwclock", "--systohc"], check=False) + logger.info(f"Set time to {iso_string}") + return {"status": "success", "time": iso_string} + except Exception as e: + logger.error(f"Error setting time: {e}") + return {"error": str(e)} + + +def get_updates() -> Dict[str, Any]: + try: + result = subprocess.run( + ["apt", "list", "--upgradable"], + capture_output=True, text=True, timeout=10, check=False + ) + if result.returncode != 0: + return {"error": result.stderr} + + packages = [] + for line in result.stdout.split("\n")[1:]: + if line.strip(): + parts = line.split("/") + if len(parts) >= 2: + packages.append({ + "package": parts[0].strip(), + "current": parts[1].split("[")[0].strip() if "[" in line else "" + }) + return {"available": len(packages), "packages": packages} + except Exception as e: + logger.error(f"Error checking updates: {e}") + return {"error": str(e)} + + +def reboot() -> Dict[str, str]: + try: + subprocess.Popen(["shutdown", "-r", "now"]) + return {"status": "success", "message": "System rebooting..."} + except Exception as e: + logger.error(f"Error rebooting: {e}") + return {"error": str(e)} + + +def shutdown() -> Dict[str, str]: + try: + subprocess.Popen(["shutdown", "-h", "now"]) + return {"status": "success", "message": "System shutting down..."} + except Exception as e: + logger.error(f"Error shutting down: {e}") + return {"error": str(e)} + + +def get_network_info() -> Dict[str, Any]: + try: + result = subprocess.run( + ["/usr/sbin/ip", "-j", "addr"], + capture_output=True, text=True, timeout=5 + ) + if result.returncode == 0: + import json + try: + data = json.loads(result.stdout) interfaces = [] - try: - data = json.loads(result.stdout) - for iface in data: - addr_info = [] - for addr in iface.get("addr_info", []): - addr_info.append({ - "family": addr.get("family"), - "local": addr.get("local") - }) - interfaces.append({ - "name": iface.get("ifname"), - "state": iface.get("operstate", "UNKNOWN"), - "addresses": addr_info - }) - return {"interfaces": interfaces} - except json.JSONDecodeError: - pass + for iface in data: + addr_info = [ + {"family": addr.get("family"), "local": addr.get("local")} + for addr in iface.get("addr_info", []) + ] + interfaces.append({ + "name": iface.get("ifname"), + "state": iface.get("operstate", "UNKNOWN"), + "addresses": addr_info + }) + return {"interfaces": interfaces} + except json.JSONDecodeError: + pass - # Fallback: read from /proc/net/dev - with open("/proc/net/dev", "r") as f: - lines = f.readlines() + with open("/proc/net/dev", "r") as f: + lines = f.readlines() - interfaces = [] - for line in lines[2:]: # Skip header lines - if ":" in line: - name = line.split(":")[0].strip() + interfaces = [] + for line in lines[2:]: + if ":" in line: + name = line.split(":")[0].strip() + interfaces.append({"name": name, "state": "UP", "addresses": []}) + return {"interfaces": interfaces} + except Exception as e: + logger.error(f"Error getting network info: {e}") + return {"error": str(e)} + + +def get_network_traffic() -> Dict[str, Any]: + try: + with open("/proc/net/dev", "r") as f: + lines = f.readlines() + + interfaces = [] + for line in lines[2:]: + if ":" in line: + name, stats_str = line.split(":") + name = name.strip() + stats = stats_str.split() + if len(stats) >= 16: interfaces.append({ "name": name, - "state": "UP", - "addresses": [] + "rx_bytes": int(stats[0]), + "rx_packets": int(stats[1]), + "rx_errors": int(stats[2]), + "rx_drops": int(stats[3]), + "tx_bytes": int(stats[8]), + "tx_packets": int(stats[9]), + "tx_errors": int(stats[10]), + "tx_drops": int(stats[11]) }) + return {"interfaces": interfaces} + except Exception as e: + logger.error(f"Error getting network traffic: {e}") + return {"error": str(e)} - return {"interfaces": interfaces} - except Exception as e: - logger.error(f"Error getting network info: {e}") - return {"error": str(e)} - @staticmethod - def get_network_traffic() -> Dict[str, Any]: - """Get network interface traffic (RX/TX bytes)""" - try: - with open("/proc/net/dev", "r") as f: - lines = f.readlines() +def get_disk_io() -> Dict[str, Any]: + try: + with open("/proc/diskstats", "r") as f: + lines = f.readlines() - interfaces = [] - # /proc/net/dev format (after colon): - # RX bytes, RX packets, RX errors, RX drops, RX fifo, RX frame, RX compressed, RX multicast, - # TX bytes, TX packets, TX errors, TX drops, TX fifo, TX collisions, TX carrier, TX compressed - for line in lines[2:]: # Skip header lines - if ":" in line: - name, stats_str = line.split(":") - name = name.strip() - stats = stats_str.split() - - if len(stats) >= 16: - interfaces.append({ - "name": name, - "rx_bytes": int(stats[0]), - "rx_packets": int(stats[1]), - "rx_errors": int(stats[2]), - "rx_drops": int(stats[3]), - "tx_bytes": int(stats[8]), - "tx_packets": int(stats[9]), - "tx_errors": int(stats[10]), - "tx_drops": int(stats[11]) - }) - - return {"interfaces": interfaces} - except Exception as e: - logger.error(f"Error getting network traffic: {e}") - return {"error": str(e)} - - @staticmethod - def get_disk_io() -> Dict[str, Any]: - """Get disk I/O statistics (read/write operations and bytes)""" - try: - with open("/proc/diskstats", "r") as f: - lines = f.readlines() - - disks = [] - # /proc/diskstats format: - # major minor name reads_completed reads_merged reads_sectors reads_time_ms - # writes_completed writes_merged writes_sectors writes_time_ms in_progress io_time_ms weighted_io_time_ms - for line in lines: - fields = line.split() - if len(fields) >= 14: - major = int(fields[0]) - minor = int(fields[1]) - name = fields[2] - - # Skip loop devices, ram disks, and other virtual disks - if name.startswith(('dm-', 'loop', 'ram', 'sr', 'zram')): - continue - # Only include actual storage devices (sda, sdb, nvme0n1, etc.) - if not any(name.startswith(prefix) for prefix in ['sd', 'nvme', 'hd', 'vd']): - continue - - reads_completed = int(fields[3]) - reads_sectors = int(fields[5]) - writes_completed = int(fields[7]) - writes_sectors = int(fields[9]) - - # Sectors are typically 512 bytes - reads_bytes = reads_sectors * 512 - writes_bytes = writes_sectors * 512 - - disks.append({ - "name": name, - "reads_completed": reads_completed, - "reads_bytes": reads_bytes, - "writes_completed": writes_completed, - "writes_bytes": writes_bytes - }) - - return {"disks": disks} - except Exception as e: - logger.error(f"Error getting disk I/O: {e}") - return {"error": str(e)} - - @staticmethod - def get_services() -> Dict[str, Any]: - """Get running systemd services""" - try: - result = subprocess.run( - ["/usr/bin/systemctl", "list-units", "--type=service", "--state=running", "--no-pager", "--output=json"], - capture_output=True, - text=True, - timeout=10 - ) - - if result.returncode == 0: - import json - try: - services = json.loads(result.stdout) - return { - "services": [ - { - "name": svc.get("unit"), - "state": svc.get("active"), - "description": svc.get("description") - } - for svc in services if svc.get("unit", "").endswith(".service") - ] - } - except json.JSONDecodeError: - pass - - # Fallback: parse text output - result = subprocess.run( - ["/usr/bin/systemctl", "list-units", "--type=service", "--state=running", "--no-pager"], - capture_output=True, - text=True, - timeout=10 - ) - - services = [] - for line in result.stdout.split("\n")[1:]: - if line.strip() and ".service" in line: - parts = line.split() - if len(parts) >= 2: - services.append({ - "name": parts[0], - "state": "running", - "description": " ".join(parts[2:]) if len(parts) > 2 else "" - }) - - return {"services": services} - except Exception as e: - logger.error(f"Error getting services: {e}") - return {"error": str(e)} - - @staticmethod - def get_all_units() -> Dict[str, Any]: - """Get all systemd units (services, targets, sockets, timers, paths)""" - try: - # Get all units without filtering by state - result = subprocess.run( - ["/usr/bin/systemctl", "list-units", "--all", "--no-pager", "--output=json"], - capture_output=True, - text=True, - timeout=10 - ) - - if result.returncode == 0: - import json - try: - units_data = json.loads(result.stdout) - units = { - "services": [], - "targets": [], - "sockets": [], - "timers": [], - "paths": [] - } - - for unit in units_data: - name = unit.get("unit", "") - item = { - "name": name, - "active": unit.get("active"), # active/inactive - "sub": unit.get("sub"), # sub-state like "running", "exited", "enabled", etc. - "description": unit.get("description", "") - } - - if name.endswith(".service"): - units["services"].append(item) - elif name.endswith(".target"): - units["targets"].append(item) - elif name.endswith(".socket"): - units["sockets"].append(item) - elif name.endswith(".timer"): - units["timers"].append(item) - elif name.endswith(".path"): - units["paths"].append(item) - - return units - except json.JSONDecodeError: - pass - - # Fallback: parse text output - result = subprocess.run( - ["/usr/bin/systemctl", "list-units", "--all", "--no-pager"], - capture_output=True, - text=True, - timeout=10 - ) - - units = { - "services": [], - "targets": [], - "sockets": [], - "timers": [], - "paths": [] - } - - for line in result.stdout.split("\n")[1:]: - if not line.strip(): + disks = [] + for line in lines: + fields = line.split() + if len(fields) >= 14: + name = fields[2] + if name.startswith(('dm-', 'loop', 'ram', 'sr', 'zram')): continue - parts = line.split() - if len(parts) < 2: + if not any(name.startswith(p) for p in ['sd', 'nvme', 'hd', 'vd']): continue - - name = parts[0] - active = parts[1] if len(parts) > 1 else "unknown" - description = " ".join(parts[3:]) if len(parts) > 3 else "" - - item = { + disks.append({ "name": name, - "active": active, - "sub": parts[2] if len(parts) > 2 else "", - "description": description + "reads_completed": int(fields[3]), + "reads_bytes": int(fields[5]) * 512, + "writes_completed": int(fields[7]), + "writes_bytes": int(fields[9]) * 512 + }) + return {"disks": disks} + except Exception as e: + logger.error(f"Error getting disk I/O: {e}") + return {"error": str(e)} + + +def get_services() -> Dict[str, Any]: + try: + result = subprocess.run( + ["/usr/bin/systemctl", "list-units", "--type=service", "--state=running", + "--no-pager", "--output=json"], + capture_output=True, text=True, timeout=10 + ) + if result.returncode == 0: + import json + try: + services = json.loads(result.stdout) + return { + "services": [ + {"name": svc.get("unit"), "state": svc.get("active"), "description": svc.get("description")} + for svc in services if svc.get("unit", "").endswith(".service") + ] } + except json.JSONDecodeError: + pass - if name.endswith(".service"): - units["services"].append(item) - elif name.endswith(".target"): - units["targets"].append(item) - elif name.endswith(".socket"): - units["sockets"].append(item) - elif name.endswith(".timer"): - units["timers"].append(item) - elif name.endswith(".path"): - units["paths"].append(item) - - return units - except Exception as e: - logger.error(f"Error getting units: {e}") - return {"error": str(e)} - - @staticmethod - def get_journal_logs(limit: int = 20) -> Dict[str, Any]: - """Get recent journal logs""" - try: - result = subprocess.run( - ["/usr/bin/journalctl", "-n", str(limit), "--no-pager", "--output=short"], - capture_output=True, - text=True, - timeout=5 - ) - - if result.returncode == 0: - logs = [line.strip() for line in result.stdout.split("\n") if line.strip()] - return {"logs": logs} - - return {"logs": []} - except Exception as e: - logger.error(f"Error getting journal logs: {e}") - return {"error": str(e)} + result = subprocess.run( + ["/usr/bin/systemctl", "list-units", "--type=service", "--state=running", "--no-pager"], + capture_output=True, text=True, timeout=10 + ) + services = [] + for line in result.stdout.split("\n")[1:]: + if line.strip() and ".service" in line: + parts = line.split() + if len(parts) >= 2: + services.append({ + "name": parts[0], + "state": "running", + "description": " ".join(parts[2:]) if len(parts) > 2 else "" + }) + return {"services": services} + except Exception as e: + logger.error(f"Error getting services: {e}") + return {"error": str(e)} -# Global instance -system_info = SystemInfo() +def get_all_units() -> Dict[str, Any]: + units = {"services": [], "targets": [], "sockets": [], "timers": [], "paths": []} + try: + result = subprocess.run( + ["/usr/bin/systemctl", "list-units", "--all", "--no-pager", "--output=json"], + capture_output=True, text=True, timeout=10 + ) + if result.returncode == 0: + import json + try: + for unit in json.loads(result.stdout): + name = unit.get("unit", "") + item = { + "name": name, + "active": unit.get("active"), + "sub": unit.get("sub"), + "description": unit.get("description", "") + } + if name.endswith(".service"): + units["services"].append(item) + elif name.endswith(".target"): + units["targets"].append(item) + elif name.endswith(".socket"): + units["sockets"].append(item) + elif name.endswith(".timer"): + units["timers"].append(item) + elif name.endswith(".path"): + units["paths"].append(item) + return units + except json.JSONDecodeError: + pass + + result = subprocess.run( + ["/usr/bin/systemctl", "list-units", "--all", "--no-pager"], + capture_output=True, text=True, timeout=10 + ) + for line in result.stdout.split("\n")[1:]: + if not line.strip(): + continue + parts = line.split() + if len(parts) < 2: + continue + name = parts[0] + item = { + "name": name, + "active": parts[1] if len(parts) > 1 else "unknown", + "sub": parts[2] if len(parts) > 2 else "", + "description": " ".join(parts[3:]) if len(parts) > 3 else "" + } + if name.endswith(".service"): + units["services"].append(item) + elif name.endswith(".target"): + units["targets"].append(item) + elif name.endswith(".socket"): + units["sockets"].append(item) + elif name.endswith(".timer"): + units["timers"].append(item) + elif name.endswith(".path"): + units["paths"].append(item) + return units + except Exception as e: + logger.error(f"Error getting units: {e}") + return {"error": str(e)} + + +def get_journal_logs(limit: int = 20) -> Dict[str, Any]: + try: + result = subprocess.run( + ["/usr/bin/journalctl", "-n", str(limit), "--no-pager", "--output=short"], + capture_output=True, text=True, timeout=5 + ) + if result.returncode == 0: + logs = [line.strip() for line in result.stdout.split("\n") if line.strip()] + return {"logs": logs} + return {"logs": []} + except Exception as e: + logger.error(f"Error getting journal logs: {e}") + return {"error": str(e)} diff --git a/frontend/app/page.tsx b/frontend/app/page.tsx index bc9e941..9146f9f 100644 --- a/frontend/app/page.tsx +++ b/frontend/app/page.tsx @@ -502,12 +502,12 @@ export default function Dashboard() {
RX
{formatBytes(iface.rx_bytes)}
-{iface.rx_packets.toLocaleString()} packets
+{(iface.rx_packets ?? 0).toLocaleString()} packets
TX
{formatBytes(iface.tx_bytes)}
-{iface.tx_packets.toLocaleString()} packets
+{(iface.tx_packets ?? 0).toLocaleString()} packets