Files
zmb-webui/backend/routers/snapshots.py
T
patrick d079d76151 Feature: Snapshot-Tab in Datasets mit Kontextmenü (Clone/Rename/Rollback/Destroy)
- Snapshots direkt im Datasets-Tab ladbar (lazy, per Pool)
- Tabelle: Name, Created, Used, Referenced, Clones
- Kontextmenü (⋮) mit Clone, Rename, Roll Back, Destroy Snapshot
- Backend: /api/snapshots/clone + /api/snapshots/rename Endpoints

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-04 22:51:08 +02:00

160 lines
4.8 KiB
Python

"""
Snapshot management endpoints
"""
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from typing import List, Optional
from pydantic import BaseModel
from datetime import datetime
from services.zfs_runner import zfs_runner
from services.auth import auth_service
from models import Snapshot
router = APIRouter(prefix="/api/snapshots", tags=["snapshots"])
security = HTTPBearer()
def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""Verify JWT token and return username"""
username = auth_service.verify_token(credentials.credentials)
if not username:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token"
)
return username
class CreateSnapshotRequest(BaseModel):
dataset: str
name: Optional[str] = None # Auto-generate if not provided
class RollbackSnapshotRequest(BaseModel):
snapshot: str
class CloneSnapshotRequest(BaseModel):
snapshot: str
target: str # full target dataset name
class RenameSnapshotRequest(BaseModel):
snapshot: str
new_name: str # new snapshot tag (after @)
@router.get("/", response_model=List[Snapshot])
async def list_snapshots(
dataset: Optional[str] = None,
limit: int = 50,
current_user: str = Depends(get_current_user)
):
"""
List snapshots (optionally filtered by dataset)
"""
try:
snapshots = zfs_runner.list_snapshots(dataset, limit=limit)
return [
Snapshot(
name=s["name"],
dataset=s["name"].split("@")[0], # Extract dataset part
created=s["creation"],
used=s["used"],
referenced=s["referenced"],
creation_datetime=datetime.fromtimestamp(s["creation"]).isoformat() + "Z"
)
for s in snapshots
]
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/", response_model=dict)
async def create_snapshot(
request: CreateSnapshotRequest,
current_user: str = Depends(get_current_user)
):
"""
Create new snapshot
"""
try:
result = zfs_runner.create_snapshot(request.dataset, request.name)
if result.get("status") == "error":
raise HTTPException(status_code=400, detail=result.get("message"))
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.delete("/{snapshot_name:path}")
async def delete_snapshot(
snapshot_name: str,
current_user: str = Depends(get_current_user)
):
"""
Delete snapshot
"""
try:
result = zfs_runner.destroy_snapshot(snapshot_name)
if result.get("status") == "error":
raise HTTPException(status_code=400, detail=result.get("message"))
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/clone")
async def clone_snapshot(
request: CloneSnapshotRequest,
current_user: str = Depends(get_current_user)
):
"""Clone snapshot to a new dataset"""
try:
result = zfs_runner.run_command(["zfs", "clone", request.snapshot, request.target])
if result[2] != 0:
raise HTTPException(status_code=400, detail=result[1])
zfs_runner.clear_cache()
return {"status": "success", "message": f"Cloned to {request.target}"}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/rename")
async def rename_snapshot(
request: RenameSnapshotRequest,
current_user: str = Depends(get_current_user)
):
"""Rename snapshot"""
try:
dataset = request.snapshot.split("@")[0]
new_full = f"{dataset}@{request.new_name}"
result = zfs_runner.run_command(["zfs", "rename", request.snapshot, new_full])
if result[2] != 0:
raise HTTPException(status_code=400, detail=result[1])
zfs_runner.clear_cache()
return {"status": "success", "message": f"Renamed to {new_full}"}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.post("/rollback")
async def rollback_snapshot(
request: RollbackSnapshotRequest,
current_user: str = Depends(get_current_user)
):
"""
Rollback dataset to snapshot (WARNING: Destroys data after snapshot!)
"""
try:
result = zfs_runner.rollback_snapshot(request.snapshot)
if result.get("status") == "error":
raise HTTPException(status_code=400, detail=result.get("message"))
return result
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))