diff --git a/controller/cuda_grid_controller/pipeline_monitor.py b/controller/cuda_grid_controller/pipeline_monitor.py index 829064e..338be42 100644 --- a/controller/cuda_grid_controller/pipeline_monitor.py +++ b/controller/cuda_grid_controller/pipeline_monitor.py @@ -105,9 +105,12 @@ class PipelineMonitor: async def _check_stall(self, instance: str) -> None: """Poll mediamtx publish session bytesReceived. Не растёт N consecutive polls → encoder stalled (NVENC hung, output blocked, etc.). Emit MQTT - alert — HA automation или user manually решает restart.""" - if not self._alive.get(instance, False): - return # pipeline already dead, restart logic handles + alert — HA automation или user manually решает restart. + + Важно: stall check работает ВНЕ зависимости от alive state. Pipeline + может быть hung без exit'а (ffmpeg process жив но encoder deadlock): + ZMQ filter не отвечает (alive=False), но процесс не падает → wrapper + не retry'ит. Нужен внешний alert чтобы HA сделал docker restart.""" try: async with httpx.AsyncClient(timeout=2.0) as client: r = await client.get(f"{self.mediamtx_api}/v3/rtspsessions/list") @@ -117,15 +120,16 @@ class PipelineMonitor: # Find publish session для instance's output path. # Pipeline publishes к /live (можно extend per-instance в будущем). + # Если publish session отсутствует — pipeline вообще не push'ит + # (process висит / never connected). Treated як "frozen at 0 bytes" + # → fire stall alert также через N polls. publish_session = None for s in data.get("items", []): if s.get("state") == "publish" and s.get("path") == "live": publish_session = s break - if not publish_session: - return - current_bytes = publish_session.get("inboundBytes", 0) + current_bytes = publish_session.get("inboundBytes", 0) if publish_session else 0 last_bytes = self._last_bytes.get(instance, -1) if current_bytes == last_bytes: self._stall_count[instance] = self._stall_count.get(instance, 0) + 1 @@ -167,11 +171,22 @@ class PipelineMonitor: self._alive[instance] = True if not was_alive: log.info("pipeline_monitor.restored", instance=instance) + if self.on_event: + try: + await self.on_event(instance, "pipeline_restored", {}) + except Exception as e: + log.warning("pipeline_monitor.on_event_fail", error=str(e)) await self._restore_state(instance) except Exception as e: - if self._alive.get(instance, True): - log.warning("pipeline_monitor.lost", instance=instance, error=str(e)) + was_alive = self._alive.get(instance, True) self._alive[instance] = False + if was_alive: + log.warning("pipeline_monitor.lost", instance=instance, error=str(e)) + if self.on_event: + try: + await self.on_event(instance, "pipeline_lost", {"error": str(e)}) + except Exception as ee: + log.warning("pipeline_monitor.on_event_fail", error=str(ee)) async def _restore_state(self, instance: str) -> None: """Re-push всё state к pipeline после detected restart."""