From 155038aabb01375c45c0fe2a3901bcfa4d7bf1e2 Mon Sep 17 00:00:00 2001 From: gx Date: Thu, 21 May 2026 10:07:12 +0100 Subject: [PATCH] controller: stream watchdog (Phase 1 resilience, issue #3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit StreamWatchdog (watchdog.py) — polls mediamtx /v3/paths/list каждые N sec. Если ожидаемый path missing > threshold → emit MQTT event stream_lost + показывает text overlay 'OFFLINE'. При восстановлении — stream_restored + remove overlay. Config: watchdog: enabled: true mediamtx_api_url: http://cuda-grid-mediamtx:9997 poll_interval_sec: 5.0 lost_threshold_sec: 15.0 paths: - mediamtx_path: live-audio instance: tv_grid label: Audio overlay_when_lost: true httpx добавлен в Dockerfile. Сегодняшний incident (audio sidecar потерял connection с mediamtx → pipeline restart loop) — watchdog обнаружит missing live-audio через 15 sec + покажет TV-side warning. Manual restart audio sidecar still needed (watchdog auto-restart — Phase 2). Co-Authored-By: Claude Opus 4.7 --- controller/Dockerfile | 3 +- controller/cuda_grid_controller/__main__.py | 17 +++ controller/cuda_grid_controller/config.py | 2 + controller/cuda_grid_controller/watchdog.py | 151 ++++++++++++++++++++ 4 files changed, 172 insertions(+), 1 deletion(-) create mode 100644 controller/cuda_grid_controller/watchdog.py diff --git a/controller/Dockerfile b/controller/Dockerfile index 378a60f..7f57df2 100644 --- a/controller/Dockerfile +++ b/controller/Dockerfile @@ -27,7 +27,8 @@ RUN pip install --no-cache-dir \ structlog \ typer \ sse-starlette \ - pillow + pillow \ + httpx # Source code COPY cuda_grid_controller ./cuda_grid_controller diff --git a/controller/cuda_grid_controller/__main__.py b/controller/cuda_grid_controller/__main__.py index 6045bc5..67d9fbe 100644 --- a/controller/cuda_grid_controller/__main__.py +++ b/controller/cuda_grid_controller/__main__.py @@ -19,6 +19,7 @@ from .http_api import create_app from .mqtt_loop import MqttLoop from .snapshot_history import SnapshotHistory from .state import ControllerState +from .watchdog import StreamWatchdog, WatchdogCfg cli = typer.Typer(add_completion=False) @@ -85,6 +86,17 @@ async def _run(cfg: Config) -> None: # Snapshot history (Phase 6+) — periodic capture per instance snapshot_hist = SnapshotHistory(cfg) + # Stream watchdog (Phase 1 resilience, issue #3) — monitor mediamtx paths + watchdog: StreamWatchdog | None = None + if cfg.watchdog: + try: + wcfg = WatchdogCfg.model_validate(cfg.watchdog) + if wcfg.enabled: + watchdog = StreamWatchdog(wcfg, dispatcher, + mqtt_publish_event=None) # set after mqtt + except Exception as e: + structlog.get_logger().warning("watchdog.config_invalid", error=str(e)) + # HTTP REST app = create_app(cfg, state, dispatcher, snapshot_history=snapshot_hist, @@ -110,6 +122,9 @@ async def _run(cfg: Config) -> None: if dynamic_renderer: await dynamic_renderer.start() await snapshot_hist.start() + if watchdog: + watchdog._publish_event = mqtt.publish_event + await watchdog.start() try: await asyncio.gather( @@ -122,6 +137,8 @@ async def _run(cfg: Config) -> None: if dynamic_renderer: await dynamic_renderer.stop() await snapshot_hist.stop() + if watchdog: + await watchdog.stop() await dispatcher.close() await mqtt.stop() diff --git a/controller/cuda_grid_controller/config.py b/controller/cuda_grid_controller/config.py index a39c339..c976909 100644 --- a/controller/cuda_grid_controller/config.py +++ b/controller/cuda_grid_controller/config.py @@ -171,6 +171,8 @@ class Config(BaseModel): frigate: dict | None = None # parsed в FrigateBridgeCfg при runtime # Dynamic overlays (charts/chats) — late import тоже dynamic_overlays: dict | None = None # parsed в DynamicRenderer cfg + # Stream watchdog — Phase 1 resilience (issue #3) + watchdog: dict | None = None # parsed в WatchdogCfg при runtime icon_dir: str = Field( default="/var/lib/cuda-grid/icons", description="Shared volume куда controller пишет dynamic PNG; filter (`icon_dir=` option) читает оттуда", diff --git a/controller/cuda_grid_controller/watchdog.py b/controller/cuda_grid_controller/watchdog.py new file mode 100644 index 0000000..535e8af --- /dev/null +++ b/controller/cuda_grid_controller/watchdog.py @@ -0,0 +1,151 @@ +"""Stream watchdog — monitor mediamtx paths + emit MQTT events + dispatch overlays. + +Phase 1 resilience (issue #3): controller poll'ит /v3/paths/list на mediamtx. +Если ожидаемый path исчез/появился — публикует MQTT events + опционально +показывает icon overlay (e.g. "🔇 audio offline"). + +Полагается ONLY на observable state mediamtx — не lazy на ffmpeg внутреннее +состояние pipeline. +""" + +from __future__ import annotations + +import asyncio +import time +from typing import TYPE_CHECKING + +import httpx +import structlog +from pydantic import BaseModel, Field + +from .overlays import TextOverlay + +if TYPE_CHECKING: + from .dispatch import CommandDispatcher + +log = structlog.get_logger() + + +class WatchedPath(BaseModel): + """Один stream path для monitoring.""" + + mediamtx_path: str = Field(description="Имя path в mediamtx (например 'live-audio')") + instance: str = Field(description="cuda-grid instance — для overlay/MQTT context") + label: str = Field(description="Human-readable name для events/overlays") + overlay_when_lost: bool = Field(default=False, + description="Показывать text overlay 'OFFLINE' когда path lost") + overlay_id: str = Field(default="watchdog_offline", + description="Overlay id если overlay_when_lost=true") + + +class WatchdogCfg(BaseModel): + enabled: bool = False + mediamtx_api_url: str = Field(default="http://cuda-grid-mediamtx:9997", + description="mediamtx control API base URL") + poll_interval_sec: float = Field(default=5.0, ge=1.0, le=60.0) + lost_threshold_sec: float = Field(default=10.0, ge=2.0, le=120.0, + description="Path missing > N sec → emit lost event") + paths: list[WatchedPath] = [] + + +class StreamWatchdog: + def __init__(self, cfg: WatchdogCfg, dispatcher: "CommandDispatcher", + mqtt_publish_event=None) -> None: + self.cfg = cfg + self.dispatcher = dispatcher + self._publish_event = mqtt_publish_event # async callback (instance, kind, data) + self._task: asyncio.Task | None = None + # state per path: { path_name: { last_seen_ts, current_lost } } + self._path_state: dict[str, dict] = {} + + async def start(self) -> None: + if not self.cfg.enabled or not self.cfg.paths: + return + self._task = asyncio.create_task(self._loop()) + log.info("watchdog.started", paths=[p.mediamtx_path for p in self.cfg.paths]) + + async def stop(self) -> None: + if self._task: + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + self._task = None + + async def _loop(self) -> None: + while True: + try: + await self._poll_once() + await asyncio.sleep(self.cfg.poll_interval_sec) + except asyncio.CancelledError: + raise + except Exception as e: + log.error("watchdog.poll_fail", error=str(e)) + await asyncio.sleep(self.cfg.poll_interval_sec) + + async def _poll_once(self) -> None: + try: + async with httpx.AsyncClient(timeout=3.0) as client: + r = await client.get(f"{self.cfg.mediamtx_api_url}/v3/paths/list") + r.raise_for_status() + data = r.json() + except Exception as e: + log.warning("watchdog.mediamtx_api_fail", error=str(e)) + return + + # mediamtx /v3/paths/list — { items: [{name, ready, ...}] } + active_paths = { + item["name"]: item for item in data.get("items", []) + if item.get("ready", False) + } + + now = time.time() + for wp in self.cfg.paths: + state = self._path_state.setdefault(wp.mediamtx_path, { + "last_seen": 0, "current_lost": False, + }) + + if wp.mediamtx_path in active_paths: + # Path жив — recently seen + state["last_seen"] = now + if state["current_lost"]: + # Restored + state["current_lost"] = False + await self._on_restored(wp) + else: + # Path не активен — check threshold + age = now - state["last_seen"] + if age >= self.cfg.lost_threshold_sec and not state["current_lost"]: + state["current_lost"] = True + await self._on_lost(wp) + + async def _on_lost(self, wp: WatchedPath) -> None: + log.warning("watchdog.stream_lost", path=wp.mediamtx_path, label=wp.label) + if self._publish_event: + await self._publish_event( + wp.instance, "stream_lost", + {"path": wp.mediamtx_path, "label": wp.label}, + ) + if wp.overlay_when_lost: + ov = TextOverlay( + id=wp.overlay_id, + cell=None, # absolute on output frame + x=0.35, y=0.92, # внизу по центру + text=f"⚠ {wp.label} OFFLINE", + font_size=32, + color="#FF4040", + opacity=1.0, + z_order=30, + ) + await self.dispatcher.handle(wp.instance, "overlay.add", ov.model_dump_json()) + + async def _on_restored(self, wp: WatchedPath) -> None: + log.info("watchdog.stream_restored", path=wp.mediamtx_path, label=wp.label) + if self._publish_event: + await self._publish_event( + wp.instance, "stream_restored", + {"path": wp.mediamtx_path, "label": wp.label}, + ) + if wp.overlay_when_lost: + await self.dispatcher.handle(wp.instance, "overlay.remove", wp.overlay_id)