diff --git a/controller/cuda_grid_controller/__main__.py b/controller/cuda_grid_controller/__main__.py index 2456443..6c2f319 100644 --- a/controller/cuda_grid_controller/__main__.py +++ b/controller/cuda_grid_controller/__main__.py @@ -139,13 +139,15 @@ async def _run(cfg: Config) -> None: watchdog._publish_event = mqtt.publish_event await watchdog.start() - # Pipeline monitor — detect ffmpeg restart + auto-restore overlay state. + # Pipeline monitor — detect ffmpeg restart + auto-restore overlay state + + # encoder stall alerts (через MQTT pipeline.stalled event). pipeline_monitor = PipelineMonitor( cfg=cfg, state=state, dispatcher=dispatcher, browser_renderer=browser_renderer, dynamic_renderer=dynamic_renderer, frigate_bridge=frigate_bridge, ) + pipeline_monitor.on_event = mqtt.publish_event await pipeline_monitor.start() try: diff --git a/controller/cuda_grid_controller/pipeline_monitor.py b/controller/cuda_grid_controller/pipeline_monitor.py index 51c7b27..829064e 100644 --- a/controller/cuda_grid_controller/pipeline_monitor.py +++ b/controller/cuda_grid_controller/pipeline_monitor.py @@ -1,4 +1,7 @@ """Pipeline monitor — детект restart ffmpeg pipeline + auto-restore overlay state. +Также stall watchdog — детектит когда pipeline encoder перестал emit'ить +frames (e.g. audio input gone, NVENC hung) и emit'ит alert через on_event +callback (typically wired к MQTT publish). Pipeline filter state (overlays, layout, cell_map) живёт в RAM ffmpeg process'а. При recreate container (compose up, OOM, etc.) — state теряется. Controller'у @@ -8,8 +11,12 @@ Pipeline filter state (overlays, layout, cell_map) живёт в RAM ffmpeg proc * audio output state (state.audio_output_enabled) * browser/dynamic renderer flags (registered → reset, next loop iter re-add) -Detect mechanism: ZMQ ping (set_layout с current name = no-op) every N sec. -Tracks alive state — first success после consecutive failures → restart event. +Detect mechanism: + * Restart: ZMQ ping (set_layout с current name = no-op) every N sec. + Tracks alive state — first success после consecutive failures → restart event. + * Stall: polls mediamtx HTTP API для inbound bytes на publish session. + Если bytes не растут N consecutive polls → emit pipeline.stalled MQTT event. + User / HA automation решает что делать (restart, notify). """ from __future__ import annotations @@ -17,6 +24,7 @@ from __future__ import annotations import asyncio from typing import TYPE_CHECKING +import httpx import structlog from .layouts import PREDEFINED_LAYOUTS @@ -55,6 +63,16 @@ class PipelineMonitor: self.poll_interval_sec = poll_interval_sec self._task: asyncio.Task | None = None self._alive: dict[str, bool] = {} # instance → last-known up/down + # Stall detection: per-instance last seen bytesReceived + consecutive + # stall poll count. 3 polls × 3 sec = ~9 sec stalled → emit alert. + self._last_bytes: dict[str, int] = {} + self._stall_count: dict[str, int] = {} + self._stall_alerted: dict[str, bool] = {} # already alerted (suppress dup) + self.stall_polls_threshold: int = 3 + # mediamtx host derived from instance audio/zmq endpoints (assume same host) + self.mediamtx_api: str = "http://cuda-grid-mediamtx:9997" + # on_event callback signature (instance, kind, payload) — wired в __main__ + self.on_event = None async def start(self) -> None: self._task = asyncio.create_task(self._loop()) @@ -76,6 +94,7 @@ class PipelineMonitor: try: for inst in self.cfg.instances: await self._check_instance(inst.name) + await self._check_stall(inst.name) await asyncio.sleep(self.poll_interval_sec) except asyncio.CancelledError: raise @@ -83,6 +102,58 @@ class PipelineMonitor: log.error("pipeline_monitor.loop_fail", error=str(e)) await asyncio.sleep(self.poll_interval_sec) + async def _check_stall(self, instance: str) -> None: + """Poll mediamtx publish session bytesReceived. Не растёт N consecutive + polls → encoder stalled (NVENC hung, output blocked, etc.). Emit MQTT + alert — HA automation или user manually решает restart.""" + if not self._alive.get(instance, False): + return # pipeline already dead, restart logic handles + try: + async with httpx.AsyncClient(timeout=2.0) as client: + r = await client.get(f"{self.mediamtx_api}/v3/rtspsessions/list") + data = r.json() + except Exception: + return # mediamtx undeavailable — silent fail, не наша проблема + + # Find publish session для instance's output path. + # Pipeline publishes к /live (можно extend per-instance в будущем). + publish_session = None + for s in data.get("items", []): + if s.get("state") == "publish" and s.get("path") == "live": + publish_session = s + break + if not publish_session: + return + + current_bytes = publish_session.get("inboundBytes", 0) + last_bytes = self._last_bytes.get(instance, -1) + if current_bytes == last_bytes: + self._stall_count[instance] = self._stall_count.get(instance, 0) + 1 + if self._stall_count[instance] >= self.stall_polls_threshold: + if not self._stall_alerted.get(instance, False): + log.error("pipeline_monitor.stalled", + instance=instance, bytes=current_bytes, + stalled_for_sec=self._stall_count[instance] * self.poll_interval_sec) + if self.on_event: + try: + await self.on_event(instance, "pipeline_stalled", + {"bytes": current_bytes, + "stalled_sec": self._stall_count[instance] * self.poll_interval_sec}) + except Exception as e: + log.warning("pipeline_monitor.on_event_fail", error=str(e)) + self._stall_alerted[instance] = True + else: + if self._stall_alerted.get(instance, False): + log.info("pipeline_monitor.unstalled", instance=instance) + if self.on_event: + try: + await self.on_event(instance, "pipeline_unstalled", {"bytes": current_bytes}) + except Exception: + pass + self._last_bytes[instance] = current_bytes + self._stall_count[instance] = 0 + self._stall_alerted[instance] = False + async def _check_instance(self, instance: str) -> None: inst = next((i for i in self.cfg.instances if i.name == instance), None) if inst is None: