Files
vf-cuda-grid/controller/cuda_grid_controller/pipeline_monitor.py
T
gx d29f3f96e5 pipeline_monitor: + stall watchdog (mediamtx bytes-based detect)
Resilience improvement — раньше pipeline mог hung без exit (NVENC stuck,
output broken pipe), Docker restart policy не triggered. Никакой alert.

Now: poll mediamtx /v3/rtspsessions/list каждые N sec, track publish session
inboundBytes. Не растёт 3 polls (~9 sec) → emit MQTT 'pipeline_stalled' event
(через dispatcher.on_event = mqtt.publish_event). User / Home Assistant
automation решает что делать (restart container, notify).

Wired:
  pipeline_monitor.on_event = mqtt.publish_event  # __main__.py

Bytes started growing again → emit 'pipeline_unstalled'.

Alert single-shot: пока stalled flag set, no dup alerts. Reset когда
bytes counter растёт.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-25 10:03:41 +01:00

226 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Pipeline monitor — детект restart ffmpeg pipeline + auto-restore overlay state.
Также stall watchdog — детектит когда pipeline encoder перестал emit'ить
frames (e.g. audio input gone, NVENC hung) и emit'ит alert через on_event
callback (typically wired к MQTT publish).
Pipeline filter state (overlays, layout, cell_map) живёт в RAM ffmpeg process'а.
При recreate container (compose up, OOM, etc.) — state теряется. Controller'у
нужно re-push:
* active layout (state.active_layout)
* все overlays (state.overlays)
* audio output state (state.audio_output_enabled)
* browser/dynamic renderer flags (registered → reset, next loop iter re-add)
Detect mechanism:
* Restart: ZMQ ping (set_layout с current name = no-op) every N sec.
Tracks alive state — first success после consecutive failures → restart event.
* Stall: polls mediamtx HTTP API для inbound bytes на publish session.
Если bytes не растут N consecutive polls → emit pipeline.stalled MQTT event.
User / HA automation решает что делать (restart, notify).
"""
from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING
import httpx
import structlog
from .layouts import PREDEFINED_LAYOUTS
from .zmq_client import FFmpegZmqClient
if TYPE_CHECKING:
from .browser_overlays import BrowserRenderer
from .config import Config
from .dispatch import CommandDispatcher
from .dynamic_overlays import DynamicRenderer
from .frigate_bridge import FrigateBridge
from .state import ControllerState
log = structlog.get_logger()
class PipelineMonitor:
"""Polls pipeline ZMQ, restores state on restart-detect."""
def __init__(
self,
cfg: "Config",
state: "ControllerState",
dispatcher: "CommandDispatcher",
browser_renderer: "BrowserRenderer | None" = None,
dynamic_renderer: "DynamicRenderer | None" = None,
frigate_bridge: "FrigateBridge | None" = None,
poll_interval_sec: float = 3.0,
) -> None:
self.cfg = cfg
self.state = state
self.dispatcher = dispatcher
self.browser_renderer = browser_renderer
self.dynamic_renderer = dynamic_renderer
self.frigate_bridge = frigate_bridge
self.poll_interval_sec = poll_interval_sec
self._task: asyncio.Task | None = None
self._alive: dict[str, bool] = {} # instance → last-known up/down
# Stall detection: per-instance last seen bytesReceived + consecutive
# stall poll count. 3 polls × 3 sec = ~9 sec stalled → emit alert.
self._last_bytes: dict[str, int] = {}
self._stall_count: dict[str, int] = {}
self._stall_alerted: dict[str, bool] = {} # already alerted (suppress dup)
self.stall_polls_threshold: int = 3
# mediamtx host derived from instance audio/zmq endpoints (assume same host)
self.mediamtx_api: str = "http://cuda-grid-mediamtx:9997"
# on_event callback signature (instance, kind, payload) — wired в __main__
self.on_event = None
async def start(self) -> None:
self._task = asyncio.create_task(self._loop())
log.info("pipeline_monitor.started", poll_sec=self.poll_interval_sec,
instances=[i.name for i in self.cfg.instances])
async def stop(self) -> None:
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
async def _loop(self) -> None:
# Boot delay — give pipeline time to start initially
await asyncio.sleep(self.poll_interval_sec)
while True:
try:
for inst in self.cfg.instances:
await self._check_instance(inst.name)
await self._check_stall(inst.name)
await asyncio.sleep(self.poll_interval_sec)
except asyncio.CancelledError:
raise
except Exception as e:
log.error("pipeline_monitor.loop_fail", error=str(e))
await asyncio.sleep(self.poll_interval_sec)
async def _check_stall(self, instance: str) -> None:
"""Poll mediamtx publish session bytesReceived. Не растёт N consecutive
polls → encoder stalled (NVENC hung, output blocked, etc.). Emit MQTT
alert — HA automation или user manually решает restart."""
if not self._alive.get(instance, False):
return # pipeline already dead, restart logic handles
try:
async with httpx.AsyncClient(timeout=2.0) as client:
r = await client.get(f"{self.mediamtx_api}/v3/rtspsessions/list")
data = r.json()
except Exception:
return # mediamtx undeavailable — silent fail, не наша проблема
# Find publish session для instance's output path.
# Pipeline publishes к /live (можно extend per-instance в будущем).
publish_session = None
for s in data.get("items", []):
if s.get("state") == "publish" and s.get("path") == "live":
publish_session = s
break
if not publish_session:
return
current_bytes = publish_session.get("inboundBytes", 0)
last_bytes = self._last_bytes.get(instance, -1)
if current_bytes == last_bytes:
self._stall_count[instance] = self._stall_count.get(instance, 0) + 1
if self._stall_count[instance] >= self.stall_polls_threshold:
if not self._stall_alerted.get(instance, False):
log.error("pipeline_monitor.stalled",
instance=instance, bytes=current_bytes,
stalled_for_sec=self._stall_count[instance] * self.poll_interval_sec)
if self.on_event:
try:
await self.on_event(instance, "pipeline_stalled",
{"bytes": current_bytes,
"stalled_sec": self._stall_count[instance] * self.poll_interval_sec})
except Exception as e:
log.warning("pipeline_monitor.on_event_fail", error=str(e))
self._stall_alerted[instance] = True
else:
if self._stall_alerted.get(instance, False):
log.info("pipeline_monitor.unstalled", instance=instance)
if self.on_event:
try:
await self.on_event(instance, "pipeline_unstalled", {"bytes": current_bytes})
except Exception:
pass
self._last_bytes[instance] = current_bytes
self._stall_count[instance] = 0
self._stall_alerted[instance] = False
async def _check_instance(self, instance: str) -> None:
inst = next((i for i in self.cfg.instances if i.name == instance), None)
if inst is None:
return
client = self.dispatcher._client(inst)
try:
# Cheap noop — set_layout к текущему. Filter accepts всегда.
current_layout = await self.state.get_layout(instance) or inst.default_layout
await client.send_command(inst.filter_target, "set_layout", current_layout)
was_alive = self._alive.get(instance, False)
self._alive[instance] = True
if not was_alive:
log.info("pipeline_monitor.restored", instance=instance)
await self._restore_state(instance)
except Exception as e:
if self._alive.get(instance, True):
log.warning("pipeline_monitor.lost", instance=instance, error=str(e))
self._alive[instance] = False
async def _restore_state(self, instance: str) -> None:
"""Re-push всё state к pipeline после detected restart."""
inst = next((i for i in self.cfg.instances if i.name == instance), None)
if inst is None:
return
client = self.dispatcher._client(inst)
# Pipeline filter может быть ещё в startup — ZMQ может отвечать но
# filter graph не полностью инициализирован. Wait 2 sec чтобы overlays
# accepted reliably.
await asyncio.sleep(2.0)
# 1. Layout (set_layout уже sent в _check_instance — pipeline restored к
# нему. Просто apply ещё раз чтобы быть consistent.)
layout = await self.state.get_layout(instance) or inst.default_layout
try:
await client.send_command(inst.filter_target, "set_layout", layout)
except Exception as e:
log.warning("pipeline_monitor.layout_fail", instance=instance, error=str(e))
# 2. Audio output state (mute/unmute) — re-apply.
try:
enabled = await self.state.get_audio_output_enabled(instance)
await self.dispatcher.set_audio_output_enabled(instance, enabled)
except Exception as e:
log.warning("pipeline_monitor.audio_fail", instance=instance, error=str(e))
# 3. Overlays — re-push все из state.
overlays = await self.state.get_overlays(instance)
from .dispatch import _serialize_overlay_to_zmq # local import
for ov in overlays:
try:
zmq_arg = _serialize_overlay_to_zmq(ov)
for target in self.dispatcher._overlay_targets(inst):
await client.send_command(target, "add_overlay", zmq_arg)
except Exception as e:
log.warning("pipeline_monitor.overlay_fail",
instance=instance, overlay_id=ov.id, error=str(e))
# 4. Reset registered flags на browser/dynamic renderers + frigate bridge —
# next loop iter / MQTT event auto re-add (add_overlay + reload_icon).
if self.browser_renderer:
self.browser_renderer.mark_all_unregistered()
if self.dynamic_renderer:
self.dynamic_renderer.mark_all_unregistered()
if self.frigate_bridge:
self.frigate_bridge.mark_all_unregistered()
log.info("pipeline_monitor.restore_done", instance=instance,
layout=layout, overlays=len(overlays))