"""Command dispatch — между MQTT/HTTP командами и ZMQ выходом. Action kinds: layout.set — set_layout overlay.add — add overlay (JSON payload) overlay.remove — remove overlay by id overlay.clear — remove all overlays """ from __future__ import annotations import json import structlog from .config import Config, InstanceCfg from .layouts import PREDEFINED_LAYOUTS from .overlays import Overlay from .state import ControllerState from .zmq_client import FFmpegZmqClient log = structlog.get_logger() def _hex_to_rgb(hex_color: str) -> tuple[int, int, int]: """'#FF8800' → (255, 136, 0). Fallback (255, 255, 255) при невалидном input.""" s = hex_color.lstrip("#") if len(s) != 6: return (255, 255, 255) try: return (int(s[0:2], 16), int(s[2:4], 16), int(s[4:6], 16)) except ValueError: return (255, 255, 255) def _serialize_overlay_to_zmq(overlay: Overlay) -> str: """Сериализовать overlay в строку для FFmpeg process_command. Wire format: ` =...` Filter parser (libavfilter/vf_cuda_grid.c parse_overlay_args) ожидает plain fields — поэтому translate: color "#RRGGBB" → r=N g=N b=N opacity 0..1.0 → opacity 0..255 border_only+width → thickness (0=filled, иначе border_width) dim_factor 0..1 → amount 0..255 String values URL-encoded (%20 для space) — filter inline decode'ит. """ from urllib.parse import quote r, g, b = 255, 255, 255 # default parts: list[str] = [overlay.id, overlay.type] # Common base fields if overlay.cell is not None: parts.append(f"cell={overlay.cell}") parts.append(f"z_order={overlay.z_order}") parts.append(f"opacity={int(round(overlay.opacity * 255))}") parts.append(f"visible={1 if overlay.visible else 0}") t = overlay.type if t == "rect": r, g, b = _hex_to_rgb(overlay.color) thickness = overlay.border_width if overlay.border_only else 0 parts += [ f"x={overlay.x}", f"y={overlay.y}", f"w={overlay.w}", f"h={overlay.h}", f"r={r}", f"g={g}", f"b={b}", f"thickness={thickness}", ] elif t == "text": r, g, b = _hex_to_rgb(overlay.color) parts += [ f"x={overlay.x}", f"y={overlay.y}", f"text={quote(overlay.text, safe='')}", f"font_size={overlay.font_size}", f"r={r}", f"g={g}", f"b={b}", ] elif t == "icon": parts += [ f"x={overlay.x}", f"y={overlay.y}", f"icon_name={quote(overlay.name, safe='')}", ] elif t == "dim": # filter: amount = насколько затемнить (0..255). Маппится из dim_factor. amount = int(round(overlay.dim_factor * 255)) parts += [ f"x={overlay.x}", f"y={overlay.y}", f"w={overlay.w}", f"h={overlay.h}", f"amount={amount}", ] elif t in ("image", "graph", "chat"): # Phase 5+ — filter пока не support'ит. Просто отправляем что есть, filter # skip-логирует (parse_overlay_args знает только rect/text/icon/dim). pass return " ".join(parts) class CommandDispatcher: def __init__(self, cfg: Config, state: ControllerState) -> None: self.cfg = cfg self.state = state self._zmq_clients: dict[str, FFmpegZmqClient] = {} self.on_state_change = None # type: ignore[var-annotated] self.on_event = None # type: ignore[var-annotated] def _client(self, inst: InstanceCfg) -> FFmpegZmqClient: c = self._zmq_clients.get(inst.name) if c is None: c = FFmpegZmqClient(inst.zmq_endpoint) self._zmq_clients[inst.name] = c return c def _audio_client(self, inst: InstanceCfg) -> FFmpegZmqClient: """ZMQ client к audio sidecar (Phase 5d). Fallback к video pipeline если split-process не configured.""" endpoint = inst.audio_zmq_endpoint or inst.zmq_endpoint key = f"{inst.name}:audio" c = self._zmq_clients.get(key) if c is None: c = FFmpegZmqClient(endpoint) self._zmq_clients[key] = c return c def _find_instance(self, name: str) -> InstanceCfg | None: return next((i for i in self.cfg.instances if i.name == name), None) async def handle(self, instance_name: str, kind: str, payload: str) -> None: inst = self._find_instance(instance_name) if inst is None: log.warning("dispatch.unknown_instance", instance=instance_name) return match kind: case "layout.set": await self._set_layout(inst, payload.strip()) case "overlay.add": await self._overlay_add(inst, payload) case "overlay.remove": await self._overlay_remove(inst, payload.strip()) case "overlay.clear": await self._overlay_clear(inst) case "audio.set": await self._audio_set(inst, payload.strip()) case "intercom.start": await self._intercom_set(inst, active=True) case "intercom.end": await self._intercom_set(inst, active=False) case _: log.warning( "dispatch.unknown_kind", instance=instance_name, kind=kind ) # ─── Audio ───────────────────────────────────────────────────── async def _audio_set(self, inst: InstanceCfg, source_name: str) -> None: """Switch audio к source_name через ZMQ команду astreamselect map . Phase 5d: команда идёт в audio sidecar (отдельный ffmpeg).""" if not inst.audio_sources: log.warning("dispatch.no_audio_sources", instance=inst.name) return src = next((s for s in inst.audio_sources if s.name == source_name), None) if src is None: log.warning("dispatch.unknown_audio_source", instance=inst.name, source=source_name, available=[s.name for s in inst.audio_sources]) return client = self._audio_client(inst) try: reply = await client.send_command( inst.audio_filter_target, "map", str(src.index) ) log.info("dispatch.audio_set", instance=inst.name, source=source_name, index=src.index, ffmpeg_reply=reply) except (TimeoutError, Exception) as e: log.error("dispatch.audio_zmq_fail", instance=inst.name, error=str(e)) return if self.on_state_change: await self.on_state_change(inst.name, "audio_source", source_name) if self.on_event: await self.on_event( inst.name, "audio_switched", {"to": source_name, "index": src.index} ) async def set_main_cam(self, instance: str, cam_index: int) -> None: """Switch single layout main camera через streamselect@main_cam map.""" inst = self._find_instance(instance) if inst is None: return client = self._client(inst) try: reply = await client.send_command( inst.main_cam_filter_target, "map", str(cam_index) ) log.info("dispatch.main_cam_set", instance=instance, cam_index=cam_index, ffmpeg_reply=reply) except (TimeoutError, Exception) as e: log.warning("dispatch.main_cam_fail", instance=instance, error=str(e)) async def set_mpp_main(self, instance: str, cam_index: int) -> None: """Switch mpp main camera через streamselect@mpp_main map.""" inst = self._find_instance(instance) if inst is None: return client = self._client(inst) try: reply = await client.send_command( inst.mpp_main_filter_target, "map", str(cam_index) ) log.info("dispatch.mpp_main_set", instance=instance, cam_index=cam_index, ffmpeg_reply=reply) except (TimeoutError, Exception) as e: log.warning("dispatch.mpp_main_fail", instance=instance, error=str(e)) async def _reload_icon(self, instance: str, icon_name: str) -> None: """Invalidate cached icon atlas во всех cuda_grid instances — каждый имеет свой кеш.""" inst = self._find_instance(instance) if inst is None: return await self._overlay_broadcast(inst, "reload_icon", icon_name) async def _intercom_set(self, inst: InstanceCfg, active: bool) -> None: """Ducking pattern: при intercom ON → music volume ↓ + intercom ↑. После end — restore. Phase 5d: команды идут в audio sidecar.""" client = self._audio_client(inst) music_vol = inst.music_ducked_volume if active else 1.0 intercom_vol = 1.0 if active else 0.0 try: r1 = await client.send_command(inst.music_volume_target, "volume", str(music_vol)) r2 = await client.send_command(inst.intercom_volume_target, "volume", str(intercom_vol)) log.info("dispatch.intercom", instance=inst.name, active=active, music_vol=music_vol, intercom_vol=intercom_vol, music_reply=r1, intercom_reply=r2) except (TimeoutError, Exception) as e: log.error("dispatch.intercom_zmq_fail", instance=inst.name, error=str(e)) return if self.on_event: await self.on_event( inst.name, "intercom", {"active": active, "music_vol": music_vol} ) # ─── Layout ──────────────────────────────────────────────────── async def _set_layout(self, inst: InstanceCfg, layout: str) -> None: if layout not in inst.layout_map: log.warning( "dispatch.unknown_layout", instance=inst.name, layout=layout, available=list(inst.layout_map.keys()), ) return old = await self.state.get_layout(inst.name) client = self._client(inst) try: reply = await client.send_command( inst.layout_filter_target, "map", str(inst.layout_map[layout]) ) log.info( "dispatch.layout_set", instance=inst.name, layout=layout, index=inst.layout_map[layout], ffmpeg_reply=reply, ) except (TimeoutError, Exception) as e: log.error("dispatch.zmq_fail", instance=inst.name, error=str(e)) return await self.state.set_layout(inst.name, layout) if self.on_state_change: await self.on_state_change(inst.name, "layout", layout) if self.on_event: await self.on_event( inst.name, "layout_switched", {"from": old, "to": layout, "reason": "mqtt"}, ) # ─── Overlays ────────────────────────────────────────────────── def _overlay_targets(self, inst: InstanceCfg) -> list[str]: """Если overlay_filter_targets настроен — broadcast'им ко всем (нужно когда несколько cuda_grid instances в pipeline, например layout switching). Иначе fallback к default filter_target.""" return inst.overlay_filter_targets or [inst.filter_target] async def _overlay_broadcast(self, inst: InstanceCfg, cmd: str, arg: str) -> str | None: """Send command к каждому overlay target. Return last reply (если успешный).""" client = self._client(inst) last_reply = None for target in self._overlay_targets(inst): try: last_reply = await client.send_command(target, cmd, arg) except (TimeoutError, Exception) as e: log.warning("dispatch.overlay_broadcast_fail", instance=inst.name, target=target, error=str(e)) return last_reply async def _overlay_add(self, inst: InstanceCfg, payload: str) -> None: """Payload = JSON совместимый с Overlay discriminated union.""" from pydantic import TypeAdapter try: overlay: Overlay = TypeAdapter(Overlay).validate_json(payload) except Exception as e: log.warning("dispatch.overlay_parse_fail", error=str(e), payload=payload[:200]) return # Broadcast к ВСЕМ cuda_grid instances (если несколько в pipeline для # layout switching) — каждый имеет свой overlay state. zmq_arg = _serialize_overlay_to_zmq(overlay) reply = await self._overlay_broadcast(inst, "add_overlay", zmq_arg) log.info("dispatch.overlay_add", instance=inst.name, id=overlay.id, type=overlay.type, ffmpeg_reply=reply) await self.state.add_overlay(inst.name, overlay) if self.on_state_change: count = len(await self.state.get_overlays(inst.name)) await self.on_state_change(inst.name, "overlays_count", str(count)) if self.on_event: await self.on_event( inst.name, "overlay_added", {"id": overlay.id, "type": overlay.type}, ) async def _overlay_remove(self, inst: InstanceCfg, overlay_id: str) -> None: existed = await self.state.remove_overlay(inst.name, overlay_id) if not existed: log.warning( "dispatch.overlay_unknown", instance=inst.name, id=overlay_id, ) return await self._overlay_broadcast(inst, "remove_overlay", overlay_id) log.info("dispatch.overlay_removed", instance=inst.name, id=overlay_id) if self.on_state_change: count = len(await self.state.get_overlays(inst.name)) await self.on_state_change(inst.name, "overlays_count", str(count)) if self.on_event: await self.on_event( inst.name, "overlay_removed", {"id": overlay_id} ) async def _overlay_clear(self, inst: InstanceCfg) -> None: n = await self.state.clear_overlays(inst.name) await self._overlay_broadcast(inst, "clear_overlays", "") log.info("dispatch.overlays_cleared", instance=inst.name, count=n) if self.on_state_change: await self.on_state_change(inst.name, "overlays_count", "0") if self.on_event: await self.on_event(inst.name, "overlays_cleared", {"count": n}) # ─── Cleanup ─────────────────────────────────────────────────── async def close(self) -> None: for c in self._zmq_clients.values(): await c.close()