diff --git a/controller/cuda_grid_controller/__main__.py b/controller/cuda_grid_controller/__main__.py index a80867e..5e1ca72 100644 --- a/controller/cuda_grid_controller/__main__.py +++ b/controller/cuda_grid_controller/__main__.py @@ -21,6 +21,7 @@ from .frigate_bridge import FrigateBridge, FrigateBridgeCfg from .http_api import create_app from .mqtt_loop import MqttLoop from .snapshot_history import SnapshotHistory +from .snapshot_keeper import SnapshotKeeper from .state import ControllerState from .watchdog import StreamWatchdog, WatchdogCfg @@ -111,6 +112,10 @@ async def _run(cfg: Config) -> None: # Snapshot history (Phase 6+) — periodic capture per instance snapshot_hist = SnapshotHistory(cfg) + # Snapshot keeper — persistent ffmpeg per instance dumping latest frame + # к PNG (для low-latency /snapshot UI polling). + snapshot_keeper = SnapshotKeeper(cfg, Path("/tmp/snapshot-keeper")) + # Stream watchdog (Phase 1 resilience, issue #3) — monitor mediamtx paths watchdog: StreamWatchdog | None = None if cfg.watchdog: @@ -125,6 +130,7 @@ async def _run(cfg: Config) -> None: # HTTP REST app = create_app(cfg, state, dispatcher, snapshot_history=snapshot_hist, + snapshot_keeper=snapshot_keeper, frigate_bridge=frigate_bridge) server = uvicorn.Server( uvicorn.Config( @@ -149,6 +155,7 @@ async def _run(cfg: Config) -> None: if browser_renderer: await browser_renderer.start() await snapshot_hist.start() + await snapshot_keeper.start() if watchdog: watchdog._publish_event = mqtt.publish_event await watchdog.start() @@ -178,6 +185,7 @@ async def _run(cfg: Config) -> None: if browser_renderer: await browser_renderer.stop() await snapshot_hist.stop() + await snapshot_keeper.stop() if watchdog: await watchdog.stop() await dispatcher.close() diff --git a/controller/cuda_grid_controller/http_api.py b/controller/cuda_grid_controller/http_api.py index 5b04c84..0f3413a 100644 --- a/controller/cuda_grid_controller/http_api.py +++ b/controller/cuda_grid_controller/http_api.py @@ -41,6 +41,7 @@ class AudioOutputReq(BaseModel): def create_app( cfg: Config, state: ControllerState, dispatcher: CommandDispatcher, snapshot_history: SnapshotHistory | None = None, + snapshot_keeper=None, frigate_bridge=None, ) -> FastAPI: app = FastAPI( @@ -221,11 +222,23 @@ def create_app( }, ) async def snapshot(instance: str) -> Response: - """Capture single frame от output_rtsp_url. Returns PNG bytes (image/png).""" + """Capture single frame от output_rtsp_url. Returns PNG bytes (image/png). + + Fast path: persistent ffmpeg-keeper subprocess дампит latest frame в + файл каждые 500 ms — endpoint просто читает (latency ~10 ms). Cold path: + fork ffmpeg на каждый request (~5 sec) — fallback если keeper ещё не + создал файл (первые 1-2 sec после controller start) или у инстанса + нет output_rtsp_url. + """ inst = _check_instance(instance) if not inst.output_rtsp_url: raise HTTPException(400, f"instance '{instance}' has no output_rtsp_url configured") + if snapshot_keeper: + path = snapshot_keeper.path(instance) + if path: + return FileResponse(path, media_type="image/png") + proc = await asyncio.create_subprocess_exec( "ffmpeg", "-hide_banner", "-loglevel", "error", "-rtsp_transport", "tcp", diff --git a/controller/cuda_grid_controller/snapshot_keeper.py b/controller/cuda_grid_controller/snapshot_keeper.py new file mode 100644 index 0000000..951002d --- /dev/null +++ b/controller/cuda_grid_controller/snapshot_keeper.py @@ -0,0 +1,109 @@ +"""Persistent ffmpeg subprocess для low-latency /snapshot endpoint. + +Cold ffmpeg start + RTSP negotiate + keyframe wait занимает ~5 sec — это +неприемлемо для UI editor где preview обновляется каждые ~700 ms. Заменяем +"per-request ffmpeg" на single long-running ffmpeg который непрерывно читает +RTSP и dump'ит latest frame в file (`fps=2 -update 1`). Endpoint просто +serves этот file — latency ~50 ms (disk read). + +Auto-restart при exit (RTSP source перезапустился, network glitch и т.д.). +""" + +from __future__ import annotations + +import asyncio +from pathlib import Path + +import structlog + +from .config import Config + +log = structlog.get_logger() + + +class SnapshotKeeper: + """Persistent ffmpeg per instance dumping latest frame to PNG file.""" + + def __init__(self, cfg: Config, snapshot_dir: Path) -> None: + self.cfg = cfg + self.snapshot_dir = snapshot_dir + self.snapshot_dir.mkdir(parents=True, exist_ok=True) + self._procs: dict[str, asyncio.subprocess.Process] = {} + self._tasks: dict[str, asyncio.Task] = {} + self._stop = False + + def path(self, instance: str) -> Path | None: + """Возвращает Path к PNG если файл существует (хотя бы один кадр был + записан), иначе None — caller может fallback к cold ffmpeg.""" + p = self.snapshot_dir / f"{instance}.png" + return p if p.exists() else None + + async def start(self) -> None: + for inst in self.cfg.instances: + if not inst.output_rtsp_url: + continue + self._tasks[inst.name] = asyncio.create_task( + self._supervisor(inst.name, inst.output_rtsp_url)) + log.info("snapshot_keeper.started", + instances=list(self._tasks.keys())) + + async def stop(self) -> None: + self._stop = True + for name, proc in self._procs.items(): + try: + proc.terminate() + await asyncio.wait_for(proc.wait(), timeout=2) + except (ProcessLookupError, asyncio.TimeoutError): + try: proc.kill() + except ProcessLookupError: pass + for task in self._tasks.values(): + task.cancel() + for task in self._tasks.values(): + try: await task + except (asyncio.CancelledError, Exception): pass + + async def _supervisor(self, name: str, url: str) -> None: + """Loop: launch ffmpeg, wait exit, retry с backoff.""" + backoff = 3.0 + while not self._stop: + try: + await self._launch_and_wait(name, url) + except Exception as e: + log.warning("snapshot_keeper.launch_fail", + instance=name, error=str(e)) + if self._stop: + break + await asyncio.sleep(backoff) + backoff = min(backoff * 1.5, 30.0) + + async def _launch_and_wait(self, name: str, url: str) -> None: + out_path = self.snapshot_dir / f"{name}.png" + # fps=2 — 2 PNG/sec обновления (preview latency ~500 ms). + # -update 1 — overwrite single file атомарно (rename trick внутри + # image2 muxer). + proc = await asyncio.create_subprocess_exec( + "ffmpeg", + "-hide_banner", "-loglevel", "error", + "-rtsp_transport", "tcp", + "-i", url, + "-an", # без аудио — не нужно для preview + "-vf", "fps=2", + "-update", "1", + "-f", "image2", + "-y", str(out_path), + stdin=asyncio.subprocess.DEVNULL, + stdout=asyncio.subprocess.DEVNULL, + stderr=asyncio.subprocess.PIPE, + ) + self._procs[name] = proc + log.info("snapshot_keeper.launched", instance=name, pid=proc.pid) + rc = await proc.wait() + del self._procs[name] + # Read stderr (might give hint о cause) + try: + err = (await proc.stderr.read(2048)).decode(errors="replace") if proc.stderr else "" + except Exception: + err = "" + if not self._stop: + log.warning("snapshot_keeper.exited", + instance=name, returncode=rc, stderr_tail=err[-300:]) diff --git a/controller/cuda_grid_controller/static/index.html b/controller/cuda_grid_controller/static/index.html index 27a1596..6005d42 100644 --- a/controller/cuda_grid_controller/static/index.html +++ b/controller/cuda_grid_controller/static/index.html @@ -225,7 +225,10 @@ function initVideo() { busy = false; } tick(); - setInterval(tick, 700); + // 250 ms = 4 fps preview. Keeper dumps PNG @ 2 fps в файл, /snapshot reads + // disk за ~4 ms — practical limit это keeper update rate (можно поднять + // до fps=4 в snapshot_keeper.py если нужно ещё быстрее). + setInterval(tick, 250); // expose для syncEditorBounds window.__previewEl = img; window.__previewSize = () => ({w: nativeW, h: nativeH});