Files
vf-cuda-grid/controller/cuda_grid_controller/frigate_bridge.py
T
gx 26e9f30990 controller: FrigateBridge cell borders с idle/motion state machine
Permanent 1-2px рамка вокруг каждой cell для visual разделения.
При motion ON → рамка светится красным (config'и BorderTheme).

Logic:
  _ensure_borders()  Lazy init 4 borders (id="cell_N_border") при первом event
                     с idle стилем (#808080 width=2 opacity=0.4)
  _set_border_state(motion=bool) Upsert тот же overlay с motion (#FF0000 width=4 opacity=1.0)
                                  или idle styling
  _cell_states       set'у активных cams per cell ("inst:cell" → set(cam_names))
                     — border ON если хоть один cam имеет motion, OFF только когда
                     все cams cleared

BorderTheme:
  idle_color/width/opacity     subtle разделитель
  motion_color/width/opacity   alarm подсветка
  configurable через cfg.frigate.border_theme

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-20 20:43:34 +01:00

262 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Frigate MQTT bridge — auto-overlay generation для motion + object events.
Phase 4b auto-rendering: subscribed на frigate/<cam>/motion + frigate/events,
конвертирует в add/remove overlay commands к configured target instance + cell.
Logic:
- motion ON → add RectOverlay (orange border) на весь cell
- motion OFF → remove тот overlay
- event new → add RectOverlay (bbox) + TextOverlay (label + score)
- event end → remove оба overlays
Bbox coords из Frigate event = absolute pixel в camera resolution.
Если в mapping указано camera_width/height — используем для normalize; иначе assume 1920×1080.
"""
from __future__ import annotations
import json
from typing import TYPE_CHECKING
import structlog
from pydantic import BaseModel, Field
from .overlays import RectOverlay, TextOverlay
if TYPE_CHECKING:
from .dispatch import CommandDispatcher
log = structlog.get_logger()
class FrigateCameraMapping(BaseModel):
"""Mapping Frigate camera_name → vf-cuda-grid cell index в instance."""
frigate_camera: str
target_instance: str
cell: int = Field(default=0, description="Cell index в layout куда рисовать bbox")
enabled: bool = True
camera_width: int = Field(default=1920, gt=0, description="Native camera resolution для нормализации bbox")
camera_height: int = Field(default=1080, gt=0)
motion_indicator: bool = Field(default=True, description="Подсвечивать рамкой при motion ON")
bbox_overlay: bool = Field(default=True, description="Рисовать bbox для object detection events")
class BorderTheme(BaseModel):
"""Стиль cell border в трёх состояниях. Color = HEX RGB."""
idle_color: str = Field(default="#808080", description="Нейтральная разделительная рамка")
idle_width: int = Field(default=2, ge=1, le=8)
idle_opacity: float = Field(default=0.4, ge=0.0, le=1.0)
motion_color: str = Field(default="#FF0000", description="Цвет при frigate motion ON")
motion_width: int = Field(default=4, ge=1, le=16)
motion_opacity: float = Field(default=1.0, ge=0.0, le=1.0)
class FrigateBridgeCfg(BaseModel):
enabled: bool = False
base_topic: str = "frigate"
mappings: list[FrigateCameraMapping] = []
border_theme: BorderTheme = Field(default_factory=BorderTheme,
description="Стиль cell borders — idle/motion цвета")
class FrigateBridge:
"""Frigate MQTT subscriber → auto-overlay commands.
Состояние:
_motion_overlays: cam → overlay_id (motion indicator rect)
_event_overlays: event_id → (rect_id, text_id) для bbox + label
"""
def __init__(self, cfg: FrigateBridgeCfg, dispatcher: "CommandDispatcher | None" = None) -> None:
self.cfg = cfg
self.dispatcher = dispatcher
self._by_camera = {m.frigate_camera: m for m in cfg.mappings if m.enabled}
self._motion_overlays: dict[str, str] = {} # legacy — unused if border theme active
self._event_overlays: dict[str, tuple[str, str]] = {}
self._borders_initialized: dict[str, bool] = {} # target_instance → bool
self._cell_states: dict[str, set[str]] = {} # "<inst>:<cell>" → set of active cameras с motion
def topics_to_subscribe(self) -> list[str]:
if not self.cfg.enabled:
return []
base = self.cfg.base_topic.rstrip("/")
return [f"{base}/+/motion", f"{base}/events"]
def _cell_border_id(self, cell: int) -> str:
return f"cell_{cell}_border"
async def _ensure_borders(self, instance: str) -> None:
"""Lazy init: 4 idle borders для каждого cell в instance. Idempotent."""
if self._borders_initialized.get(instance) or not self.dispatcher:
return
cells = sorted({m.cell for m in self.cfg.mappings if m.target_instance == instance})
theme = self.cfg.border_theme
for cell in cells:
ov = RectOverlay(
id=self._cell_border_id(cell),
cell=cell,
x=0.0, y=0.0, w=1.0, h=1.0,
color=theme.idle_color,
border_only=True,
border_width=theme.idle_width,
opacity=theme.idle_opacity,
z_order=5,
)
await self.dispatcher.handle(instance, "overlay.add", ov.model_dump_json())
self._borders_initialized[instance] = True
log.info("frigate_bridge.borders_initialized", instance=instance, cells=cells)
async def _set_border_state(self, instance: str, cell: int, motion: bool) -> None:
"""Upsert cell border overlay в idle или motion стиль."""
if not self.dispatcher:
return
theme = self.cfg.border_theme
ov = RectOverlay(
id=self._cell_border_id(cell),
cell=cell,
x=0.0, y=0.0, w=1.0, h=1.0,
color=theme.motion_color if motion else theme.idle_color,
border_only=True,
border_width=theme.motion_width if motion else theme.idle_width,
opacity=theme.motion_opacity if motion else theme.idle_opacity,
z_order=5,
)
await self.dispatcher.handle(instance, "overlay.add", ov.model_dump_json())
async def handle_message(self, topic: str, payload: str) -> None:
if not self.cfg.enabled:
return
# Lazy init borders для всех target instances при первом event
for inst in {m.target_instance for m in self.cfg.mappings}:
await self._ensure_borders(inst)
base = self.cfg.base_topic.rstrip("/")
# frigate/<cam>/motion
if topic.startswith(f"{base}/") and topic.endswith("/motion"):
cam = topic[len(base) + 1 : -len("/motion")]
await self._handle_motion(cam, payload.strip().upper())
return
# frigate/events — JSON
if topic == f"{base}/events":
try:
event = json.loads(payload)
except json.JSONDecodeError as e:
log.warning("frigate.event_parse_fail", error=str(e))
return
await self._handle_event(event)
async def _handle_motion(self, cam: str, state: str) -> None:
mapping = self._by_camera.get(cam)
if mapping is None or not mapping.motion_indicator:
return
log.info("frigate.motion", camera=cam, state=state, target=mapping.target_instance, cell=mapping.cell)
if not self.dispatcher:
return
# Cell может содержать multiple cameras (теоретически) — border ON если
# хотя бы один cam имеет motion. Tracking через set'у per cell.
cell_key = f"{mapping.target_instance}:{mapping.cell}"
active = self._cell_states.setdefault(cell_key, set())
was_active = bool(active)
if state == "ON":
active.add(cam)
elif state == "OFF":
active.discard(cam)
is_active = bool(active)
if is_active != was_active:
await self._set_border_state(mapping.target_instance, mapping.cell, motion=is_active)
async def _handle_event(self, event: dict) -> None:
event_type = event.get("type", "?")
after = event.get("after") or {}
before = event.get("before") or {}
cam = after.get("camera") or before.get("camera")
if not cam:
return
mapping = self._by_camera.get(cam)
if mapping is None or not mapping.bbox_overlay:
return
if not self.dispatcher:
return
event_id = after.get("id") or before.get("id")
if not event_id:
return
label = after.get("label", "?")
score = after.get("score") or after.get("top_score") or 0.0
log.info(
"frigate.event",
camera=cam, type=event_type, label=label, score=round(score, 2),
target=mapping.target_instance, cell=mapping.cell, event_id=event_id,
)
if event_type in ("new", "update"):
await self._upsert_event_overlay(mapping, event_id, after, label, score)
elif event_type == "end":
await self._remove_event_overlay(mapping, event_id)
async def _upsert_event_overlay(self, mapping, event_id: str, after: dict, label: str, score: float) -> None:
box = after.get("box") # [x1, y1, x2, y2] в абс пикселях камеры
if not box or len(box) != 4:
return
x1, y1, x2, y2 = box
nx = x1 / mapping.camera_width
ny = y1 / mapping.camera_height
nw = (x2 - x1) / mapping.camera_width
nh = (y2 - y1) / mapping.camera_height
# Clamp на [0, 1]
nx = max(0.0, min(0.99, nx))
ny = max(0.0, min(0.99, ny))
nw = max(0.01, min(1.0 - nx, nw))
nh = max(0.01, min(1.0 - ny, nh))
# Short id'ы — кутаем event_id чтобы влезть в 32 chars
eid_short = event_id.replace("-", "")[:8]
rect_id = f"e{eid_short}r"
text_id = f"e{eid_short}t"
rect = RectOverlay(
id=rect_id,
cell=mapping.cell,
x=nx, y=ny, w=nw, h=nh,
color="#00FF00",
border_only=True,
border_width=3,
z_order=20,
opacity=1.0,
)
text = TextOverlay(
id=text_id,
cell=mapping.cell,
x=nx, y=max(0.0, ny - 0.04), # над bbox
text=f"{label} {int(score * 100)}%",
font_size=20,
color="#00FF00",
z_order=21,
opacity=1.0,
)
self._event_overlays[event_id] = (rect_id, text_id)
await self.dispatcher.handle(mapping.target_instance, "overlay.add", rect.model_dump_json())
await self.dispatcher.handle(mapping.target_instance, "overlay.add", text.model_dump_json())
async def _remove_event_overlay(self, mapping, event_id: str) -> None:
stored = self._event_overlays.pop(event_id, None)
if not stored:
return
rect_id, text_id = stored
await self.dispatcher.handle(mapping.target_instance, "overlay.remove", rect_id)
await self.dispatcher.handle(mapping.target_instance, "overlay.remove", text_id)