"""Pipeline monitor — детект restart ffmpeg pipeline + auto-restore overlay state. Pipeline filter state (overlays, layout, cell_map) живёт в RAM ffmpeg process'а. При recreate container (compose up, OOM, etc.) — state теряется. Controller'у нужно re-push: * active layout (state.active_layout) * все overlays (state.overlays) * audio output state (state.audio_output_enabled) * browser/dynamic renderer flags (registered → reset, next loop iter re-add) Detect mechanism: ZMQ ping (set_layout с current name = no-op) every N sec. Tracks alive state — first success после consecutive failures → restart event. """ from __future__ import annotations import asyncio from typing import TYPE_CHECKING import structlog from .layouts import PREDEFINED_LAYOUTS from .zmq_client import FFmpegZmqClient if TYPE_CHECKING: from .browser_overlays import BrowserRenderer from .config import Config from .dispatch import CommandDispatcher from .dynamic_overlays import DynamicRenderer from .frigate_bridge import FrigateBridge from .state import ControllerState log = structlog.get_logger() class PipelineMonitor: """Polls pipeline ZMQ, restores state on restart-detect.""" def __init__( self, cfg: "Config", state: "ControllerState", dispatcher: "CommandDispatcher", browser_renderer: "BrowserRenderer | None" = None, dynamic_renderer: "DynamicRenderer | None" = None, frigate_bridge: "FrigateBridge | None" = None, poll_interval_sec: float = 3.0, ) -> None: self.cfg = cfg self.state = state self.dispatcher = dispatcher self.browser_renderer = browser_renderer self.dynamic_renderer = dynamic_renderer self.frigate_bridge = frigate_bridge self.poll_interval_sec = poll_interval_sec self._task: asyncio.Task | None = None self._alive: dict[str, bool] = {} # instance → last-known up/down async def start(self) -> None: self._task = asyncio.create_task(self._loop()) log.info("pipeline_monitor.started", poll_sec=self.poll_interval_sec, instances=[i.name for i in self.cfg.instances]) async def stop(self) -> None: if self._task: self._task.cancel() try: await self._task except asyncio.CancelledError: pass async def _loop(self) -> None: # Boot delay — give pipeline time to start initially await asyncio.sleep(self.poll_interval_sec) while True: try: for inst in self.cfg.instances: await self._check_instance(inst.name) await asyncio.sleep(self.poll_interval_sec) except asyncio.CancelledError: raise except Exception as e: log.error("pipeline_monitor.loop_fail", error=str(e)) await asyncio.sleep(self.poll_interval_sec) async def _check_instance(self, instance: str) -> None: inst = next((i for i in self.cfg.instances if i.name == instance), None) if inst is None: return client = self.dispatcher._client(inst) try: # Cheap noop — set_layout к текущему. Filter accepts всегда. current_layout = await self.state.get_layout(instance) or inst.default_layout await client.send_command(inst.filter_target, "set_layout", current_layout) was_alive = self._alive.get(instance, False) self._alive[instance] = True if not was_alive: log.info("pipeline_monitor.restored", instance=instance) await self._restore_state(instance) except Exception as e: if self._alive.get(instance, True): log.warning("pipeline_monitor.lost", instance=instance, error=str(e)) self._alive[instance] = False async def _restore_state(self, instance: str) -> None: """Re-push всё state к pipeline после detected restart.""" inst = next((i for i in self.cfg.instances if i.name == instance), None) if inst is None: return client = self.dispatcher._client(inst) # Pipeline filter может быть ещё в startup — ZMQ может отвечать но # filter graph не полностью инициализирован. Wait 2 sec чтобы overlays # accepted reliably. await asyncio.sleep(2.0) # 1. Layout (set_layout уже sent в _check_instance — pipeline restored к # нему. Просто apply ещё раз чтобы быть consistent.) layout = await self.state.get_layout(instance) or inst.default_layout try: await client.send_command(inst.filter_target, "set_layout", layout) except Exception as e: log.warning("pipeline_monitor.layout_fail", instance=instance, error=str(e)) # 2. Audio output state (mute/unmute) — re-apply. try: enabled = await self.state.get_audio_output_enabled(instance) await self.dispatcher.set_audio_output_enabled(instance, enabled) except Exception as e: log.warning("pipeline_monitor.audio_fail", instance=instance, error=str(e)) # 3. Overlays — re-push все из state. overlays = await self.state.get_overlays(instance) from .dispatch import _serialize_overlay_to_zmq # local import for ov in overlays: try: zmq_arg = _serialize_overlay_to_zmq(ov) for target in self.dispatcher._overlay_targets(inst): await client.send_command(target, "add_overlay", zmq_arg) except Exception as e: log.warning("pipeline_monitor.overlay_fail", instance=instance, overlay_id=ov.id, error=str(e)) # 4. Reset registered flags на browser/dynamic renderers + frigate bridge — # next loop iter / MQTT event auto re-add (add_overlay + reload_icon). if self.browser_renderer: self.browser_renderer.mark_all_unregistered() if self.dynamic_renderer: self.dynamic_renderer.mark_all_unregistered() if self.frigate_bridge: self.frigate_bridge.mark_all_unregistered() log.info("pipeline_monitor.restore_done", instance=instance, layout=layout, overlays=len(overlays))