Files
vf-cuda-grid/controller/cuda_grid_controller/watchdog.py
T
gx 155038aabb controller: stream watchdog (Phase 1 resilience, issue #3)
StreamWatchdog (watchdog.py) — polls mediamtx /v3/paths/list каждые N sec.
Если ожидаемый path missing > threshold → emit MQTT event stream_lost +
показывает text overlay 'OFFLINE'. При восстановлении — stream_restored +
remove overlay.

Config:
  watchdog:
    enabled: true
    mediamtx_api_url: http://cuda-grid-mediamtx:9997
    poll_interval_sec: 5.0
    lost_threshold_sec: 15.0
    paths:
      - mediamtx_path: live-audio
        instance: tv_grid
        label: Audio
        overlay_when_lost: true

httpx добавлен в Dockerfile.

Сегодняшний incident (audio sidecar потерял connection с mediamtx →
pipeline restart loop) — watchdog обнаружит missing live-audio через
15 sec + покажет TV-side warning. Manual restart audio sidecar still
needed (watchdog auto-restart — Phase 2).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-21 10:07:12 +01:00

152 lines
5.8 KiB
Python

"""Stream watchdog — monitor mediamtx paths + emit MQTT events + dispatch overlays.
Phase 1 resilience (issue #3): controller poll'ит /v3/paths/list на mediamtx.
Если ожидаемый path исчез/появился — публикует MQTT events + опционально
показывает icon overlay (e.g. "🔇 audio offline").
Полагается ONLY на observable state mediamtx — не lazy на ffmpeg внутреннее
состояние pipeline.
"""
from __future__ import annotations
import asyncio
import time
from typing import TYPE_CHECKING
import httpx
import structlog
from pydantic import BaseModel, Field
from .overlays import TextOverlay
if TYPE_CHECKING:
from .dispatch import CommandDispatcher
log = structlog.get_logger()
class WatchedPath(BaseModel):
"""Один stream path для monitoring."""
mediamtx_path: str = Field(description="Имя path в mediamtx (например 'live-audio')")
instance: str = Field(description="cuda-grid instance — для overlay/MQTT context")
label: str = Field(description="Human-readable name для events/overlays")
overlay_when_lost: bool = Field(default=False,
description="Показывать text overlay 'OFFLINE' когда path lost")
overlay_id: str = Field(default="watchdog_offline",
description="Overlay id если overlay_when_lost=true")
class WatchdogCfg(BaseModel):
enabled: bool = False
mediamtx_api_url: str = Field(default="http://cuda-grid-mediamtx:9997",
description="mediamtx control API base URL")
poll_interval_sec: float = Field(default=5.0, ge=1.0, le=60.0)
lost_threshold_sec: float = Field(default=10.0, ge=2.0, le=120.0,
description="Path missing > N sec → emit lost event")
paths: list[WatchedPath] = []
class StreamWatchdog:
def __init__(self, cfg: WatchdogCfg, dispatcher: "CommandDispatcher",
mqtt_publish_event=None) -> None:
self.cfg = cfg
self.dispatcher = dispatcher
self._publish_event = mqtt_publish_event # async callback (instance, kind, data)
self._task: asyncio.Task | None = None
# state per path: { path_name: { last_seen_ts, current_lost } }
self._path_state: dict[str, dict] = {}
async def start(self) -> None:
if not self.cfg.enabled or not self.cfg.paths:
return
self._task = asyncio.create_task(self._loop())
log.info("watchdog.started", paths=[p.mediamtx_path for p in self.cfg.paths])
async def stop(self) -> None:
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
async def _loop(self) -> None:
while True:
try:
await self._poll_once()
await asyncio.sleep(self.cfg.poll_interval_sec)
except asyncio.CancelledError:
raise
except Exception as e:
log.error("watchdog.poll_fail", error=str(e))
await asyncio.sleep(self.cfg.poll_interval_sec)
async def _poll_once(self) -> None:
try:
async with httpx.AsyncClient(timeout=3.0) as client:
r = await client.get(f"{self.cfg.mediamtx_api_url}/v3/paths/list")
r.raise_for_status()
data = r.json()
except Exception as e:
log.warning("watchdog.mediamtx_api_fail", error=str(e))
return
# mediamtx /v3/paths/list — { items: [{name, ready, ...}] }
active_paths = {
item["name"]: item for item in data.get("items", [])
if item.get("ready", False)
}
now = time.time()
for wp in self.cfg.paths:
state = self._path_state.setdefault(wp.mediamtx_path, {
"last_seen": 0, "current_lost": False,
})
if wp.mediamtx_path in active_paths:
# Path жив — recently seen
state["last_seen"] = now
if state["current_lost"]:
# Restored
state["current_lost"] = False
await self._on_restored(wp)
else:
# Path не активен — check threshold
age = now - state["last_seen"]
if age >= self.cfg.lost_threshold_sec and not state["current_lost"]:
state["current_lost"] = True
await self._on_lost(wp)
async def _on_lost(self, wp: WatchedPath) -> None:
log.warning("watchdog.stream_lost", path=wp.mediamtx_path, label=wp.label)
if self._publish_event:
await self._publish_event(
wp.instance, "stream_lost",
{"path": wp.mediamtx_path, "label": wp.label},
)
if wp.overlay_when_lost:
ov = TextOverlay(
id=wp.overlay_id,
cell=None, # absolute on output frame
x=0.35, y=0.92, # внизу по центру
text=f"{wp.label} OFFLINE",
font_size=32,
color="#FF4040",
opacity=1.0,
z_order=30,
)
await self.dispatcher.handle(wp.instance, "overlay.add", ov.model_dump_json())
async def _on_restored(self, wp: WatchedPath) -> None:
log.info("watchdog.stream_restored", path=wp.mediamtx_path, label=wp.label)
if self._publish_event:
await self._publish_event(
wp.instance, "stream_restored",
{"path": wp.mediamtx_path, "label": wp.label},
)
if wp.overlay_when_lost:
await self.dispatcher.handle(wp.instance, "overlay.remove", wp.overlay_id)