From 543b7c950869902df5abbbefc0db38ba5acd542b Mon Sep 17 00:00:00 2001 From: gx Date: Mon, 25 May 2026 15:57:21 +0100 Subject: [PATCH] =?UTF-8?q?pipeline=5Fmonitor:=202=20bug=20fixes=20?= =?UTF-8?q?=E2=80=94=20stall=20detection=20+=20lost=20MQTT=20event?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bug #1: _check_stall возвращался early если alive=False (pipeline ZMQ dead). Но pipeline может hung без exit'а (ffmpeg process жив но encoder deadlock): ZMQ не отвечает (alive=False), при этом encoder не emit'ит frames в mediamtx. Wrapper script не retry'ит (process не exited), никто не поднимает алерт. Fix: stall check работает ВНЕ зависимости от alive. Bug #2: _check_stall возвращался early если /live publish session отсутствует в mediamtx /v3/rtspsessions/list. Pipeline мог никогда не подключиться (или TCP push session дропнулся). Treat as "frozen at 0 bytes" — stall alert fire'ится через N polls. Bug #3 (bonus): _check_instance логировал "lost" но не emit'ил MQTT event. HA не видела алертов. Fix: добавлен on_event call для pipeline_lost и pipeline_restored (paired с уже существующими pipeline_stalled / pipeline_unstalled). Verified на сегодняшнем incident: pipeline encoder hung 4 hours, никто не реагировал. После fix monitor emit'ит lost+stalled события через 12 секунд → MQTT-listener (HA automation) может сделать docker restart cuda-grid-pipeline. Co-Authored-By: Claude Opus 4.7 --- .../cuda_grid_controller/pipeline_monitor.py | 31 ++++++++++++++----- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/controller/cuda_grid_controller/pipeline_monitor.py b/controller/cuda_grid_controller/pipeline_monitor.py index 829064e..338be42 100644 --- a/controller/cuda_grid_controller/pipeline_monitor.py +++ b/controller/cuda_grid_controller/pipeline_monitor.py @@ -105,9 +105,12 @@ class PipelineMonitor: async def _check_stall(self, instance: str) -> None: """Poll mediamtx publish session bytesReceived. Не растёт N consecutive polls → encoder stalled (NVENC hung, output blocked, etc.). Emit MQTT - alert — HA automation или user manually решает restart.""" - if not self._alive.get(instance, False): - return # pipeline already dead, restart logic handles + alert — HA automation или user manually решает restart. + + Важно: stall check работает ВНЕ зависимости от alive state. Pipeline + может быть hung без exit'а (ffmpeg process жив но encoder deadlock): + ZMQ filter не отвечает (alive=False), но процесс не падает → wrapper + не retry'ит. Нужен внешний alert чтобы HA сделал docker restart.""" try: async with httpx.AsyncClient(timeout=2.0) as client: r = await client.get(f"{self.mediamtx_api}/v3/rtspsessions/list") @@ -117,15 +120,16 @@ class PipelineMonitor: # Find publish session для instance's output path. # Pipeline publishes к /live (можно extend per-instance в будущем). + # Если publish session отсутствует — pipeline вообще не push'ит + # (process висит / never connected). Treated як "frozen at 0 bytes" + # → fire stall alert также через N polls. publish_session = None for s in data.get("items", []): if s.get("state") == "publish" and s.get("path") == "live": publish_session = s break - if not publish_session: - return - current_bytes = publish_session.get("inboundBytes", 0) + current_bytes = publish_session.get("inboundBytes", 0) if publish_session else 0 last_bytes = self._last_bytes.get(instance, -1) if current_bytes == last_bytes: self._stall_count[instance] = self._stall_count.get(instance, 0) + 1 @@ -167,11 +171,22 @@ class PipelineMonitor: self._alive[instance] = True if not was_alive: log.info("pipeline_monitor.restored", instance=instance) + if self.on_event: + try: + await self.on_event(instance, "pipeline_restored", {}) + except Exception as e: + log.warning("pipeline_monitor.on_event_fail", error=str(e)) await self._restore_state(instance) except Exception as e: - if self._alive.get(instance, True): - log.warning("pipeline_monitor.lost", instance=instance, error=str(e)) + was_alive = self._alive.get(instance, True) self._alive[instance] = False + if was_alive: + log.warning("pipeline_monitor.lost", instance=instance, error=str(e)) + if self.on_event: + try: + await self.on_event(instance, "pipeline_lost", {"error": str(e)}) + except Exception as ee: + log.warning("pipeline_monitor.on_event_fail", error=str(ee)) async def _restore_state(self, instance: str) -> None: """Re-push всё state к pipeline после detected restart."""