controller: PipelineMonitor — auto-restore overlay state после pipeline restart
Pipeline filter state (overlays, layout, cell_map, audio) живёт в RAM
ffmpeg process. При recreate container (compose up, OOM, NVENC crash,
config change) state lost — controller'у нужно re-push.
Раньше user'у приходилось вручную:
curl POST /layout/.../set
docker restart cuda-grid-controller # для browser/dynamic re-register
Теперь автоматизировано:
PipelineMonitor polls ZMQ каждые 3 sec (no-op set_layout).
On timeout/error → mark instance lost.
First success after lost → trigger restore:
1. set_layout к state.active_layout
2. set_audio_output_enabled к state.audio_output_enabled
3. re-push все overlays из state.overlays
4. browser/dynamic/frigate hooks: mark_all_unregistered() —
их loops автоматически re-add на next iteration
Verified test: docker restart cuda-grid-pipeline → 10 sec downtime →
monitor logs lost+restored+restore_done с count=6 overlays.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -15,6 +15,7 @@ from .config import Config
|
||||
from .dispatch import CommandDispatcher
|
||||
from .browser_overlays import BrowserRenderer, DashboardCfg
|
||||
from .dynamic_overlays import ChartCfg, ChatCfg, DynamicRenderer
|
||||
from .pipeline_monitor import PipelineMonitor
|
||||
from .frigate_bridge import FrigateBridge, FrigateBridgeCfg
|
||||
from .http_api import create_app
|
||||
from .mqtt_loop import MqttLoop
|
||||
@@ -138,6 +139,15 @@ async def _run(cfg: Config) -> None:
|
||||
watchdog._publish_event = mqtt.publish_event
|
||||
await watchdog.start()
|
||||
|
||||
# Pipeline monitor — detect ffmpeg restart + auto-restore overlay state.
|
||||
pipeline_monitor = PipelineMonitor(
|
||||
cfg=cfg, state=state, dispatcher=dispatcher,
|
||||
browser_renderer=browser_renderer,
|
||||
dynamic_renderer=dynamic_renderer,
|
||||
frigate_bridge=frigate_bridge,
|
||||
)
|
||||
await pipeline_monitor.start()
|
||||
|
||||
try:
|
||||
await asyncio.gather(
|
||||
mqtt.run(),
|
||||
@@ -146,6 +156,7 @@ async def _run(cfg: Config) -> None:
|
||||
except asyncio.CancelledError:
|
||||
log.info("controller.shutdown")
|
||||
finally:
|
||||
await pipeline_monitor.stop()
|
||||
if dynamic_renderer:
|
||||
await dynamic_renderer.stop()
|
||||
if browser_renderer:
|
||||
|
||||
@@ -99,6 +99,12 @@ class BrowserRenderer:
|
||||
self._playwright = None
|
||||
self._browser = None
|
||||
self._pages: dict[str, object] = {} # dashboard_id → playwright Page
|
||||
self._registered: dict[str, bool] = {} # dashboard_id → registered в pipeline
|
||||
|
||||
def mark_all_unregistered(self) -> None:
|
||||
"""Pipeline restart hook — clear registered flags чтобы _render_loop
|
||||
re-add overlays на next iteration."""
|
||||
self._registered.clear()
|
||||
|
||||
async def start(self) -> None:
|
||||
if not self.dashboards:
|
||||
@@ -193,7 +199,8 @@ class BrowserRenderer:
|
||||
if page is None:
|
||||
return
|
||||
|
||||
registered = False
|
||||
# Registered state shared в self._registered — pipeline restart hook
|
||||
# сбрасывает flag → re-add overlay на next iter.
|
||||
# Первый snapshot — сразу. Дальше — каждые refresh_sec.
|
||||
while True:
|
||||
try:
|
||||
@@ -224,9 +231,9 @@ class BrowserRenderer:
|
||||
|
||||
await self.dispatcher._reload_icon(cfg.target_instance, cfg.id)
|
||||
|
||||
if not registered:
|
||||
if not self._registered.get(cfg.id, False):
|
||||
await self._register_overlay(cfg)
|
||||
registered = True
|
||||
self._registered[cfg.id] = True
|
||||
|
||||
# Reload page для свежих данных (Grafana auto-refresh не triggers
|
||||
# без user interaction в headless mode)
|
||||
|
||||
@@ -170,6 +170,12 @@ class DynamicRenderer:
|
||||
}
|
||||
self._tasks: list[asyncio.Task] = []
|
||||
self._start_time = time.time()
|
||||
self._registered: dict[str, bool] = {} # overlay_id → registered
|
||||
|
||||
def mark_all_unregistered(self) -> None:
|
||||
"""Pipeline restart hook — clear registered flags чтобы loops
|
||||
re-add overlays на next iteration."""
|
||||
self._registered.clear()
|
||||
|
||||
def topics_to_subscribe(self) -> list[str]:
|
||||
topics = []
|
||||
@@ -215,7 +221,6 @@ class DynamicRenderer:
|
||||
self._tasks.clear()
|
||||
|
||||
async def _chart_loop(self, cfg: ChartCfg) -> None:
|
||||
registered = False
|
||||
while True:
|
||||
try:
|
||||
buf = self._chart_data[cfg.id]
|
||||
@@ -224,10 +229,10 @@ class DynamicRenderer:
|
||||
t = time.time() - self._start_time
|
||||
buf.append(20.0 + 10.0 * math.sin(t / 5))
|
||||
await self._render_and_publish(cfg, lambda: render_chart(cfg, list(buf)))
|
||||
if not registered:
|
||||
if not self._registered.get(cfg.id, False):
|
||||
await self._register_overlay(cfg.id, cfg.target_instance, cfg.cell,
|
||||
cfg.x, cfg.y, cfg.opacity, cfg.z_order)
|
||||
registered = True
|
||||
self._registered[cfg.id] = True
|
||||
await asyncio.sleep(cfg.refresh_sec)
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
@@ -236,19 +241,19 @@ class DynamicRenderer:
|
||||
await asyncio.sleep(5)
|
||||
|
||||
async def _chat_loop(self, cfg: ChatCfg) -> None:
|
||||
registered = False
|
||||
last_signature: tuple[str, ...] = ()
|
||||
while True:
|
||||
try:
|
||||
msgs = list(self._chat_messages[cfg.id])
|
||||
sig = tuple(msgs)
|
||||
registered = self._registered.get(cfg.id, False)
|
||||
if sig != last_signature or not registered:
|
||||
await self._render_and_publish(cfg, lambda: render_chat(cfg, msgs))
|
||||
last_signature = sig
|
||||
if not registered:
|
||||
await self._register_overlay(cfg.id, cfg.target_instance, cfg.cell,
|
||||
cfg.x, cfg.y, cfg.opacity, cfg.z_order)
|
||||
registered = True
|
||||
self._registered[cfg.id] = True
|
||||
await asyncio.sleep(0.1) # tight check для chat reactivity (10 Hz)
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
|
||||
@@ -118,6 +118,14 @@ class FrigateBridge:
|
||||
# Active per camera (для auto-layout decision)
|
||||
self._cam_active: dict[str, bool] = {m.frigate_camera: False for m in cfg.mappings}
|
||||
|
||||
def mark_all_unregistered(self) -> None:
|
||||
"""Pipeline restart hook — clear borders init + event overlays state.
|
||||
На следующий MQTT event borders re-init'нутся, bbox events re-add'нутся."""
|
||||
self._borders_initialized.clear()
|
||||
self._event_overlays.clear()
|
||||
self._event_bbox_last.clear()
|
||||
self._focus_dims.clear()
|
||||
|
||||
def topics_to_subscribe(self) -> list[str]:
|
||||
if not self.cfg.enabled:
|
||||
return []
|
||||
|
||||
@@ -0,0 +1,149 @@
|
||||
"""Pipeline monitor — детект restart ffmpeg pipeline + auto-restore overlay state.
|
||||
|
||||
Pipeline filter state (overlays, layout, cell_map) живёт в RAM ffmpeg process'а.
|
||||
При recreate container (compose up, OOM, etc.) — state теряется. Controller'у
|
||||
нужно re-push:
|
||||
* active layout (state.active_layout)
|
||||
* все overlays (state.overlays)
|
||||
* audio output state (state.audio_output_enabled)
|
||||
* browser/dynamic renderer flags (registered → reset, next loop iter re-add)
|
||||
|
||||
Detect mechanism: ZMQ ping (set_layout с current name = no-op) every N sec.
|
||||
Tracks alive state — first success после consecutive failures → restart event.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import structlog
|
||||
|
||||
from .layouts import PREDEFINED_LAYOUTS
|
||||
from .zmq_client import FFmpegZmqClient
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .browser_overlays import BrowserRenderer
|
||||
from .config import Config
|
||||
from .dispatch import CommandDispatcher
|
||||
from .dynamic_overlays import DynamicRenderer
|
||||
from .frigate_bridge import FrigateBridge
|
||||
from .state import ControllerState
|
||||
|
||||
log = structlog.get_logger()
|
||||
|
||||
|
||||
class PipelineMonitor:
|
||||
"""Polls pipeline ZMQ, restores state on restart-detect."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
cfg: "Config",
|
||||
state: "ControllerState",
|
||||
dispatcher: "CommandDispatcher",
|
||||
browser_renderer: "BrowserRenderer | None" = None,
|
||||
dynamic_renderer: "DynamicRenderer | None" = None,
|
||||
frigate_bridge: "FrigateBridge | None" = None,
|
||||
poll_interval_sec: float = 3.0,
|
||||
) -> None:
|
||||
self.cfg = cfg
|
||||
self.state = state
|
||||
self.dispatcher = dispatcher
|
||||
self.browser_renderer = browser_renderer
|
||||
self.dynamic_renderer = dynamic_renderer
|
||||
self.frigate_bridge = frigate_bridge
|
||||
self.poll_interval_sec = poll_interval_sec
|
||||
self._task: asyncio.Task | None = None
|
||||
self._alive: dict[str, bool] = {} # instance → last-known up/down
|
||||
|
||||
async def start(self) -> None:
|
||||
self._task = asyncio.create_task(self._loop())
|
||||
log.info("pipeline_monitor.started", poll_sec=self.poll_interval_sec,
|
||||
instances=[i.name for i in self.cfg.instances])
|
||||
|
||||
async def stop(self) -> None:
|
||||
if self._task:
|
||||
self._task.cancel()
|
||||
try:
|
||||
await self._task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
|
||||
async def _loop(self) -> None:
|
||||
# Boot delay — give pipeline time to start initially
|
||||
await asyncio.sleep(self.poll_interval_sec)
|
||||
while True:
|
||||
try:
|
||||
for inst in self.cfg.instances:
|
||||
await self._check_instance(inst.name)
|
||||
await asyncio.sleep(self.poll_interval_sec)
|
||||
except asyncio.CancelledError:
|
||||
raise
|
||||
except Exception as e:
|
||||
log.error("pipeline_monitor.loop_fail", error=str(e))
|
||||
await asyncio.sleep(self.poll_interval_sec)
|
||||
|
||||
async def _check_instance(self, instance: str) -> None:
|
||||
inst = next((i for i in self.cfg.instances if i.name == instance), None)
|
||||
if inst is None:
|
||||
return
|
||||
client = self.dispatcher._client(inst)
|
||||
try:
|
||||
# Cheap noop — set_layout к текущему. Filter accepts всегда.
|
||||
current_layout = await self.state.get_layout(instance) or inst.default_layout
|
||||
await client.send_command(inst.filter_target, "set_layout", current_layout)
|
||||
was_alive = self._alive.get(instance, False)
|
||||
self._alive[instance] = True
|
||||
if not was_alive:
|
||||
log.info("pipeline_monitor.restored", instance=instance)
|
||||
await self._restore_state(instance)
|
||||
except Exception as e:
|
||||
if self._alive.get(instance, True):
|
||||
log.warning("pipeline_monitor.lost", instance=instance, error=str(e))
|
||||
self._alive[instance] = False
|
||||
|
||||
async def _restore_state(self, instance: str) -> None:
|
||||
"""Re-push всё state к pipeline после detected restart."""
|
||||
inst = next((i for i in self.cfg.instances if i.name == instance), None)
|
||||
if inst is None:
|
||||
return
|
||||
client = self.dispatcher._client(inst)
|
||||
|
||||
# 1. Layout (set_layout уже sent в _check_instance — pipeline restored к
|
||||
# нему. Просто apply ещё раз чтобы быть consistent.)
|
||||
layout = await self.state.get_layout(instance) or inst.default_layout
|
||||
try:
|
||||
await client.send_command(inst.filter_target, "set_layout", layout)
|
||||
except Exception as e:
|
||||
log.warning("pipeline_monitor.layout_fail", instance=instance, error=str(e))
|
||||
|
||||
# 2. Audio output state (mute/unmute) — re-apply.
|
||||
try:
|
||||
enabled = await self.state.get_audio_output_enabled(instance)
|
||||
await self.dispatcher.set_audio_output_enabled(instance, enabled)
|
||||
except Exception as e:
|
||||
log.warning("pipeline_monitor.audio_fail", instance=instance, error=str(e))
|
||||
|
||||
# 3. Overlays — re-push все из state.
|
||||
overlays = await self.state.get_overlays(instance)
|
||||
from .dispatch import _serialize_overlay_to_zmq # local import
|
||||
for ov in overlays:
|
||||
try:
|
||||
zmq_arg = _serialize_overlay_to_zmq(ov)
|
||||
for target in self.dispatcher._overlay_targets(inst):
|
||||
await client.send_command(target, "add_overlay", zmq_arg)
|
||||
except Exception as e:
|
||||
log.warning("pipeline_monitor.overlay_fail",
|
||||
instance=instance, overlay_id=ov.id, error=str(e))
|
||||
|
||||
# 4. Reset registered flags на browser/dynamic renderers + frigate bridge —
|
||||
# next loop iter / MQTT event auto re-add (add_overlay + reload_icon).
|
||||
if self.browser_renderer:
|
||||
self.browser_renderer.mark_all_unregistered()
|
||||
if self.dynamic_renderer:
|
||||
self.dynamic_renderer.mark_all_unregistered()
|
||||
if self.frigate_bridge:
|
||||
self.frigate_bridge.mark_all_unregistered()
|
||||
|
||||
log.info("pipeline_monitor.restore_done", instance=instance,
|
||||
layout=layout, overlays=len(overlays))
|
||||
Reference in New Issue
Block a user