a576b4ec51
User feedback: 4px motion border слишком жирная, к тому же много false motion от Frigate (зоны/чувствительность будут tune'иться позже). Уменьшаем default до 1px чтобы borders не мешали visually. Width конфигурируется (1..16). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
264 lines
10 KiB
Python
264 lines
10 KiB
Python
"""Frigate MQTT bridge — auto-overlay generation для motion + object events.
|
||
|
||
Phase 4b auto-rendering: subscribed на frigate/<cam>/motion + frigate/events,
|
||
конвертирует в add/remove overlay commands к configured target instance + cell.
|
||
|
||
Logic:
|
||
- motion ON → add RectOverlay (orange border) на весь cell
|
||
- motion OFF → remove тот overlay
|
||
- event new → add RectOverlay (bbox) + TextOverlay (label + score)
|
||
- event end → remove оба overlays
|
||
|
||
Bbox coords из Frigate event = absolute pixel в camera resolution.
|
||
Если в mapping указано camera_width/height — используем для normalize; иначе assume 1920×1080.
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
from typing import TYPE_CHECKING
|
||
|
||
import structlog
|
||
from pydantic import BaseModel, Field
|
||
|
||
from .overlays import RectOverlay, TextOverlay
|
||
|
||
if TYPE_CHECKING:
|
||
from .dispatch import CommandDispatcher
|
||
|
||
log = structlog.get_logger()
|
||
|
||
|
||
class FrigateCameraMapping(BaseModel):
|
||
"""Mapping Frigate camera_name → vf-cuda-grid cell index в instance."""
|
||
|
||
frigate_camera: str
|
||
target_instance: str
|
||
cell: int = Field(default=0, description="Cell index в layout куда рисовать bbox")
|
||
enabled: bool = True
|
||
camera_width: int = Field(default=1920, gt=0, description="Native camera resolution для нормализации bbox")
|
||
camera_height: int = Field(default=1080, gt=0)
|
||
motion_indicator: bool = Field(default=True, description="Подсвечивать рамкой при motion ON")
|
||
bbox_overlay: bool = Field(default=True, description="Рисовать bbox для object detection events")
|
||
|
||
|
||
class BorderTheme(BaseModel):
|
||
"""Стиль cell border в двух состояниях. Color = HEX RGB."""
|
||
|
||
idle_color: str = Field(default="#808080", description="Нейтральная разделительная рамка")
|
||
idle_width: int = Field(default=1, ge=1, le=16)
|
||
idle_opacity: float = Field(default=0.4, ge=0.0, le=1.0)
|
||
|
||
motion_color: str = Field(default="#FF0000", description="Цвет при frigate motion ON")
|
||
motion_width: int = Field(default=1, ge=1, le=16)
|
||
motion_opacity: float = Field(default=1.0, ge=0.0, le=1.0)
|
||
|
||
|
||
class FrigateBridgeCfg(BaseModel):
|
||
enabled: bool = False
|
||
base_topic: str = "frigate"
|
||
mappings: list[FrigateCameraMapping] = []
|
||
border_theme: BorderTheme = Field(default_factory=BorderTheme,
|
||
description="Стиль cell borders — idle/motion цвета")
|
||
|
||
|
||
class FrigateBridge:
|
||
"""Frigate MQTT subscriber → auto-overlay commands.
|
||
|
||
Состояние:
|
||
_motion_overlays: cam → overlay_id (motion indicator rect)
|
||
_event_overlays: event_id → (rect_id, text_id) для bbox + label
|
||
"""
|
||
|
||
def __init__(self, cfg: FrigateBridgeCfg, dispatcher: "CommandDispatcher | None" = None) -> None:
|
||
self.cfg = cfg
|
||
self.dispatcher = dispatcher
|
||
self._by_camera = {m.frigate_camera: m for m in cfg.mappings if m.enabled}
|
||
self._motion_overlays: dict[str, str] = {} # legacy — unused if border theme active
|
||
self._event_overlays: dict[str, tuple[str, str]] = {}
|
||
self._borders_initialized: dict[str, bool] = {} # target_instance → bool
|
||
self._cell_states: dict[str, set[str]] = {} # "<inst>:<cell>" → set of active cameras с motion
|
||
|
||
def topics_to_subscribe(self) -> list[str]:
|
||
if not self.cfg.enabled:
|
||
return []
|
||
base = self.cfg.base_topic.rstrip("/")
|
||
# Frigate publishes state на `frigate/<cam>/motion/state` ("ON"/"OFF"),
|
||
# а `frigate/<cam>/motion` — это SET-topic (control). Subscribe строго к /state.
|
||
return [f"{base}/+/motion/state", f"{base}/events"]
|
||
|
||
def _cell_border_id(self, cell: int) -> str:
|
||
return f"cell_{cell}_border"
|
||
|
||
async def _ensure_borders(self, instance: str) -> None:
|
||
"""Lazy init: 4 idle borders для каждого cell в instance. Idempotent."""
|
||
if self._borders_initialized.get(instance) or not self.dispatcher:
|
||
return
|
||
cells = sorted({m.cell for m in self.cfg.mappings if m.target_instance == instance})
|
||
theme = self.cfg.border_theme
|
||
for cell in cells:
|
||
ov = RectOverlay(
|
||
id=self._cell_border_id(cell),
|
||
cell=cell,
|
||
x=0.0, y=0.0, w=1.0, h=1.0,
|
||
color=theme.idle_color,
|
||
border_only=True,
|
||
border_width=theme.idle_width,
|
||
opacity=theme.idle_opacity,
|
||
z_order=5,
|
||
)
|
||
await self.dispatcher.handle(instance, "overlay.add", ov.model_dump_json())
|
||
self._borders_initialized[instance] = True
|
||
log.info("frigate_bridge.borders_initialized", instance=instance, cells=cells)
|
||
|
||
async def _set_border_state(self, instance: str, cell: int, motion: bool) -> None:
|
||
"""Upsert cell border overlay в idle или motion стиль."""
|
||
if not self.dispatcher:
|
||
return
|
||
theme = self.cfg.border_theme
|
||
ov = RectOverlay(
|
||
id=self._cell_border_id(cell),
|
||
cell=cell,
|
||
x=0.0, y=0.0, w=1.0, h=1.0,
|
||
color=theme.motion_color if motion else theme.idle_color,
|
||
border_only=True,
|
||
border_width=theme.motion_width if motion else theme.idle_width,
|
||
opacity=theme.motion_opacity if motion else theme.idle_opacity,
|
||
z_order=5,
|
||
)
|
||
await self.dispatcher.handle(instance, "overlay.add", ov.model_dump_json())
|
||
|
||
async def handle_message(self, topic: str, payload: str) -> None:
|
||
if not self.cfg.enabled:
|
||
return
|
||
|
||
# Lazy init borders для всех target instances при первом event
|
||
for inst in {m.target_instance for m in self.cfg.mappings}:
|
||
await self._ensure_borders(inst)
|
||
|
||
base = self.cfg.base_topic.rstrip("/")
|
||
|
||
# frigate/<cam>/motion/state
|
||
if topic.startswith(f"{base}/") and topic.endswith("/motion/state"):
|
||
cam = topic[len(base) + 1 : -len("/motion/state")]
|
||
await self._handle_motion(cam, payload.strip().upper())
|
||
return
|
||
|
||
# frigate/events — JSON
|
||
if topic == f"{base}/events":
|
||
try:
|
||
event = json.loads(payload)
|
||
except json.JSONDecodeError as e:
|
||
log.warning("frigate.event_parse_fail", error=str(e))
|
||
return
|
||
await self._handle_event(event)
|
||
|
||
async def _handle_motion(self, cam: str, state: str) -> None:
|
||
mapping = self._by_camera.get(cam)
|
||
if mapping is None or not mapping.motion_indicator:
|
||
return
|
||
|
||
log.info("frigate.motion", camera=cam, state=state, target=mapping.target_instance, cell=mapping.cell)
|
||
|
||
if not self.dispatcher:
|
||
return
|
||
|
||
# Cell может содержать multiple cameras (теоретически) — border ON если
|
||
# хотя бы один cam имеет motion. Tracking через set'у per cell.
|
||
cell_key = f"{mapping.target_instance}:{mapping.cell}"
|
||
active = self._cell_states.setdefault(cell_key, set())
|
||
was_active = bool(active)
|
||
if state == "ON":
|
||
active.add(cam)
|
||
elif state == "OFF":
|
||
active.discard(cam)
|
||
is_active = bool(active)
|
||
|
||
if is_active != was_active:
|
||
await self._set_border_state(mapping.target_instance, mapping.cell, motion=is_active)
|
||
|
||
async def _handle_event(self, event: dict) -> None:
|
||
event_type = event.get("type", "?")
|
||
after = event.get("after") or {}
|
||
before = event.get("before") or {}
|
||
cam = after.get("camera") or before.get("camera")
|
||
if not cam:
|
||
return
|
||
|
||
mapping = self._by_camera.get(cam)
|
||
if mapping is None or not mapping.bbox_overlay:
|
||
return
|
||
if not self.dispatcher:
|
||
return
|
||
|
||
event_id = after.get("id") or before.get("id")
|
||
if not event_id:
|
||
return
|
||
label = after.get("label", "?")
|
||
score = after.get("score") or after.get("top_score") or 0.0
|
||
|
||
log.info(
|
||
"frigate.event",
|
||
camera=cam, type=event_type, label=label, score=round(score, 2),
|
||
target=mapping.target_instance, cell=mapping.cell, event_id=event_id,
|
||
)
|
||
|
||
if event_type in ("new", "update"):
|
||
await self._upsert_event_overlay(mapping, event_id, after, label, score)
|
||
elif event_type == "end":
|
||
await self._remove_event_overlay(mapping, event_id)
|
||
|
||
async def _upsert_event_overlay(self, mapping, event_id: str, after: dict, label: str, score: float) -> None:
|
||
box = after.get("box") # [x1, y1, x2, y2] в абс пикселях камеры
|
||
if not box or len(box) != 4:
|
||
return
|
||
|
||
x1, y1, x2, y2 = box
|
||
nx = x1 / mapping.camera_width
|
||
ny = y1 / mapping.camera_height
|
||
nw = (x2 - x1) / mapping.camera_width
|
||
nh = (y2 - y1) / mapping.camera_height
|
||
# Clamp на [0, 1]
|
||
nx = max(0.0, min(0.99, nx))
|
||
ny = max(0.0, min(0.99, ny))
|
||
nw = max(0.01, min(1.0 - nx, nw))
|
||
nh = max(0.01, min(1.0 - ny, nh))
|
||
|
||
# Short id'ы — кутаем event_id чтобы влезть в 32 chars
|
||
eid_short = event_id.replace("-", "")[:8]
|
||
rect_id = f"e{eid_short}r"
|
||
text_id = f"e{eid_short}t"
|
||
|
||
rect = RectOverlay(
|
||
id=rect_id,
|
||
cell=mapping.cell,
|
||
x=nx, y=ny, w=nw, h=nh,
|
||
color="#00FF00",
|
||
border_only=True,
|
||
border_width=3,
|
||
z_order=20,
|
||
opacity=1.0,
|
||
)
|
||
text = TextOverlay(
|
||
id=text_id,
|
||
cell=mapping.cell,
|
||
x=nx, y=max(0.0, ny - 0.04), # над bbox
|
||
text=f"{label} {int(score * 100)}%",
|
||
font_size=20,
|
||
color="#00FF00",
|
||
z_order=21,
|
||
opacity=1.0,
|
||
)
|
||
|
||
self._event_overlays[event_id] = (rect_id, text_id)
|
||
await self.dispatcher.handle(mapping.target_instance, "overlay.add", rect.model_dump_json())
|
||
await self.dispatcher.handle(mapping.target_instance, "overlay.add", text.model_dump_json())
|
||
|
||
async def _remove_event_overlay(self, mapping, event_id: str) -> None:
|
||
stored = self._event_overlays.pop(event_id, None)
|
||
if not stored:
|
||
return
|
||
rect_id, text_id = stored
|
||
await self.dispatcher.handle(mapping.target_instance, "overlay.remove", rect_id)
|
||
await self.dispatcher.handle(mapping.target_instance, "overlay.remove", text_id)
|