"""HTTP REST API (FastAPI).""" from __future__ import annotations from typing import Any import structlog from fastapi import FastAPI, HTTPException from pydantic import BaseModel from .config import Config from .dispatch import CommandDispatcher from .layouts import PREDEFINED_LAYOUTS from .state import ControllerState log = structlog.get_logger() class LayoutSetReq(BaseModel): layout: str def create_app( cfg: Config, state: ControllerState, dispatcher: CommandDispatcher ) -> FastAPI: app = FastAPI( title="cuda-grid-controller", version="0.1.0", description="Control plane для vf_cuda_grid FFmpeg filter", ) @app.get("/health") async def health() -> dict[str, Any]: return {"status": "ok"} @app.get("/layouts") async def layouts() -> dict[str, Any]: return {"predefined": PREDEFINED_LAYOUTS} @app.get("/state") async def get_state() -> dict[str, Any]: out = {} for inst in cfg.instances: out[inst.name] = { "active_layout": await state.get_layout(inst.name), "zmq_endpoint": inst.zmq_endpoint, } return {"instances": out} @app.post("/layout/{instance}/set") async def set_layout(instance: str, req: LayoutSetReq) -> dict[str, Any]: inst = next((i for i in cfg.instances if i.name == instance), None) if inst is None: raise HTTPException(404, f"unknown instance '{instance}'") if req.layout not in PREDEFINED_LAYOUTS: raise HTTPException( 400, f"unknown layout '{req.layout}'. Доступны: {PREDEFINED_LAYOUTS}" ) await dispatcher.handle(instance, "layout.set", req.layout) return {"ok": True, "instance": instance, "layout": req.layout} return app