diff --git a/controller/cuda_grid_controller/__main__.py b/controller/cuda_grid_controller/__main__.py index 1bfa919..5f813d6 100644 --- a/controller/cuda_grid_controller/__main__.py +++ b/controller/cuda_grid_controller/__main__.py @@ -13,6 +13,7 @@ import uvicorn from .config import Config from .dispatch import CommandDispatcher +from .frigate_bridge import FrigateBridge, FrigateBridgeCfg from .http_api import create_app from .mqtt_loop import MqttLoop from .state import ControllerState @@ -41,7 +42,20 @@ async def _run(cfg: Config) -> None: await state.set_layout(inst.name, inst.default_layout) dispatcher = CommandDispatcher(cfg, state) - mqtt = MqttLoop(cfg, state, dispatcher.handle) + + # Frigate bridge (опционально) + frigate_bridge: FrigateBridge | None = None + if cfg.frigate: + try: + fcfg = FrigateBridgeCfg.model_validate(cfg.frigate) + if fcfg.enabled: + frigate_bridge = FrigateBridge(fcfg) + except Exception as e: + structlog.get_logger().warning( + "frigate_bridge.config_invalid", error=str(e) + ) + + mqtt = MqttLoop(cfg, state, dispatcher.handle, frigate_bridge=frigate_bridge) # Wire dispatcher events → MQTT publishes dispatcher.on_state_change = mqtt.publish_state diff --git a/controller/cuda_grid_controller/config.py b/controller/cuda_grid_controller/config.py index 48781ad..3e61249 100644 --- a/controller/cuda_grid_controller/config.py +++ b/controller/cuda_grid_controller/config.py @@ -95,6 +95,8 @@ class Config(BaseModel): ha_discovery: HaDiscoveryCfg = HaDiscoveryCfg() http: HttpCfg = HttpCfg() log: LogCfg = LogCfg() + # Frigate bridge — late import чтобы избежать circular dep + frigate: dict | None = None # parsed в FrigateBridgeCfg при runtime @classmethod def from_yaml(cls, path: Path | str) -> Self: diff --git a/controller/cuda_grid_controller/dispatch.py b/controller/cuda_grid_controller/dispatch.py index 893d8f0..9409fa2 100644 --- a/controller/cuda_grid_controller/dispatch.py +++ b/controller/cuda_grid_controller/dispatch.py @@ -2,27 +2,44 @@ Action kinds: layout.set — set_layout - (future Phase 4+: auto_mode.set, focus_camera.set, overlay.add, ...) + overlay.add — add overlay (JSON payload) + overlay.remove — remove overlay by id + overlay.clear — remove all overlays """ from __future__ import annotations +import json + import structlog from .config import Config, InstanceCfg from .layouts import PREDEFINED_LAYOUTS +from .overlays import Overlay from .state import ControllerState from .zmq_client import FFmpegZmqClient log = structlog.get_logger() +def _serialize_overlay_to_zmq(overlay: Overlay) -> str: + """Сериализовать overlay в одну строку для FFmpeg process_command. + + Формат: `add_overlay ` + Filter-side (Phase 4b) парсит JSON и применяет. + + JSON используем потому что overlay имеет вложенные поля (style для graph + и т.п.); проще чем positional args. + """ + payload = overlay.model_dump_json() + return f"{overlay.id} {overlay.type} {payload}" + + class CommandDispatcher: def __init__(self, cfg: Config, state: ControllerState) -> None: self.cfg = cfg self.state = state self._zmq_clients: dict[str, FFmpegZmqClient] = {} - # publish callback устанавливается из вне (MqttLoop) self.on_state_change = None # type: ignore[var-annotated] self.on_event = None # type: ignore[var-annotated] @@ -42,10 +59,21 @@ class CommandDispatcher: log.warning("dispatch.unknown_instance", instance=instance_name) return - if kind == "layout.set": - await self._set_layout(inst, payload.strip()) - else: - log.warning("dispatch.unknown_kind", instance=instance_name, kind=kind) + match kind: + case "layout.set": + await self._set_layout(inst, payload.strip()) + case "overlay.add": + await self._overlay_add(inst, payload) + case "overlay.remove": + await self._overlay_remove(inst, payload.strip()) + case "overlay.clear": + await self._overlay_clear(inst) + case _: + log.warning( + "dispatch.unknown_kind", instance=instance_name, kind=kind + ) + + # ─── Layout ──────────────────────────────────────────────────── async def _set_layout(self, inst: InstanceCfg, layout: str) -> None: if layout not in PREDEFINED_LAYOUTS: @@ -84,6 +112,99 @@ class CommandDispatcher: {"from": old, "to": layout, "reason": "mqtt"}, ) + # ─── Overlays ────────────────────────────────────────────────── + + async def _overlay_add(self, inst: InstanceCfg, payload: str) -> None: + """Payload = JSON совместимый с Overlay discriminated union.""" + from pydantic import TypeAdapter + + try: + overlay: Overlay = TypeAdapter(Overlay).validate_json(payload) + except Exception as e: + log.warning("dispatch.overlay_parse_fail", error=str(e), payload=payload[:200]) + return + + # ZMQ send → filter (Phase 4b will actually render) + client = self._client(inst) + try: + zmq_arg = _serialize_overlay_to_zmq(overlay) + reply = await client.send_command( + inst.filter_target, "add_overlay", zmq_arg + ) + log.info( + "dispatch.overlay_add", + instance=inst.name, + id=overlay.id, + type=overlay.type, + ffmpeg_reply=reply, + ) + except (TimeoutError, Exception) as e: + # Filter side might not support yet — log warn but persist + # в state (controller behaves correctly даже если filter ignore'ит). + log.warning( + "dispatch.overlay_zmq_fail", + instance=inst.name, + id=overlay.id, + error=str(e), + ) + + await self.state.add_overlay(inst.name, overlay) + + if self.on_state_change: + count = len(await self.state.get_overlays(inst.name)) + await self.on_state_change(inst.name, "overlays_count", str(count)) + if self.on_event: + await self.on_event( + inst.name, + "overlay_added", + {"id": overlay.id, "type": overlay.type}, + ) + + async def _overlay_remove(self, inst: InstanceCfg, overlay_id: str) -> None: + existed = await self.state.remove_overlay(inst.name, overlay_id) + if not existed: + log.warning( + "dispatch.overlay_unknown", + instance=inst.name, + id=overlay_id, + ) + return + + client = self._client(inst) + try: + await client.send_command( + inst.filter_target, "remove_overlay", overlay_id + ) + except (TimeoutError, Exception) as e: + log.warning("dispatch.overlay_remove_zmq_fail", error=str(e)) + + log.info("dispatch.overlay_removed", instance=inst.name, id=overlay_id) + + if self.on_state_change: + count = len(await self.state.get_overlays(inst.name)) + await self.on_state_change(inst.name, "overlays_count", str(count)) + if self.on_event: + await self.on_event( + inst.name, "overlay_removed", {"id": overlay_id} + ) + + async def _overlay_clear(self, inst: InstanceCfg) -> None: + n = await self.state.clear_overlays(inst.name) + client = self._client(inst) + try: + await client.send_command(inst.filter_target, "clear_overlays", "") + except (TimeoutError, Exception) as e: + log.warning("dispatch.overlay_clear_zmq_fail", error=str(e)) + + log.info("dispatch.overlays_cleared", instance=inst.name, count=n) + + if self.on_state_change: + await self.on_state_change(inst.name, "overlays_count", "0") + if self.on_event: + await self.on_event(inst.name, "overlays_cleared", {"count": n}) + + # ─── Cleanup ─────────────────────────────────────────────────── + async def close(self) -> None: for c in self._zmq_clients.values(): await c.close() diff --git a/controller/cuda_grid_controller/frigate_bridge.py b/controller/cuda_grid_controller/frigate_bridge.py new file mode 100644 index 0000000..c2255db --- /dev/null +++ b/controller/cuda_grid_controller/frigate_bridge.py @@ -0,0 +1,98 @@ +"""Frigate MQTT bridge — subscribe `frigate/+/motion` + `frigate/events`, +маппит к overlay add/remove на configured instance. + +Phase 4a: skeleton — receives + logs events, не делает auto-overlay yet. +Phase 4b: actual auto-bbox overlays для detect events. +""" + +from __future__ import annotations + +import json + +import structlog +from pydantic import BaseModel, Field + +log = structlog.get_logger() + + +class FrigateCameraMapping(BaseModel): + """Mapping Frigate camera_name → vf-cuda-grid cell index в instance.""" + + frigate_camera: str + target_instance: str + cell: int = Field(default=0, description="Cell index в layout куда рисовать bbox") + enabled: bool = True + + +class FrigateBridgeCfg(BaseModel): + enabled: bool = False + base_topic: str = "frigate" + mappings: list[FrigateCameraMapping] = [] + + +class FrigateBridge: + """Подписывается на frigate/* topics; transforms к overlay commands. + + Phase 4a: just log events. Phase 4b will: + - на frigate//motion ON → add dim overlay на cell (highlight motion) + - на frigate/events object_detected → add rect+text overlay с label+confidence + - на event ended → remove overlay + """ + + def __init__(self, cfg: FrigateBridgeCfg) -> None: + self.cfg = cfg + # cam_name → mapping (для быстрого lookup) + self._by_camera = {m.frigate_camera: m for m in cfg.mappings if m.enabled} + + def topics_to_subscribe(self) -> list[str]: + if not self.cfg.enabled: + return [] + base = self.cfg.base_topic.rstrip("/") + return [f"{base}/+/motion", f"{base}/events"] + + def handle_message(self, topic: str, payload: str) -> None: + """Logging + future overlay generation.""" + if not self.cfg.enabled: + return + + base = self.cfg.base_topic.rstrip("/") + + # frigate//motion + if topic.startswith(f"{base}/") and topic.endswith("/motion"): + cam = topic[len(base) + 1 : -len("/motion")] + mapping = self._by_camera.get(cam) + if mapping is None: + return + state = payload.strip().upper() + log.info( + "frigate.motion", + camera=cam, + state=state, + target=mapping.target_instance, + cell=mapping.cell, + ) + # Phase 4b: добавлять/удалять dim overlay в зависимости от state + return + + # frigate/events — JSON с details + if topic == f"{base}/events": + try: + event = json.loads(payload) + cam = event.get("after", {}).get("camera") or event.get("before", {}).get("camera") + event_type = event.get("type", "?") + label = event.get("after", {}).get("label", "?") + mapping = self._by_camera.get(cam) if cam else None + if mapping is None: + return + log.info( + "frigate.event", + camera=cam, + type=event_type, + label=label, + target=mapping.target_instance, + cell=mapping.cell, + ) + # Phase 4b: для event_type=new → add rect+text overlay; + # для event_type=end → remove overlay + except json.JSONDecodeError as e: + log.warning("frigate.event_parse_fail", error=str(e)) diff --git a/controller/cuda_grid_controller/http_api.py b/controller/cuda_grid_controller/http_api.py index c5b0c09..c8f4366 100644 --- a/controller/cuda_grid_controller/http_api.py +++ b/controller/cuda_grid_controller/http_api.py @@ -5,12 +5,13 @@ from __future__ import annotations from typing import Any import structlog -from fastapi import FastAPI, HTTPException -from pydantic import BaseModel +from fastapi import Body, FastAPI, HTTPException +from pydantic import BaseModel, TypeAdapter from .config import Config from .dispatch import CommandDispatcher from .layouts import PREDEFINED_LAYOUTS +from .overlays import Overlay from .state import ControllerState log = structlog.get_logger() @@ -29,6 +30,12 @@ def create_app( description="Control plane для vf_cuda_grid FFmpeg filter", ) + def _check_instance(name: str): + inst = next((i for i in cfg.instances if i.name == name), None) + if inst is None: + raise HTTPException(404, f"unknown instance '{name}'") + return inst + @app.get("/health") async def health() -> dict[str, Any]: return {"status": "ok"} @@ -41,17 +48,17 @@ def create_app( async def get_state() -> dict[str, Any]: out = {} for inst in cfg.instances: + overlays = await state.get_overlays(inst.name) out[inst.name] = { "active_layout": await state.get_layout(inst.name), "zmq_endpoint": inst.zmq_endpoint, + "overlays_count": len(overlays), } return {"instances": out} @app.post("/layout/{instance}/set") async def set_layout(instance: str, req: LayoutSetReq) -> dict[str, Any]: - inst = next((i for i in cfg.instances if i.name == instance), None) - if inst is None: - raise HTTPException(404, f"unknown instance '{instance}'") + _check_instance(instance) if req.layout not in PREDEFINED_LAYOUTS: raise HTTPException( 400, f"unknown layout '{req.layout}'. Доступны: {PREDEFINED_LAYOUTS}" @@ -59,4 +66,51 @@ def create_app( await dispatcher.handle(instance, "layout.set", req.layout) return {"ok": True, "instance": instance, "layout": req.layout} + # ─── Overlays ────────────────────────────────────────────────── + + @app.post("/overlay/{instance}/add") + async def overlay_add( + instance: str, overlay: Overlay = Body(...) + ) -> dict[str, Any]: + _check_instance(instance) + await dispatcher.handle( + instance, "overlay.add", overlay.model_dump_json() + ) + return {"ok": True, "id": overlay.id, "type": overlay.type} + + @app.get("/overlay/{instance}") + async def overlay_list(instance: str) -> dict[str, Any]: + _check_instance(instance) + overlays = await state.get_overlays(instance) + return { + "instance": instance, + "count": len(overlays), + "overlays": [o.model_dump() for o in overlays], + } + + @app.delete("/overlay/{instance}/{overlay_id}") + async def overlay_remove(instance: str, overlay_id: str) -> dict[str, Any]: + _check_instance(instance) + await dispatcher.handle(instance, "overlay.remove", overlay_id) + return {"ok": True} + + @app.delete("/overlay/{instance}") + async def overlay_clear(instance: str) -> dict[str, Any]: + _check_instance(instance) + await dispatcher.handle(instance, "overlay.clear", "") + return {"ok": True} + + @app.patch("/overlay/{instance}/{overlay_id}") + async def overlay_update( + instance: str, overlay_id: str, overlay: Overlay = Body(...) + ) -> dict[str, Any]: + _check_instance(instance) + # Фиксируем id из URL — игнорируем body's id если отличается + overlay.id = overlay_id + await state.update_overlay(instance, overlay) + await dispatcher.handle( + instance, "overlay.add", overlay.model_dump_json() + ) + return {"ok": True, "id": overlay_id} + return app diff --git a/controller/cuda_grid_controller/mqtt_loop.py b/controller/cuda_grid_controller/mqtt_loop.py index 59cce16..f898a26 100644 --- a/controller/cuda_grid_controller/mqtt_loop.py +++ b/controller/cuda_grid_controller/mqtt_loop.py @@ -24,6 +24,7 @@ import aiomqtt import structlog from .config import Config +from .frigate_bridge import FrigateBridge from .ha_discovery import availability_topic, discovery_payloads from .state import ControllerState @@ -39,10 +40,12 @@ class MqttLoop: cfg: Config, state: ControllerState, command_handler: CommandHandler, + frigate_bridge: FrigateBridge | None = None, ) -> None: self.cfg = cfg self.state = state self.command_handler = command_handler + self.frigate_bridge = frigate_bridge self._client: aiomqtt.Client | None = None self._stop = asyncio.Event() @@ -77,6 +80,11 @@ class MqttLoop: ) # HA status — republish discovery если HA рестартанул await client.subscribe("homeassistant/status", qos=0) + # Frigate topics для bridge + if self.frigate_bridge: + for t in self.frigate_bridge.topics_to_subscribe(): + await client.subscribe(t, qos=0) + log.info("mqtt.frigate.subscribed", topic=t) async for msg in client.messages: await self._handle_message(msg) @@ -109,10 +117,15 @@ class MqttLoop: parts = topic.split("/") if len(parts) >= 5 and parts[0] == "cuda_grid" and parts[1] == "cmd": instance, scope, action = parts[2], parts[3], parts[4] - kind = f"{scope}.{action}" # e.g. "layout.set", "auto_mode.set" + kind = f"{scope}.{action}" # e.g. "layout.set", "overlay.add" await self.command_handler(instance, kind, payload) return + # Frigate bridge + if self.frigate_bridge and topic.startswith(self.frigate_bridge.cfg.base_topic + "/"): + self.frigate_bridge.handle_message(topic, payload) + return + log.warning("mqtt.unknown_topic", topic=topic, payload=payload) async def publish_state(self, instance: str, scope: str, value: str, retain: bool = True) -> None: diff --git a/controller/cuda_grid_controller/overlays.py b/controller/cuda_grid_controller/overlays.py new file mode 100644 index 0000000..83b6f20 --- /dev/null +++ b/controller/cuda_grid_controller/overlays.py @@ -0,0 +1,144 @@ +"""Overlay primitives — 7 типов через pydantic discriminated union. + +Все координаты **normalized** (0.0–1.0 относительно cell или output frame). +Color — hex RGB string + alpha как float. + +Phase 4a: data models + state. Rendering — Phase 4b (filter-side CUDA kernels). +""" + +from __future__ import annotations + +import uuid +from typing import Annotated, Literal, Union + +from pydantic import BaseModel, Field + + +# ─── Common ────────────────────────────────────────────────────────────── + +class OverlayBase(BaseModel): + """Общие поля всех overlay'ев.""" + + id: str = Field(default_factory=lambda: str(uuid.uuid4())[:8]) + cell: int | None = Field( + default=None, + description="Привязка к cell layout (0..N-1). None = относительно " + "всего output frame.", + ) + z_order: int = Field(default=0, description="Higher = поверх. Default 0.") + opacity: float = Field(default=1.0, ge=0.0, le=1.0) + visible: bool = True + + +# ─── Rect ──────────────────────────────────────────────────────────────── + +class RectOverlay(OverlayBase): + type: Literal["rect"] = "rect" + x: float = Field(ge=0.0, le=1.0) + y: float = Field(ge=0.0, le=1.0) + w: float = Field(gt=0.0, le=1.0) + h: float = Field(gt=0.0, le=1.0) + color: str = Field(default="#FF0000", description="HEX RGB e.g. #FF0000") + border_only: bool = Field(default=False, description="Если true — только рамка") + border_width: int = Field(default=2, ge=1, le=64) + + +# ─── Text ──────────────────────────────────────────────────────────────── + +class TextOverlay(OverlayBase): + type: Literal["text"] = "text" + x: float = Field(ge=0.0, le=1.0) + y: float = Field(ge=0.0, le=1.0) + text: str + font_size: int = Field(default=24, ge=8, le=256) + color: str = "#FFFFFF" + bg_color: str | None = Field( + default="#000000", description="Background — None = transparent" + ) + bg_opacity: float = Field(default=0.5, ge=0.0, le=1.0) + + +# ─── Icon ──────────────────────────────────────────────────────────────── + +class IconOverlay(OverlayBase): + type: Literal["icon"] = "icon" + name: str = Field(description="Имя из preloaded sprite sheet, e.g. 'warning', 'person'") + x: float = Field(ge=0.0, le=1.0) + y: float = Field(ge=0.0, le=1.0) + size: float = Field(default=0.05, gt=0.0, le=1.0, description="Размер относительно frame") + tint: str | None = Field(default=None, description="HEX RGB — None = original color") + + +# ─── Image (любой PNG/JPG как texture) ────────────────────────────────── + +class ImageOverlay(OverlayBase): + type: Literal["image"] = "image" + url: str = Field(description="file://path или http://... PNG/JPG") + x: float = Field(ge=0.0, le=1.0) + y: float = Field(ge=0.0, le=1.0) + w: float = Field(gt=0.0, le=1.0) + h: float = Field(gt=0.0, le=1.0) + + +# ─── Dim / privacy mask ────────────────────────────────────────────────── + +class DimOverlay(OverlayBase): + """Затемнение области — используется как privacy mask либо out-of-zone dim.""" + + type: Literal["dim"] = "dim" + x: float = Field(ge=0.0, le=1.0) + y: float = Field(ge=0.0, le=1.0) + w: float = Field(gt=0.0, le=1.0) + h: float = Field(gt=0.0, le=1.0) + color: str = "#000000" + dim_factor: float = Field(default=0.8, ge=0.0, le=1.0, description="0=без затемнения, 1=полное") + + +# ─── Graph / chart ─────────────────────────────────────────────────────── + +class GraphOverlay(OverlayBase): + """Live chart — controller рендерит CPU-side (Cairo) и uploads texture.""" + + type: Literal["graph"] = "graph" + x: float = Field(ge=0.0, le=1.0) + y: float = Field(ge=0.0, le=1.0) + w: float = Field(gt=0.0, le=1.0) + h: float = Field(gt=0.0, le=1.0) + data_topic: str = Field(description="MQTT topic с time-series данными") + chart_type: Literal["line", "bar", "histogram"] = "line" + style: dict = Field(default_factory=dict, description="Цвета, axis, тp.") + refresh_hz: float = Field(default=1.0, gt=0.0, le=10.0) + + +# ─── Chat / scrolling text ─────────────────────────────────────────────── + +class ChatOverlay(OverlayBase): + """Scrolling text — notifications, alerts, etc.""" + + type: Literal["chat"] = "chat" + x: float = Field(ge=0.0, le=1.0) + y: float = Field(ge=0.0, le=1.0) + w: float = Field(gt=0.0, le=1.0) + h: float = Field(gt=0.0, le=1.0) + source_topic: str = Field(description="MQTT topic для новых сообщений (newline-separated)") + font_size: int = Field(default=20, ge=8, le=128) + color: str = "#FFFFFF" + bg_opacity: float = Field(default=0.6, ge=0.0, le=1.0) + max_messages: int = Field(default=10, ge=1, le=100) + scroll_speed_px_s: int = Field(default=30, ge=0, le=1000) + + +# ─── Discriminated union ───────────────────────────────────────────────── + +Overlay = Annotated[ + Union[ + RectOverlay, + TextOverlay, + IconOverlay, + ImageOverlay, + DimOverlay, + GraphOverlay, + ChatOverlay, + ], + Field(discriminator="type"), +] diff --git a/controller/cuda_grid_controller/state.py b/controller/cuda_grid_controller/state.py index b14166e..94243ee 100644 --- a/controller/cuda_grid_controller/state.py +++ b/controller/cuda_grid_controller/state.py @@ -5,11 +5,14 @@ from __future__ import annotations import asyncio from dataclasses import dataclass, field +from .overlays import Overlay + @dataclass class InstanceState: name: str active_layout: str + overlays: dict[str, Overlay] = field(default_factory=dict) # Future: fps_out, dropped_frames, motion_cameras, last_event_ts @@ -31,3 +34,43 @@ class ControllerState: async with self._lock: st = self.instances.get(instance) return st.active_layout if st else None + + # ─── Overlay state ────────────────────────────────────────────── + + async def add_overlay(self, instance: str, overlay: Overlay) -> str: + async with self._lock: + st = self.instances.get(instance) + if st is None: + raise KeyError(f"unknown instance '{instance}'") + st.overlays[overlay.id] = overlay + return overlay.id + + async def remove_overlay(self, instance: str, overlay_id: str) -> bool: + async with self._lock: + st = self.instances.get(instance) + if st is None or overlay_id not in st.overlays: + return False + del st.overlays[overlay_id] + return True + + async def update_overlay(self, instance: str, overlay: Overlay) -> bool: + async with self._lock: + st = self.instances.get(instance) + if st is None or overlay.id not in st.overlays: + return False + st.overlays[overlay.id] = overlay + return True + + async def get_overlays(self, instance: str) -> list[Overlay]: + async with self._lock: + st = self.instances.get(instance) + return list(st.overlays.values()) if st else [] + + async def clear_overlays(self, instance: str) -> int: + async with self._lock: + st = self.instances.get(instance) + if st is None: + return 0 + n = len(st.overlays) + st.overlays.clear() + return n diff --git a/controller/examples/controller.yaml b/controller/examples/controller.yaml index 3d4c3a1..fba93c2 100644 --- a/controller/examples/controller.yaml +++ b/controller/examples/controller.yaml @@ -31,3 +31,21 @@ http: log: level: INFO + +# Phase 4a — Frigate bridge для auto-overlay (rendering в Phase 4b) +frigate: + enabled: true + base_topic: frigate + mappings: + - frigate_camera: parking_overview + target_instance: livingroom_tv + cell: 0 + - frigate_camera: front_yard + target_instance: livingroom_tv + cell: 1 + - frigate_camera: gate_lpr + target_instance: livingroom_tv + cell: 2 + - frigate_camera: back_yard + target_instance: livingroom_tv + cell: 3