""" File Manager Service – Browse, upload, download files in /tank/share Similar to cockpit-files but optimized for ZFS shares """ import os import logging from pathlib import Path from typing import List, Dict, Any, Optional from stat import filemode from datetime import datetime import stat logger = logging.getLogger(__name__) # Root directory for file operations (ZFS share) BASE_PATH = Path("/tank/share") MAX_FILE_SIZE = 2 * 1024 * 1024 * 1024 # 2GB max upload class FileInfo: """File/Directory information""" def __init__(self, path: Path, base_path: Optional[Path] = None): self.path = path self.base_path = base_path or BASE_PATH self.name = path.name self.is_link = path.is_symlink() self.link_target = None try: # For symlinks, read the target if self.is_link: self.link_target = os.readlink(path) # Use lstat for symlink itself, not the target self.stat = path.lstat() else: self.stat = path.stat() self.is_dir = path.is_dir() self.size = self.stat.st_size self.modified = self.stat.st_mtime self.mode = filemode(self.stat.st_mode) self.uid = self.stat.st_uid self.gid = self.stat.st_gid self.error = None except Exception as e: self.is_dir = False self.size = 0 self.modified = 0 self.mode = "---------" self.uid = 0 self.gid = 0 self.error = str(e) def to_dict(self) -> Dict[str, Any]: """Convert to dict for JSON response""" result = { "name": self.name, "path": str(self.path.relative_to(self.base_path)), "is_dir": self.is_dir, "is_link": self.is_link, "size": self.size, "modified": self.modified, "modified_iso": datetime.fromtimestamp(self.modified).isoformat(), "permissions": self.mode, "uid": self.uid, "gid": self.gid, "error": self.error } if self.link_target: result["link_target"] = self.link_target return result class FileManager: def __init__(self, base_path: Optional[Path] = None): self.base_path = base_path or BASE_PATH # Ensure base path exists if not self.base_path.exists(): logger.warning(f"Base path does not exist: {self.base_path}") self.base_path.mkdir(parents=True, exist_ok=True) def _resolve_path(self, rel_path: str) -> Optional[Path]: """ Resolve and validate path (prevent directory traversal attacks) Returns absolute path if safe, None otherwise """ try: if not rel_path: return self.base_path # Remove leading slash and resolve rel_path = rel_path.lstrip("/") target = (self.base_path / rel_path).resolve() # Ensure target is within base_path if not str(target).startswith(str(self.base_path.resolve())): logger.warning(f"Path traversal attempt: {rel_path}") return None return target except Exception as e: logger.error(f"Path resolution error: {e}") return None def list_directory(self, rel_path: str = "") -> Dict[str, Any]: """ List directory contents """ target = self._resolve_path(rel_path) if not target: return {"error": "Invalid path"} if not target.exists(): return {"error": "Path not found"} if not target.is_dir(): return {"error": "Not a directory"} try: entries = [] for item in sorted(target.iterdir()): entries.append(FileInfo(item, self.base_path).to_dict()) return { "path": rel_path or "/", "entries": entries, "total": len(entries) } except PermissionError: return {"error": "Permission denied"} except Exception as e: logger.error(f"Error listing directory: {e}") return {"error": str(e)} def list_subdirectories(self, rel_path: str = "") -> Dict[str, Any]: """ List only subdirectories of a path (for tree sidebar navigation). Returns: { dirs: [{name, path, has_children}] } """ target = self._resolve_path(rel_path) if not target: return {"error": "Invalid path"} if not target.exists(): return {"error": "Path not found"} if not target.is_dir(): return {"error": "Not a directory"} try: dirs = [] for item in sorted(target.iterdir()): try: if not item.is_dir(): continue # Check if has any subdirectory children has_children = any( c.is_dir() for c in item.iterdir() ) except PermissionError: has_children = False dirs.append({ "name": item.name, "path": str(item.relative_to(self.base_path)), "has_children": has_children }) return {"path": rel_path or "/", "dirs": dirs} except PermissionError: return {"error": "Permission denied"} except Exception as e: logger.error(f"Error listing subdirectories: {e}") return {"error": str(e)} def get_file_info(self, rel_path: str) -> Dict[str, Any]: """ Get detailed info about file/directory """ target = self._resolve_path(rel_path) if not target: return {"error": "Invalid path"} if not target.exists(): return {"error": "Path not found"} try: file_info = FileInfo(target, self.base_path).to_dict() # Add directory listing if it's a directory if target.is_dir(): try: children = sorted(target.iterdir()) file_info["children_count"] = len(children) except PermissionError: file_info["children_count"] = -1 # Add file preview for text files if target.is_file() and target.stat().st_size < 1024 * 100: # < 100KB try: if target.suffix in [".txt", ".log", ".md", ".json", ".yaml", ".yml"]: with open(target, "r", encoding="utf-8", errors="replace") as f: preview = f.read(1000) file_info["preview"] = preview except Exception as e: logger.debug(f"Could not read preview: {e}") return file_info except Exception as e: logger.error(f"Error getting file info: {e}") return {"error": str(e)} def read_file(self, rel_path: str, limit: Optional[int] = None) -> Dict[str, Any]: """ Read file content (with optional limit) """ target = self._resolve_path(rel_path) if not target: return {"error": "Invalid path"} if not target.exists(): return {"error": "File not found"} if not target.is_file(): return {"error": "Not a file"} # Prevent reading huge files at once if target.stat().st_size > 10 * 1024 * 1024: # > 10MB return { "error": "File too large", "size": target.stat().st_size, "message": "Use download endpoint for large files" } try: with open(target, "r", encoding="utf-8", errors="replace") as f: if limit: content = f.read(limit) else: content = f.read() return { "path": rel_path, "content": content, "size": len(content), "encoding": "utf-8" } except Exception as e: logger.error(f"Error reading file: {e}") return {"error": str(e)} def create_file(self, rel_path: str, content: str = "") -> Dict[str, str]: """ Create new file or empty file """ target = self._resolve_path(rel_path) if not target: return {"error": "Invalid path"} try: target.parent.mkdir(parents=True, exist_ok=True) if target.exists(): return {"error": "File already exists"} with open(target, "w") as f: f.write(content) logger.info(f"Created file: {target}") return {"status": "success", "path": rel_path} except Exception as e: logger.error(f"Error creating file: {e}") return {"error": str(e)} def delete_file_or_dir(self, rel_path: str, recursive: bool = False) -> Dict[str, str]: """ Delete file or directory """ target = self._resolve_path(rel_path) if not target: return {"error": "Invalid path"} if not target.exists(): return {"error": "Path not found"} try: if target.is_file(): target.unlink() logger.info(f"Deleted file: {target}") return {"status": "success", "message": f"File deleted: {rel_path}"} elif target.is_dir(): if not recursive and list(target.iterdir()): return {"error": "Directory not empty"} import shutil shutil.rmtree(target) logger.info(f"Deleted directory: {target}") return {"status": "success", "message": f"Directory deleted: {rel_path}"} return {"error": "Unknown error"} except PermissionError: return {"error": "Permission denied"} except Exception as e: logger.error(f"Error deleting: {e}") return {"error": str(e)} def rename_file(self, rel_path: str, new_name: str) -> Dict[str, str]: """ Rename file or directory """ target = self._resolve_path(rel_path) if not target: return {"error": "Invalid path"} if not target.exists(): return {"error": "Path not found"} # Validate new name (no path separators) if "/" in new_name or "\\" in new_name: return {"error": "Invalid filename"} try: new_path = target.parent / new_name if new_path.exists(): return {"error": "Target already exists"} target.rename(new_path) logger.info(f"Renamed {target} to {new_path}") return { "status": "success", "old_path": rel_path, "new_path": str(new_path.relative_to(self.base_path)) } except Exception as e: logger.error(f"Error renaming: {e}") return {"error": str(e)} def mkdir(self, rel_path: str) -> Dict[str, str]: """ Create directory """ target = self._resolve_path(rel_path) if not target: return {"error": "Invalid path"} if target.exists(): return {"error": "Directory already exists"} try: target.mkdir(parents=True, exist_ok=False) logger.info(f"Created directory: {target}") return {"status": "success", "path": rel_path} except Exception as e: logger.error(f"Error creating directory: {e}") return {"error": str(e)} def get_space_info(self) -> Dict[str, Any]: """ Get space usage of base path via ZFS or fallback to df """ try: import subprocess # Try to get space from ZFS first result = subprocess.run( ["zfs", "list", "-H", "-p", "-o", "used,avail,refer", "tank/share"], capture_output=True, text=True, timeout=5 ) if result.returncode == 0: parts = result.stdout.strip().split() if len(parts) >= 3: used = int(parts[0]) available = int(parts[1]) return { "used": used, "available": available, "actual": int(parts[2]), "total": used + available } # Fallback to df if ZFS is not available result = subprocess.run( ["df", "-B1", str(self.base_path)], capture_output=True, text=True, timeout=5 ) if result.returncode == 0: lines = result.stdout.strip().split('\n') if len(lines) >= 2: # Parse df output: Filesystem 1B-blocks Used Available Use% Mounted on parts = lines[1].split() if len(parts) >= 4: total = int(parts[1]) used = int(parts[2]) available = int(parts[3]) return { "used": used, "available": available, "total": total } return {"error": "Could not get space info"} except Exception as e: logger.error(f"Error getting space info: {e}") return {"error": str(e)} def change_permissions(self, rel_path: str, mode: str) -> Dict[str, Any]: """ Change file/directory permissions (chmod) mode: octal string like "755" or "644" """ target = self._resolve_path(rel_path) if not target: return {"error": "Invalid path"} if not target.exists(): return {"error": "Path not found"} try: import subprocess # Convert octal mode string to int mode_int = int(mode, 8) result = subprocess.run( ["chmod", mode, str(target)], capture_output=True, timeout=10 ) if result.returncode == 0: logger.info(f"Changed permissions for {target} to {mode}") return { "status": "success", "path": rel_path, "permissions": mode } else: logger.error(f"Failed to change permissions: {result.stderr.decode()}") return {"error": f"Failed to change permissions: {result.stderr.decode()}"} except Exception as e: logger.error(f"Error changing permissions: {e}") return {"error": str(e)} def change_owner(self, rel_path: str, owner: str, group: Optional[str] = None) -> Dict[str, Any]: """ Change file/directory owner (chown) owner: username or uid group: groupname or gid (optional) """ target = self._resolve_path(rel_path) if not target: return {"error": "Invalid path"} if not target.exists(): return {"error": "Path not found"} try: import subprocess if group: chown_spec = f"{owner}:{group}" else: chown_spec = owner result = subprocess.run( ["chown", chown_spec, str(target)], capture_output=True, timeout=10 ) if result.returncode == 0: logger.info(f"Changed owner for {target} to {chown_spec}") return { "status": "success", "path": rel_path, "owner": owner, "group": group } else: logger.error(f"Failed to change owner: {result.stderr.decode()}") return {"error": f"Failed to change owner: {result.stderr.decode()}"} except Exception as e: logger.error(f"Error changing owner: {e}") return {"error": str(e)} def change_permissions_recursive(self, rel_path: str, mode: str) -> Dict[str, Any]: """ Change permissions recursively for directory and contents (chmod -R) """ target = self._resolve_path(rel_path) if not target: return {"error": "Invalid path"} if not target.exists(): return {"error": "Path not found"} if not target.is_dir(): return {"error": "Path is not a directory"} try: import subprocess result = subprocess.run( ["chmod", "-R", mode, str(target)], capture_output=True, timeout=30 ) if result.returncode == 0: logger.info(f"Changed permissions recursively for {target} to {mode}") return { "status": "success", "path": rel_path, "permissions": mode, "recursive": True } else: logger.error(f"Failed to change permissions: {result.stderr.decode()}") return {"error": f"Failed to change permissions: {result.stderr.decode()}"} except Exception as e: logger.error(f"Error changing permissions: {e}") return {"error": str(e)} def copy_file(self, src_rel: str, dst_rel: str, overwrite: bool = False) -> Dict[str, Any]: """ Copy file or directory """ src = self._resolve_path(src_rel) dst = self._resolve_path(dst_rel) if not src or not dst: return {"error": "Invalid path"} if not src.exists(): return {"error": "Source path not found"} if dst.exists() and not overwrite: return {"error": "Destination already exists"} try: import shutil if src.is_file(): shutil.copy2(src, dst) else: if dst.exists(): shutil.rmtree(dst) shutil.copytree(src, dst) logger.info(f"Copied {src} to {dst}") return { "status": "success", "src": src_rel, "dst": dst_rel } except Exception as e: logger.error(f"Error copying: {e}") return {"error": str(e)} def move_file(self, src_rel: str, dst_rel: str, overwrite: bool = False) -> Dict[str, Any]: """ Move (rename) file or directory """ src = self._resolve_path(src_rel) dst = self._resolve_path(dst_rel) if not src or not dst: return {"error": "Invalid path"} if not src.exists(): return {"error": "Source path not found"} if dst.exists() and not overwrite: return {"error": "Destination already exists"} try: import shutil if dst.exists() and overwrite: if dst.is_dir(): shutil.rmtree(dst) else: dst.unlink() shutil.move(str(src), str(dst)) logger.info(f"Moved {src} to {dst}") return { "status": "success", "src": src_rel, "dst": dst_rel } except Exception as e: logger.error(f"Error moving: {e}") return {"error": str(e)} def search_files(self, query: str, search_path: str = "", max_results: int = 50) -> List[Dict[str, Any]]: """ Search for files by name (case-insensitive) """ target = self._resolve_path(search_path) if search_path else self.base_path if not target or not target.exists(): return [] results = [] query_lower = query.lower() try: for root, dirs, files in target.walk(): # Search in directories for d in sorted(dirs): if query_lower in d.lower(): dir_path = Path(root) / d results.append(FileInfo(dir_path, self.base_path).to_dict()) if len(results) >= max_results: return results # Search in files for f in sorted(files): if query_lower in f.lower(): file_path = Path(root) / f results.append(FileInfo(file_path, self.base_path).to_dict()) if len(results) >= max_results: return results return results except Exception as e: logger.error(f"Error searching files: {e}") return [] # Global instance file_manager = FileManager()