"""Command dispatch — между MQTT/HTTP командами и ZMQ выходом. Action kinds: layout.set — set_layout overlay.add — add overlay (JSON payload) overlay.remove — remove overlay by id overlay.clear — remove all overlays """ from __future__ import annotations import json import structlog from .config import Config, InstanceCfg from .layouts import PREDEFINED_LAYOUTS from .overlays import Overlay from .state import ControllerState from .zmq_client import FFmpegZmqClient log = structlog.get_logger() def _serialize_overlay_to_zmq(overlay: Overlay) -> str: """Сериализовать overlay в одну строку для FFmpeg process_command. Формат: ` = = ...` String values URL-encoded (spaces → %20), filter-side decode'ит inline в parse_overlay_args. Nested values (style и т.п.) skip'аются — Phase 4b их не поддерживает. """ from urllib.parse import quote parts = [overlay.id, overlay.type] data = overlay.model_dump() for key, value in data.items(): if key in {"id", "type"}: continue if value is None: continue if isinstance(value, (dict, list)): continue # Phase 4b skipped — Phase 5 для nested style if isinstance(value, bool): value = 1 if value else 0 if isinstance(value, str): value = quote(value, safe="") # encode spaces + всё кроме alnum parts.append(f"{key}={value}") return " ".join(parts) class CommandDispatcher: def __init__(self, cfg: Config, state: ControllerState) -> None: self.cfg = cfg self.state = state self._zmq_clients: dict[str, FFmpegZmqClient] = {} self.on_state_change = None # type: ignore[var-annotated] self.on_event = None # type: ignore[var-annotated] def _client(self, inst: InstanceCfg) -> FFmpegZmqClient: c = self._zmq_clients.get(inst.name) if c is None: c = FFmpegZmqClient(inst.zmq_endpoint) self._zmq_clients[inst.name] = c return c def _find_instance(self, name: str) -> InstanceCfg | None: return next((i for i in self.cfg.instances if i.name == name), None) async def handle(self, instance_name: str, kind: str, payload: str) -> None: inst = self._find_instance(instance_name) if inst is None: log.warning("dispatch.unknown_instance", instance=instance_name) return match kind: case "layout.set": await self._set_layout(inst, payload.strip()) case "overlay.add": await self._overlay_add(inst, payload) case "overlay.remove": await self._overlay_remove(inst, payload.strip()) case "overlay.clear": await self._overlay_clear(inst) case _: log.warning( "dispatch.unknown_kind", instance=instance_name, kind=kind ) # ─── Layout ──────────────────────────────────────────────────── async def _set_layout(self, inst: InstanceCfg, layout: str) -> None: if layout not in PREDEFINED_LAYOUTS: log.warning( "dispatch.unknown_layout", instance=inst.name, layout=layout, available=PREDEFINED_LAYOUTS, ) return old = await self.state.get_layout(inst.name) client = self._client(inst) try: reply = await client.send_command( inst.filter_target, "layout", layout ) log.info( "dispatch.layout_set", instance=inst.name, layout=layout, ffmpeg_reply=reply, ) except (TimeoutError, Exception) as e: log.error("dispatch.zmq_fail", instance=inst.name, error=str(e)) return await self.state.set_layout(inst.name, layout) if self.on_state_change: await self.on_state_change(inst.name, "layout", layout) if self.on_event: await self.on_event( inst.name, "layout_switched", {"from": old, "to": layout, "reason": "mqtt"}, ) # ─── Overlays ────────────────────────────────────────────────── async def _overlay_add(self, inst: InstanceCfg, payload: str) -> None: """Payload = JSON совместимый с Overlay discriminated union.""" from pydantic import TypeAdapter try: overlay: Overlay = TypeAdapter(Overlay).validate_json(payload) except Exception as e: log.warning("dispatch.overlay_parse_fail", error=str(e), payload=payload[:200]) return # ZMQ send → filter (Phase 4b will actually render) client = self._client(inst) try: zmq_arg = _serialize_overlay_to_zmq(overlay) reply = await client.send_command( inst.filter_target, "add_overlay", zmq_arg ) log.info( "dispatch.overlay_add", instance=inst.name, id=overlay.id, type=overlay.type, ffmpeg_reply=reply, ) except (TimeoutError, Exception) as e: # Filter side might not support yet — log warn but persist # в state (controller behaves correctly даже если filter ignore'ит). log.warning( "dispatch.overlay_zmq_fail", instance=inst.name, id=overlay.id, error=str(e), ) await self.state.add_overlay(inst.name, overlay) if self.on_state_change: count = len(await self.state.get_overlays(inst.name)) await self.on_state_change(inst.name, "overlays_count", str(count)) if self.on_event: await self.on_event( inst.name, "overlay_added", {"id": overlay.id, "type": overlay.type}, ) async def _overlay_remove(self, inst: InstanceCfg, overlay_id: str) -> None: existed = await self.state.remove_overlay(inst.name, overlay_id) if not existed: log.warning( "dispatch.overlay_unknown", instance=inst.name, id=overlay_id, ) return client = self._client(inst) try: await client.send_command( inst.filter_target, "remove_overlay", overlay_id ) except (TimeoutError, Exception) as e: log.warning("dispatch.overlay_remove_zmq_fail", error=str(e)) log.info("dispatch.overlay_removed", instance=inst.name, id=overlay_id) if self.on_state_change: count = len(await self.state.get_overlays(inst.name)) await self.on_state_change(inst.name, "overlays_count", str(count)) if self.on_event: await self.on_event( inst.name, "overlay_removed", {"id": overlay_id} ) async def _overlay_clear(self, inst: InstanceCfg) -> None: n = await self.state.clear_overlays(inst.name) client = self._client(inst) try: await client.send_command(inst.filter_target, "clear_overlays", "") except (TimeoutError, Exception) as e: log.warning("dispatch.overlay_clear_zmq_fail", error=str(e)) log.info("dispatch.overlays_cleared", instance=inst.name, count=n) if self.on_state_change: await self.on_state_change(inst.name, "overlays_count", "0") if self.on_event: await self.on_event(inst.name, "overlays_cleared", {"count": n}) # ─── Cleanup ─────────────────────────────────────────────────── async def close(self) -> None: for c in self._zmq_clients.values(): await c.close()