Files
vf-cuda-grid/controller/cuda_grid_controller/http_api.py
T
gx d0e34c9d31 controller: persistent ffmpeg snapshot keeper — /snapshot latency 5s → 4ms
SnapshotKeeper class — single long-running ffmpeg subprocess на каждый
instance, читает output_rtsp_url непрерывно и dumps latest frame в файл
(/tmp/snapshot-keeper/<instance>.png) каждые 500 ms через ffmpeg fps=2
-update 1. /snapshot endpoint просто serves файл — disk read ~4 ms (было
~5 sec на cold ffmpeg start + RTSP negotiate + keyframe wait).

Auto-restart с exponential backoff при exit (RTSP source перезапустился,
network glitch). Cold ffmpeg fallback остаётся в endpoint — если keeper
ещё не успел dump первый PNG (первые ~1-2 sec после controller start).

UI: snapshot poll interval 700 → 250 ms (4 fps preview, было ~1.4 fps).
Keeper dump rate сейчас 2 fps — practical limit. При желании поднять
до 4 fps — fps=4 в snapshot_keeper.py (~5% доп CPU на ffmpeg PNG encode).

Применение: основной user-facing path = visual overlay editor http://controller:8083/.
Раньше polling 700 ms показывал тот же frame несколько раз пока ffmpeg
запускался. Сейчас preview почти real-time, drag-and-drop reference точный.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-26 22:16:45 +01:00

298 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""HTTP REST API (FastAPI)."""
from __future__ import annotations
import asyncio
from pathlib import Path
from typing import Any
import structlog
from fastapi import Body, FastAPI, HTTPException
from fastapi.responses import FileResponse, Response
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, TypeAdapter
from .config import Config
from .dispatch import CommandDispatcher
from .layouts import PREDEFINED_LAYOUTS
from .overlays import Overlay
from .snapshot_history import SnapshotHistory
from .state import ControllerState
log = structlog.get_logger()
class LayoutSetReq(BaseModel):
layout: str
class AudioSetReq(BaseModel):
source: str
class AutoLayoutReq(BaseModel):
enabled: bool
class AudioOutputReq(BaseModel):
enabled: bool
def create_app(
cfg: Config, state: ControllerState, dispatcher: CommandDispatcher,
snapshot_history: SnapshotHistory | None = None,
snapshot_keeper=None,
frigate_bridge=None,
) -> FastAPI:
app = FastAPI(
title="cuda-grid-controller",
version="0.1.0",
description="Control plane для vf_cuda_grid FFmpeg filter",
)
# Static UI — http://controller:8080/ui/
static_dir = Path(__file__).parent / "static"
if static_dir.exists():
app.mount("/ui", StaticFiles(directory=str(static_dir), html=True), name="ui")
@app.get("/")
async def root_redirect():
return FileResponse(static_dir / "index.html")
def _check_instance(name: str):
inst = next((i for i in cfg.instances if i.name == name), None)
if inst is None:
raise HTTPException(404, f"unknown instance '{name}'")
return inst
@app.get("/health")
async def health() -> dict[str, Any]:
return {"status": "ok"}
@app.get("/layouts")
async def layouts() -> dict[str, Any]:
return {"predefined": PREDEFINED_LAYOUTS}
@app.get("/state")
async def get_state() -> dict[str, Any]:
out = {}
for inst in cfg.instances:
overlays = await state.get_overlays(inst.name)
out[inst.name] = {
"active_layout": await state.get_layout(inst.name),
"zmq_endpoint": inst.zmq_endpoint,
"overlays_count": len(overlays),
}
return {"instances": out}
@app.get("/audio-output/{instance}")
async def audio_output_get(instance: str) -> dict[str, Any]:
_check_instance(instance)
return {
"instance": instance,
"enabled": await state.get_audio_output_enabled(instance),
}
@app.post("/audio-output/{instance}")
async def audio_output_set(instance: str, req: AudioOutputReq) -> dict[str, Any]:
_check_instance(instance)
ok = await dispatcher.set_audio_output_enabled(instance, req.enabled)
if not ok:
raise HTTPException(500, "audio output ZMQ command failed (см. logs)")
return {"ok": True, "instance": instance, "enabled": req.enabled}
@app.post("/layout/{instance}/set")
async def set_layout(instance: str, req: LayoutSetReq) -> dict[str, Any]:
_check_instance(instance)
# Phase 7: validation против built-in PREDEFINED_LAYOUTS (filter ground truth).
if req.layout not in PREDEFINED_LAYOUTS:
raise HTTPException(
400, f"unknown layout '{req.layout}'. Доступны: {PREDEFINED_LAYOUTS}"
)
await dispatcher.handle(instance, "layout.set", req.layout)
return {"ok": True, "instance": instance, "layout": req.layout}
@app.get("/layouts/{instance}")
async def layouts_for_instance(instance: str) -> dict[str, Any]:
inst = _check_instance(instance)
# UI shows только то что объявлено в config layout_map (subset). Если пуст —
# все доступные filter layouts.
ui_layouts = list(inst.layout_map.keys()) if inst.layout_map else PREDEFINED_LAYOUTS
return {
"instance": instance,
"layouts": ui_layouts,
"current": await state.get_layout(instance),
}
@app.get("/auto-layout/{instance}")
async def auto_layout_get(instance: str) -> dict[str, Any]:
_check_instance(instance)
enabled = bool(frigate_bridge and frigate_bridge.cfg.auto_layout)
return {"instance": instance, "enabled": enabled}
@app.post("/auto-layout/{instance}")
async def auto_layout_set(instance: str, req: AutoLayoutReq) -> dict[str, Any]:
_check_instance(instance)
if frigate_bridge is None:
raise HTTPException(404, "frigate_bridge не configured")
frigate_bridge.cfg.auto_layout = req.enabled
log.info("auto_layout.toggled", instance=instance, enabled=req.enabled)
# При включении — сразу применить (могут уже быть active cameras)
if req.enabled:
await frigate_bridge._update_auto_layout(instance)
return {"ok": True, "instance": instance, "enabled": req.enabled}
# ─── Overlays ──────────────────────────────────────────────────
@app.post("/overlay/{instance}/add")
async def overlay_add(
instance: str, overlay: Overlay = Body(...)
) -> dict[str, Any]:
_check_instance(instance)
await dispatcher.handle(
instance, "overlay.add", overlay.model_dump_json()
)
return {"ok": True, "id": overlay.id, "type": overlay.type}
@app.get("/overlay/{instance}")
async def overlay_list(instance: str) -> dict[str, Any]:
_check_instance(instance)
overlays = await state.get_overlays(instance)
return {
"instance": instance,
"count": len(overlays),
"overlays": [o.model_dump() for o in overlays],
}
@app.delete("/overlay/{instance}/{overlay_id}")
async def overlay_remove(instance: str, overlay_id: str) -> dict[str, Any]:
_check_instance(instance)
await dispatcher.handle(instance, "overlay.remove", overlay_id)
return {"ok": True}
@app.delete("/overlay/{instance}")
async def overlay_clear(instance: str) -> dict[str, Any]:
_check_instance(instance)
await dispatcher.handle(instance, "overlay.clear", "")
return {"ok": True}
# ─── Audio ─────────────────────────────────────────────────────
@app.get("/audio/{instance}")
async def audio_list(instance: str) -> dict[str, Any]:
inst = _check_instance(instance)
return {
"instance": instance,
"sources": [
{"name": s.name, "index": s.index, "label": s.label or s.name}
for s in inst.audio_sources
],
}
@app.post("/audio/{instance}/set")
async def audio_set(instance: str, req: AudioSetReq) -> dict[str, Any]:
inst = _check_instance(instance)
names = {s.name for s in inst.audio_sources}
if req.source not in names:
raise HTTPException(
400, f"unknown audio source '{req.source}'. Доступны: {sorted(names)}"
)
await dispatcher.handle(instance, "audio.set", req.source)
return {"ok": True, "instance": instance, "source": req.source}
@app.post("/intercom/{instance}/start")
async def intercom_start(instance: str) -> dict[str, Any]:
_check_instance(instance)
await dispatcher.handle(instance, "intercom.start", "")
return {"ok": True, "instance": instance, "intercom": "active"}
@app.post("/intercom/{instance}/end")
async def intercom_end(instance: str) -> dict[str, Any]:
_check_instance(instance)
await dispatcher.handle(instance, "intercom.end", "")
return {"ok": True, "instance": instance, "intercom": "idle"}
# ─── Snapshot ──────────────────────────────────────────────────
@app.post(
"/snapshot/{instance}",
responses={
200: {"content": {"image/png": {}}, "description": "PNG snapshot of live grid"},
504: {"description": "ffmpeg timeout"},
},
)
async def snapshot(instance: str) -> Response:
"""Capture single frame от output_rtsp_url. Returns PNG bytes (image/png).
Fast path: persistent ffmpeg-keeper subprocess дампит latest frame в
файл каждые 500 ms — endpoint просто читает (latency ~10 ms). Cold path:
fork ffmpeg на каждый request (~5 sec) — fallback если keeper ещё не
создал файл (первые 1-2 sec после controller start) или у инстанса
нет output_rtsp_url.
"""
inst = _check_instance(instance)
if not inst.output_rtsp_url:
raise HTTPException(400, f"instance '{instance}' has no output_rtsp_url configured")
if snapshot_keeper:
path = snapshot_keeper.path(instance)
if path:
return FileResponse(path, media_type="image/png")
proc = await asyncio.create_subprocess_exec(
"ffmpeg", "-hide_banner", "-loglevel", "error",
"-rtsp_transport", "tcp",
"-i", inst.output_rtsp_url,
"-frames:v", "1",
"-f", "image2pipe", "-c:v", "png", "-",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
png_data, err = await asyncio.wait_for(proc.communicate(), timeout=10)
except asyncio.TimeoutError:
proc.kill()
raise HTTPException(504, "snapshot timeout (ffmpeg >10s)")
if proc.returncode != 0 or not png_data:
err_msg = err.decode(errors="replace")[:300] if err else "no output"
log.warning("snapshot.failed", instance=instance, err=err_msg)
raise HTTPException(500, f"ffmpeg failed: {err_msg}")
log.info("snapshot.ok", instance=instance, bytes=len(png_data))
return Response(content=png_data, media_type="image/png")
# ─── Snapshot history (Phase 6+) ───────────────────────────────
@app.get("/snapshots/{instance}")
async def snapshots_list(instance: str, limit: int = 50) -> dict[str, Any]:
_check_instance(instance)
if snapshot_history is None:
raise HTTPException(404, "snapshot_history disabled")
return {"instance": instance, "items": snapshot_history.list_snapshots(instance, limit)}
@app.get("/snapshots/{instance}/{filename}")
async def snapshots_get(instance: str, filename: str) -> Response:
_check_instance(instance)
if snapshot_history is None:
raise HTTPException(404, "snapshot_history disabled")
p = snapshot_history.path(instance, filename)
if p is None:
raise HTTPException(404, "snapshot not found")
return FileResponse(p, media_type="image/png")
@app.patch("/overlay/{instance}/{overlay_id}")
async def overlay_update(
instance: str, overlay_id: str, overlay: Overlay = Body(...)
) -> dict[str, Any]:
_check_instance(instance)
# Фиксируем id из URL — игнорируем body's id если отличается
overlay.id = overlay_id
await state.update_overlay(instance, overlay)
await dispatcher.handle(
instance, "overlay.add", overlay.model_dump_json()
)
return {"ok": True, "id": overlay_id}
return app