a1090a5f4c
Phase 4a deliverable (no filter rendering yet — это Phase 4b).
End-to-end pipeline: HA/HTTP/MQTT → controller → ZMQ → FFmpeg (logged).
Modules:
- overlays.py — 7 discriminated union types через pydantic:
rect, text, icon, image, dim, graph, chat. Normalized coords (0.0-1.0),
optional cell binding, z_order, opacity, visible.
- state.py — overlay storage per instance (CRUD: add/remove/update/get/clear)
- dispatch.py — overlay.add/remove/clear actions:
- parses JSON payload в Overlay через TypeAdapter
- serializes to ZMQ string: "<id> <type> <full-json>"
- sends via FFmpeg process_command (filter will парсить в Phase 4b)
- updates state + publishes events (overlay_added, overlay_removed, overlays_cleared)
- http_api.py — REST endpoints:
- POST /overlay/{inst}/add (body = Overlay JSON, returns id)
- GET /overlay/{inst} — list all
- DELETE /overlay/{inst}/{id} — single
- DELETE /overlay/{inst} — clear all
- PATCH /overlay/{inst}/{id} — update
- mqtt_loop.py — already subscribes cuda_grid/cmd/<inst>/+/+; teper handles
overlay/add (JSON payload), overlay/remove (id), overlay/clear
- frigate_bridge.py — FrigateBridge skeleton:
- subscribe frigate/+/motion + frigate/events
- mapping camera_name → target_instance + cell index
- Phase 4a: log received events (rendering в Phase 4b)
- config.py — frigate: optional section
- examples/controller.yaml — frigate mappings для 4 наших камер
State management:
- ControllerState.add/remove/update/get/clear_overlay (asyncio.Lock guarded)
- InstanceState.overlays: dict[str, Overlay]
- IDs generated via uuid4()[:8]
Phase 4a limitations:
- Filter side ничего не рендерит (just logs ZMQ commands)
- Frigate bridge принимает events но не auto-generates overlays
- HA Discovery не имеет overlay-specific entities (overlays через REST API)
Phase 4b: filter-side AVFrame side data + CUDA kernels (rect first, NPP-based,
потом text via freetype atlas, потом icon sprite blit).
155 lines
5.9 KiB
Python
155 lines
5.9 KiB
Python
"""MQTT subscriber + publisher loop.
|
|
|
|
Topics:
|
|
Subscribed:
|
|
cuda_grid/cmd/<instance>/layout/set — payload = layout name
|
|
cuda_grid/cmd/<instance>/<future commands> — Phase 4+
|
|
homeassistant/status — HA online → republish discovery
|
|
|
|
Published (per instance):
|
|
cuda_grid/state/<instance>/layout — текущий layout (retained)
|
|
cuda_grid/event/<instance>/layout_switched — {from, to, reason}
|
|
|
|
Global:
|
|
cuda_grid/state/online — LWT (online/offline)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
from typing import Awaitable, Callable
|
|
|
|
import aiomqtt
|
|
import structlog
|
|
|
|
from .config import Config
|
|
from .frigate_bridge import FrigateBridge
|
|
from .ha_discovery import availability_topic, discovery_payloads
|
|
from .state import ControllerState
|
|
|
|
log = structlog.get_logger()
|
|
|
|
CommandHandler = Callable[[str, str, str], Awaitable[None]]
|
|
# args: (instance_name, command_kind, payload_str)
|
|
|
|
|
|
class MqttLoop:
|
|
def __init__(
|
|
self,
|
|
cfg: Config,
|
|
state: ControllerState,
|
|
command_handler: CommandHandler,
|
|
frigate_bridge: FrigateBridge | None = None,
|
|
) -> None:
|
|
self.cfg = cfg
|
|
self.state = state
|
|
self.command_handler = command_handler
|
|
self.frigate_bridge = frigate_bridge
|
|
self._client: aiomqtt.Client | None = None
|
|
self._stop = asyncio.Event()
|
|
|
|
async def run(self) -> None:
|
|
"""Main loop — connect + subscribe + dispatch. Re-connect при разрыве."""
|
|
avail = availability_topic()
|
|
while not self._stop.is_set():
|
|
try:
|
|
async with aiomqtt.Client(
|
|
hostname=self.cfg.broker.host,
|
|
port=self.cfg.broker.port,
|
|
username=self.cfg.broker.username,
|
|
password=self.cfg.broker.password,
|
|
identifier=self.cfg.broker.client_id,
|
|
keepalive=self.cfg.broker.keepalive_sec,
|
|
will=aiomqtt.Will(
|
|
topic=avail, payload=b"offline", qos=1, retain=True
|
|
),
|
|
) as client:
|
|
self._client = client
|
|
log.info("mqtt.connected", host=self.cfg.broker.host)
|
|
|
|
# online + HA Discovery
|
|
await client.publish(avail, b"online", qos=1, retain=True)
|
|
if self.cfg.ha_discovery.enabled:
|
|
await self._publish_ha_discovery()
|
|
|
|
# Subscribe commands per instance
|
|
for inst in self.cfg.instances:
|
|
await client.subscribe(
|
|
f"cuda_grid/cmd/{inst.name}/+/+", qos=1
|
|
)
|
|
# HA status — republish discovery если HA рестартанул
|
|
await client.subscribe("homeassistant/status", qos=0)
|
|
# Frigate topics для bridge
|
|
if self.frigate_bridge:
|
|
for t in self.frigate_bridge.topics_to_subscribe():
|
|
await client.subscribe(t, qos=0)
|
|
log.info("mqtt.frigate.subscribed", topic=t)
|
|
|
|
async for msg in client.messages:
|
|
await self._handle_message(msg)
|
|
except aiomqtt.MqttError as e:
|
|
log.warning("mqtt.disconnected", error=str(e))
|
|
self._client = None
|
|
await asyncio.sleep(5)
|
|
|
|
async def _publish_ha_discovery(self) -> None:
|
|
assert self._client is not None
|
|
payloads = discovery_payloads(self.cfg.ha_discovery, self.cfg.instances)
|
|
for topic, payload in payloads:
|
|
await self._client.publish(topic, payload.encode(), qos=1, retain=True)
|
|
log.info("mqtt.ha_discovery.published", count=len(payloads))
|
|
|
|
async def _handle_message(self, msg: aiomqtt.Message) -> None:
|
|
topic = str(msg.topic)
|
|
try:
|
|
payload = msg.payload.decode() if isinstance(msg.payload, (bytes, bytearray)) else str(msg.payload)
|
|
except Exception:
|
|
payload = repr(msg.payload)
|
|
|
|
if topic == "homeassistant/status" and payload == "online":
|
|
log.info("mqtt.ha.restarted — republish discovery")
|
|
if self.cfg.ha_discovery.enabled:
|
|
await self._publish_ha_discovery()
|
|
return
|
|
|
|
# cuda_grid/cmd/<instance>/<scope>/<action>
|
|
parts = topic.split("/")
|
|
if len(parts) >= 5 and parts[0] == "cuda_grid" and parts[1] == "cmd":
|
|
instance, scope, action = parts[2], parts[3], parts[4]
|
|
kind = f"{scope}.{action}" # e.g. "layout.set", "overlay.add"
|
|
await self.command_handler(instance, kind, payload)
|
|
return
|
|
|
|
# Frigate bridge
|
|
if self.frigate_bridge and topic.startswith(self.frigate_bridge.cfg.base_topic + "/"):
|
|
self.frigate_bridge.handle_message(topic, payload)
|
|
return
|
|
|
|
log.warning("mqtt.unknown_topic", topic=topic, payload=payload)
|
|
|
|
async def publish_state(self, instance: str, scope: str, value: str, retain: bool = True) -> None:
|
|
"""Publish state — `cuda_grid/state/<instance>/<scope>`."""
|
|
if self._client is None:
|
|
return
|
|
await self._client.publish(
|
|
f"cuda_grid/state/{instance}/{scope}",
|
|
value.encode(),
|
|
qos=1,
|
|
retain=retain,
|
|
)
|
|
|
|
async def publish_event(self, instance: str, event_kind: str, data: dict) -> None:
|
|
"""Publish event (non-retained) — `cuda_grid/event/<instance>/<kind>`."""
|
|
if self._client is None:
|
|
return
|
|
await self._client.publish(
|
|
f"cuda_grid/event/{instance}/{event_kind}",
|
|
json.dumps(data).encode(),
|
|
qos=0,
|
|
retain=False,
|
|
)
|
|
|
|
async def stop(self) -> None:
|
|
self._stop.set()
|