"""Stream watchdog — monitor mediamtx paths + emit MQTT events + dispatch overlays. Phase 1 resilience (issue #3): controller poll'ит /v3/paths/list на mediamtx. Если ожидаемый path исчез/появился — публикует MQTT events + опционально показывает icon overlay (e.g. "🔇 audio offline"). Полагается ONLY на observable state mediamtx — не lazy на ffmpeg внутреннее состояние pipeline. """ from __future__ import annotations import asyncio import time from typing import TYPE_CHECKING import httpx import structlog from pydantic import BaseModel, Field from .overlays import TextOverlay if TYPE_CHECKING: from .dispatch import CommandDispatcher log = structlog.get_logger() class WatchedPath(BaseModel): """Один stream path для monitoring.""" mediamtx_path: str = Field(description="Имя path в mediamtx (например 'live-audio')") instance: str = Field(description="cuda-grid instance — для overlay/MQTT context") label: str = Field(description="Human-readable name для events/overlays") overlay_when_lost: bool = Field(default=False, description="Показывать text overlay 'OFFLINE' когда path lost") overlay_id: str = Field(default="watchdog_offline", description="Overlay id если overlay_when_lost=true") class WatchdogCfg(BaseModel): enabled: bool = False mediamtx_api_url: str = Field(default="http://cuda-grid-mediamtx:9997", description="mediamtx control API base URL") poll_interval_sec: float = Field(default=5.0, ge=1.0, le=60.0) lost_threshold_sec: float = Field(default=10.0, ge=2.0, le=120.0, description="Path missing > N sec → emit lost event") paths: list[WatchedPath] = [] class StreamWatchdog: def __init__(self, cfg: WatchdogCfg, dispatcher: "CommandDispatcher", mqtt_publish_event=None) -> None: self.cfg = cfg self.dispatcher = dispatcher self._publish_event = mqtt_publish_event # async callback (instance, kind, data) self._task: asyncio.Task | None = None # state per path: { path_name: { last_seen_ts, current_lost } } self._path_state: dict[str, dict] = {} async def start(self) -> None: if not self.cfg.enabled or not self.cfg.paths: return self._task = asyncio.create_task(self._loop()) log.info("watchdog.started", paths=[p.mediamtx_path for p in self.cfg.paths]) async def stop(self) -> None: if self._task: self._task.cancel() try: await self._task except asyncio.CancelledError: pass self._task = None async def _loop(self) -> None: while True: try: await self._poll_once() await asyncio.sleep(self.cfg.poll_interval_sec) except asyncio.CancelledError: raise except Exception as e: log.error("watchdog.poll_fail", error=str(e)) await asyncio.sleep(self.cfg.poll_interval_sec) async def _poll_once(self) -> None: try: async with httpx.AsyncClient(timeout=3.0) as client: r = await client.get(f"{self.cfg.mediamtx_api_url}/v3/paths/list") r.raise_for_status() data = r.json() except Exception as e: log.warning("watchdog.mediamtx_api_fail", error=str(e)) return # mediamtx /v3/paths/list — { items: [{name, ready, ...}] } active_paths = { item["name"]: item for item in data.get("items", []) if item.get("ready", False) } now = time.time() for wp in self.cfg.paths: state = self._path_state.setdefault(wp.mediamtx_path, { "last_seen": 0, "current_lost": False, }) if wp.mediamtx_path in active_paths: # Path жив — recently seen state["last_seen"] = now if state["current_lost"]: # Restored state["current_lost"] = False await self._on_restored(wp) else: # Path не активен — check threshold age = now - state["last_seen"] if age >= self.cfg.lost_threshold_sec and not state["current_lost"]: state["current_lost"] = True await self._on_lost(wp) async def _on_lost(self, wp: WatchedPath) -> None: log.warning("watchdog.stream_lost", path=wp.mediamtx_path, label=wp.label) if self._publish_event: await self._publish_event( wp.instance, "stream_lost", {"path": wp.mediamtx_path, "label": wp.label}, ) if wp.overlay_when_lost: ov = TextOverlay( id=wp.overlay_id, cell=None, # absolute on output frame x=0.35, y=0.92, # внизу по центру text=f"⚠ {wp.label} OFFLINE", font_size=32, color="#FF4040", opacity=1.0, z_order=30, ) await self.dispatcher.handle(wp.instance, "overlay.add", ov.model_dump_json()) async def _on_restored(self, wp: WatchedPath) -> None: log.info("watchdog.stream_restored", path=wp.mediamtx_path, label=wp.label) if self._publish_event: await self._publish_event( wp.instance, "stream_restored", {"path": wp.mediamtx_path, "label": wp.label}, ) if wp.overlay_when_lost: await self.dispatcher.handle(wp.instance, "overlay.remove", wp.overlay_id)