Files
patrick f49793e6f2 Refactor: Java-Klassen aus Services entfernt + kritische Bugs gefixt
- AuthService, SystemInfo, IdentitiesManager Klassen → Modul-Funktionen
- grp.getall() → grp.getgrall() (Bug: Methode existierte nie)
- open('/proc/loadavg') ohne context manager gefixt (File-Handle-Leak)
- rx_packets/tx_packets null-check im Frontend (toLocaleString auf undefined)
- PoolCard onClick: /pools/{name} → /zfs (Route existierte nicht, löste Seitenreload aus)
- Alle Router-Imports auf Modul-Aliase umgestellt

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-05 14:11:32 +02:00

409 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
File Manager endpoints Browse, upload, download /tank/share
Similar to cockpit-files but minimalist
"""
from fastapi import APIRouter, Depends, HTTPException, status, File, UploadFile, Query
from fastapi.responses import FileResponse, StreamingResponse
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
from typing import Optional
import io
import os
import jwt
from datetime import datetime, timedelta
from services.file_manager import file_manager
from services import auth as auth_service
router = APIRouter(prefix="/api/navigator", tags=["navigator"])
security = HTTPBearer()
def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""Verify JWT token and return username"""
username = auth_service.verify_token(credentials.credentials)
if not username:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token"
)
return username
class CreateFileRequest(BaseModel):
path: str
content: Optional[str] = ""
class RenameRequest(BaseModel):
old_path: str
new_name: str
class MkdirRequest(BaseModel):
path: str
class ChangePermissionsRequest(BaseModel):
path: str
mode: str # e.g. "755", "644"
recursive: bool = False
class ChangeOwnerRequest(BaseModel):
path: str
owner: str
group: Optional[str] = None
@router.get("/browse")
async def browse_directory(
path: str = Query(""),
admin: bool = Query(False),
current_user: str = Depends(get_current_user)
):
"""
List directory contents
Query: ?path=/subdir&admin=false
"""
from pathlib import Path as PyPath
from services.file_manager import FileManager
if admin:
fm = FileManager(base_path=PyPath("/"))
else:
fm = file_manager
result = fm.list_directory(path)
if "error" in result:
raise HTTPException(status_code=400, detail=result["error"])
return result
@router.get("/dirs")
async def list_subdirectories(
path: str = Query(""),
admin: bool = Query(False),
current_user: str = Depends(get_current_user)
):
"""
List only subdirectories of a path (for tree sidebar navigation).
Query: ?path=/subdir&admin=false
Returns: { dirs: [{name, path, has_children}] }
"""
from pathlib import Path as PyPath
from services.file_manager import FileManager
if admin:
fm = FileManager(base_path=PyPath("/"))
else:
fm = file_manager
result = fm.list_subdirectories(path)
if "error" in result:
raise HTTPException(status_code=400, detail=result["error"])
return result
@router.get("/info")
async def get_info(
path: str = Query(""),
current_user: str = Depends(get_current_user)
):
"""
Get file/directory info
Query: ?path=/filename.txt
"""
result = file_manager.get_file_info(path)
if "error" in result:
raise HTTPException(status_code=400, detail=result["error"])
return result
@router.get("/read")
async def read_file(
path: str = Query(""),
limit: Optional[int] = Query(None),
current_user: str = Depends(get_current_user)
):
"""
Read file content (text files only, max 10MB)
Query: ?path=/file.txt&limit=1000
"""
result = file_manager.read_file(path, limit)
if "error" in result:
raise HTTPException(status_code=400, detail=result["error"])
return result
@router.get("/download")
async def download_file(
path: str = Query(""),
current_user: str = Depends(get_current_user)
):
"""
Download file
Returns binary file stream
"""
from pathlib import Path
target = file_manager._resolve_path(path)
if not target or not target.exists() or not target.is_file():
raise HTTPException(status_code=404, detail="File not found")
try:
return FileResponse(
path=target,
filename=target.name,
media_type="application/octet-stream"
)
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@router.post("/upload")
async def upload_file(
file: UploadFile = File(...),
path: str = Query(""),
current_user: str = Depends(get_current_user)
):
"""
Upload file to directory
Query: ?path=/subdir
"""
from pathlib import Path
target = file_manager._resolve_path(path)
if not target or not target.is_dir():
raise HTTPException(status_code=400, detail="Invalid upload directory")
try:
# Create target file path
file_path = target / file.filename
if file_path.exists():
raise HTTPException(status_code=400, detail="File already exists")
# Write uploaded file
contents = await file.read()
if len(contents) > file_manager.MAX_FILE_SIZE:
raise HTTPException(status_code=413, detail="File too large")
with open(file_path, "wb") as f:
f.write(contents)
return {
"status": "success",
"filename": file.filename,
"size": len(contents),
"path": str(file_path.relative_to(file_manager.base_path))
}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@router.post("/create")
async def create_file(
request: CreateFileRequest,
current_user: str = Depends(get_current_user)
):
"""
Create new file with optional content
"""
result = file_manager.create_file(request.path, request.content)
if "error" in result:
raise HTTPException(status_code=400, detail=result["error"])
return result
@router.post("/mkdir")
async def make_directory(
request: MkdirRequest,
current_user: str = Depends(get_current_user)
):
"""
Create directory
"""
result = file_manager.mkdir(request.path)
if "error" in result:
raise HTTPException(status_code=400, detail=result["error"])
return result
@router.post("/rename")
async def rename_file(
request: RenameRequest,
current_user: str = Depends(get_current_user)
):
"""
Rename file or directory
"""
result = file_manager.rename_file(request.old_path, request.new_name)
if "error" in result:
raise HTTPException(status_code=400, detail=result["error"])
return result
@router.delete("/delete")
async def delete_file(
path: str = Query(""),
recursive: bool = Query(False),
current_user: str = Depends(get_current_user)
):
"""
Delete file or directory
Query: ?path=/file.txt or ?path=/dir&recursive=true
"""
result = file_manager.delete_file_or_dir(path, recursive)
if "error" in result:
raise HTTPException(status_code=400, detail=result["error"])
return result
@router.get("/space")
async def get_space_info(current_user: str = Depends(get_current_user)):
"""
Get space usage of /tank/share
"""
result = file_manager.get_space_info()
if "error" in result:
raise HTTPException(status_code=400, detail=result["error"])
return result
@router.post("/permissions")
async def change_permissions(
request: ChangePermissionsRequest,
current_user: str = Depends(get_current_user)
):
"""
Change file/directory permissions (chmod)
Request: {"path": "/file.txt", "mode": "755", "recursive": false}
"""
if request.recursive:
result = file_manager.change_permissions_recursive(request.path, request.mode)
else:
result = file_manager.change_permissions(request.path, request.mode)
if "error" in result:
raise HTTPException(status_code=400, detail=result["error"])
return result
@router.post("/owner")
async def change_owner(
request: ChangeOwnerRequest,
current_user: str = Depends(get_current_user)
):
"""
Change file/directory owner (chown)
Request: {"path": "/file.txt", "owner": "root", "group": "wheel"}
"""
result = file_manager.change_owner(request.path, request.owner, request.group)
if "error" in result:
raise HTTPException(status_code=400, detail=result["error"])
return result
class UnlockRequest(BaseModel):
password: str
@router.post("/unlock")
async def unlock_admin_mode(
request: UnlockRequest,
current_user: str = Depends(get_current_user)
):
"""
Unlock admin mode with password
Returns a token that enables full filesystem access
"""
admin_password = os.environ.get("ZFS_ADMIN_PASSWORD", "admin")
if request.password != admin_password:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid admin password"
)
# Generate a short-lived admin token (valid for 1 hour)
secret_key = os.environ.get("ZFS_SECRET_KEY", "change-me-in-production")
payload = {
"sub": current_user,
"admin": True,
"exp": datetime.utcnow() + timedelta(hours=1)
}
token = jwt.encode(payload, secret_key, algorithm="HS256")
return {
"status": "success",
"admin_token": token,
"message": "Admin mode unlocked"
}
class CopyRequest(BaseModel):
src: str
dst: str
overwrite: bool = False
@router.post("/copy")
async def copy_file(
request: CopyRequest,
current_user: str = Depends(get_current_user)
):
"""
Copy file or directory
"""
result = file_manager.copy_file(request.src, request.dst, request.overwrite)
if "error" in result:
raise HTTPException(status_code=400, detail=result["error"])
return result
class MoveRequest(BaseModel):
src: str
dst: str
overwrite: bool = False
@router.post("/move")
async def move_file(
request: MoveRequest,
current_user: str = Depends(get_current_user)
):
"""
Move (rename) file or directory
"""
result = file_manager.move_file(request.src, request.dst, request.overwrite)
if "error" in result:
raise HTTPException(status_code=400, detail=result["error"])
return result
@router.get("/search")
async def search_files(
q: str = Query(""),
path: str = Query(""),
limit: int = Query(50),
current_user: str = Depends(get_current_user)
):
"""
Search for files by name (case-insensitive)
Query: ?q=term&path=/subdir&limit=50
"""
if not q:
raise HTTPException(status_code=400, detail="Query parameter 'q' is required")
results = file_manager.search_files(q, path, limit)
return {
"query": q,
"path": path or "/",
"results": results,
"count": len(results)
}