Files
vf-cuda-grid/controller/cuda_grid_controller/frigate_bridge.py
T
gx a7b1d9b1d9 controller: auto-layout selector (motion + priority)
FrigateCameraMapping +priority +main_cam_index — для auto-layout decision.
FrigateBridgeCfg.auto_layout flag — toggle через REST.

Logic (FrigateBridge._update_auto_layout):
  0 active cameras → quad (default overview)
  1+ active → single, main_cam = highest priority active
  Equal priority → first active wins (deterministic)

Dispatcher.set_main_cam — ZMQ streamselect@main_cam map <index>
Config.main_cam_filter_target = "streamselect@main_cam"

REST:
  GET  /auto-layout/{instance}     — current toggle state
  POST /auto-layout/{instance}     — { enabled: bool }
                                      при включении сразу применяет

UI:
  + checkbox "auto" в Layout card — toggleAuto() hits POST /auto-layout

Live verified: enable → immediately picks layout=single, main=gate_lpr
(priority=10, highest active). Visual confirms gate_lpr full screen.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-21 06:31:27 +01:00

369 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Frigate MQTT bridge — auto-overlay generation для motion + object events.
Phase 4b auto-rendering: subscribed на frigate/<cam>/motion + frigate/events,
конвертирует в add/remove overlay commands к configured target instance + cell.
Logic:
- motion ON → add RectOverlay (orange border) на весь cell
- motion OFF → remove тот overlay
- event new → add RectOverlay (bbox) + TextOverlay (label + score)
- event end → remove оба overlays
Bbox coords из Frigate event = absolute pixel в camera resolution.
Если в mapping указано camera_width/height — используем для normalize; иначе assume 1920×1080.
"""
from __future__ import annotations
import json
from typing import TYPE_CHECKING
import structlog
from pydantic import BaseModel, Field
from .overlays import DimOverlay, RectOverlay, TextOverlay
if TYPE_CHECKING:
from .dispatch import CommandDispatcher
log = structlog.get_logger()
class FrigateCameraMapping(BaseModel):
"""Mapping Frigate camera_name → vf-cuda-grid cell index в instance."""
frigate_camera: str
target_instance: str
cell: int = Field(default=0, description="Cell index в layout куда рисовать bbox")
enabled: bool = True
camera_width: int = Field(default=1920, gt=0, description="Native camera resolution для нормализации bbox")
camera_height: int = Field(default=1080, gt=0)
motion_indicator: bool = Field(default=True, description="Подсвечивать рамкой при motion ON")
bbox_overlay: bool = Field(default=True, description="Рисовать bbox для object detection events")
priority: int = Field(default=0, description="Higher priority → выводится в main cell при auto-layout. Equal priority → first-active wins")
main_cam_index: int = Field(default=0, ge=0, le=15,
description="Index в pipeline streamselect@main_cam (соответствует порядку -i в filter_complex)")
class BorderTheme(BaseModel):
"""Стиль cell border в двух состояниях. Color = HEX RGB."""
idle_color: str = Field(default="#808080", description="Нейтральная разделительная рамка")
idle_width: int = Field(default=1, ge=1, le=16)
idle_opacity: float = Field(default=0.4, ge=0.0, le=1.0)
motion_color: str = Field(default="#FF0000", description="Цвет при frigate motion ON")
motion_width: int = Field(default=1, ge=1, le=16)
motion_opacity: float = Field(default=1.0, ge=0.0, le=1.0)
class FocusTheme(BaseModel):
"""Auto-focus: при motion на одной cell — затемнение остальных.
Если motion на 2+ cells одновременно — focus не применяется (no obvious focus)."""
enabled: bool = True
dim_color: str = "#000000"
dim_factor: float = Field(default=0.6, ge=0.0, le=1.0,
description="0.0=без затемнения, 1.0=полное черное")
class FrigateBridgeCfg(BaseModel):
enabled: bool = False
base_topic: str = "frigate"
mappings: list[FrigateCameraMapping] = []
border_theme: BorderTheme = Field(default_factory=BorderTheme,
description="Стиль cell borders — idle/motion цвета")
focus_theme: FocusTheme = Field(default_factory=FocusTheme,
description="Auto-focus: dim non-active cells когда motion на одной")
auto_layout: bool = Field(default=False,
description="Auto-switch layout based on active motion + priorities. "
"0 active → quad, 1+ active → single c main=highest priority active")
class FrigateBridge:
"""Frigate MQTT subscriber → auto-overlay commands.
Состояние:
_motion_overlays: cam → overlay_id (motion indicator rect)
_event_overlays: event_id → (rect_id, text_id) для bbox + label
"""
def __init__(self, cfg: FrigateBridgeCfg, dispatcher: "CommandDispatcher | None" = None) -> None:
self.cfg = cfg
self.dispatcher = dispatcher
self._by_camera = {m.frigate_camera: m for m in cfg.mappings if m.enabled}
self._motion_overlays: dict[str, str] = {} # legacy — unused if border theme active
self._event_overlays: dict[str, tuple[str, str]] = {}
self._borders_initialized: dict[str, bool] = {} # target_instance → bool
self._cell_states: dict[str, set[str]] = {} # "<inst>:<cell>" → set of active cameras с motion
self._focus_dims: dict[str, set[int]] = {} # instance → set of cells which сейчас затемнены
self._auto_state: dict[str, tuple[str, int]] = {} # instance → (current_layout, current_main_cam_index)
# Active per camera (для auto-layout decision)
self._cam_active: dict[str, bool] = {m.frigate_camera: False for m in cfg.mappings}
def topics_to_subscribe(self) -> list[str]:
if not self.cfg.enabled:
return []
base = self.cfg.base_topic.rstrip("/")
# Frigate publishes state на `frigate/<cam>/motion/state` ("ON"/"OFF"),
# а `frigate/<cam>/motion` — это SET-topic (control). Subscribe строго к /state.
return [f"{base}/+/motion/state", f"{base}/events"]
def _cell_border_id(self, cell: int) -> str:
return f"cell_{cell}_border"
async def _ensure_borders(self, instance: str) -> None:
"""Lazy init: 4 idle borders для каждого cell в instance. Idempotent."""
if self._borders_initialized.get(instance) or not self.dispatcher:
return
cells = sorted({m.cell for m in self.cfg.mappings if m.target_instance == instance})
theme = self.cfg.border_theme
for cell in cells:
ov = RectOverlay(
id=self._cell_border_id(cell),
cell=cell,
x=0.0, y=0.0, w=1.0, h=1.0,
color=theme.idle_color,
border_only=True,
border_width=theme.idle_width,
opacity=theme.idle_opacity,
z_order=5,
)
await self.dispatcher.handle(instance, "overlay.add", ov.model_dump_json())
self._borders_initialized[instance] = True
log.info("frigate_bridge.borders_initialized", instance=instance, cells=cells)
async def _set_border_state(self, instance: str, cell: int, motion: bool) -> None:
"""Upsert cell border overlay в idle или motion стиль."""
if not self.dispatcher:
return
theme = self.cfg.border_theme
ov = RectOverlay(
id=self._cell_border_id(cell),
cell=cell,
x=0.0, y=0.0, w=1.0, h=1.0,
color=theme.motion_color if motion else theme.idle_color,
border_only=True,
border_width=theme.motion_width if motion else theme.idle_width,
opacity=theme.motion_opacity if motion else theme.idle_opacity,
z_order=5,
)
await self.dispatcher.handle(instance, "overlay.add", ov.model_dump_json())
async def handle_message(self, topic: str, payload: str) -> None:
if not self.cfg.enabled:
return
# Lazy init borders для всех target instances при первом event
for inst in {m.target_instance for m in self.cfg.mappings}:
await self._ensure_borders(inst)
base = self.cfg.base_topic.rstrip("/")
# frigate/<cam>/motion/state
if topic.startswith(f"{base}/") and topic.endswith("/motion/state"):
cam = topic[len(base) + 1 : -len("/motion/state")]
await self._handle_motion(cam, payload.strip().upper())
return
# frigate/events — JSON
if topic == f"{base}/events":
try:
event = json.loads(payload)
except json.JSONDecodeError as e:
log.warning("frigate.event_parse_fail", error=str(e))
return
await self._handle_event(event)
async def _handle_motion(self, cam: str, state: str) -> None:
mapping = self._by_camera.get(cam)
if mapping is None or not mapping.motion_indicator:
return
log.info("frigate.motion", camera=cam, state=state, target=mapping.target_instance, cell=mapping.cell)
if not self.dispatcher:
return
# Cell может содержать multiple cameras (теоретически) — border ON если
# хотя бы один cam имеет motion. Tracking через set'у per cell.
cell_key = f"{mapping.target_instance}:{mapping.cell}"
active = self._cell_states.setdefault(cell_key, set())
was_active = bool(active)
if state == "ON":
active.add(cam)
elif state == "OFF":
active.discard(cam)
is_active = bool(active)
if is_active != was_active:
await self._set_border_state(mapping.target_instance, mapping.cell, motion=is_active)
await self._update_focus(mapping.target_instance)
self._cam_active[cam] = is_active
await self._update_auto_layout(mapping.target_instance)
async def _update_auto_layout(self, instance: str) -> None:
"""Auto-select layout + main_cam based на active cameras + priority."""
if not self.cfg.auto_layout or not self.dispatcher:
return
# Active cameras для этого instance, отсортированы по priority desc
active = [
m for m in self.cfg.mappings
if m.target_instance == instance and self._cam_active.get(m.frigate_camera, False)
]
active.sort(key=lambda m: -m.priority)
if not active:
# Idle — quad
target_layout = "quad"
target_main_cam = 0
else:
target_layout = "single"
target_main_cam = active[0].main_cam_index
prev = self._auto_state.get(instance, (None, None))
if prev == (target_layout, target_main_cam):
return
log.info("auto_layout.change", instance=instance,
from_state=prev, to_layout=target_layout, to_main=target_main_cam,
active=[m.frigate_camera for m in active])
# Set main_cam first (so single layout shows correct cam at switch moment)
await self.dispatcher.set_main_cam(instance, target_main_cam)
await self.dispatcher.handle(instance, "layout.set", target_layout)
self._auto_state[instance] = (target_layout, target_main_cam)
def _cell_dim_id(self, cell: int) -> str:
return f"cell_{cell}_focus_dim"
async def _update_focus(self, instance: str) -> None:
"""Auto-focus logic:
0 active cells → no dim (remove all focus dims)
1 active cell → dim non-focus cells (focus mode ON)
2+ active cells → no dim (too many — no obvious single focus)
"""
if not self.dispatcher or not self.cfg.focus_theme.enabled:
return
# Active cells для этого instance
active_cells = {
int(key.split(":", 1)[1])
for key, cams in self._cell_states.items()
if cams and key.startswith(f"{instance}:")
}
all_cells = sorted({m.cell for m in self.cfg.mappings if m.target_instance == instance})
currently_dimmed = self._focus_dims.setdefault(instance, set())
target_dimmed: set[int]
if len(active_cells) == 1:
focus_cell = next(iter(active_cells))
target_dimmed = {c for c in all_cells if c != focus_cell}
else:
target_dimmed = set()
to_add = target_dimmed - currently_dimmed
to_remove = currently_dimmed - target_dimmed
theme = self.cfg.focus_theme
for cell in to_add:
ov = DimOverlay(
id=self._cell_dim_id(cell),
cell=cell,
x=0.0, y=0.0, w=1.0, h=1.0,
color=theme.dim_color,
dim_factor=theme.dim_factor,
z_order=1, # под bbox (20) и border (5) но над cell content
)
await self.dispatcher.handle(instance, "overlay.add", ov.model_dump_json())
currently_dimmed.add(cell)
for cell in to_remove:
await self.dispatcher.handle(instance, "overlay.remove", self._cell_dim_id(cell))
currently_dimmed.discard(cell)
if to_add or to_remove:
log.info("focus.updated", instance=instance, active=sorted(active_cells), dimmed=sorted(target_dimmed))
async def _handle_event(self, event: dict) -> None:
event_type = event.get("type", "?")
after = event.get("after") or {}
before = event.get("before") or {}
cam = after.get("camera") or before.get("camera")
if not cam:
return
mapping = self._by_camera.get(cam)
if mapping is None or not mapping.bbox_overlay:
return
if not self.dispatcher:
return
event_id = after.get("id") or before.get("id")
if not event_id:
return
label = after.get("label", "?")
score = after.get("score") or after.get("top_score") or 0.0
log.info(
"frigate.event",
camera=cam, type=event_type, label=label, score=round(score, 2),
target=mapping.target_instance, cell=mapping.cell, event_id=event_id,
)
if event_type in ("new", "update"):
await self._upsert_event_overlay(mapping, event_id, after, label, score)
elif event_type == "end":
await self._remove_event_overlay(mapping, event_id)
async def _upsert_event_overlay(self, mapping, event_id: str, after: dict, label: str, score: float) -> None:
box = after.get("box") # [x1, y1, x2, y2] в абс пикселях камеры
if not box or len(box) != 4:
return
x1, y1, x2, y2 = box
nx = x1 / mapping.camera_width
ny = y1 / mapping.camera_height
nw = (x2 - x1) / mapping.camera_width
nh = (y2 - y1) / mapping.camera_height
# Clamp на [0, 1]
nx = max(0.0, min(0.99, nx))
ny = max(0.0, min(0.99, ny))
nw = max(0.01, min(1.0 - nx, nw))
nh = max(0.01, min(1.0 - ny, nh))
# Short id'ы — кутаем event_id чтобы влезть в 32 chars
eid_short = event_id.replace("-", "")[:8]
rect_id = f"e{eid_short}r"
text_id = f"e{eid_short}t"
rect = RectOverlay(
id=rect_id,
cell=mapping.cell,
x=nx, y=ny, w=nw, h=nh,
color="#00FF00",
border_only=True,
border_width=3,
z_order=20,
opacity=1.0,
)
text = TextOverlay(
id=text_id,
cell=mapping.cell,
x=nx, y=max(0.0, ny - 0.04), # над bbox
text=f"{label} {int(score * 100)}%",
font_size=20,
color="#00FF00",
z_order=21,
opacity=1.0,
)
self._event_overlays[event_id] = (rect_id, text_id)
await self.dispatcher.handle(mapping.target_instance, "overlay.add", rect.model_dump_json())
await self.dispatcher.handle(mapping.target_instance, "overlay.add", text.model_dump_json())
async def _remove_event_overlay(self, mapping, event_id: str) -> None:
stored = self._event_overlays.pop(event_id, None)
if not stored:
return
rect_id, text_id = stored
await self.dispatcher.handle(mapping.target_instance, "overlay.remove", rect_id)
await self.dispatcher.handle(mapping.target_instance, "overlay.remove", text_id)