"""HTTP REST API (FastAPI).""" from __future__ import annotations import asyncio from pathlib import Path from typing import Any import structlog from fastapi import Body, FastAPI, HTTPException from fastapi.responses import FileResponse, Response from fastapi.staticfiles import StaticFiles from pydantic import BaseModel, TypeAdapter from .config import Config from .dispatch import CommandDispatcher from .layouts import PREDEFINED_LAYOUTS from .overlays import Overlay from .snapshot_history import SnapshotHistory from .state import ControllerState log = structlog.get_logger() class LayoutSetReq(BaseModel): layout: str class AudioSetReq(BaseModel): source: str def create_app( cfg: Config, state: ControllerState, dispatcher: CommandDispatcher, snapshot_history: SnapshotHistory | None = None, ) -> FastAPI: app = FastAPI( title="cuda-grid-controller", version="0.1.0", description="Control plane для vf_cuda_grid FFmpeg filter", ) # Static UI — http://controller:8080/ui/ static_dir = Path(__file__).parent / "static" if static_dir.exists(): app.mount("/ui", StaticFiles(directory=str(static_dir), html=True), name="ui") @app.get("/") async def root_redirect(): return FileResponse(static_dir / "index.html") def _check_instance(name: str): inst = next((i for i in cfg.instances if i.name == name), None) if inst is None: raise HTTPException(404, f"unknown instance '{name}'") return inst @app.get("/health") async def health() -> dict[str, Any]: return {"status": "ok"} @app.get("/layouts") async def layouts() -> dict[str, Any]: return {"predefined": PREDEFINED_LAYOUTS} @app.get("/state") async def get_state() -> dict[str, Any]: out = {} for inst in cfg.instances: overlays = await state.get_overlays(inst.name) out[inst.name] = { "active_layout": await state.get_layout(inst.name), "zmq_endpoint": inst.zmq_endpoint, "overlays_count": len(overlays), } return {"instances": out} @app.post("/layout/{instance}/set") async def set_layout(instance: str, req: LayoutSetReq) -> dict[str, Any]: inst = _check_instance(instance) if req.layout not in inst.layout_map: raise HTTPException( 400, f"unknown layout '{req.layout}'. Доступны: {list(inst.layout_map.keys())}" ) await dispatcher.handle(instance, "layout.set", req.layout) return {"ok": True, "instance": instance, "layout": req.layout} # ─── Overlays ────────────────────────────────────────────────── @app.post("/overlay/{instance}/add") async def overlay_add( instance: str, overlay: Overlay = Body(...) ) -> dict[str, Any]: _check_instance(instance) await dispatcher.handle( instance, "overlay.add", overlay.model_dump_json() ) return {"ok": True, "id": overlay.id, "type": overlay.type} @app.get("/overlay/{instance}") async def overlay_list(instance: str) -> dict[str, Any]: _check_instance(instance) overlays = await state.get_overlays(instance) return { "instance": instance, "count": len(overlays), "overlays": [o.model_dump() for o in overlays], } @app.delete("/overlay/{instance}/{overlay_id}") async def overlay_remove(instance: str, overlay_id: str) -> dict[str, Any]: _check_instance(instance) await dispatcher.handle(instance, "overlay.remove", overlay_id) return {"ok": True} @app.delete("/overlay/{instance}") async def overlay_clear(instance: str) -> dict[str, Any]: _check_instance(instance) await dispatcher.handle(instance, "overlay.clear", "") return {"ok": True} # ─── Audio ───────────────────────────────────────────────────── @app.get("/audio/{instance}") async def audio_list(instance: str) -> dict[str, Any]: inst = _check_instance(instance) return { "instance": instance, "sources": [ {"name": s.name, "index": s.index, "label": s.label or s.name} for s in inst.audio_sources ], } @app.post("/audio/{instance}/set") async def audio_set(instance: str, req: AudioSetReq) -> dict[str, Any]: inst = _check_instance(instance) names = {s.name for s in inst.audio_sources} if req.source not in names: raise HTTPException( 400, f"unknown audio source '{req.source}'. Доступны: {sorted(names)}" ) await dispatcher.handle(instance, "audio.set", req.source) return {"ok": True, "instance": instance, "source": req.source} @app.post("/intercom/{instance}/start") async def intercom_start(instance: str) -> dict[str, Any]: _check_instance(instance) await dispatcher.handle(instance, "intercom.start", "") return {"ok": True, "instance": instance, "intercom": "active"} @app.post("/intercom/{instance}/end") async def intercom_end(instance: str) -> dict[str, Any]: _check_instance(instance) await dispatcher.handle(instance, "intercom.end", "") return {"ok": True, "instance": instance, "intercom": "idle"} # ─── Snapshot ────────────────────────────────────────────────── @app.post( "/snapshot/{instance}", responses={ 200: {"content": {"image/png": {}}, "description": "PNG snapshot of live grid"}, 504: {"description": "ffmpeg timeout"}, }, ) async def snapshot(instance: str) -> Response: """Capture single frame от output_rtsp_url. Returns PNG bytes (image/png).""" inst = _check_instance(instance) if not inst.output_rtsp_url: raise HTTPException(400, f"instance '{instance}' has no output_rtsp_url configured") proc = await asyncio.create_subprocess_exec( "ffmpeg", "-hide_banner", "-loglevel", "error", "-rtsp_transport", "tcp", "-i", inst.output_rtsp_url, "-frames:v", "1", "-f", "image2pipe", "-c:v", "png", "-", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: png_data, err = await asyncio.wait_for(proc.communicate(), timeout=10) except asyncio.TimeoutError: proc.kill() raise HTTPException(504, "snapshot timeout (ffmpeg >10s)") if proc.returncode != 0 or not png_data: err_msg = err.decode(errors="replace")[:300] if err else "no output" log.warning("snapshot.failed", instance=instance, err=err_msg) raise HTTPException(500, f"ffmpeg failed: {err_msg}") log.info("snapshot.ok", instance=instance, bytes=len(png_data)) return Response(content=png_data, media_type="image/png") # ─── Snapshot history (Phase 6+) ─────────────────────────────── @app.get("/snapshots/{instance}") async def snapshots_list(instance: str, limit: int = 50) -> dict[str, Any]: _check_instance(instance) if snapshot_history is None: raise HTTPException(404, "snapshot_history disabled") return {"instance": instance, "items": snapshot_history.list_snapshots(instance, limit)} @app.get("/snapshots/{instance}/{filename}") async def snapshots_get(instance: str, filename: str) -> Response: _check_instance(instance) if snapshot_history is None: raise HTTPException(404, "snapshot_history disabled") p = snapshot_history.path(instance, filename) if p is None: raise HTTPException(404, "snapshot not found") return FileResponse(p, media_type="image/png") @app.patch("/overlay/{instance}/{overlay_id}") async def overlay_update( instance: str, overlay_id: str, overlay: Overlay = Body(...) ) -> dict[str, Any]: _check_instance(instance) # Фиксируем id из URL — игнорируем body's id если отличается overlay.id = overlay_id await state.update_overlay(instance, overlay) await dispatcher.handle( instance, "overlay.add", overlay.model_dump_json() ) return {"ok": True, "id": overlay_id} return app