"""Pipeline monitor — детект restart ffmpeg pipeline + auto-restore overlay state. Также stall watchdog — детектит когда pipeline encoder перестал emit'ить frames (e.g. audio input gone, NVENC hung) и emit'ит alert через on_event callback (typically wired к MQTT publish). Pipeline filter state (overlays, layout, cell_map) живёт в RAM ffmpeg process'а. При recreate container (compose up, OOM, etc.) — state теряется. Controller'у нужно re-push: * active layout (state.active_layout) * все overlays (state.overlays) * audio output state (state.audio_output_enabled) * browser/dynamic renderer flags (registered → reset, next loop iter re-add) Detect mechanism: * Restart: ZMQ ping (set_layout с current name = no-op) every N sec. Tracks alive state — first success после consecutive failures → restart event. * Stall: polls mediamtx HTTP API для inbound bytes на publish session. Если bytes не растут N consecutive polls → emit pipeline.stalled MQTT event. User / HA automation решает что делать (restart, notify). """ from __future__ import annotations import asyncio from typing import TYPE_CHECKING import httpx import structlog from .layouts import PREDEFINED_LAYOUTS from .zmq_client import FFmpegZmqClient if TYPE_CHECKING: from .browser_overlays import BrowserRenderer from .config import Config from .dispatch import CommandDispatcher from .dynamic_overlays import DynamicRenderer from .frigate_bridge import FrigateBridge from .state import ControllerState log = structlog.get_logger() class PipelineMonitor: """Polls pipeline ZMQ, restores state on restart-detect.""" def __init__( self, cfg: "Config", state: "ControllerState", dispatcher: "CommandDispatcher", browser_renderer: "BrowserRenderer | None" = None, dynamic_renderer: "DynamicRenderer | None" = None, frigate_bridge: "FrigateBridge | None" = None, poll_interval_sec: float = 3.0, ) -> None: self.cfg = cfg self.state = state self.dispatcher = dispatcher self.browser_renderer = browser_renderer self.dynamic_renderer = dynamic_renderer self.frigate_bridge = frigate_bridge self.poll_interval_sec = poll_interval_sec self._task: asyncio.Task | None = None self._alive: dict[str, bool] = {} # instance → last-known up/down # Stall detection: per-instance last seen bytesReceived + consecutive # stall poll count. 3 polls × 3 sec = ~9 sec stalled → emit alert. self._last_bytes: dict[str, int] = {} self._stall_count: dict[str, int] = {} self._stall_alerted: dict[str, bool] = {} # already alerted (suppress dup) self.stall_polls_threshold: int = 3 # mediamtx host derived from instance audio/zmq endpoints (assume same host) self.mediamtx_api: str = "http://cuda-grid-mediamtx:9997" # on_event callback signature (instance, kind, payload) — wired в __main__ self.on_event = None async def start(self) -> None: self._task = asyncio.create_task(self._loop()) log.info("pipeline_monitor.started", poll_sec=self.poll_interval_sec, instances=[i.name for i in self.cfg.instances]) async def stop(self) -> None: if self._task: self._task.cancel() try: await self._task except asyncio.CancelledError: pass async def _loop(self) -> None: # Boot delay — give pipeline time to start initially await asyncio.sleep(self.poll_interval_sec) while True: try: for inst in self.cfg.instances: await self._check_instance(inst.name) await self._check_stall(inst.name) await asyncio.sleep(self.poll_interval_sec) except asyncio.CancelledError: raise except Exception as e: log.error("pipeline_monitor.loop_fail", error=str(e)) await asyncio.sleep(self.poll_interval_sec) async def _check_stall(self, instance: str) -> None: """Poll mediamtx publish session bytesReceived. Не растёт N consecutive polls → encoder stalled (NVENC hung, output blocked, etc.). Emit MQTT alert — HA automation или user manually решает restart. Важно: stall check работает ВНЕ зависимости от alive state. Pipeline может быть hung без exit'а (ffmpeg process жив но encoder deadlock): ZMQ filter не отвечает (alive=False), но процесс не падает → wrapper не retry'ит. Нужен внешний alert чтобы HA сделал docker restart.""" try: async with httpx.AsyncClient(timeout=2.0) as client: r = await client.get(f"{self.mediamtx_api}/v3/rtspsessions/list") data = r.json() except Exception: return # mediamtx undeavailable — silent fail, не наша проблема # Find publish session для instance's output path. # Pipeline publishes к /live (можно extend per-instance в будущем). # Если publish session отсутствует — pipeline вообще не push'ит # (process висит / never connected). Treated як "frozen at 0 bytes" # → fire stall alert также через N polls. publish_session = None for s in data.get("items", []): if s.get("state") == "publish" and s.get("path") == "live": publish_session = s break current_bytes = publish_session.get("inboundBytes", 0) if publish_session else 0 last_bytes = self._last_bytes.get(instance, -1) if current_bytes == last_bytes: self._stall_count[instance] = self._stall_count.get(instance, 0) + 1 if self._stall_count[instance] >= self.stall_polls_threshold: if not self._stall_alerted.get(instance, False): log.error("pipeline_monitor.stalled", instance=instance, bytes=current_bytes, stalled_for_sec=self._stall_count[instance] * self.poll_interval_sec) if self.on_event: try: await self.on_event(instance, "pipeline_stalled", {"bytes": current_bytes, "stalled_sec": self._stall_count[instance] * self.poll_interval_sec}) except Exception as e: log.warning("pipeline_monitor.on_event_fail", error=str(e)) self._stall_alerted[instance] = True else: if self._stall_alerted.get(instance, False): log.info("pipeline_monitor.unstalled", instance=instance) if self.on_event: try: await self.on_event(instance, "pipeline_unstalled", {"bytes": current_bytes}) except Exception: pass self._last_bytes[instance] = current_bytes self._stall_count[instance] = 0 self._stall_alerted[instance] = False async def _check_instance(self, instance: str) -> None: inst = next((i for i in self.cfg.instances if i.name == instance), None) if inst is None: return client = self.dispatcher._client(inst) try: # Cheap noop — set_layout к текущему. Filter accepts всегда. current_layout = await self.state.get_layout(instance) or inst.default_layout await client.send_command(inst.filter_target, "set_layout", current_layout) was_alive = self._alive.get(instance, False) self._alive[instance] = True if not was_alive: log.info("pipeline_monitor.restored", instance=instance) if self.on_event: try: await self.on_event(instance, "pipeline_restored", {}) except Exception as e: log.warning("pipeline_monitor.on_event_fail", error=str(e)) await self._restore_state(instance) except Exception as e: was_alive = self._alive.get(instance, True) self._alive[instance] = False if was_alive: log.warning("pipeline_monitor.lost", instance=instance, error=str(e)) if self.on_event: try: await self.on_event(instance, "pipeline_lost", {"error": str(e)}) except Exception as ee: log.warning("pipeline_monitor.on_event_fail", error=str(ee)) async def _restore_state(self, instance: str) -> None: """Re-push всё state к pipeline после detected restart.""" inst = next((i for i in self.cfg.instances if i.name == instance), None) if inst is None: return client = self.dispatcher._client(inst) # Pipeline filter может быть ещё в startup — ZMQ может отвечать но # filter graph не полностью инициализирован. Wait 2 sec чтобы overlays # accepted reliably. await asyncio.sleep(2.0) # 1. Layout (set_layout уже sent в _check_instance — pipeline restored к # нему. Просто apply ещё раз чтобы быть consistent.) layout = await self.state.get_layout(instance) or inst.default_layout try: await client.send_command(inst.filter_target, "set_layout", layout) except Exception as e: log.warning("pipeline_monitor.layout_fail", instance=instance, error=str(e)) # 2. Audio output state (mute/unmute) — re-apply. try: enabled = await self.state.get_audio_output_enabled(instance) await self.dispatcher.set_audio_output_enabled(instance, enabled) except Exception as e: log.warning("pipeline_monitor.audio_fail", instance=instance, error=str(e)) # 3. Overlays — re-push все из state. overlays = await self.state.get_overlays(instance) from .dispatch import _serialize_overlay_to_zmq # local import for ov in overlays: try: zmq_arg = _serialize_overlay_to_zmq(ov) for target in self.dispatcher._overlay_targets(inst): await client.send_command(target, "add_overlay", zmq_arg) except Exception as e: log.warning("pipeline_monitor.overlay_fail", instance=instance, overlay_id=ov.id, error=str(e)) # 4. Reset registered flags на browser/dynamic renderers + frigate bridge — # next loop iter / MQTT event auto re-add (add_overlay + reload_icon). if self.browser_renderer: self.browser_renderer.mark_all_unregistered() if self.dynamic_renderer: self.dynamic_renderer.mark_all_unregistered() if self.frigate_bridge: self.frigate_bridge.mark_all_unregistered() log.info("pipeline_monitor.restore_done", instance=instance, layout=layout, overlays=len(overlays))