Files
vf-cuda-grid/controller/cuda_grid_controller/mqtt_loop.py
T
gx d807cd2c23 controller: Phase 5b+5c+6 — multi-audio + intercom ducking + dynamic overlays
5b — audio source switching:
  AudioSourceCfg list + audio_filter_target в InstanceCfg
  CommandDispatcher._audio_set → ZMQ astreamselect@as map <index>
  REST: GET /audio/{inst}, POST /audio/{inst}/set
  MQTT: cuda_grid/cmd/<inst>/audio/set <source_name>

5c — intercom ducking:
  music_volume_target / intercom_volume_target / music_ducked_volume в InstanceCfg
  CommandDispatcher._intercom_set → 2× ZMQ volume@music/@intercom commands
  REST: POST /intercom/{inst}/start (music↓ + intercom↑) + /end (restore)
  MQTT: cuda_grid/cmd/<inst>/intercom/start|end

6 — dynamic overlays (charts/chats):
  dynamic_overlays.py: ChartCfg/ChatCfg + DynamicRenderer
  PIL rendering: line chart + scrolling text list
  Async loops пишут PNG в icon_dir + invalidate filter cache via reload_icon ZMQ
  MQTT subscriptions для real data (charts: numeric topic, chats: text topic)
  Demo: chart sine wave если data_topic=null
  Wired в __main__.py + mqtt_loop dispatch

+ ZMQ client asyncio.Lock — REQ socket strict send/recv pattern требует
  serialize requests (overlay/audio/intercom concurrent ломали "Operation
  cannot be accomplished in current state")
+ Pillow в Dockerfile (для PIL render)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-20 21:55:33 +01:00

168 lines
6.5 KiB
Python

"""MQTT subscriber + publisher loop.
Topics:
Subscribed:
cuda_grid/cmd/<instance>/layout/set — payload = layout name
cuda_grid/cmd/<instance>/<future commands> — Phase 4+
homeassistant/status — HA online → republish discovery
Published (per instance):
cuda_grid/state/<instance>/layout — текущий layout (retained)
cuda_grid/event/<instance>/layout_switched — {from, to, reason}
Global:
cuda_grid/state/online — LWT (online/offline)
"""
from __future__ import annotations
import asyncio
import json
from typing import Awaitable, Callable
import aiomqtt
import structlog
from .config import Config
from .dynamic_overlays import DynamicRenderer
from .frigate_bridge import FrigateBridge
from .ha_discovery import availability_topic, discovery_payloads
from .state import ControllerState
log = structlog.get_logger()
CommandHandler = Callable[[str, str, str], Awaitable[None]]
# args: (instance_name, command_kind, payload_str)
class MqttLoop:
def __init__(
self,
cfg: Config,
state: ControllerState,
command_handler: CommandHandler,
frigate_bridge: FrigateBridge | None = None,
dynamic_renderer: DynamicRenderer | None = None,
) -> None:
self.cfg = cfg
self.state = state
self.command_handler = command_handler
self.frigate_bridge = frigate_bridge
self.dynamic_renderer = dynamic_renderer
self._client: aiomqtt.Client | None = None
self._stop = asyncio.Event()
async def run(self) -> None:
"""Main loop — connect + subscribe + dispatch. Re-connect при разрыве."""
avail = availability_topic()
while not self._stop.is_set():
try:
async with aiomqtt.Client(
hostname=self.cfg.broker.host,
port=self.cfg.broker.port,
username=self.cfg.broker.username,
password=self.cfg.broker.password,
identifier=self.cfg.broker.client_id,
keepalive=self.cfg.broker.keepalive_sec,
will=aiomqtt.Will(
topic=avail, payload=b"offline", qos=1, retain=True
),
) as client:
self._client = client
log.info("mqtt.connected", host=self.cfg.broker.host)
# online + HA Discovery
await client.publish(avail, b"online", qos=1, retain=True)
if self.cfg.ha_discovery.enabled:
await self._publish_ha_discovery()
# Subscribe commands per instance
for inst in self.cfg.instances:
await client.subscribe(
f"cuda_grid/cmd/{inst.name}/+/+", qos=1
)
# HA status — republish discovery если HA рестартанул
await client.subscribe("homeassistant/status", qos=0)
# Frigate topics для bridge
if self.frigate_bridge:
for t in self.frigate_bridge.topics_to_subscribe():
await client.subscribe(t, qos=0)
log.info("mqtt.frigate.subscribed", topic=t)
# Dynamic overlays MQTT data subscriptions
if self.dynamic_renderer:
for t in self.dynamic_renderer.topics_to_subscribe():
await client.subscribe(t, qos=0)
log.info("mqtt.dynamic.subscribed", topic=t)
async for msg in client.messages:
await self._handle_message(msg)
except aiomqtt.MqttError as e:
log.warning("mqtt.disconnected", error=str(e))
self._client = None
await asyncio.sleep(5)
async def _publish_ha_discovery(self) -> None:
assert self._client is not None
payloads = discovery_payloads(self.cfg.ha_discovery, self.cfg.instances)
for topic, payload in payloads:
await self._client.publish(topic, payload.encode(), qos=1, retain=True)
log.info("mqtt.ha_discovery.published", count=len(payloads))
async def _handle_message(self, msg: aiomqtt.Message) -> None:
topic = str(msg.topic)
try:
payload = msg.payload.decode() if isinstance(msg.payload, (bytes, bytearray)) else str(msg.payload)
except Exception:
payload = repr(msg.payload)
if topic == "homeassistant/status" and payload == "online":
log.info("mqtt.ha.restarted — republish discovery")
if self.cfg.ha_discovery.enabled:
await self._publish_ha_discovery()
return
# cuda_grid/cmd/<instance>/<scope>/<action>
parts = topic.split("/")
if len(parts) >= 5 and parts[0] == "cuda_grid" and parts[1] == "cmd":
instance, scope, action = parts[2], parts[3], parts[4]
kind = f"{scope}.{action}" # e.g. "layout.set", "overlay.add"
await self.command_handler(instance, kind, payload)
return
# Frigate bridge
if self.frigate_bridge and topic.startswith(self.frigate_bridge.cfg.base_topic + "/"):
await self.frigate_bridge.handle_message(topic, payload)
return
# Dynamic overlays — chart data + chat messages
if self.dynamic_renderer:
self.dynamic_renderer.handle_message(topic, payload)
return
log.warning("mqtt.unknown_topic", topic=topic, payload=payload)
async def publish_state(self, instance: str, scope: str, value: str, retain: bool = True) -> None:
"""Publish state — `cuda_grid/state/<instance>/<scope>`."""
if self._client is None:
return
await self._client.publish(
f"cuda_grid/state/{instance}/{scope}",
value.encode(),
qos=1,
retain=retain,
)
async def publish_event(self, instance: str, event_kind: str, data: dict) -> None:
"""Publish event (non-retained) — `cuda_grid/event/<instance>/<kind>`."""
if self._client is None:
return
await self._client.publish(
f"cuda_grid/event/{instance}/{event_kind}",
json.dumps(data).encode(),
qos=0,
retain=False,
)
async def stop(self) -> None:
self._stop.set()