""" ZMB Webui API FastAPI backend for ZFS pool management """ import asyncio import json import logging import os import sys from pathlib import Path from typing import Set from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect, status from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse, FileResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates # Add backend to path for imports sys.path.insert(0, str(Path(__file__).parent)) from routers import auth, pools, datasets, snapshots, navigator, identities, shares, system, pages from services.zfs_runner import zfs_runner # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Create FastAPI app app = FastAPI( title="ZMB Webui API", description="API for managing ZFS pools, datasets, and snapshots", version="1.0.0" ) # CORS middleware — configurable via environment variable. # Set ZMB_CORS_ORIGINS to a comma-separated list of allowed origins # (e.g. "https://:8090,http://:3000"). # Defaults to "*" for development if unset. _cors_origins_env = os.getenv("ZMB_CORS_ORIGINS", "*").strip() cors_origins = ( ["*"] if _cors_origins_env == "*" else [origin.strip() for origin in _cors_origins_env.split(",") if origin.strip()] ) # allow_credentials cannot be combined with the "*" wildcard per the CORS spec allow_credentials = cors_origins != ["*"] logger.info("CORS allowed origins: %s", cors_origins) app.add_middleware( CORSMiddleware, allow_origins=cors_origins, allow_credentials=allow_credentials, allow_methods=["*"], allow_headers=["*"], ) # Connected WebSocket clients ws_clients: Set[WebSocket] = set() async def ws_broadcast(message: dict): """Broadcast JSON message to all connected WebSocket clients""" if not ws_clients: return dead = set() data = json.dumps(message) for ws in ws_clients: try: await ws.send_text(data) except Exception: dead.add(ws) ws_clients.difference_update(dead) async def pool_status_broadcaster(): """Background task: broadcast pool status every 30s""" while True: await asyncio.sleep(30) try: pools_data = zfs_runner.list_pools() if pools_data: await ws_broadcast({"type": "pool_status", "data": pools_data}) except Exception as e: logger.warning(f"WS broadcaster error: {e}") @app.on_event("startup") async def startup_event(): asyncio.create_task(pool_status_broadcaster()) @app.websocket("/ws") async def websocket_endpoint(websocket: WebSocket): await websocket.accept() ws_clients.add(websocket) try: # Send initial pool status immediately on connect try: pools_data = zfs_runner.list_pools() await websocket.send_text(json.dumps({"type": "pool_status", "data": pools_data})) except Exception: pass # Keep alive while True: await websocket.receive_text() except WebSocketDisconnect: pass finally: ws_clients.discard(websocket) # Static assets (HTMX, PicoCSS — vendored) _static_dir = os.path.join(os.path.dirname(__file__), "static") app.mount("/static", StaticFiles(directory=_static_dir), name="static") # Include API routers first app.include_router(auth.router) app.include_router(pools.router) app.include_router(datasets.router) app.include_router(snapshots.router) app.include_router(navigator.router) app.include_router(identities.router) app.include_router(shares.router) app.include_router(system.router) # HTML pages router last (catches /, /login, /zfs, /fragments/*) app.include_router(pages.router) # Health check endpoint (no auth required) @app.get("/health") async def health_check(): """Health check endpoint""" return {"status": "healthy", "version": "1.0.0"} # Status endpoint - check if ZFS is available (no auth required) @app.get("/api/status") async def status_check(): """Check system status (ZFS availability)""" # Try to list pools to see if ZFS is available pools = zfs_runner.list_pools() zfs_available = len(pools) >= 0 # list_pools returns [] if ZFS unavailable # More accurate: check if zpool command works import subprocess try: result = subprocess.run(["zpool", "list"], capture_output=True, timeout=5) zfs_available = result.returncode == 0 except Exception: zfs_available = False return { "status": "healthy", "zfs_available": zfs_available, "version": "1.0.0" } # Error handler @app.exception_handler(Exception) async def general_exception_handler(request: Request, exc: Exception): """Global exception handler""" logger.error(f"Unhandled exception: {exc}", exc_info=True) return JSONResponse( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content={"detail": "Internal server error"} ) if __name__ == "__main__": import uvicorn logger.info("Starting ZMB Webui API") logger.info("Available endpoints:") logger.info(" POST /api/auth/login - Login with username/password") logger.info(" GET /api/pools - List all pools") logger.info(" GET /api/pools/{name} - Get pool details") logger.info(" POST /api/pools/{name}/scrub - Start scrub") logger.info(" GET /api/datasets - List datasets") logger.info(" POST /api/datasets - Create dataset") logger.info(" DELETE /api/datasets/{name} - Delete dataset") logger.info(" GET /api/snapshots - List snapshots") logger.info(" POST /api/snapshots - Create snapshot") logger.info(" DELETE /api/snapshots/{name} - Delete snapshot") logger.info(" POST /api/snapshots/rollback - Rollback to snapshot") logger.info(" GET /api/navigator/browse - Browse directory") logger.info(" GET /api/navigator/read - Read file") logger.info(" GET /api/navigator/download - Download file") logger.info(" POST /api/navigator/upload - Upload file") logger.info(" POST /api/navigator/create - Create file") logger.info(" POST /api/navigator/mkdir - Create directory") logger.info(" POST /api/navigator/rename - Rename file") logger.info(" DELETE /api/navigator/delete - Delete file/directory") logger.info(" GET /api/navigator/space - Get space usage") logger.info("") uvicorn.run( "main:app", host="0.0.0.0", port=8000, reload=False, workers=1 )