976aed52e9
Frigate publishes 5-10 detection events/sec при motion → каждое emit ZMQ overlay update к pipeline (mutex acquire, atlas rebuild). Это блокирует render thread и вызывает TV stutter (см. cuda-grid-filter-stutter memory 2026-05-21 диагностика). Throttle config: bbox_min_interval_sec: 0.25 # max 4 updates/sec на event_id bbox_delta_threshold: 0.02 # skip если все coords changed < 2% camera dim State в self._event_bbox_last: event_id → (timestamp, nx, ny, nw, nh). Cleanup в _remove_event_overlay (event end). С default 0.25s + 0.02 threshold: 5-10 ev/sec → ~2-4 ev/sec applied (rate-limit), плюс stationary objects не апдейтятся вообще (delta filter). Render-thread load на bbox flow снижается 60-80%. Эффект — можно вернуть bbox_overlay=true в controller.yaml без risk TV stutter. Diagnostic-disable из 2026-05-21 теперь не нужен. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
428 lines
19 KiB
Python
428 lines
19 KiB
Python
"""Frigate MQTT bridge — auto-overlay generation для motion + object events.
|
||
|
||
Phase 4b auto-rendering: subscribed на frigate/<cam>/motion + frigate/events,
|
||
конвертирует в add/remove overlay commands к configured target instance + cell.
|
||
|
||
Logic:
|
||
- motion ON → add RectOverlay (orange border) на весь cell
|
||
- motion OFF → remove тот overlay
|
||
- event new → add RectOverlay (bbox) + TextOverlay (label + score)
|
||
- event end → remove оба overlays
|
||
|
||
Bbox coords из Frigate event = absolute pixel в camera resolution.
|
||
Если в mapping указано camera_width/height — используем для normalize; иначе assume 1920×1080.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import json
|
||
import time
|
||
from typing import TYPE_CHECKING
|
||
|
||
import structlog
|
||
from pydantic import BaseModel, Field
|
||
|
||
from .overlays import DimOverlay, RectOverlay, TextOverlay
|
||
|
||
if TYPE_CHECKING:
|
||
from .dispatch import CommandDispatcher
|
||
|
||
log = structlog.get_logger()
|
||
|
||
|
||
class FrigateCameraMapping(BaseModel):
|
||
"""Mapping Frigate camera_name → vf-cuda-grid cell index в instance."""
|
||
|
||
frigate_camera: str
|
||
target_instance: str
|
||
cell: int = Field(default=0, description="Cell index в layout куда рисовать bbox")
|
||
enabled: bool = True
|
||
camera_width: int = Field(default=1920, gt=0, description="Native camera resolution для нормализации bbox")
|
||
camera_height: int = Field(default=1080, gt=0)
|
||
motion_indicator: bool = Field(default=True, description="Подсвечивать рамкой при motion ON")
|
||
bbox_overlay: bool = Field(default=True, description="Рисовать bbox для object detection events")
|
||
priority: int = Field(default=0, description="Higher priority → выводится в main cell при auto-layout. Equal priority → first-active wins")
|
||
main_cam_index: int = Field(default=0, ge=0, le=15,
|
||
description="Index в pipeline streamselect@main_cam (соответствует порядку -i в filter_complex)")
|
||
|
||
|
||
class BorderTheme(BaseModel):
|
||
"""Стиль cell border в двух состояниях. Color = HEX RGB."""
|
||
|
||
idle_color: str = Field(default="#808080", description="Нейтральная разделительная рамка")
|
||
idle_width: int = Field(default=1, ge=1, le=16)
|
||
idle_opacity: float = Field(default=0.4, ge=0.0, le=1.0)
|
||
|
||
motion_color: str = Field(default="#FF0000", description="Цвет при frigate motion ON")
|
||
motion_width: int = Field(default=1, ge=1, le=16)
|
||
motion_opacity: float = Field(default=1.0, ge=0.0, le=1.0)
|
||
|
||
|
||
class FocusTheme(BaseModel):
|
||
"""Auto-focus: при motion на одной cell — затемнение остальных.
|
||
Если motion на 2+ cells одновременно — focus не применяется (no obvious focus)."""
|
||
|
||
enabled: bool = True
|
||
dim_color: str = "#000000"
|
||
dim_factor: float = Field(default=0.6, ge=0.0, le=1.0,
|
||
description="0.0=без затемнения, 1.0=полное черное")
|
||
|
||
|
||
class FrigateBridgeCfg(BaseModel):
|
||
enabled: bool = False
|
||
base_topic: str = "frigate"
|
||
mappings: list[FrigateCameraMapping] = []
|
||
border_theme: BorderTheme = Field(default_factory=BorderTheme,
|
||
description="Стиль cell borders — idle/motion цвета")
|
||
focus_theme: FocusTheme = Field(default_factory=FocusTheme,
|
||
description="Auto-focus: dim non-active cells когда motion на одной")
|
||
auto_layout: bool = Field(default=False,
|
||
description="Auto-switch layout based on active motion + priorities. "
|
||
"0 active → quad, 1 active → single, 2+ → mpp с highest-priority main")
|
||
auto_hysteresis_sec: float = Field(default=3.0, ge=0.0, le=60.0,
|
||
description="Debounce — apply layout change только если state стабилен N sec. "
|
||
"0 = немедленно. Защищает от short motion blips дёргающих layout")
|
||
bbox_min_interval_sec: float = Field(default=0.25, ge=0.0, le=5.0,
|
||
description="Min interval между bbox update ZMQ commands per event_id. "
|
||
"Frigate publishes 5-10 events/sec при motion → throttle reduces "
|
||
"render-thread pause (см. cuda-grid-filter-stutter memory). "
|
||
"0 = no throttle.")
|
||
bbox_delta_threshold: float = Field(default=0.02, ge=0.0, le=1.0,
|
||
description="Min relative bbox shift (fraction of camera dim) для new update. "
|
||
"Skip update если все coords changed < threshold от last. "
|
||
"0.02 = 2% → 38px на 1920 cam. 0 = no delta filter.")
|
||
|
||
|
||
class FrigateBridge:
|
||
"""Frigate MQTT subscriber → auto-overlay commands.
|
||
|
||
Состояние:
|
||
_motion_overlays: cam → overlay_id (motion indicator rect)
|
||
_event_overlays: event_id → (rect_id, text_id) для bbox + label
|
||
"""
|
||
|
||
def __init__(self, cfg: FrigateBridgeCfg, dispatcher: "CommandDispatcher | None" = None) -> None:
|
||
self.cfg = cfg
|
||
self.dispatcher = dispatcher
|
||
self._by_camera = {m.frigate_camera: m for m in cfg.mappings if m.enabled}
|
||
self._motion_overlays: dict[str, str] = {} # legacy — unused if border theme active
|
||
self._event_overlays: dict[str, tuple[str, str]] = {}
|
||
# Throttling state per event_id: (last_apply_ts, last_nx, last_ny, last_nw, last_nh)
|
||
self._event_bbox_last: dict[str, tuple[float, float, float, float, float]] = {}
|
||
self._borders_initialized: dict[str, bool] = {} # target_instance → bool
|
||
self._cell_states: dict[str, set[str]] = {} # "<inst>:<cell>" → set of active cameras с motion
|
||
self._focus_dims: dict[str, set[int]] = {} # instance → set of cells which сейчас затемнены
|
||
self._auto_state: dict[str, tuple[str, int]] = {} # instance → (current_layout, current_main_cam_index)
|
||
self._auto_pending: dict[str, asyncio.Task] = {} # instance → debounce task
|
||
# Active per camera (для auto-layout decision)
|
||
self._cam_active: dict[str, bool] = {m.frigate_camera: False for m in cfg.mappings}
|
||
|
||
def topics_to_subscribe(self) -> list[str]:
|
||
if not self.cfg.enabled:
|
||
return []
|
||
base = self.cfg.base_topic.rstrip("/")
|
||
# Frigate publishes state на `frigate/<cam>/motion/state` ("ON"/"OFF"),
|
||
# а `frigate/<cam>/motion` — это SET-topic (control). Subscribe строго к /state.
|
||
return [f"{base}/+/motion/state", f"{base}/events"]
|
||
|
||
def _cell_border_id(self, cell: int) -> str:
|
||
return f"cell_{cell}_border"
|
||
|
||
async def _ensure_borders(self, instance: str) -> None:
|
||
"""Lazy init: 4 idle borders для каждого cell в instance. Idempotent."""
|
||
if self._borders_initialized.get(instance) or not self.dispatcher:
|
||
return
|
||
cells = sorted({m.cell for m in self.cfg.mappings if m.target_instance == instance})
|
||
theme = self.cfg.border_theme
|
||
for cell in cells:
|
||
ov = RectOverlay(
|
||
id=self._cell_border_id(cell),
|
||
cell=cell,
|
||
x=0.0, y=0.0, w=1.0, h=1.0,
|
||
color=theme.idle_color,
|
||
border_only=True,
|
||
border_width=theme.idle_width,
|
||
opacity=theme.idle_opacity,
|
||
z_order=5,
|
||
)
|
||
await self.dispatcher.handle(instance, "overlay.add", ov.model_dump_json())
|
||
self._borders_initialized[instance] = True
|
||
log.info("frigate_bridge.borders_initialized", instance=instance, cells=cells)
|
||
|
||
async def _set_border_state(self, instance: str, cell: int, motion: bool) -> None:
|
||
"""Upsert cell border overlay в idle или motion стиль."""
|
||
if not self.dispatcher:
|
||
return
|
||
theme = self.cfg.border_theme
|
||
ov = RectOverlay(
|
||
id=self._cell_border_id(cell),
|
||
cell=cell,
|
||
x=0.0, y=0.0, w=1.0, h=1.0,
|
||
color=theme.motion_color if motion else theme.idle_color,
|
||
border_only=True,
|
||
border_width=theme.motion_width if motion else theme.idle_width,
|
||
opacity=theme.motion_opacity if motion else theme.idle_opacity,
|
||
z_order=5,
|
||
)
|
||
await self.dispatcher.handle(instance, "overlay.add", ov.model_dump_json())
|
||
|
||
async def handle_message(self, topic: str, payload: str) -> None:
|
||
if not self.cfg.enabled:
|
||
return
|
||
|
||
# Lazy init borders для всех target instances при первом event
|
||
for inst in {m.target_instance for m in self.cfg.mappings}:
|
||
await self._ensure_borders(inst)
|
||
|
||
base = self.cfg.base_topic.rstrip("/")
|
||
|
||
# frigate/<cam>/motion/state
|
||
if topic.startswith(f"{base}/") and topic.endswith("/motion/state"):
|
||
cam = topic[len(base) + 1 : -len("/motion/state")]
|
||
await self._handle_motion(cam, payload.strip().upper())
|
||
return
|
||
|
||
# frigate/events — JSON
|
||
if topic == f"{base}/events":
|
||
try:
|
||
event = json.loads(payload)
|
||
except json.JSONDecodeError as e:
|
||
log.warning("frigate.event_parse_fail", error=str(e))
|
||
return
|
||
await self._handle_event(event)
|
||
|
||
async def _handle_motion(self, cam: str, state: str) -> None:
|
||
mapping = self._by_camera.get(cam)
|
||
if mapping is None or not mapping.motion_indicator:
|
||
return
|
||
|
||
log.info("frigate.motion", camera=cam, state=state, target=mapping.target_instance, cell=mapping.cell)
|
||
|
||
if not self.dispatcher:
|
||
return
|
||
|
||
# Cell может содержать multiple cameras (теоретически) — border ON если
|
||
# хотя бы один cam имеет motion. Tracking через set'у per cell.
|
||
cell_key = f"{mapping.target_instance}:{mapping.cell}"
|
||
active = self._cell_states.setdefault(cell_key, set())
|
||
was_active = bool(active)
|
||
if state == "ON":
|
||
active.add(cam)
|
||
elif state == "OFF":
|
||
active.discard(cam)
|
||
is_active = bool(active)
|
||
|
||
if is_active != was_active:
|
||
await self._set_border_state(mapping.target_instance, mapping.cell, motion=is_active)
|
||
await self._update_focus(mapping.target_instance)
|
||
self._cam_active[cam] = is_active
|
||
await self._update_auto_layout(mapping.target_instance)
|
||
|
||
def _compute_target(self, instance: str) -> tuple[str, int, list[str]]:
|
||
"""Returns (layout, main_cam_index, [active cam names sorted by priority])."""
|
||
active = [
|
||
m for m in self.cfg.mappings
|
||
if m.target_instance == instance and self._cam_active.get(m.frigate_camera, False)
|
||
]
|
||
active.sort(key=lambda m: -m.priority)
|
||
|
||
if not active:
|
||
return ("quad", 0, [])
|
||
if len(active) == 1:
|
||
return ("single", active[0].main_cam_index, [m.frigate_camera for m in active])
|
||
return ("main_plus_preview", active[0].main_cam_index, [m.frigate_camera for m in active])
|
||
|
||
async def _update_auto_layout(self, instance: str) -> None:
|
||
"""Schedule debounced auto-layout update. Cancel any pending."""
|
||
if not self.cfg.auto_layout or not self.dispatcher:
|
||
return
|
||
|
||
hyst = self.cfg.auto_hysteresis_sec
|
||
|
||
# Cancel pending — мы получили новый state event, timer reset'ится
|
||
pending = self._auto_pending.pop(instance, None)
|
||
if pending and not pending.done():
|
||
pending.cancel()
|
||
|
||
if hyst <= 0:
|
||
await self._apply_auto_layout(instance)
|
||
return
|
||
|
||
# Schedule debounced apply
|
||
self._auto_pending[instance] = asyncio.create_task(
|
||
self._debounced_apply(instance, hyst)
|
||
)
|
||
|
||
async def _debounced_apply(self, instance: str, delay: float) -> None:
|
||
try:
|
||
await asyncio.sleep(delay)
|
||
await self._apply_auto_layout(instance)
|
||
except asyncio.CancelledError:
|
||
log.debug("auto_layout.debounce_cancel", instance=instance)
|
||
raise
|
||
|
||
async def _apply_auto_layout(self, instance: str) -> None:
|
||
target_layout, target_main_cam, active_names = self._compute_target(instance)
|
||
prev = self._auto_state.get(instance, (None, None))
|
||
if prev == (target_layout, target_main_cam):
|
||
return
|
||
log.info("auto_layout.change", instance=instance,
|
||
from_state=prev, to_layout=target_layout, to_main=target_main_cam,
|
||
active=active_names)
|
||
await self.dispatcher.set_main_cam(instance, target_main_cam)
|
||
await self.dispatcher.handle(instance, "layout.set", target_layout)
|
||
self._auto_state[instance] = (target_layout, target_main_cam)
|
||
|
||
def _cell_dim_id(self, cell: int) -> str:
|
||
return f"cell_{cell}_focus_dim"
|
||
|
||
async def _update_focus(self, instance: str) -> None:
|
||
"""Auto-focus logic:
|
||
0 active cells → no dim (remove all focus dims)
|
||
1 active cell → dim non-focus cells (focus mode ON)
|
||
2+ active cells → no dim (too many — no obvious single focus)
|
||
"""
|
||
if not self.dispatcher or not self.cfg.focus_theme.enabled:
|
||
return
|
||
|
||
# Active cells для этого instance
|
||
active_cells = {
|
||
int(key.split(":", 1)[1])
|
||
for key, cams in self._cell_states.items()
|
||
if cams and key.startswith(f"{instance}:")
|
||
}
|
||
all_cells = sorted({m.cell for m in self.cfg.mappings if m.target_instance == instance})
|
||
currently_dimmed = self._focus_dims.setdefault(instance, set())
|
||
|
||
target_dimmed: set[int]
|
||
if len(active_cells) == 1:
|
||
focus_cell = next(iter(active_cells))
|
||
target_dimmed = {c for c in all_cells if c != focus_cell}
|
||
else:
|
||
target_dimmed = set()
|
||
|
||
to_add = target_dimmed - currently_dimmed
|
||
to_remove = currently_dimmed - target_dimmed
|
||
|
||
theme = self.cfg.focus_theme
|
||
for cell in to_add:
|
||
ov = DimOverlay(
|
||
id=self._cell_dim_id(cell),
|
||
cell=cell,
|
||
x=0.0, y=0.0, w=1.0, h=1.0,
|
||
color=theme.dim_color,
|
||
dim_factor=theme.dim_factor,
|
||
z_order=1, # под bbox (20) и border (5) но над cell content
|
||
)
|
||
await self.dispatcher.handle(instance, "overlay.add", ov.model_dump_json())
|
||
currently_dimmed.add(cell)
|
||
for cell in to_remove:
|
||
await self.dispatcher.handle(instance, "overlay.remove", self._cell_dim_id(cell))
|
||
currently_dimmed.discard(cell)
|
||
|
||
if to_add or to_remove:
|
||
log.info("focus.updated", instance=instance, active=sorted(active_cells), dimmed=sorted(target_dimmed))
|
||
|
||
async def _handle_event(self, event: dict) -> None:
|
||
event_type = event.get("type", "?")
|
||
after = event.get("after") or {}
|
||
before = event.get("before") or {}
|
||
cam = after.get("camera") or before.get("camera")
|
||
if not cam:
|
||
return
|
||
|
||
mapping = self._by_camera.get(cam)
|
||
if mapping is None or not mapping.bbox_overlay:
|
||
return
|
||
if not self.dispatcher:
|
||
return
|
||
|
||
event_id = after.get("id") or before.get("id")
|
||
if not event_id:
|
||
return
|
||
label = after.get("label", "?")
|
||
score = after.get("score") or after.get("top_score") or 0.0
|
||
|
||
log.info(
|
||
"frigate.event",
|
||
camera=cam, type=event_type, label=label, score=round(score, 2),
|
||
target=mapping.target_instance, cell=mapping.cell, event_id=event_id,
|
||
)
|
||
|
||
if event_type in ("new", "update"):
|
||
await self._upsert_event_overlay(mapping, event_id, after, label, score)
|
||
elif event_type == "end":
|
||
await self._remove_event_overlay(mapping, event_id)
|
||
|
||
async def _upsert_event_overlay(self, mapping, event_id: str, after: dict, label: str, score: float) -> None:
|
||
box = after.get("box") # [x1, y1, x2, y2] в абс пикселях камеры
|
||
if not box or len(box) != 4:
|
||
return
|
||
|
||
x1, y1, x2, y2 = box
|
||
nx = x1 / mapping.camera_width
|
||
ny = y1 / mapping.camera_height
|
||
nw = (x2 - x1) / mapping.camera_width
|
||
nh = (y2 - y1) / mapping.camera_height
|
||
# Clamp на [0, 1]
|
||
nx = max(0.0, min(0.99, nx))
|
||
ny = max(0.0, min(0.99, ny))
|
||
nw = max(0.01, min(1.0 - nx, nw))
|
||
nh = max(0.01, min(1.0 - ny, nh))
|
||
|
||
# Throttle: skip update if too frequent OR bbox barely moved.
|
||
# First emission всегда applies (regardless of throttle).
|
||
now = time.monotonic()
|
||
prev = self._event_bbox_last.get(event_id)
|
||
if prev is not None:
|
||
prev_ts, px, py, pw, ph = prev
|
||
if self.cfg.bbox_min_interval_sec > 0 and \
|
||
(now - prev_ts) < self.cfg.bbox_min_interval_sec:
|
||
return # rate-limit
|
||
if self.cfg.bbox_delta_threshold > 0:
|
||
dt = self.cfg.bbox_delta_threshold
|
||
if abs(nx - px) < dt and abs(ny - py) < dt and \
|
||
abs(nw - pw) < dt and abs(nh - ph) < dt:
|
||
return # delta filter — barely changed
|
||
self._event_bbox_last[event_id] = (now, nx, ny, nw, nh)
|
||
|
||
# Short id'ы — кутаем event_id чтобы влезть в 32 chars
|
||
eid_short = event_id.replace("-", "")[:8]
|
||
rect_id = f"e{eid_short}r"
|
||
text_id = f"e{eid_short}t"
|
||
|
||
rect = RectOverlay(
|
||
id=rect_id,
|
||
cell=mapping.cell,
|
||
x=nx, y=ny, w=nw, h=nh,
|
||
color="#00FF00",
|
||
border_only=True,
|
||
border_width=3,
|
||
z_order=20,
|
||
opacity=1.0,
|
||
)
|
||
text = TextOverlay(
|
||
id=text_id,
|
||
cell=mapping.cell,
|
||
x=nx, y=max(0.0, ny - 0.04), # над bbox
|
||
text=f"{label} {int(score * 100)}%",
|
||
font_size=20,
|
||
color="#00FF00",
|
||
z_order=21,
|
||
opacity=1.0,
|
||
)
|
||
|
||
self._event_overlays[event_id] = (rect_id, text_id)
|
||
await self.dispatcher.handle(mapping.target_instance, "overlay.add", rect.model_dump_json())
|
||
await self.dispatcher.handle(mapping.target_instance, "overlay.add", text.model_dump_json())
|
||
|
||
async def _remove_event_overlay(self, mapping, event_id: str) -> None:
|
||
self._event_bbox_last.pop(event_id, None) # cleanup throttle state
|
||
stored = self._event_overlays.pop(event_id, None)
|
||
if not stored:
|
||
return
|
||
rect_id, text_id = stored
|
||
await self.dispatcher.handle(mapping.target_instance, "overlay.remove", rect_id)
|
||
await self.dispatcher.handle(mapping.target_instance, "overlay.remove", text_id)
|