"""MQTT subscriber + publisher loop. Topics: Subscribed: cuda_grid/cmd//layout/set — payload = layout name cuda_grid/cmd// — Phase 4+ homeassistant/status — HA online → republish discovery Published (per instance): cuda_grid/state//layout — текущий layout (retained) cuda_grid/event//layout_switched — {from, to, reason} Global: cuda_grid/state/online — LWT (online/offline) """ from __future__ import annotations import asyncio import json from typing import Awaitable, Callable import aiomqtt import structlog from .config import Config from .frigate_bridge import FrigateBridge from .ha_discovery import availability_topic, discovery_payloads from .state import ControllerState log = structlog.get_logger() CommandHandler = Callable[[str, str, str], Awaitable[None]] # args: (instance_name, command_kind, payload_str) class MqttLoop: def __init__( self, cfg: Config, state: ControllerState, command_handler: CommandHandler, frigate_bridge: FrigateBridge | None = None, ) -> None: self.cfg = cfg self.state = state self.command_handler = command_handler self.frigate_bridge = frigate_bridge self._client: aiomqtt.Client | None = None self._stop = asyncio.Event() async def run(self) -> None: """Main loop — connect + subscribe + dispatch. Re-connect при разрыве.""" avail = availability_topic() while not self._stop.is_set(): try: async with aiomqtt.Client( hostname=self.cfg.broker.host, port=self.cfg.broker.port, username=self.cfg.broker.username, password=self.cfg.broker.password, identifier=self.cfg.broker.client_id, keepalive=self.cfg.broker.keepalive_sec, will=aiomqtt.Will( topic=avail, payload=b"offline", qos=1, retain=True ), ) as client: self._client = client log.info("mqtt.connected", host=self.cfg.broker.host) # online + HA Discovery await client.publish(avail, b"online", qos=1, retain=True) if self.cfg.ha_discovery.enabled: await self._publish_ha_discovery() # Subscribe commands per instance for inst in self.cfg.instances: await client.subscribe( f"cuda_grid/cmd/{inst.name}/+/+", qos=1 ) # HA status — republish discovery если HA рестартанул await client.subscribe("homeassistant/status", qos=0) # Frigate topics для bridge if self.frigate_bridge: for t in self.frigate_bridge.topics_to_subscribe(): await client.subscribe(t, qos=0) log.info("mqtt.frigate.subscribed", topic=t) async for msg in client.messages: await self._handle_message(msg) except aiomqtt.MqttError as e: log.warning("mqtt.disconnected", error=str(e)) self._client = None await asyncio.sleep(5) async def _publish_ha_discovery(self) -> None: assert self._client is not None payloads = discovery_payloads(self.cfg.ha_discovery, self.cfg.instances) for topic, payload in payloads: await self._client.publish(topic, payload.encode(), qos=1, retain=True) log.info("mqtt.ha_discovery.published", count=len(payloads)) async def _handle_message(self, msg: aiomqtt.Message) -> None: topic = str(msg.topic) try: payload = msg.payload.decode() if isinstance(msg.payload, (bytes, bytearray)) else str(msg.payload) except Exception: payload = repr(msg.payload) if topic == "homeassistant/status" and payload == "online": log.info("mqtt.ha.restarted — republish discovery") if self.cfg.ha_discovery.enabled: await self._publish_ha_discovery() return # cuda_grid/cmd/// parts = topic.split("/") if len(parts) >= 5 and parts[0] == "cuda_grid" and parts[1] == "cmd": instance, scope, action = parts[2], parts[3], parts[4] kind = f"{scope}.{action}" # e.g. "layout.set", "overlay.add" await self.command_handler(instance, kind, payload) return # Frigate bridge if self.frigate_bridge and topic.startswith(self.frigate_bridge.cfg.base_topic + "/"): await self.frigate_bridge.handle_message(topic, payload) return log.warning("mqtt.unknown_topic", topic=topic, payload=payload) async def publish_state(self, instance: str, scope: str, value: str, retain: bool = True) -> None: """Publish state — `cuda_grid/state//`.""" if self._client is None: return await self._client.publish( f"cuda_grid/state/{instance}/{scope}", value.encode(), qos=1, retain=retain, ) async def publish_event(self, instance: str, event_kind: str, data: dict) -> None: """Publish event (non-retained) — `cuda_grid/event//`.""" if self._client is None: return await self._client.publish( f"cuda_grid/event/{instance}/{event_kind}", json.dumps(data).encode(), qos=0, retain=False, ) async def stop(self) -> None: self._stop.set()