"""Frigate MQTT bridge — auto-overlay generation для motion + object events. Phase 4b auto-rendering: subscribed на frigate//motion + frigate/events, конвертирует в add/remove overlay commands к configured target instance + cell. Logic: - motion ON → add RectOverlay (orange border) на весь cell - motion OFF → remove тот overlay - event new → add RectOverlay (bbox) + TextOverlay (label + score) - event end → remove оба overlays Bbox coords из Frigate event = absolute pixel в camera resolution. Если в mapping указано camera_width/height — используем для normalize; иначе assume 1920×1080. """ from __future__ import annotations import asyncio import json import time from typing import TYPE_CHECKING import structlog from pydantic import BaseModel, Field from .overlays import DimOverlay, RectOverlay, TextOverlay if TYPE_CHECKING: from .dispatch import CommandDispatcher log = structlog.get_logger() class FrigateCameraMapping(BaseModel): """Mapping Frigate camera_name → vf-cuda-grid cell index в instance.""" frigate_camera: str target_instance: str cell: int = Field(default=0, description="Cell index в layout куда рисовать bbox") placeholder_label: str = Field(default="", description="Текст label на placeholder при stale ('Парковка', 'Ворота'). " "Empty → используется frigate_camera как label. Controller " "рендерит PNG offline_.png при startup.") enabled: bool = True camera_width: int = Field(default=1920, gt=0, description="Native camera resolution для нормализации bbox") camera_height: int = Field(default=1080, gt=0) motion_indicator: bool = Field(default=True, description="Подсвечивать рамкой при motion ON") bbox_overlay: bool = Field(default=True, description="Рисовать bbox для object detection events") priority: int = Field(default=0, description="Higher priority → выводится в main cell при auto-layout. Equal priority → first-active wins") main_cam_index: int = Field(default=0, ge=0, le=15, description="Index в pipeline streamselect@main_cam (соответствует порядку -i в filter_complex)") class BorderTheme(BaseModel): """Стиль cell border в двух состояниях. Color = HEX RGB.""" idle_color: str = Field(default="#808080", description="Нейтральная разделительная рамка") idle_width: int = Field(default=1, ge=1, le=16) idle_opacity: float = Field(default=0.4, ge=0.0, le=1.0) motion_color: str = Field(default="#FF0000", description="Цвет при frigate motion ON") motion_width: int = Field(default=1, ge=1, le=16) motion_opacity: float = Field(default=1.0, ge=0.0, le=1.0) class FocusTheme(BaseModel): """Auto-focus: при motion на одной cell — затемнение остальных. Если motion на 2+ cells одновременно — focus не применяется (no obvious focus).""" enabled: bool = True dim_color: str = "#000000" dim_factor: float = Field(default=0.6, ge=0.0, le=1.0, description="0.0=без затемнения, 1.0=полное черное") class FrigateBridgeCfg(BaseModel): enabled: bool = False base_topic: str = "frigate" mappings: list[FrigateCameraMapping] = [] border_theme: BorderTheme = Field(default_factory=BorderTheme, description="Стиль cell borders — idle/motion цвета") focus_theme: FocusTheme = Field(default_factory=FocusTheme, description="Auto-focus: dim non-active cells когда motion на одной") auto_layout: bool = Field(default=False, description="Auto-switch layout based on active motion + priorities. " "0 active → quad, 1 active → single, 2+ → mpp с highest-priority main") auto_hysteresis_sec: float = Field(default=3.0, ge=0.0, le=60.0, description="Debounce — apply layout change только если state стабилен N sec. " "0 = немедленно. Защищает от short motion blips дёргающих layout") bbox_min_interval_sec: float = Field(default=0.25, ge=0.0, le=5.0, description="Min interval между bbox update ZMQ commands per event_id. " "Frigate publishes 5-10 events/sec при motion → throttle reduces " "render-thread pause (см. cuda-grid-filter-stutter memory). " "0 = no throttle.") bbox_delta_threshold: float = Field(default=0.02, ge=0.0, le=1.0, description="Min relative bbox shift (fraction of camera dim) для new update. " "Skip update если все coords changed < threshold от last. " "0.02 = 2% → 38px на 1920 cam. 0 = no delta filter.") class FrigateBridge: """Frigate MQTT subscriber → auto-overlay commands. Состояние: _motion_overlays: cam → overlay_id (motion indicator rect) _event_overlays: event_id → (rect_id, text_id) для bbox + label """ def __init__(self, cfg: FrigateBridgeCfg, dispatcher: "CommandDispatcher | None" = None) -> None: self.cfg = cfg self.dispatcher = dispatcher self._by_camera = {m.frigate_camera: m for m in cfg.mappings if m.enabled} self._motion_overlays: dict[str, str] = {} # legacy — unused if border theme active self._event_overlays: dict[str, tuple[str, str]] = {} # Throttling state per event_id: (last_apply_ts, last_nx, last_ny, last_nw, last_nh) self._event_bbox_last: dict[str, tuple[float, float, float, float, float]] = {} self._borders_initialized: dict[str, bool] = {} # target_instance → bool self._cell_states: dict[str, set[str]] = {} # ":" → set of active cameras с motion self._focus_dims: dict[str, set[int]] = {} # instance → set of cells which сейчас затемнены self._auto_state: dict[str, tuple[str, int]] = {} # instance → (current_layout, current_main_cam_index) self._auto_pending: dict[str, asyncio.Task] = {} # instance → debounce task # Active per camera (для auto-layout decision) self._cam_active: dict[str, bool] = {m.frigate_camera: False for m in cfg.mappings} def mark_all_unregistered(self) -> None: """Pipeline restart hook — clear borders init + event overlays state. На следующий MQTT event borders re-init'нутся, bbox events re-add'нутся.""" self._borders_initialized.clear() self._event_overlays.clear() self._event_bbox_last.clear() self._focus_dims.clear() def topics_to_subscribe(self) -> list[str]: if not self.cfg.enabled: return [] base = self.cfg.base_topic.rstrip("/") # Frigate publishes state на `frigate//motion/state` ("ON"/"OFF"), # а `frigate//motion` — это SET-topic (control). Subscribe строго к /state. return [f"{base}/+/motion/state", f"{base}/events"] def _cell_border_id(self, cell: int) -> str: return f"cell_{cell}_border" async def _ensure_borders(self, instance: str) -> None: """Lazy init: 4 idle borders для каждого cell в instance. Idempotent.""" if self._borders_initialized.get(instance) or not self.dispatcher: return cells = sorted({m.cell for m in self.cfg.mappings if m.target_instance == instance}) theme = self.cfg.border_theme for cell in cells: ov = RectOverlay( id=self._cell_border_id(cell), cell=cell, x=0.0, y=0.0, w=1.0, h=1.0, color=theme.idle_color, border_only=True, border_width=theme.idle_width, opacity=theme.idle_opacity, z_order=5, ) await self.dispatcher.handle(instance, "overlay.add", ov.model_dump_json()) self._borders_initialized[instance] = True log.info("frigate_bridge.borders_initialized", instance=instance, cells=cells) async def _set_border_state(self, instance: str, cell: int, motion: bool) -> None: """Upsert cell border overlay в idle или motion стиль.""" if not self.dispatcher: return theme = self.cfg.border_theme ov = RectOverlay( id=self._cell_border_id(cell), cell=cell, x=0.0, y=0.0, w=1.0, h=1.0, color=theme.motion_color if motion else theme.idle_color, border_only=True, border_width=theme.motion_width if motion else theme.idle_width, opacity=theme.motion_opacity if motion else theme.idle_opacity, z_order=5, ) await self.dispatcher.handle(instance, "overlay.add", ov.model_dump_json()) async def handle_message(self, topic: str, payload: str) -> None: if not self.cfg.enabled: return # Lazy init borders для всех target instances при первом event for inst in {m.target_instance for m in self.cfg.mappings}: await self._ensure_borders(inst) base = self.cfg.base_topic.rstrip("/") # frigate//motion/state if topic.startswith(f"{base}/") and topic.endswith("/motion/state"): cam = topic[len(base) + 1 : -len("/motion/state")] await self._handle_motion(cam, payload.strip().upper()) return # frigate/events — JSON if topic == f"{base}/events": try: event = json.loads(payload) except json.JSONDecodeError as e: log.warning("frigate.event_parse_fail", error=str(e)) return await self._handle_event(event) async def _handle_motion(self, cam: str, state: str) -> None: mapping = self._by_camera.get(cam) if mapping is None or not mapping.motion_indicator: return log.info("frigate.motion", camera=cam, state=state, target=mapping.target_instance, cell=mapping.cell) if not self.dispatcher: return # Cell может содержать multiple cameras (теоретически) — border ON если # хотя бы один cam имеет motion. Tracking через set'у per cell. cell_key = f"{mapping.target_instance}:{mapping.cell}" active = self._cell_states.setdefault(cell_key, set()) was_active = bool(active) if state == "ON": active.add(cam) elif state == "OFF": active.discard(cam) is_active = bool(active) if is_active != was_active: await self._set_border_state(mapping.target_instance, mapping.cell, motion=is_active) await self._update_focus(mapping.target_instance) self._cam_active[cam] = is_active await self._update_auto_layout(mapping.target_instance) def _compute_target(self, instance: str) -> tuple[str, int, list[str]]: """Returns (layout, main_cam_index, [active cam names sorted by priority]).""" active = [ m for m in self.cfg.mappings if m.target_instance == instance and self._cam_active.get(m.frigate_camera, False) ] active.sort(key=lambda m: -m.priority) if not active: return ("quad", 0, []) if len(active) == 1: return ("single", active[0].main_cam_index, [m.frigate_camera for m in active]) return ("main_plus_preview", active[0].main_cam_index, [m.frigate_camera for m in active]) async def _update_auto_layout(self, instance: str) -> None: """Schedule debounced auto-layout update. Cancel any pending.""" if not self.cfg.auto_layout or not self.dispatcher: return hyst = self.cfg.auto_hysteresis_sec # Cancel pending — мы получили новый state event, timer reset'ится pending = self._auto_pending.pop(instance, None) if pending and not pending.done(): pending.cancel() if hyst <= 0: await self._apply_auto_layout(instance) return # Schedule debounced apply self._auto_pending[instance] = asyncio.create_task( self._debounced_apply(instance, hyst) ) async def _debounced_apply(self, instance: str, delay: float) -> None: try: await asyncio.sleep(delay) await self._apply_auto_layout(instance) except asyncio.CancelledError: log.debug("auto_layout.debounce_cancel", instance=instance) raise async def _apply_auto_layout(self, instance: str) -> None: target_layout, target_main_cam, active_names = self._compute_target(instance) prev = self._auto_state.get(instance, (None, None)) if prev == (target_layout, target_main_cam): return log.info("auto_layout.change", instance=instance, from_state=prev, to_layout=target_layout, to_main=target_main_cam, active=active_names) await self.dispatcher.set_main_cam(instance, target_main_cam) await self.dispatcher.handle(instance, "layout.set", target_layout) self._auto_state[instance] = (target_layout, target_main_cam) def _cell_dim_id(self, cell: int) -> str: return f"cell_{cell}_focus_dim" async def _update_focus(self, instance: str) -> None: """Auto-focus logic: 0 active cells → no dim (remove all focus dims) 1 active cell → dim non-focus cells (focus mode ON) 2+ active cells → no dim (too many — no obvious single focus) """ if not self.dispatcher or not self.cfg.focus_theme.enabled: return # Active cells для этого instance active_cells = { int(key.split(":", 1)[1]) for key, cams in self._cell_states.items() if cams and key.startswith(f"{instance}:") } all_cells = sorted({m.cell for m in self.cfg.mappings if m.target_instance == instance}) currently_dimmed = self._focus_dims.setdefault(instance, set()) target_dimmed: set[int] if len(active_cells) == 1: focus_cell = next(iter(active_cells)) target_dimmed = {c for c in all_cells if c != focus_cell} else: target_dimmed = set() to_add = target_dimmed - currently_dimmed to_remove = currently_dimmed - target_dimmed theme = self.cfg.focus_theme for cell in to_add: ov = DimOverlay( id=self._cell_dim_id(cell), cell=cell, x=0.0, y=0.0, w=1.0, h=1.0, color=theme.dim_color, dim_factor=theme.dim_factor, z_order=1, # под bbox (20) и border (5) но над cell content ) await self.dispatcher.handle(instance, "overlay.add", ov.model_dump_json()) currently_dimmed.add(cell) for cell in to_remove: await self.dispatcher.handle(instance, "overlay.remove", self._cell_dim_id(cell)) currently_dimmed.discard(cell) if to_add or to_remove: log.info("focus.updated", instance=instance, active=sorted(active_cells), dimmed=sorted(target_dimmed)) async def _handle_event(self, event: dict) -> None: event_type = event.get("type", "?") after = event.get("after") or {} before = event.get("before") or {} cam = after.get("camera") or before.get("camera") if not cam: return mapping = self._by_camera.get(cam) if mapping is None or not mapping.bbox_overlay: return if not self.dispatcher: return event_id = after.get("id") or before.get("id") if not event_id: return label = after.get("label", "?") score = after.get("score") or after.get("top_score") or 0.0 log.info( "frigate.event", camera=cam, type=event_type, label=label, score=round(score, 2), target=mapping.target_instance, cell=mapping.cell, event_id=event_id, ) if event_type in ("new", "update"): await self._upsert_event_overlay(mapping, event_id, after, label, score) elif event_type == "end": await self._remove_event_overlay(mapping, event_id) async def _upsert_event_overlay(self, mapping, event_id: str, after: dict, label: str, score: float) -> None: box = after.get("box") # [x1, y1, x2, y2] в абс пикселях камеры if not box or len(box) != 4: return x1, y1, x2, y2 = box nx = x1 / mapping.camera_width ny = y1 / mapping.camera_height nw = (x2 - x1) / mapping.camera_width nh = (y2 - y1) / mapping.camera_height # Clamp на [0, 1] nx = max(0.0, min(0.99, nx)) ny = max(0.0, min(0.99, ny)) nw = max(0.01, min(1.0 - nx, nw)) nh = max(0.01, min(1.0 - ny, nh)) # Throttle: skip update if too frequent OR bbox barely moved. # First emission всегда applies (regardless of throttle). now = time.monotonic() prev = self._event_bbox_last.get(event_id) if prev is not None: prev_ts, px, py, pw, ph = prev if self.cfg.bbox_min_interval_sec > 0 and \ (now - prev_ts) < self.cfg.bbox_min_interval_sec: return # rate-limit if self.cfg.bbox_delta_threshold > 0: dt = self.cfg.bbox_delta_threshold if abs(nx - px) < dt and abs(ny - py) < dt and \ abs(nw - pw) < dt and abs(nh - ph) < dt: return # delta filter — barely changed self._event_bbox_last[event_id] = (now, nx, ny, nw, nh) # Short id'ы — кутаем event_id чтобы влезть в 32 chars eid_short = event_id.replace("-", "")[:8] rect_id = f"e{eid_short}r" text_id = f"e{eid_short}t" rect = RectOverlay( id=rect_id, cell=mapping.cell, x=nx, y=ny, w=nw, h=nh, color="#00FF00", border_only=True, border_width=3, z_order=20, opacity=1.0, ) text = TextOverlay( id=text_id, cell=mapping.cell, x=nx, y=max(0.0, ny - 0.04), # над bbox text=f"{label} {int(score * 100)}%", font_size=20, color="#00FF00", z_order=21, opacity=1.0, ) self._event_overlays[event_id] = (rect_id, text_id) await self.dispatcher.handle(mapping.target_instance, "overlay.add", rect.model_dump_json()) await self.dispatcher.handle(mapping.target_instance, "overlay.add", text.model_dump_json()) async def _remove_event_overlay(self, mapping, event_id: str) -> None: self._event_bbox_last.pop(event_id, None) # cleanup throttle state stored = self._event_overlays.pop(event_id, None) if not stored: return rect_id, text_id = stored await self.dispatcher.handle(mapping.target_instance, "overlay.remove", rect_id) await self.dispatcher.handle(mapping.target_instance, "overlay.remove", text_id)