"""Persistent ffmpeg subprocess для low-latency /snapshot endpoint. Cold ffmpeg start + RTSP negotiate + keyframe wait занимает ~5 sec — это неприемлемо для UI editor где preview обновляется каждые ~700 ms. Заменяем "per-request ffmpeg" на single long-running ffmpeg который непрерывно читает RTSP и dump'ит latest frame в file (`fps=2 -update 1`). Endpoint просто serves этот file — latency ~50 ms (disk read). Auto-restart при exit (RTSP source перезапустился, network glitch и т.д.). """ from __future__ import annotations import asyncio from pathlib import Path import structlog from .config import Config log = structlog.get_logger() class SnapshotKeeper: """Persistent ffmpeg per instance dumping latest frame to PNG file.""" def __init__(self, cfg: Config, snapshot_dir: Path) -> None: self.cfg = cfg self.snapshot_dir = snapshot_dir self.snapshot_dir.mkdir(parents=True, exist_ok=True) self._procs: dict[str, asyncio.subprocess.Process] = {} self._tasks: dict[str, asyncio.Task] = {} self._stop = False def path(self, instance: str) -> Path | None: """Возвращает Path к PNG если файл существует (хотя бы один кадр был записан), иначе None — caller может fallback к cold ffmpeg.""" p = self.snapshot_dir / f"{instance}.png" return p if p.exists() else None async def start(self) -> None: for inst in self.cfg.instances: if not inst.output_rtsp_url: continue self._tasks[inst.name] = asyncio.create_task( self._supervisor(inst.name, inst.output_rtsp_url)) log.info("snapshot_keeper.started", instances=list(self._tasks.keys())) async def stop(self) -> None: self._stop = True for name, proc in self._procs.items(): try: proc.terminate() await asyncio.wait_for(proc.wait(), timeout=2) except (ProcessLookupError, asyncio.TimeoutError): try: proc.kill() except ProcessLookupError: pass for task in self._tasks.values(): task.cancel() for task in self._tasks.values(): try: await task except (asyncio.CancelledError, Exception): pass async def _supervisor(self, name: str, url: str) -> None: """Loop: launch ffmpeg, wait exit, retry с backoff.""" backoff = 3.0 while not self._stop: try: await self._launch_and_wait(name, url) except Exception as e: log.warning("snapshot_keeper.launch_fail", instance=name, error=str(e)) if self._stop: break await asyncio.sleep(backoff) backoff = min(backoff * 1.5, 30.0) async def _launch_and_wait(self, name: str, url: str) -> None: out_path = self.snapshot_dir / f"{name}.png" # fps=2 — 2 PNG/sec обновления (preview latency ~500 ms). # -update 1 — overwrite single file атомарно (rename trick внутри # image2 muxer). proc = await asyncio.create_subprocess_exec( "ffmpeg", "-hide_banner", "-loglevel", "error", "-rtsp_transport", "tcp", "-i", url, "-an", # без аудио — не нужно для preview "-vf", "fps=2", "-update", "1", "-f", "image2", "-y", str(out_path), stdin=asyncio.subprocess.DEVNULL, stdout=asyncio.subprocess.DEVNULL, stderr=asyncio.subprocess.PIPE, ) self._procs[name] = proc log.info("snapshot_keeper.launched", instance=name, pid=proc.pid) rc = await proc.wait() del self._procs[name] # Read stderr (might give hint о cause) try: err = (await proc.stderr.read(2048)).decode(errors="replace") if proc.stderr else "" except Exception: err = "" if not self._stop: log.warning("snapshot_keeper.exited", instance=name, returncode=rc, stderr_tail=err[-300:])