2fc68a7cab
- /bin/chmod statt chmod (line 435) - /bin/chown statt chown (line 476) - /bin/chmod -R statt chmod -R für recursive (line 514) Behebt: 'Failed to save permissions' Fehler beim Ändern von Dateiberechtigungen Co-Authored-By: Claude Haiku 4.5 <noreply@anthropic.com>
645 lines
21 KiB
Python
645 lines
21 KiB
Python
"""
|
||
File Manager Service – Browse, upload, download files in /tank/share
|
||
Similar to cockpit-files but optimized for ZFS shares
|
||
"""
|
||
|
||
import os
|
||
import logging
|
||
from pathlib import Path
|
||
from typing import List, Dict, Any, Optional
|
||
from stat import filemode
|
||
from datetime import datetime
|
||
import stat
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# Root directory for file operations (ZFS share)
|
||
BASE_PATH = Path("/tank/share")
|
||
MAX_FILE_SIZE = 2 * 1024 * 1024 * 1024 # 2GB max upload
|
||
|
||
|
||
class FileInfo:
|
||
"""File/Directory information"""
|
||
def __init__(self, path: Path, base_path: Optional[Path] = None):
|
||
self.path = path
|
||
self.base_path = base_path or BASE_PATH
|
||
self.name = path.name
|
||
self.is_link = path.is_symlink()
|
||
self.link_target = None
|
||
|
||
try:
|
||
# For symlinks, read the target
|
||
if self.is_link:
|
||
self.link_target = os.readlink(path)
|
||
# Use lstat for symlink itself, not the target
|
||
self.stat = path.lstat()
|
||
else:
|
||
self.stat = path.stat()
|
||
|
||
self.is_dir = path.is_dir()
|
||
self.size = self.stat.st_size
|
||
self.modified = self.stat.st_mtime
|
||
self.mode = filemode(self.stat.st_mode)
|
||
self.uid = self.stat.st_uid
|
||
self.gid = self.stat.st_gid
|
||
self.error = None
|
||
except Exception as e:
|
||
self.is_dir = False
|
||
self.size = 0
|
||
self.modified = 0
|
||
self.mode = "---------"
|
||
self.uid = 0
|
||
self.gid = 0
|
||
self.error = str(e)
|
||
|
||
def to_dict(self) -> Dict[str, Any]:
|
||
"""Convert to dict for JSON response"""
|
||
result = {
|
||
"name": self.name,
|
||
"path": str(self.path.relative_to(self.base_path)),
|
||
"is_dir": self.is_dir,
|
||
"is_link": self.is_link,
|
||
"size": self.size,
|
||
"modified": self.modified,
|
||
"modified_iso": datetime.fromtimestamp(self.modified).isoformat(),
|
||
"permissions": self.mode,
|
||
"uid": self.uid,
|
||
"gid": self.gid,
|
||
"error": self.error
|
||
}
|
||
if self.link_target:
|
||
result["link_target"] = self.link_target
|
||
return result
|
||
|
||
|
||
class FileManager:
|
||
def __init__(self, base_path: Optional[Path] = None):
|
||
self.base_path = base_path or BASE_PATH
|
||
|
||
# Ensure base path exists
|
||
if not self.base_path.exists():
|
||
logger.warning(f"Base path does not exist: {self.base_path}")
|
||
self.base_path.mkdir(parents=True, exist_ok=True)
|
||
|
||
def _resolve_path(self, rel_path: str) -> Optional[Path]:
|
||
"""
|
||
Resolve and validate path (prevent directory traversal attacks)
|
||
Returns absolute path if safe, None otherwise
|
||
"""
|
||
try:
|
||
if not rel_path:
|
||
return self.base_path
|
||
|
||
# Remove leading slash and resolve
|
||
rel_path = rel_path.lstrip("/")
|
||
target = (self.base_path / rel_path).resolve()
|
||
|
||
# Ensure target is within base_path
|
||
if not str(target).startswith(str(self.base_path.resolve())):
|
||
logger.warning(f"Path traversal attempt: {rel_path}")
|
||
return None
|
||
|
||
return target
|
||
except Exception as e:
|
||
logger.error(f"Path resolution error: {e}")
|
||
return None
|
||
|
||
def list_directory(self, rel_path: str = "") -> Dict[str, Any]:
|
||
"""
|
||
List directory contents
|
||
"""
|
||
target = self._resolve_path(rel_path)
|
||
if not target:
|
||
return {"error": "Invalid path"}
|
||
|
||
if not target.exists():
|
||
return {"error": "Path not found"}
|
||
|
||
if not target.is_dir():
|
||
return {"error": "Not a directory"}
|
||
|
||
try:
|
||
entries = []
|
||
for item in sorted(target.iterdir()):
|
||
entries.append(FileInfo(item, self.base_path).to_dict())
|
||
|
||
return {
|
||
"path": rel_path or "/",
|
||
"entries": entries,
|
||
"total": len(entries)
|
||
}
|
||
except PermissionError:
|
||
return {"error": "Permission denied"}
|
||
except Exception as e:
|
||
logger.error(f"Error listing directory: {e}")
|
||
return {"error": str(e)}
|
||
|
||
def list_subdirectories(self, rel_path: str = "") -> Dict[str, Any]:
|
||
"""
|
||
List only subdirectories of a path (for tree sidebar navigation).
|
||
Returns: { dirs: [{name, path, has_children}] }
|
||
"""
|
||
target = self._resolve_path(rel_path)
|
||
if not target:
|
||
return {"error": "Invalid path"}
|
||
|
||
if not target.exists():
|
||
return {"error": "Path not found"}
|
||
|
||
if not target.is_dir():
|
||
return {"error": "Not a directory"}
|
||
|
||
try:
|
||
dirs = []
|
||
for item in sorted(target.iterdir()):
|
||
try:
|
||
if not item.is_dir():
|
||
continue
|
||
|
||
# Check if has any subdirectory children
|
||
has_children = any(
|
||
c.is_dir() for c in item.iterdir()
|
||
)
|
||
except PermissionError:
|
||
has_children = False
|
||
|
||
dirs.append({
|
||
"name": item.name,
|
||
"path": str(item.relative_to(self.base_path)),
|
||
"has_children": has_children
|
||
})
|
||
|
||
return {"path": rel_path or "/", "dirs": dirs}
|
||
except PermissionError:
|
||
return {"error": "Permission denied"}
|
||
except Exception as e:
|
||
logger.error(f"Error listing subdirectories: {e}")
|
||
return {"error": str(e)}
|
||
|
||
def get_file_info(self, rel_path: str) -> Dict[str, Any]:
|
||
"""
|
||
Get detailed info about file/directory
|
||
"""
|
||
target = self._resolve_path(rel_path)
|
||
if not target:
|
||
return {"error": "Invalid path"}
|
||
|
||
if not target.exists():
|
||
return {"error": "Path not found"}
|
||
|
||
try:
|
||
file_info = FileInfo(target, self.base_path).to_dict()
|
||
|
||
# Add directory listing if it's a directory
|
||
if target.is_dir():
|
||
try:
|
||
children = sorted(target.iterdir())
|
||
file_info["children_count"] = len(children)
|
||
except PermissionError:
|
||
file_info["children_count"] = -1
|
||
|
||
# Add file preview for text files
|
||
if target.is_file() and target.stat().st_size < 1024 * 100: # < 100KB
|
||
try:
|
||
if target.suffix in [".txt", ".log", ".md", ".json", ".yaml", ".yml"]:
|
||
with open(target, "r", encoding="utf-8", errors="replace") as f:
|
||
preview = f.read(1000)
|
||
file_info["preview"] = preview
|
||
except Exception as e:
|
||
logger.debug(f"Could not read preview: {e}")
|
||
|
||
return file_info
|
||
except Exception as e:
|
||
logger.error(f"Error getting file info: {e}")
|
||
return {"error": str(e)}
|
||
|
||
def read_file(self, rel_path: str, limit: Optional[int] = None) -> Dict[str, Any]:
|
||
"""
|
||
Read file content (with optional limit)
|
||
"""
|
||
target = self._resolve_path(rel_path)
|
||
if not target:
|
||
return {"error": "Invalid path"}
|
||
|
||
if not target.exists():
|
||
return {"error": "File not found"}
|
||
|
||
if not target.is_file():
|
||
return {"error": "Not a file"}
|
||
|
||
# Prevent reading huge files at once
|
||
if target.stat().st_size > 10 * 1024 * 1024: # > 10MB
|
||
return {
|
||
"error": "File too large",
|
||
"size": target.stat().st_size,
|
||
"message": "Use download endpoint for large files"
|
||
}
|
||
|
||
try:
|
||
with open(target, "r", encoding="utf-8", errors="replace") as f:
|
||
if limit:
|
||
content = f.read(limit)
|
||
else:
|
||
content = f.read()
|
||
|
||
return {
|
||
"path": rel_path,
|
||
"content": content,
|
||
"size": len(content),
|
||
"encoding": "utf-8"
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"Error reading file: {e}")
|
||
return {"error": str(e)}
|
||
|
||
def create_file(self, rel_path: str, content: str = "") -> Dict[str, str]:
|
||
"""
|
||
Create new file or empty file
|
||
"""
|
||
target = self._resolve_path(rel_path)
|
||
if not target:
|
||
return {"error": "Invalid path"}
|
||
|
||
try:
|
||
target.parent.mkdir(parents=True, exist_ok=True)
|
||
|
||
if target.exists():
|
||
return {"error": "File already exists"}
|
||
|
||
with open(target, "w") as f:
|
||
f.write(content)
|
||
|
||
logger.info(f"Created file: {target}")
|
||
return {"status": "success", "path": rel_path}
|
||
except Exception as e:
|
||
logger.error(f"Error creating file: {e}")
|
||
return {"error": str(e)}
|
||
|
||
def delete_file_or_dir(self, rel_path: str, recursive: bool = False) -> Dict[str, str]:
|
||
"""
|
||
Delete file or directory
|
||
"""
|
||
target = self._resolve_path(rel_path)
|
||
if not target:
|
||
return {"error": "Invalid path"}
|
||
|
||
if not target.exists():
|
||
return {"error": "Path not found"}
|
||
|
||
try:
|
||
if target.is_file():
|
||
target.unlink()
|
||
logger.info(f"Deleted file: {target}")
|
||
return {"status": "success", "message": f"File deleted: {rel_path}"}
|
||
|
||
elif target.is_dir():
|
||
if not recursive and list(target.iterdir()):
|
||
return {"error": "Directory not empty"}
|
||
|
||
import shutil
|
||
shutil.rmtree(target)
|
||
logger.info(f"Deleted directory: {target}")
|
||
return {"status": "success", "message": f"Directory deleted: {rel_path}"}
|
||
|
||
return {"error": "Unknown error"}
|
||
except PermissionError:
|
||
return {"error": "Permission denied"}
|
||
except Exception as e:
|
||
logger.error(f"Error deleting: {e}")
|
||
return {"error": str(e)}
|
||
|
||
def rename_file(self, rel_path: str, new_name: str) -> Dict[str, str]:
|
||
"""
|
||
Rename file or directory
|
||
"""
|
||
target = self._resolve_path(rel_path)
|
||
if not target:
|
||
return {"error": "Invalid path"}
|
||
|
||
if not target.exists():
|
||
return {"error": "Path not found"}
|
||
|
||
# Validate new name (no path separators)
|
||
if "/" in new_name or "\\" in new_name:
|
||
return {"error": "Invalid filename"}
|
||
|
||
try:
|
||
new_path = target.parent / new_name
|
||
|
||
if new_path.exists():
|
||
return {"error": "Target already exists"}
|
||
|
||
target.rename(new_path)
|
||
logger.info(f"Renamed {target} to {new_path}")
|
||
return {
|
||
"status": "success",
|
||
"old_path": rel_path,
|
||
"new_path": str(new_path.relative_to(self.base_path))
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"Error renaming: {e}")
|
||
return {"error": str(e)}
|
||
|
||
def mkdir(self, rel_path: str) -> Dict[str, str]:
|
||
"""
|
||
Create directory
|
||
"""
|
||
target = self._resolve_path(rel_path)
|
||
if not target:
|
||
return {"error": "Invalid path"}
|
||
|
||
if target.exists():
|
||
return {"error": "Directory already exists"}
|
||
|
||
try:
|
||
target.mkdir(parents=True, exist_ok=False)
|
||
logger.info(f"Created directory: {target}")
|
||
return {"status": "success", "path": rel_path}
|
||
except Exception as e:
|
||
logger.error(f"Error creating directory: {e}")
|
||
return {"error": str(e)}
|
||
|
||
def get_space_info(self) -> Dict[str, Any]:
|
||
"""
|
||
Get space usage of base path via ZFS or fallback to df
|
||
"""
|
||
try:
|
||
import subprocess
|
||
|
||
# Try to get space from ZFS first
|
||
result = subprocess.run(
|
||
["zfs", "list", "-H", "-p", "-o", "used,avail,refer", "tank/share"],
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=5
|
||
)
|
||
|
||
if result.returncode == 0:
|
||
parts = result.stdout.strip().split()
|
||
if len(parts) >= 3:
|
||
used = int(parts[0])
|
||
available = int(parts[1])
|
||
return {
|
||
"used": used,
|
||
"available": available,
|
||
"actual": int(parts[2]),
|
||
"total": used + available
|
||
}
|
||
|
||
# Fallback to df if ZFS is not available
|
||
result = subprocess.run(
|
||
["df", "-B1", str(self.base_path)],
|
||
capture_output=True,
|
||
text=True,
|
||
timeout=5
|
||
)
|
||
|
||
if result.returncode == 0:
|
||
lines = result.stdout.strip().split('\n')
|
||
if len(lines) >= 2:
|
||
# Parse df output: Filesystem 1B-blocks Used Available Use% Mounted on
|
||
parts = lines[1].split()
|
||
if len(parts) >= 4:
|
||
total = int(parts[1])
|
||
used = int(parts[2])
|
||
available = int(parts[3])
|
||
return {
|
||
"used": used,
|
||
"available": available,
|
||
"total": total
|
||
}
|
||
|
||
return {"error": "Could not get space info"}
|
||
except Exception as e:
|
||
logger.error(f"Error getting space info: {e}")
|
||
return {"error": str(e)}
|
||
|
||
def change_permissions(self, rel_path: str, mode: str) -> Dict[str, Any]:
|
||
"""
|
||
Change file/directory permissions (chmod)
|
||
mode: octal string like "755" or "644"
|
||
"""
|
||
target = self._resolve_path(rel_path)
|
||
if not target:
|
||
return {"error": "Invalid path"}
|
||
|
||
if not target.exists():
|
||
return {"error": "Path not found"}
|
||
|
||
try:
|
||
import subprocess
|
||
# Convert octal mode string to int
|
||
mode_int = int(mode, 8)
|
||
|
||
result = subprocess.run(
|
||
["/bin/chmod", mode, str(target)],
|
||
capture_output=True,
|
||
timeout=10
|
||
)
|
||
|
||
if result.returncode == 0:
|
||
logger.info(f"Changed permissions for {target} to {mode}")
|
||
return {
|
||
"status": "success",
|
||
"path": rel_path,
|
||
"permissions": mode
|
||
}
|
||
else:
|
||
logger.error(f"Failed to change permissions: {result.stderr.decode()}")
|
||
return {"error": f"Failed to change permissions: {result.stderr.decode()}"}
|
||
except Exception as e:
|
||
logger.error(f"Error changing permissions: {e}")
|
||
return {"error": str(e)}
|
||
|
||
def change_owner(self, rel_path: str, owner: str, group: Optional[str] = None) -> Dict[str, Any]:
|
||
"""
|
||
Change file/directory owner (chown)
|
||
owner: username or uid
|
||
group: groupname or gid (optional)
|
||
"""
|
||
target = self._resolve_path(rel_path)
|
||
if not target:
|
||
return {"error": "Invalid path"}
|
||
|
||
if not target.exists():
|
||
return {"error": "Path not found"}
|
||
|
||
try:
|
||
import subprocess
|
||
|
||
if group:
|
||
chown_spec = f"{owner}:{group}"
|
||
else:
|
||
chown_spec = owner
|
||
|
||
result = subprocess.run(
|
||
["/bin/chown", chown_spec, str(target)],
|
||
capture_output=True,
|
||
timeout=10
|
||
)
|
||
|
||
if result.returncode == 0:
|
||
logger.info(f"Changed owner for {target} to {chown_spec}")
|
||
return {
|
||
"status": "success",
|
||
"path": rel_path,
|
||
"owner": owner,
|
||
"group": group
|
||
}
|
||
else:
|
||
logger.error(f"Failed to change owner: {result.stderr.decode()}")
|
||
return {"error": f"Failed to change owner: {result.stderr.decode()}"}
|
||
except Exception as e:
|
||
logger.error(f"Error changing owner: {e}")
|
||
return {"error": str(e)}
|
||
|
||
def change_permissions_recursive(self, rel_path: str, mode: str) -> Dict[str, Any]:
|
||
"""
|
||
Change permissions recursively for directory and contents (chmod -R)
|
||
"""
|
||
target = self._resolve_path(rel_path)
|
||
if not target:
|
||
return {"error": "Invalid path"}
|
||
|
||
if not target.exists():
|
||
return {"error": "Path not found"}
|
||
|
||
if not target.is_dir():
|
||
return {"error": "Path is not a directory"}
|
||
|
||
try:
|
||
import subprocess
|
||
|
||
result = subprocess.run(
|
||
["/bin/chmod", "-R", mode, str(target)],
|
||
capture_output=True,
|
||
timeout=30
|
||
)
|
||
|
||
if result.returncode == 0:
|
||
logger.info(f"Changed permissions recursively for {target} to {mode}")
|
||
return {
|
||
"status": "success",
|
||
"path": rel_path,
|
||
"permissions": mode,
|
||
"recursive": True
|
||
}
|
||
else:
|
||
logger.error(f"Failed to change permissions: {result.stderr.decode()}")
|
||
return {"error": f"Failed to change permissions: {result.stderr.decode()}"}
|
||
except Exception as e:
|
||
logger.error(f"Error changing permissions: {e}")
|
||
return {"error": str(e)}
|
||
|
||
def copy_file(self, src_rel: str, dst_rel: str, overwrite: bool = False) -> Dict[str, Any]:
|
||
"""
|
||
Copy file or directory
|
||
"""
|
||
src = self._resolve_path(src_rel)
|
||
dst = self._resolve_path(dst_rel)
|
||
|
||
if not src or not dst:
|
||
return {"error": "Invalid path"}
|
||
|
||
if not src.exists():
|
||
return {"error": "Source path not found"}
|
||
|
||
if dst.exists() and not overwrite:
|
||
return {"error": "Destination already exists"}
|
||
|
||
try:
|
||
import shutil
|
||
|
||
if src.is_file():
|
||
shutil.copy2(src, dst)
|
||
else:
|
||
if dst.exists():
|
||
shutil.rmtree(dst)
|
||
shutil.copytree(src, dst)
|
||
|
||
logger.info(f"Copied {src} to {dst}")
|
||
return {
|
||
"status": "success",
|
||
"src": src_rel,
|
||
"dst": dst_rel
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"Error copying: {e}")
|
||
return {"error": str(e)}
|
||
|
||
def move_file(self, src_rel: str, dst_rel: str, overwrite: bool = False) -> Dict[str, Any]:
|
||
"""
|
||
Move (rename) file or directory
|
||
"""
|
||
src = self._resolve_path(src_rel)
|
||
dst = self._resolve_path(dst_rel)
|
||
|
||
if not src or not dst:
|
||
return {"error": "Invalid path"}
|
||
|
||
if not src.exists():
|
||
return {"error": "Source path not found"}
|
||
|
||
if dst.exists() and not overwrite:
|
||
return {"error": "Destination already exists"}
|
||
|
||
try:
|
||
import shutil
|
||
|
||
if dst.exists() and overwrite:
|
||
if dst.is_dir():
|
||
shutil.rmtree(dst)
|
||
else:
|
||
dst.unlink()
|
||
|
||
shutil.move(str(src), str(dst))
|
||
|
||
logger.info(f"Moved {src} to {dst}")
|
||
return {
|
||
"status": "success",
|
||
"src": src_rel,
|
||
"dst": dst_rel
|
||
}
|
||
except Exception as e:
|
||
logger.error(f"Error moving: {e}")
|
||
return {"error": str(e)}
|
||
|
||
def search_files(self, query: str, search_path: str = "", max_results: int = 50) -> List[Dict[str, Any]]:
|
||
"""
|
||
Search for files by name (case-insensitive)
|
||
"""
|
||
target = self._resolve_path(search_path) if search_path else self.base_path
|
||
|
||
if not target or not target.exists():
|
||
return []
|
||
|
||
results = []
|
||
query_lower = query.lower()
|
||
|
||
try:
|
||
for root, dirs, files in target.walk():
|
||
# Search in directories
|
||
for d in sorted(dirs):
|
||
if query_lower in d.lower():
|
||
dir_path = Path(root) / d
|
||
results.append(FileInfo(dir_path, self.base_path).to_dict())
|
||
if len(results) >= max_results:
|
||
return results
|
||
|
||
# Search in files
|
||
for f in sorted(files):
|
||
if query_lower in f.lower():
|
||
file_path = Path(root) / f
|
||
results.append(FileInfo(file_path, self.base_path).to_dict())
|
||
if len(results) >= max_results:
|
||
return results
|
||
|
||
return results
|
||
except Exception as e:
|
||
logger.error(f"Error searching files: {e}")
|
||
return []
|
||
|
||
|
||
# Global instance
|
||
file_manager = FileManager()
|