Files
vf-cuda-grid/controller/cuda_grid_controller/dispatch.py
T
gx e2764160b6 controller: Phase 7 dispatch — set_layout + cell_map (вместо streamselect)
Sync с n7.1-vf-cuda-grid-phase7 filter rework (один cuda_grid + native
runtime layout switching). Изменения:

  dispatch._set_layout:
    target: layout_filter_target (default cuda_grid@cg, был streamselect@layout)
    command: "set_layout <name>"  (был "map <index>")
    validation: PREDEFINED_LAYOUTS (был inst.layout_map)

  dispatch.set_main_cam:
    target: main_cam_filter_target (default cuda_grid@cg, был streamselect@main_cam)
    command: "cell_map <cell> <pad>" × max_cells
    cell 0 = main camera; cells 1..N-1 = identity rotation остальных pads
    (исключая main чтобы не дублировать в preview cells)

  dispatch.set_mpp_main:
    alias к set_main_cam — main_plus_preview тоже использует cell 0 как main.

  config.InstanceCfg:
    layout_filter_target / main_cam_filter_target / mpp_main_filter_target
    default = cuda_grid@cg (Phase 7 single filter)
    new max_cells: int = 4 (соответствует filter max_cells option)
    layout_map deprecated — оставлен для UI subset visibility (HTTP /layouts/{inst})

  http_api.set_layout:
    validation против PREDEFINED_LAYOUTS вместо layout_map

Frigate bridge не меняется — set_main_cam подпись та же (instance, cam_index).

Compile-test OK. Pipeline image: gx/cuda-grid-pipeline:phase7. Controller
image rebuild требуется для deploy.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-21 20:11:04 +01:00

381 lines
16 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Command dispatch — между MQTT/HTTP командами и ZMQ выходом.
Action kinds:
layout.set — set_layout <name>
overlay.add — add overlay (JSON payload)
overlay.remove — remove overlay by id
overlay.clear — remove all overlays
"""
from __future__ import annotations
import json
import structlog
from .config import Config, InstanceCfg
from .layouts import PREDEFINED_LAYOUTS
from .overlays import Overlay
from .state import ControllerState
from .zmq_client import FFmpegZmqClient
log = structlog.get_logger()
def _hex_to_rgb(hex_color: str) -> tuple[int, int, int]:
"""'#FF8800' → (255, 136, 0). Fallback (255, 255, 255) при невалидном input."""
s = hex_color.lstrip("#")
if len(s) != 6:
return (255, 255, 255)
try:
return (int(s[0:2], 16), int(s[2:4], 16), int(s[4:6], 16))
except ValueError:
return (255, 255, 255)
def _serialize_overlay_to_zmq(overlay: Overlay) -> str:
"""Сериализовать overlay в строку для FFmpeg process_command.
Wire format: `<id> <type> <key>=<val>...`
Filter parser (libavfilter/vf_cuda_grid.c parse_overlay_args) ожидает
plain fields — поэтому translate:
color "#RRGGBB" → r=N g=N b=N
opacity 0..1.0 → opacity 0..255
border_only+width → thickness (0=filled, иначе border_width)
dim_factor 0..1 → amount 0..255
String values URL-encoded (%20 для space) — filter inline decode'ит.
"""
from urllib.parse import quote
r, g, b = 255, 255, 255 # default
parts: list[str] = [overlay.id, overlay.type]
# Common base fields
if overlay.cell is not None:
parts.append(f"cell={overlay.cell}")
parts.append(f"z_order={overlay.z_order}")
parts.append(f"opacity={int(round(overlay.opacity * 255))}")
parts.append(f"visible={1 if overlay.visible else 0}")
t = overlay.type
if t == "rect":
r, g, b = _hex_to_rgb(overlay.color)
thickness = overlay.border_width if overlay.border_only else 0
parts += [
f"x={overlay.x}", f"y={overlay.y}", f"w={overlay.w}", f"h={overlay.h}",
f"r={r}", f"g={g}", f"b={b}", f"thickness={thickness}",
]
elif t == "text":
r, g, b = _hex_to_rgb(overlay.color)
parts += [
f"x={overlay.x}", f"y={overlay.y}",
f"text={quote(overlay.text, safe='')}",
f"font_size={overlay.font_size}",
f"r={r}", f"g={g}", f"b={b}",
]
elif t == "icon":
parts += [
f"x={overlay.x}", f"y={overlay.y}",
f"icon_name={quote(overlay.name, safe='')}",
]
elif t == "dim":
# filter: amount = насколько затемнить (0..255). Маппится из dim_factor.
amount = int(round(overlay.dim_factor * 255))
parts += [
f"x={overlay.x}", f"y={overlay.y}", f"w={overlay.w}", f"h={overlay.h}",
f"amount={amount}",
]
elif t in ("image", "graph", "chat"):
# Phase 5+ — filter пока не support'ит. Просто отправляем что есть, filter
# skip-логирует (parse_overlay_args знает только rect/text/icon/dim).
pass
return " ".join(parts)
class CommandDispatcher:
def __init__(self, cfg: Config, state: ControllerState) -> None:
self.cfg = cfg
self.state = state
self._zmq_clients: dict[str, FFmpegZmqClient] = {}
self.on_state_change = None # type: ignore[var-annotated]
self.on_event = None # type: ignore[var-annotated]
def _client(self, inst: InstanceCfg) -> FFmpegZmqClient:
c = self._zmq_clients.get(inst.name)
if c is None:
c = FFmpegZmqClient(inst.zmq_endpoint)
self._zmq_clients[inst.name] = c
return c
def _audio_client(self, inst: InstanceCfg) -> FFmpegZmqClient:
"""ZMQ client к audio sidecar (Phase 5d). Fallback к video pipeline
если split-process не configured."""
endpoint = inst.audio_zmq_endpoint or inst.zmq_endpoint
key = f"{inst.name}:audio"
c = self._zmq_clients.get(key)
if c is None:
c = FFmpegZmqClient(endpoint)
self._zmq_clients[key] = c
return c
def _find_instance(self, name: str) -> InstanceCfg | None:
return next((i for i in self.cfg.instances if i.name == name), None)
async def handle(self, instance_name: str, kind: str, payload: str) -> None:
inst = self._find_instance(instance_name)
if inst is None:
log.warning("dispatch.unknown_instance", instance=instance_name)
return
match kind:
case "layout.set":
await self._set_layout(inst, payload.strip())
case "overlay.add":
await self._overlay_add(inst, payload)
case "overlay.remove":
await self._overlay_remove(inst, payload.strip())
case "overlay.clear":
await self._overlay_clear(inst)
case "audio.set":
await self._audio_set(inst, payload.strip())
case "intercom.start":
await self._intercom_set(inst, active=True)
case "intercom.end":
await self._intercom_set(inst, active=False)
case _:
log.warning(
"dispatch.unknown_kind", instance=instance_name, kind=kind
)
# ─── Audio ─────────────────────────────────────────────────────
async def _audio_set(self, inst: InstanceCfg, source_name: str) -> None:
"""Switch audio к source_name через ZMQ команду astreamselect map <index>.
Phase 5d: команда идёт в audio sidecar (отдельный ffmpeg)."""
if not inst.audio_sources:
log.warning("dispatch.no_audio_sources", instance=inst.name)
return
src = next((s for s in inst.audio_sources if s.name == source_name), None)
if src is None:
log.warning("dispatch.unknown_audio_source",
instance=inst.name, source=source_name,
available=[s.name for s in inst.audio_sources])
return
client = self._audio_client(inst)
try:
reply = await client.send_command(
inst.audio_filter_target, "map", str(src.index)
)
log.info("dispatch.audio_set", instance=inst.name,
source=source_name, index=src.index, ffmpeg_reply=reply)
except (TimeoutError, Exception) as e:
log.error("dispatch.audio_zmq_fail", instance=inst.name, error=str(e))
return
if self.on_state_change:
await self.on_state_change(inst.name, "audio_source", source_name)
if self.on_event:
await self.on_event(
inst.name, "audio_switched", {"to": source_name, "index": src.index}
)
async def set_main_cam(self, instance: str, cam_index: int) -> None:
"""Phase 7: main camera = cell 0. Sends cuda_grid `cell_map <cell> <pad>`
для всех cells: cell 0 = cam_index, остальные = identity rotation
(исключая cam_index чтобы не дублировать в preview cells).
Применимо к любому layout — non-visible cells просто игнорируются
при composition (active layout's nb_cells определяет сколько cells
фактически отрисовываются)."""
inst = self._find_instance(instance)
if inst is None:
return
if not (0 <= cam_index < inst.max_cells):
log.warning("dispatch.main_cam_out_of_range",
instance=instance, cam_index=cam_index, max_cells=inst.max_cells)
return
# cell 0 = main; cells 1..max-1 = другие pads без main (стабильный порядок)
other_pads = [i for i in range(inst.max_cells) if i != cam_index]
cell_assignment = [cam_index] + other_pads
client = self._client(inst)
try:
for cell_idx, pad_idx in enumerate(cell_assignment):
await client.send_command(
inst.main_cam_filter_target, "cell_map", f"{cell_idx} {pad_idx}"
)
log.info("dispatch.main_cam_set", instance=instance,
cam_index=cam_index, assignment=cell_assignment)
except (TimeoutError, Exception) as e:
log.warning("dispatch.main_cam_fail", instance=instance, error=str(e))
async def set_mpp_main(self, instance: str, cam_index: int) -> None:
"""Phase 7: alias к set_main_cam — main_plus_preview layout также
использует cell 0 как main slot. Семантика идентична: cell_map 0 = cam_index."""
await self.set_main_cam(instance, cam_index)
async def _reload_icon(self, instance: str, icon_name: str) -> None:
"""Invalidate cached icon atlas во всех cuda_grid instances — каждый
имеет свой кеш."""
inst = self._find_instance(instance)
if inst is None:
return
await self._overlay_broadcast(inst, "reload_icon", icon_name)
async def _intercom_set(self, inst: InstanceCfg, active: bool) -> None:
"""Ducking pattern: при intercom ON → music volume ↓ + intercom ↑.
После end — restore. Phase 5d: команды идут в audio sidecar."""
client = self._audio_client(inst)
music_vol = inst.music_ducked_volume if active else 1.0
intercom_vol = 1.0 if active else 0.0
try:
r1 = await client.send_command(inst.music_volume_target, "volume", str(music_vol))
r2 = await client.send_command(inst.intercom_volume_target, "volume", str(intercom_vol))
log.info("dispatch.intercom",
instance=inst.name, active=active,
music_vol=music_vol, intercom_vol=intercom_vol,
music_reply=r1, intercom_reply=r2)
except (TimeoutError, Exception) as e:
log.error("dispatch.intercom_zmq_fail", instance=inst.name, error=str(e))
return
if self.on_event:
await self.on_event(
inst.name, "intercom", {"active": active, "music_vol": music_vol}
)
# ─── Layout ────────────────────────────────────────────────────
async def _set_layout(self, inst: InstanceCfg, layout: str) -> None:
# Phase 7: validation против built-in layouts (mirror of C filter table).
# `inst.layout_map` keys могут быть subset (УI-доступные); фактически filter
# знает весь PREDEFINED_LAYOUTS и сам ответит ошибкой если name невалиден.
if layout not in PREDEFINED_LAYOUTS:
log.warning(
"dispatch.unknown_layout",
instance=inst.name,
layout=layout,
available=PREDEFINED_LAYOUTS,
)
return
old = await self.state.get_layout(inst.name)
client = self._client(inst)
try:
# Phase 7: cuda_grid имеет native `set_layout <name>` command.
# Layout resolve'ится по имени из встроенного PREDEFINED_LAYOUTS таблицы.
reply = await client.send_command(
inst.layout_filter_target, "set_layout", layout
)
log.info(
"dispatch.layout_set",
instance=inst.name,
layout=layout,
ffmpeg_reply=reply,
)
except (TimeoutError, Exception) as e:
log.error("dispatch.zmq_fail", instance=inst.name, error=str(e))
return
await self.state.set_layout(inst.name, layout)
if self.on_state_change:
await self.on_state_change(inst.name, "layout", layout)
if self.on_event:
await self.on_event(
inst.name,
"layout_switched",
{"from": old, "to": layout, "reason": "mqtt"},
)
# ─── Overlays ──────────────────────────────────────────────────
def _overlay_targets(self, inst: InstanceCfg) -> list[str]:
"""Если overlay_filter_targets настроен — broadcast'им ко всем
(нужно когда несколько cuda_grid instances в pipeline, например
layout switching). Иначе fallback к default filter_target."""
return inst.overlay_filter_targets or [inst.filter_target]
async def _overlay_broadcast(self, inst: InstanceCfg, cmd: str, arg: str) -> str | None:
"""Send command к каждому overlay target. Return last reply (если успешный)."""
client = self._client(inst)
last_reply = None
for target in self._overlay_targets(inst):
try:
last_reply = await client.send_command(target, cmd, arg)
except (TimeoutError, Exception) as e:
log.warning("dispatch.overlay_broadcast_fail",
instance=inst.name, target=target, error=str(e))
return last_reply
async def _overlay_add(self, inst: InstanceCfg, payload: str) -> None:
"""Payload = JSON совместимый с Overlay discriminated union."""
from pydantic import TypeAdapter
try:
overlay: Overlay = TypeAdapter(Overlay).validate_json(payload)
except Exception as e:
log.warning("dispatch.overlay_parse_fail", error=str(e), payload=payload[:200])
return
# Broadcast к ВСЕМ cuda_grid instances (если несколько в pipeline для
# layout switching) — каждый имеет свой overlay state.
zmq_arg = _serialize_overlay_to_zmq(overlay)
reply = await self._overlay_broadcast(inst, "add_overlay", zmq_arg)
log.info("dispatch.overlay_add", instance=inst.name,
id=overlay.id, type=overlay.type, ffmpeg_reply=reply)
await self.state.add_overlay(inst.name, overlay)
if self.on_state_change:
count = len(await self.state.get_overlays(inst.name))
await self.on_state_change(inst.name, "overlays_count", str(count))
if self.on_event:
await self.on_event(
inst.name,
"overlay_added",
{"id": overlay.id, "type": overlay.type},
)
async def _overlay_remove(self, inst: InstanceCfg, overlay_id: str) -> None:
existed = await self.state.remove_overlay(inst.name, overlay_id)
if not existed:
log.warning(
"dispatch.overlay_unknown",
instance=inst.name,
id=overlay_id,
)
return
await self._overlay_broadcast(inst, "remove_overlay", overlay_id)
log.info("dispatch.overlay_removed", instance=inst.name, id=overlay_id)
if self.on_state_change:
count = len(await self.state.get_overlays(inst.name))
await self.on_state_change(inst.name, "overlays_count", str(count))
if self.on_event:
await self.on_event(
inst.name, "overlay_removed", {"id": overlay_id}
)
async def _overlay_clear(self, inst: InstanceCfg) -> None:
n = await self.state.clear_overlays(inst.name)
await self._overlay_broadcast(inst, "clear_overlays", "")
log.info("dispatch.overlays_cleared", instance=inst.name, count=n)
if self.on_state_change:
await self.on_state_change(inst.name, "overlays_count", "0")
if self.on_event:
await self.on_event(inst.name, "overlays_cleared", {"count": n})
# ─── Cleanup ───────────────────────────────────────────────────
async def close(self) -> None:
for c in self._zmq_clients.values():
await c.close()