543b7c9508
Bug #1: _check_stall возвращался early если alive=False (pipeline ZMQ dead). Но pipeline может hung без exit'а (ffmpeg process жив но encoder deadlock): ZMQ не отвечает (alive=False), при этом encoder не emit'ит frames в mediamtx. Wrapper script не retry'ит (process не exited), никто не поднимает алерт. Fix: stall check работает ВНЕ зависимости от alive. Bug #2: _check_stall возвращался early если /live publish session отсутствует в mediamtx /v3/rtspsessions/list. Pipeline мог никогда не подключиться (или TCP push session дропнулся). Treat as "frozen at 0 bytes" — stall alert fire'ится через N polls. Bug #3 (bonus): _check_instance логировал "lost" но не emit'ил MQTT event. HA не видела алертов. Fix: добавлен on_event call для pipeline_lost и pipeline_restored (paired с уже существующими pipeline_stalled / pipeline_unstalled). Verified на сегодняшнем incident: pipeline encoder hung 4 hours, никто не реагировал. После fix monitor emit'ит lost+stalled события через 12 секунд → MQTT-listener (HA automation) может сделать docker restart cuda-grid-pipeline. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
241 lines
11 KiB
Python
241 lines
11 KiB
Python
"""Pipeline monitor — детект restart ffmpeg pipeline + auto-restore overlay state.
|
||
Также stall watchdog — детектит когда pipeline encoder перестал emit'ить
|
||
frames (e.g. audio input gone, NVENC hung) и emit'ит alert через on_event
|
||
callback (typically wired к MQTT publish).
|
||
|
||
Pipeline filter state (overlays, layout, cell_map) живёт в RAM ffmpeg process'а.
|
||
При recreate container (compose up, OOM, etc.) — state теряется. Controller'у
|
||
нужно re-push:
|
||
* active layout (state.active_layout)
|
||
* все overlays (state.overlays)
|
||
* audio output state (state.audio_output_enabled)
|
||
* browser/dynamic renderer flags (registered → reset, next loop iter re-add)
|
||
|
||
Detect mechanism:
|
||
* Restart: ZMQ ping (set_layout с current name = no-op) every N sec.
|
||
Tracks alive state — first success после consecutive failures → restart event.
|
||
* Stall: polls mediamtx HTTP API для inbound bytes на publish session.
|
||
Если bytes не растут N consecutive polls → emit pipeline.stalled MQTT event.
|
||
User / HA automation решает что делать (restart, notify).
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
from typing import TYPE_CHECKING
|
||
|
||
import httpx
|
||
import structlog
|
||
|
||
from .layouts import PREDEFINED_LAYOUTS
|
||
from .zmq_client import FFmpegZmqClient
|
||
|
||
if TYPE_CHECKING:
|
||
from .browser_overlays import BrowserRenderer
|
||
from .config import Config
|
||
from .dispatch import CommandDispatcher
|
||
from .dynamic_overlays import DynamicRenderer
|
||
from .frigate_bridge import FrigateBridge
|
||
from .state import ControllerState
|
||
|
||
log = structlog.get_logger()
|
||
|
||
|
||
class PipelineMonitor:
|
||
"""Polls pipeline ZMQ, restores state on restart-detect."""
|
||
|
||
def __init__(
|
||
self,
|
||
cfg: "Config",
|
||
state: "ControllerState",
|
||
dispatcher: "CommandDispatcher",
|
||
browser_renderer: "BrowserRenderer | None" = None,
|
||
dynamic_renderer: "DynamicRenderer | None" = None,
|
||
frigate_bridge: "FrigateBridge | None" = None,
|
||
poll_interval_sec: float = 3.0,
|
||
) -> None:
|
||
self.cfg = cfg
|
||
self.state = state
|
||
self.dispatcher = dispatcher
|
||
self.browser_renderer = browser_renderer
|
||
self.dynamic_renderer = dynamic_renderer
|
||
self.frigate_bridge = frigate_bridge
|
||
self.poll_interval_sec = poll_interval_sec
|
||
self._task: asyncio.Task | None = None
|
||
self._alive: dict[str, bool] = {} # instance → last-known up/down
|
||
# Stall detection: per-instance last seen bytesReceived + consecutive
|
||
# stall poll count. 3 polls × 3 sec = ~9 sec stalled → emit alert.
|
||
self._last_bytes: dict[str, int] = {}
|
||
self._stall_count: dict[str, int] = {}
|
||
self._stall_alerted: dict[str, bool] = {} # already alerted (suppress dup)
|
||
self.stall_polls_threshold: int = 3
|
||
# mediamtx host derived from instance audio/zmq endpoints (assume same host)
|
||
self.mediamtx_api: str = "http://cuda-grid-mediamtx:9997"
|
||
# on_event callback signature (instance, kind, payload) — wired в __main__
|
||
self.on_event = None
|
||
|
||
async def start(self) -> None:
|
||
self._task = asyncio.create_task(self._loop())
|
||
log.info("pipeline_monitor.started", poll_sec=self.poll_interval_sec,
|
||
instances=[i.name for i in self.cfg.instances])
|
||
|
||
async def stop(self) -> None:
|
||
if self._task:
|
||
self._task.cancel()
|
||
try:
|
||
await self._task
|
||
except asyncio.CancelledError:
|
||
pass
|
||
|
||
async def _loop(self) -> None:
|
||
# Boot delay — give pipeline time to start initially
|
||
await asyncio.sleep(self.poll_interval_sec)
|
||
while True:
|
||
try:
|
||
for inst in self.cfg.instances:
|
||
await self._check_instance(inst.name)
|
||
await self._check_stall(inst.name)
|
||
await asyncio.sleep(self.poll_interval_sec)
|
||
except asyncio.CancelledError:
|
||
raise
|
||
except Exception as e:
|
||
log.error("pipeline_monitor.loop_fail", error=str(e))
|
||
await asyncio.sleep(self.poll_interval_sec)
|
||
|
||
async def _check_stall(self, instance: str) -> None:
|
||
"""Poll mediamtx publish session bytesReceived. Не растёт N consecutive
|
||
polls → encoder stalled (NVENC hung, output blocked, etc.). Emit MQTT
|
||
alert — HA automation или user manually решает restart.
|
||
|
||
Важно: stall check работает ВНЕ зависимости от alive state. Pipeline
|
||
может быть hung без exit'а (ffmpeg process жив но encoder deadlock):
|
||
ZMQ filter не отвечает (alive=False), но процесс не падает → wrapper
|
||
не retry'ит. Нужен внешний alert чтобы HA сделал docker restart."""
|
||
try:
|
||
async with httpx.AsyncClient(timeout=2.0) as client:
|
||
r = await client.get(f"{self.mediamtx_api}/v3/rtspsessions/list")
|
||
data = r.json()
|
||
except Exception:
|
||
return # mediamtx undeavailable — silent fail, не наша проблема
|
||
|
||
# Find publish session для instance's output path.
|
||
# Pipeline publishes к /live (можно extend per-instance в будущем).
|
||
# Если publish session отсутствует — pipeline вообще не push'ит
|
||
# (process висит / never connected). Treated як "frozen at 0 bytes"
|
||
# → fire stall alert также через N polls.
|
||
publish_session = None
|
||
for s in data.get("items", []):
|
||
if s.get("state") == "publish" and s.get("path") == "live":
|
||
publish_session = s
|
||
break
|
||
|
||
current_bytes = publish_session.get("inboundBytes", 0) if publish_session else 0
|
||
last_bytes = self._last_bytes.get(instance, -1)
|
||
if current_bytes == last_bytes:
|
||
self._stall_count[instance] = self._stall_count.get(instance, 0) + 1
|
||
if self._stall_count[instance] >= self.stall_polls_threshold:
|
||
if not self._stall_alerted.get(instance, False):
|
||
log.error("pipeline_monitor.stalled",
|
||
instance=instance, bytes=current_bytes,
|
||
stalled_for_sec=self._stall_count[instance] * self.poll_interval_sec)
|
||
if self.on_event:
|
||
try:
|
||
await self.on_event(instance, "pipeline_stalled",
|
||
{"bytes": current_bytes,
|
||
"stalled_sec": self._stall_count[instance] * self.poll_interval_sec})
|
||
except Exception as e:
|
||
log.warning("pipeline_monitor.on_event_fail", error=str(e))
|
||
self._stall_alerted[instance] = True
|
||
else:
|
||
if self._stall_alerted.get(instance, False):
|
||
log.info("pipeline_monitor.unstalled", instance=instance)
|
||
if self.on_event:
|
||
try:
|
||
await self.on_event(instance, "pipeline_unstalled", {"bytes": current_bytes})
|
||
except Exception:
|
||
pass
|
||
self._last_bytes[instance] = current_bytes
|
||
self._stall_count[instance] = 0
|
||
self._stall_alerted[instance] = False
|
||
|
||
async def _check_instance(self, instance: str) -> None:
|
||
inst = next((i for i in self.cfg.instances if i.name == instance), None)
|
||
if inst is None:
|
||
return
|
||
client = self.dispatcher._client(inst)
|
||
try:
|
||
# Cheap noop — set_layout к текущему. Filter accepts всегда.
|
||
current_layout = await self.state.get_layout(instance) or inst.default_layout
|
||
await client.send_command(inst.filter_target, "set_layout", current_layout)
|
||
was_alive = self._alive.get(instance, False)
|
||
self._alive[instance] = True
|
||
if not was_alive:
|
||
log.info("pipeline_monitor.restored", instance=instance)
|
||
if self.on_event:
|
||
try:
|
||
await self.on_event(instance, "pipeline_restored", {})
|
||
except Exception as e:
|
||
log.warning("pipeline_monitor.on_event_fail", error=str(e))
|
||
await self._restore_state(instance)
|
||
except Exception as e:
|
||
was_alive = self._alive.get(instance, True)
|
||
self._alive[instance] = False
|
||
if was_alive:
|
||
log.warning("pipeline_monitor.lost", instance=instance, error=str(e))
|
||
if self.on_event:
|
||
try:
|
||
await self.on_event(instance, "pipeline_lost", {"error": str(e)})
|
||
except Exception as ee:
|
||
log.warning("pipeline_monitor.on_event_fail", error=str(ee))
|
||
|
||
async def _restore_state(self, instance: str) -> None:
|
||
"""Re-push всё state к pipeline после detected restart."""
|
||
inst = next((i for i in self.cfg.instances if i.name == instance), None)
|
||
if inst is None:
|
||
return
|
||
client = self.dispatcher._client(inst)
|
||
|
||
# Pipeline filter может быть ещё в startup — ZMQ может отвечать но
|
||
# filter graph не полностью инициализирован. Wait 2 sec чтобы overlays
|
||
# accepted reliably.
|
||
await asyncio.sleep(2.0)
|
||
|
||
# 1. Layout (set_layout уже sent в _check_instance — pipeline restored к
|
||
# нему. Просто apply ещё раз чтобы быть consistent.)
|
||
layout = await self.state.get_layout(instance) or inst.default_layout
|
||
try:
|
||
await client.send_command(inst.filter_target, "set_layout", layout)
|
||
except Exception as e:
|
||
log.warning("pipeline_monitor.layout_fail", instance=instance, error=str(e))
|
||
|
||
# 2. Audio output state (mute/unmute) — re-apply.
|
||
try:
|
||
enabled = await self.state.get_audio_output_enabled(instance)
|
||
await self.dispatcher.set_audio_output_enabled(instance, enabled)
|
||
except Exception as e:
|
||
log.warning("pipeline_monitor.audio_fail", instance=instance, error=str(e))
|
||
|
||
# 3. Overlays — re-push все из state.
|
||
overlays = await self.state.get_overlays(instance)
|
||
from .dispatch import _serialize_overlay_to_zmq # local import
|
||
for ov in overlays:
|
||
try:
|
||
zmq_arg = _serialize_overlay_to_zmq(ov)
|
||
for target in self.dispatcher._overlay_targets(inst):
|
||
await client.send_command(target, "add_overlay", zmq_arg)
|
||
except Exception as e:
|
||
log.warning("pipeline_monitor.overlay_fail",
|
||
instance=instance, overlay_id=ov.id, error=str(e))
|
||
|
||
# 4. Reset registered flags на browser/dynamic renderers + frigate bridge —
|
||
# next loop iter / MQTT event auto re-add (add_overlay + reload_icon).
|
||
if self.browser_renderer:
|
||
self.browser_renderer.mark_all_unregistered()
|
||
if self.dynamic_renderer:
|
||
self.dynamic_renderer.mark_all_unregistered()
|
||
if self.frigate_bridge:
|
||
self.frigate_bridge.mark_all_unregistered()
|
||
|
||
log.info("pipeline_monitor.restore_done", instance=instance,
|
||
layout=layout, overlays=len(overlays))
|