Files
zmb-webui/backend/services/file_manager.py
T
Claude Code 92bed208e0 ZMB Webui: Complete Project – Rebrand & Initial Clean Commit
ARCHITECTURE
============
Backend: FastAPI + uvicorn (port 8000)
  - JWT authentication with PAM system users
  - ZFS CLI wrapper with caching (30-60s TTL)
  - WebSocket pool status broadcaster (30s interval)
  - Services: auth, zfs_runner, file_manager, shares, identities, system_info
  - Routers: pools, datasets, snapshots, shares, identities, navigator, system

Frontend: Next.js 15 + TypeScript (static export)
  - Incremental Static Regeneration (ISR) for weak hardware
  - Type-safe API client (lib/api.ts)
  - Dark mode + custom Tailwind theme
  - Pages: Dashboard, Login, Snapshots, Datasets, Shares, etc.

DEPLOYMENT
==========
Test Target: 192.168.1.179:8090 (Debian LXC)
Production: 10.66.120.3:9090 (Raspberry Pi 4GB ARM64)
Updater: Automated Gitea-based deployment (update-test.sh, update-pi.sh)

FEATURES COMPLETED
==================
Phase 3a: Dashboard Quick Stats (System, CPU, Memory, Storage)
  - Real-time stats with color-coded progress bars
  - Responsive grid layout (mobile: 1, tablet: 2, desktop: 4 columns)
  - ISR-optimized for fast loads on weak hardware

REBRANDING
==========
Renamed throughout:
  - Project: 'ZFS Manager' → 'ZMB Webui'
  - Services: 'zfs-manager' → 'zmb-webui'
  - Systemd units: zfs-manager-backend → zmb-webui-backend
  - Configuration files and documentation

Co-Authored-By: Patrick <patrick@perlbach24.de>
2026-04-22 00:43:05 +02:00

645 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
File Manager Service Browse, upload, download files in /tank/share
Similar to cockpit-files but optimized for ZFS shares
"""
import os
import logging
from pathlib import Path
from typing import List, Dict, Any, Optional
from stat import filemode
from datetime import datetime
import stat
logger = logging.getLogger(__name__)
# Root directory for file operations (ZFS share)
BASE_PATH = Path("/tank/share")
MAX_FILE_SIZE = 2 * 1024 * 1024 * 1024 # 2GB max upload
class FileInfo:
"""File/Directory information"""
def __init__(self, path: Path, base_path: Optional[Path] = None):
self.path = path
self.base_path = base_path or BASE_PATH
self.name = path.name
self.is_link = path.is_symlink()
self.link_target = None
try:
# For symlinks, read the target
if self.is_link:
self.link_target = os.readlink(path)
# Use lstat for symlink itself, not the target
self.stat = path.lstat()
else:
self.stat = path.stat()
self.is_dir = path.is_dir()
self.size = self.stat.st_size
self.modified = self.stat.st_mtime
self.mode = filemode(self.stat.st_mode)
self.uid = self.stat.st_uid
self.gid = self.stat.st_gid
self.error = None
except Exception as e:
self.is_dir = False
self.size = 0
self.modified = 0
self.mode = "---------"
self.uid = 0
self.gid = 0
self.error = str(e)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dict for JSON response"""
result = {
"name": self.name,
"path": str(self.path.relative_to(self.base_path)),
"is_dir": self.is_dir,
"is_link": self.is_link,
"size": self.size,
"modified": self.modified,
"modified_iso": datetime.fromtimestamp(self.modified).isoformat(),
"permissions": self.mode,
"uid": self.uid,
"gid": self.gid,
"error": self.error
}
if self.link_target:
result["link_target"] = self.link_target
return result
class FileManager:
def __init__(self, base_path: Optional[Path] = None):
self.base_path = base_path or BASE_PATH
# Ensure base path exists
if not self.base_path.exists():
logger.warning(f"Base path does not exist: {self.base_path}")
self.base_path.mkdir(parents=True, exist_ok=True)
def _resolve_path(self, rel_path: str) -> Optional[Path]:
"""
Resolve and validate path (prevent directory traversal attacks)
Returns absolute path if safe, None otherwise
"""
try:
if not rel_path:
return self.base_path
# Remove leading slash and resolve
rel_path = rel_path.lstrip("/")
target = (self.base_path / rel_path).resolve()
# Ensure target is within base_path
if not str(target).startswith(str(self.base_path.resolve())):
logger.warning(f"Path traversal attempt: {rel_path}")
return None
return target
except Exception as e:
logger.error(f"Path resolution error: {e}")
return None
def list_directory(self, rel_path: str = "") -> Dict[str, Any]:
"""
List directory contents
"""
target = self._resolve_path(rel_path)
if not target:
return {"error": "Invalid path"}
if not target.exists():
return {"error": "Path not found"}
if not target.is_dir():
return {"error": "Not a directory"}
try:
entries = []
for item in sorted(target.iterdir()):
entries.append(FileInfo(item, self.base_path).to_dict())
return {
"path": rel_path or "/",
"entries": entries,
"total": len(entries)
}
except PermissionError:
return {"error": "Permission denied"}
except Exception as e:
logger.error(f"Error listing directory: {e}")
return {"error": str(e)}
def list_subdirectories(self, rel_path: str = "") -> Dict[str, Any]:
"""
List only subdirectories of a path (for tree sidebar navigation).
Returns: { dirs: [{name, path, has_children}] }
"""
target = self._resolve_path(rel_path)
if not target:
return {"error": "Invalid path"}
if not target.exists():
return {"error": "Path not found"}
if not target.is_dir():
return {"error": "Not a directory"}
try:
dirs = []
for item in sorted(target.iterdir()):
try:
if not item.is_dir():
continue
# Check if has any subdirectory children
has_children = any(
c.is_dir() for c in item.iterdir()
)
except PermissionError:
has_children = False
dirs.append({
"name": item.name,
"path": str(item.relative_to(self.base_path)),
"has_children": has_children
})
return {"path": rel_path or "/", "dirs": dirs}
except PermissionError:
return {"error": "Permission denied"}
except Exception as e:
logger.error(f"Error listing subdirectories: {e}")
return {"error": str(e)}
def get_file_info(self, rel_path: str) -> Dict[str, Any]:
"""
Get detailed info about file/directory
"""
target = self._resolve_path(rel_path)
if not target:
return {"error": "Invalid path"}
if not target.exists():
return {"error": "Path not found"}
try:
file_info = FileInfo(target, self.base_path).to_dict()
# Add directory listing if it's a directory
if target.is_dir():
try:
children = sorted(target.iterdir())
file_info["children_count"] = len(children)
except PermissionError:
file_info["children_count"] = -1
# Add file preview for text files
if target.is_file() and target.stat().st_size < 1024 * 100: # < 100KB
try:
if target.suffix in [".txt", ".log", ".md", ".json", ".yaml", ".yml"]:
with open(target, "r", encoding="utf-8", errors="replace") as f:
preview = f.read(1000)
file_info["preview"] = preview
except Exception as e:
logger.debug(f"Could not read preview: {e}")
return file_info
except Exception as e:
logger.error(f"Error getting file info: {e}")
return {"error": str(e)}
def read_file(self, rel_path: str, limit: Optional[int] = None) -> Dict[str, Any]:
"""
Read file content (with optional limit)
"""
target = self._resolve_path(rel_path)
if not target:
return {"error": "Invalid path"}
if not target.exists():
return {"error": "File not found"}
if not target.is_file():
return {"error": "Not a file"}
# Prevent reading huge files at once
if target.stat().st_size > 10 * 1024 * 1024: # > 10MB
return {
"error": "File too large",
"size": target.stat().st_size,
"message": "Use download endpoint for large files"
}
try:
with open(target, "r", encoding="utf-8", errors="replace") as f:
if limit:
content = f.read(limit)
else:
content = f.read()
return {
"path": rel_path,
"content": content,
"size": len(content),
"encoding": "utf-8"
}
except Exception as e:
logger.error(f"Error reading file: {e}")
return {"error": str(e)}
def create_file(self, rel_path: str, content: str = "") -> Dict[str, str]:
"""
Create new file or empty file
"""
target = self._resolve_path(rel_path)
if not target:
return {"error": "Invalid path"}
try:
target.parent.mkdir(parents=True, exist_ok=True)
if target.exists():
return {"error": "File already exists"}
with open(target, "w") as f:
f.write(content)
logger.info(f"Created file: {target}")
return {"status": "success", "path": rel_path}
except Exception as e:
logger.error(f"Error creating file: {e}")
return {"error": str(e)}
def delete_file_or_dir(self, rel_path: str, recursive: bool = False) -> Dict[str, str]:
"""
Delete file or directory
"""
target = self._resolve_path(rel_path)
if not target:
return {"error": "Invalid path"}
if not target.exists():
return {"error": "Path not found"}
try:
if target.is_file():
target.unlink()
logger.info(f"Deleted file: {target}")
return {"status": "success", "message": f"File deleted: {rel_path}"}
elif target.is_dir():
if not recursive and list(target.iterdir()):
return {"error": "Directory not empty"}
import shutil
shutil.rmtree(target)
logger.info(f"Deleted directory: {target}")
return {"status": "success", "message": f"Directory deleted: {rel_path}"}
return {"error": "Unknown error"}
except PermissionError:
return {"error": "Permission denied"}
except Exception as e:
logger.error(f"Error deleting: {e}")
return {"error": str(e)}
def rename_file(self, rel_path: str, new_name: str) -> Dict[str, str]:
"""
Rename file or directory
"""
target = self._resolve_path(rel_path)
if not target:
return {"error": "Invalid path"}
if not target.exists():
return {"error": "Path not found"}
# Validate new name (no path separators)
if "/" in new_name or "\\" in new_name:
return {"error": "Invalid filename"}
try:
new_path = target.parent / new_name
if new_path.exists():
return {"error": "Target already exists"}
target.rename(new_path)
logger.info(f"Renamed {target} to {new_path}")
return {
"status": "success",
"old_path": rel_path,
"new_path": str(new_path.relative_to(self.base_path))
}
except Exception as e:
logger.error(f"Error renaming: {e}")
return {"error": str(e)}
def mkdir(self, rel_path: str) -> Dict[str, str]:
"""
Create directory
"""
target = self._resolve_path(rel_path)
if not target:
return {"error": "Invalid path"}
if target.exists():
return {"error": "Directory already exists"}
try:
target.mkdir(parents=True, exist_ok=False)
logger.info(f"Created directory: {target}")
return {"status": "success", "path": rel_path}
except Exception as e:
logger.error(f"Error creating directory: {e}")
return {"error": str(e)}
def get_space_info(self) -> Dict[str, Any]:
"""
Get space usage of base path via ZFS or fallback to df
"""
try:
import subprocess
# Try to get space from ZFS first
result = subprocess.run(
["zfs", "list", "-H", "-p", "-o", "used,avail,refer", "tank/share"],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
parts = result.stdout.strip().split()
if len(parts) >= 3:
used = int(parts[0])
available = int(parts[1])
return {
"used": used,
"available": available,
"actual": int(parts[2]),
"total": used + available
}
# Fallback to df if ZFS is not available
result = subprocess.run(
["df", "-B1", str(self.base_path)],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
lines = result.stdout.strip().split('\n')
if len(lines) >= 2:
# Parse df output: Filesystem 1B-blocks Used Available Use% Mounted on
parts = lines[1].split()
if len(parts) >= 4:
total = int(parts[1])
used = int(parts[2])
available = int(parts[3])
return {
"used": used,
"available": available,
"total": total
}
return {"error": "Could not get space info"}
except Exception as e:
logger.error(f"Error getting space info: {e}")
return {"error": str(e)}
def change_permissions(self, rel_path: str, mode: str) -> Dict[str, Any]:
"""
Change file/directory permissions (chmod)
mode: octal string like "755" or "644"
"""
target = self._resolve_path(rel_path)
if not target:
return {"error": "Invalid path"}
if not target.exists():
return {"error": "Path not found"}
try:
import subprocess
# Convert octal mode string to int
mode_int = int(mode, 8)
result = subprocess.run(
["chmod", mode, str(target)],
capture_output=True,
timeout=10
)
if result.returncode == 0:
logger.info(f"Changed permissions for {target} to {mode}")
return {
"status": "success",
"path": rel_path,
"permissions": mode
}
else:
logger.error(f"Failed to change permissions: {result.stderr.decode()}")
return {"error": f"Failed to change permissions: {result.stderr.decode()}"}
except Exception as e:
logger.error(f"Error changing permissions: {e}")
return {"error": str(e)}
def change_owner(self, rel_path: str, owner: str, group: Optional[str] = None) -> Dict[str, Any]:
"""
Change file/directory owner (chown)
owner: username or uid
group: groupname or gid (optional)
"""
target = self._resolve_path(rel_path)
if not target:
return {"error": "Invalid path"}
if not target.exists():
return {"error": "Path not found"}
try:
import subprocess
if group:
chown_spec = f"{owner}:{group}"
else:
chown_spec = owner
result = subprocess.run(
["chown", chown_spec, str(target)],
capture_output=True,
timeout=10
)
if result.returncode == 0:
logger.info(f"Changed owner for {target} to {chown_spec}")
return {
"status": "success",
"path": rel_path,
"owner": owner,
"group": group
}
else:
logger.error(f"Failed to change owner: {result.stderr.decode()}")
return {"error": f"Failed to change owner: {result.stderr.decode()}"}
except Exception as e:
logger.error(f"Error changing owner: {e}")
return {"error": str(e)}
def change_permissions_recursive(self, rel_path: str, mode: str) -> Dict[str, Any]:
"""
Change permissions recursively for directory and contents (chmod -R)
"""
target = self._resolve_path(rel_path)
if not target:
return {"error": "Invalid path"}
if not target.exists():
return {"error": "Path not found"}
if not target.is_dir():
return {"error": "Path is not a directory"}
try:
import subprocess
result = subprocess.run(
["chmod", "-R", mode, str(target)],
capture_output=True,
timeout=30
)
if result.returncode == 0:
logger.info(f"Changed permissions recursively for {target} to {mode}")
return {
"status": "success",
"path": rel_path,
"permissions": mode,
"recursive": True
}
else:
logger.error(f"Failed to change permissions: {result.stderr.decode()}")
return {"error": f"Failed to change permissions: {result.stderr.decode()}"}
except Exception as e:
logger.error(f"Error changing permissions: {e}")
return {"error": str(e)}
def copy_file(self, src_rel: str, dst_rel: str, overwrite: bool = False) -> Dict[str, Any]:
"""
Copy file or directory
"""
src = self._resolve_path(src_rel)
dst = self._resolve_path(dst_rel)
if not src or not dst:
return {"error": "Invalid path"}
if not src.exists():
return {"error": "Source path not found"}
if dst.exists() and not overwrite:
return {"error": "Destination already exists"}
try:
import shutil
if src.is_file():
shutil.copy2(src, dst)
else:
if dst.exists():
shutil.rmtree(dst)
shutil.copytree(src, dst)
logger.info(f"Copied {src} to {dst}")
return {
"status": "success",
"src": src_rel,
"dst": dst_rel
}
except Exception as e:
logger.error(f"Error copying: {e}")
return {"error": str(e)}
def move_file(self, src_rel: str, dst_rel: str, overwrite: bool = False) -> Dict[str, Any]:
"""
Move (rename) file or directory
"""
src = self._resolve_path(src_rel)
dst = self._resolve_path(dst_rel)
if not src or not dst:
return {"error": "Invalid path"}
if not src.exists():
return {"error": "Source path not found"}
if dst.exists() and not overwrite:
return {"error": "Destination already exists"}
try:
import shutil
if dst.exists() and overwrite:
if dst.is_dir():
shutil.rmtree(dst)
else:
dst.unlink()
shutil.move(str(src), str(dst))
logger.info(f"Moved {src} to {dst}")
return {
"status": "success",
"src": src_rel,
"dst": dst_rel
}
except Exception as e:
logger.error(f"Error moving: {e}")
return {"error": str(e)}
def search_files(self, query: str, search_path: str = "", max_results: int = 50) -> List[Dict[str, Any]]:
"""
Search for files by name (case-insensitive)
"""
target = self._resolve_path(search_path) if search_path else self.base_path
if not target or not target.exists():
return []
results = []
query_lower = query.lower()
try:
for root, dirs, files in target.walk():
# Search in directories
for d in sorted(dirs):
if query_lower in d.lower():
dir_path = Path(root) / d
results.append(FileInfo(dir_path, self.base_path).to_dict())
if len(results) >= max_results:
return results
# Search in files
for f in sorted(files):
if query_lower in f.lower():
file_path = Path(root) / f
results.append(FileInfo(file_path, self.base_path).to_dict())
if len(results) >= max_results:
return results
return results
except Exception as e:
logger.error(f"Error searching files: {e}")
return []
# Global instance
file_manager = FileManager()