diff --git a/controller/cuda_grid_controller/__main__.py b/controller/cuda_grid_controller/__main__.py index 051a8a4..2456443 100644 --- a/controller/cuda_grid_controller/__main__.py +++ b/controller/cuda_grid_controller/__main__.py @@ -15,6 +15,7 @@ from .config import Config from .dispatch import CommandDispatcher from .browser_overlays import BrowserRenderer, DashboardCfg from .dynamic_overlays import ChartCfg, ChatCfg, DynamicRenderer +from .pipeline_monitor import PipelineMonitor from .frigate_bridge import FrigateBridge, FrigateBridgeCfg from .http_api import create_app from .mqtt_loop import MqttLoop @@ -138,6 +139,15 @@ async def _run(cfg: Config) -> None: watchdog._publish_event = mqtt.publish_event await watchdog.start() + # Pipeline monitor — detect ffmpeg restart + auto-restore overlay state. + pipeline_monitor = PipelineMonitor( + cfg=cfg, state=state, dispatcher=dispatcher, + browser_renderer=browser_renderer, + dynamic_renderer=dynamic_renderer, + frigate_bridge=frigate_bridge, + ) + await pipeline_monitor.start() + try: await asyncio.gather( mqtt.run(), @@ -146,6 +156,7 @@ async def _run(cfg: Config) -> None: except asyncio.CancelledError: log.info("controller.shutdown") finally: + await pipeline_monitor.stop() if dynamic_renderer: await dynamic_renderer.stop() if browser_renderer: diff --git a/controller/cuda_grid_controller/browser_overlays.py b/controller/cuda_grid_controller/browser_overlays.py index eb73e43..21cc7cf 100644 --- a/controller/cuda_grid_controller/browser_overlays.py +++ b/controller/cuda_grid_controller/browser_overlays.py @@ -99,6 +99,12 @@ class BrowserRenderer: self._playwright = None self._browser = None self._pages: dict[str, object] = {} # dashboard_id → playwright Page + self._registered: dict[str, bool] = {} # dashboard_id → registered в pipeline + + def mark_all_unregistered(self) -> None: + """Pipeline restart hook — clear registered flags чтобы _render_loop + re-add overlays на next iteration.""" + self._registered.clear() async def start(self) -> None: if not self.dashboards: @@ -193,7 +199,8 @@ class BrowserRenderer: if page is None: return - registered = False + # Registered state shared в self._registered — pipeline restart hook + # сбрасывает flag → re-add overlay на next iter. # Первый snapshot — сразу. Дальше — каждые refresh_sec. while True: try: @@ -224,9 +231,9 @@ class BrowserRenderer: await self.dispatcher._reload_icon(cfg.target_instance, cfg.id) - if not registered: + if not self._registered.get(cfg.id, False): await self._register_overlay(cfg) - registered = True + self._registered[cfg.id] = True # Reload page для свежих данных (Grafana auto-refresh не triggers # без user interaction в headless mode) diff --git a/controller/cuda_grid_controller/dynamic_overlays.py b/controller/cuda_grid_controller/dynamic_overlays.py index dbc182a..4e825fb 100644 --- a/controller/cuda_grid_controller/dynamic_overlays.py +++ b/controller/cuda_grid_controller/dynamic_overlays.py @@ -170,6 +170,12 @@ class DynamicRenderer: } self._tasks: list[asyncio.Task] = [] self._start_time = time.time() + self._registered: dict[str, bool] = {} # overlay_id → registered + + def mark_all_unregistered(self) -> None: + """Pipeline restart hook — clear registered flags чтобы loops + re-add overlays на next iteration.""" + self._registered.clear() def topics_to_subscribe(self) -> list[str]: topics = [] @@ -215,7 +221,6 @@ class DynamicRenderer: self._tasks.clear() async def _chart_loop(self, cfg: ChartCfg) -> None: - registered = False while True: try: buf = self._chart_data[cfg.id] @@ -224,10 +229,10 @@ class DynamicRenderer: t = time.time() - self._start_time buf.append(20.0 + 10.0 * math.sin(t / 5)) await self._render_and_publish(cfg, lambda: render_chart(cfg, list(buf))) - if not registered: + if not self._registered.get(cfg.id, False): await self._register_overlay(cfg.id, cfg.target_instance, cfg.cell, cfg.x, cfg.y, cfg.opacity, cfg.z_order) - registered = True + self._registered[cfg.id] = True await asyncio.sleep(cfg.refresh_sec) except asyncio.CancelledError: raise @@ -236,19 +241,19 @@ class DynamicRenderer: await asyncio.sleep(5) async def _chat_loop(self, cfg: ChatCfg) -> None: - registered = False last_signature: tuple[str, ...] = () while True: try: msgs = list(self._chat_messages[cfg.id]) sig = tuple(msgs) + registered = self._registered.get(cfg.id, False) if sig != last_signature or not registered: await self._render_and_publish(cfg, lambda: render_chat(cfg, msgs)) last_signature = sig if not registered: await self._register_overlay(cfg.id, cfg.target_instance, cfg.cell, cfg.x, cfg.y, cfg.opacity, cfg.z_order) - registered = True + self._registered[cfg.id] = True await asyncio.sleep(0.1) # tight check для chat reactivity (10 Hz) except asyncio.CancelledError: raise diff --git a/controller/cuda_grid_controller/frigate_bridge.py b/controller/cuda_grid_controller/frigate_bridge.py index e3c60ad..3dec61a 100644 --- a/controller/cuda_grid_controller/frigate_bridge.py +++ b/controller/cuda_grid_controller/frigate_bridge.py @@ -118,6 +118,14 @@ class FrigateBridge: # Active per camera (для auto-layout decision) self._cam_active: dict[str, bool] = {m.frigate_camera: False for m in cfg.mappings} + def mark_all_unregistered(self) -> None: + """Pipeline restart hook — clear borders init + event overlays state. + На следующий MQTT event borders re-init'нутся, bbox events re-add'нутся.""" + self._borders_initialized.clear() + self._event_overlays.clear() + self._event_bbox_last.clear() + self._focus_dims.clear() + def topics_to_subscribe(self) -> list[str]: if not self.cfg.enabled: return [] diff --git a/controller/cuda_grid_controller/pipeline_monitor.py b/controller/cuda_grid_controller/pipeline_monitor.py new file mode 100644 index 0000000..3c840d8 --- /dev/null +++ b/controller/cuda_grid_controller/pipeline_monitor.py @@ -0,0 +1,149 @@ +"""Pipeline monitor — детект restart ffmpeg pipeline + auto-restore overlay state. + +Pipeline filter state (overlays, layout, cell_map) живёт в RAM ffmpeg process'а. +При recreate container (compose up, OOM, etc.) — state теряется. Controller'у +нужно re-push: + * active layout (state.active_layout) + * все overlays (state.overlays) + * audio output state (state.audio_output_enabled) + * browser/dynamic renderer flags (registered → reset, next loop iter re-add) + +Detect mechanism: ZMQ ping (set_layout с current name = no-op) every N sec. +Tracks alive state — first success после consecutive failures → restart event. +""" + +from __future__ import annotations + +import asyncio +from typing import TYPE_CHECKING + +import structlog + +from .layouts import PREDEFINED_LAYOUTS +from .zmq_client import FFmpegZmqClient + +if TYPE_CHECKING: + from .browser_overlays import BrowserRenderer + from .config import Config + from .dispatch import CommandDispatcher + from .dynamic_overlays import DynamicRenderer + from .frigate_bridge import FrigateBridge + from .state import ControllerState + +log = structlog.get_logger() + + +class PipelineMonitor: + """Polls pipeline ZMQ, restores state on restart-detect.""" + + def __init__( + self, + cfg: "Config", + state: "ControllerState", + dispatcher: "CommandDispatcher", + browser_renderer: "BrowserRenderer | None" = None, + dynamic_renderer: "DynamicRenderer | None" = None, + frigate_bridge: "FrigateBridge | None" = None, + poll_interval_sec: float = 3.0, + ) -> None: + self.cfg = cfg + self.state = state + self.dispatcher = dispatcher + self.browser_renderer = browser_renderer + self.dynamic_renderer = dynamic_renderer + self.frigate_bridge = frigate_bridge + self.poll_interval_sec = poll_interval_sec + self._task: asyncio.Task | None = None + self._alive: dict[str, bool] = {} # instance → last-known up/down + + async def start(self) -> None: + self._task = asyncio.create_task(self._loop()) + log.info("pipeline_monitor.started", poll_sec=self.poll_interval_sec, + instances=[i.name for i in self.cfg.instances]) + + async def stop(self) -> None: + if self._task: + self._task.cancel() + try: + await self._task + except asyncio.CancelledError: + pass + + async def _loop(self) -> None: + # Boot delay — give pipeline time to start initially + await asyncio.sleep(self.poll_interval_sec) + while True: + try: + for inst in self.cfg.instances: + await self._check_instance(inst.name) + await asyncio.sleep(self.poll_interval_sec) + except asyncio.CancelledError: + raise + except Exception as e: + log.error("pipeline_monitor.loop_fail", error=str(e)) + await asyncio.sleep(self.poll_interval_sec) + + async def _check_instance(self, instance: str) -> None: + inst = next((i for i in self.cfg.instances if i.name == instance), None) + if inst is None: + return + client = self.dispatcher._client(inst) + try: + # Cheap noop — set_layout к текущему. Filter accepts всегда. + current_layout = await self.state.get_layout(instance) or inst.default_layout + await client.send_command(inst.filter_target, "set_layout", current_layout) + was_alive = self._alive.get(instance, False) + self._alive[instance] = True + if not was_alive: + log.info("pipeline_monitor.restored", instance=instance) + await self._restore_state(instance) + except Exception as e: + if self._alive.get(instance, True): + log.warning("pipeline_monitor.lost", instance=instance, error=str(e)) + self._alive[instance] = False + + async def _restore_state(self, instance: str) -> None: + """Re-push всё state к pipeline после detected restart.""" + inst = next((i for i in self.cfg.instances if i.name == instance), None) + if inst is None: + return + client = self.dispatcher._client(inst) + + # 1. Layout (set_layout уже sent в _check_instance — pipeline restored к + # нему. Просто apply ещё раз чтобы быть consistent.) + layout = await self.state.get_layout(instance) or inst.default_layout + try: + await client.send_command(inst.filter_target, "set_layout", layout) + except Exception as e: + log.warning("pipeline_monitor.layout_fail", instance=instance, error=str(e)) + + # 2. Audio output state (mute/unmute) — re-apply. + try: + enabled = await self.state.get_audio_output_enabled(instance) + await self.dispatcher.set_audio_output_enabled(instance, enabled) + except Exception as e: + log.warning("pipeline_monitor.audio_fail", instance=instance, error=str(e)) + + # 3. Overlays — re-push все из state. + overlays = await self.state.get_overlays(instance) + from .dispatch import _serialize_overlay_to_zmq # local import + for ov in overlays: + try: + zmq_arg = _serialize_overlay_to_zmq(ov) + for target in self.dispatcher._overlay_targets(inst): + await client.send_command(target, "add_overlay", zmq_arg) + except Exception as e: + log.warning("pipeline_monitor.overlay_fail", + instance=instance, overlay_id=ov.id, error=str(e)) + + # 4. Reset registered flags на browser/dynamic renderers + frigate bridge — + # next loop iter / MQTT event auto re-add (add_overlay + reload_icon). + if self.browser_renderer: + self.browser_renderer.mark_all_unregistered() + if self.dynamic_renderer: + self.dynamic_renderer.mark_all_unregistered() + if self.frigate_bridge: + self.frigate_bridge.mark_all_unregistered() + + log.info("pipeline_monitor.restore_done", instance=instance, + layout=layout, overlays=len(overlays))