diff --git a/controller/Dockerfile b/controller/Dockerfile new file mode 100644 index 0000000..a12afdf --- /dev/null +++ b/controller/Dockerfile @@ -0,0 +1,33 @@ +# cuda-grid-controller — Python sidecar для vf_cuda_grid filter. + +FROM python:3.11-slim AS base + +ENV PYTHONDONTWRITEBYTECODE=1 \ + PYTHONUNBUFFERED=1 \ + PIP_NO_CACHE_DIR=1 + +WORKDIR /app + +# Install deps первым layer — пересборка только при изменении pyproject.toml +COPY pyproject.toml ./ +RUN pip install --no-cache-dir \ + fastapi \ + "uvicorn[standard]" \ + pydantic \ + pydantic-settings \ + aiomqtt \ + pyzmq \ + pyyaml \ + structlog \ + typer \ + sse-starlette + +# Source code +COPY cuda_grid_controller ./cuda_grid_controller +COPY examples ./examples + +RUN pip install --no-cache-dir -e . + +EXPOSE 8080 +ENTRYPOINT ["cuda-grid-controller"] +CMD ["--config", "/app/controller.yaml"] diff --git a/controller/README.md b/controller/README.md new file mode 100644 index 0000000..690b658 --- /dev/null +++ b/controller/README.md @@ -0,0 +1,60 @@ +# cuda-grid-controller + +Control-plane sidecar для [`vf_cuda_grid`](../) FFmpeg filter. Transforms +MQTT / HTTP REST commands от Home Assistant / Node-RED / custom apps +в FFmpeg `process_command` через ZeroMQ. Publishes state + events наружу +для bidirectional integration. + +**Статус:** Phase 3 work-in-progress — basic skeleton + HA Discovery + ZMQ +command bridge. Phase 4+ добавит overlays, audio orchestration. + +## Что умеет (Phase 3) + +- ✅ MQTT subscribe `cuda_grid/cmd//layout/set` → ZMQ send → FFmpeg layout switch +- ✅ MQTT publish `cuda_grid/state//layout` (retained) + `cuda_grid/event/.../layout_switched` +- ✅ HA MQTT Discovery — `select.layout` + `sensor.current_layout` + `binary_sensor.online` per instance +- ✅ HTTP REST API: `/health`, `/layouts`, `/state`, `POST /layout/{instance}/set` +- ✅ Multi-instance (несколько FFmpeg pipelines с разными ZMQ endpoints) +- ✅ Auto-reconnect MQTT, LWT для availability tracking + +## Использование + +```bash +pip install -e . + +export MQTT_USERNAME=mqtt +export MQTT_PASSWORD=secret +cuda-grid-controller --config examples/controller.yaml +``` + +## FFmpeg side — как подключить cuda_grid + zmq filter + +```bash +ffmpeg -i cam1.mp4 -i cam2.mp4 -i cam3.mp4 -i cam4.mp4 \ + -filter_complex " + [0:v][1:v][2:v][3:v]cuda_grid=layout=quad, + zmq=bind_address=tcp\\\\://127.0.0.1\\\\:5555 + " \ + -c:v h264_nvenc out.mp4 +``` + +(После этого controller с `zmq_endpoint: tcp://127.0.0.1:5555` сможет +переключать layout командой `cuda_grid@... layout six_grid`.) + +## HA dashboard + +После startup controller'а — в Home Assistant появятся entities: +- `select.cuda_grid__layout` — dropdown с layouts +- `sensor.cuda_grid__current_layout` — текущий выбранный +- `binary_sensor.cuda_grid_controller_online` — online/offline + +## Roadmap + +| Phase | Что | +|---|---| +| 3 (this) | Basic skeleton + HA Discovery + ZMQ bridge для layout switching | +| 4 | Overlay API (rect/text/icon через side data) | +| 5 | Rich overlays (image/dim/graph/chat) + privacy filtering | +| 6 | Audio orchestration (state machine, domofon use case) | + +Полный design: [`docs/design.md`](../docs/design.md). diff --git a/controller/cuda_grid_controller/__init__.py b/controller/cuda_grid_controller/__init__.py new file mode 100644 index 0000000..e3b1781 --- /dev/null +++ b/controller/cuda_grid_controller/__init__.py @@ -0,0 +1,3 @@ +"""cuda-grid-controller — sidecar для vf_cuda_grid FFmpeg filter.""" + +__version__ = "0.1.0" diff --git a/controller/cuda_grid_controller/__main__.py b/controller/cuda_grid_controller/__main__.py new file mode 100644 index 0000000..1bfa919 --- /dev/null +++ b/controller/cuda_grid_controller/__main__.py @@ -0,0 +1,104 @@ +"""Entry point: `cuda-grid-controller --config controller.yaml`.""" + +from __future__ import annotations + +import asyncio +import logging +import sys +from pathlib import Path + +import structlog +import typer +import uvicorn + +from .config import Config +from .dispatch import CommandDispatcher +from .http_api import create_app +from .mqtt_loop import MqttLoop +from .state import ControllerState + +cli = typer.Typer(add_completion=False) + + +def _configure_logging(level: str) -> None: + logging.basicConfig( + format="%(message)s", + level=getattr(logging, level.upper(), logging.INFO), + ) + structlog.configure( + processors=[ + structlog.processors.add_log_level, + structlog.processors.TimeStamper(fmt="iso"), + structlog.dev.ConsoleRenderer(), + ] + ) + + +async def _run(cfg: Config) -> None: + state = ControllerState() + # Init active_layout = default_layout per instance + for inst in cfg.instances: + await state.set_layout(inst.name, inst.default_layout) + + dispatcher = CommandDispatcher(cfg, state) + mqtt = MqttLoop(cfg, state, dispatcher.handle) + + # Wire dispatcher events → MQTT publishes + dispatcher.on_state_change = mqtt.publish_state + dispatcher.on_event = mqtt.publish_event + + # HTTP REST + app = create_app(cfg, state, dispatcher) + server = uvicorn.Server( + uvicorn.Config( + app, + host=cfg.http.host, + port=cfg.http.port, + log_level=cfg.log.level.lower(), + ) + ) + + log = structlog.get_logger() + log.info( + "controller.starting", + instances=[i.name for i in cfg.instances], + mqtt=f"{cfg.broker.host}:{cfg.broker.port}", + http=f"{cfg.http.host}:{cfg.http.port}", + ) + + try: + await asyncio.gather( + mqtt.run(), + server.serve(), + ) + except asyncio.CancelledError: + log.info("controller.shutdown") + finally: + await dispatcher.close() + await mqtt.stop() + + +@cli.command() +def run( + config: Path = typer.Option( + Path("controller.yaml"), + "--config", + "-c", + help="YAML config path", + ), +) -> None: + """Запустить controller.""" + if not config.exists(): + typer.echo(f"config not found: {config}", err=True) + raise typer.Exit(1) + cfg = Config.from_yaml(config) + _configure_logging(cfg.log.level) + asyncio.run(_run(cfg)) + + +def main() -> None: + cli() + + +if __name__ == "__main__": + main() diff --git a/controller/cuda_grid_controller/config.py b/controller/cuda_grid_controller/config.py new file mode 100644 index 0000000..48781ad --- /dev/null +++ b/controller/cuda_grid_controller/config.py @@ -0,0 +1,103 @@ +"""Конфигурация — pydantic models + YAML loader. + +Структура YAML: + broker: + host: localhost + port: 1883 + username_env: MQTT_USERNAME + password_env: MQTT_PASSWORD + + instances: + - name: livingroom_tv + zmq_endpoint: tcp://127.0.0.1:5555 + default_layout: quad + + ha_discovery: + enabled: true + prefix: homeassistant + device_name: "CUDA Grid Composer" + + http: + host: 0.0.0.0 + port: 8080 + + log: + level: INFO +""" + +from __future__ import annotations + +import os +from pathlib import Path +from typing import Self + +import yaml +from pydantic import BaseModel, Field, field_validator + + +class BrokerCfg(BaseModel): + host: str = "localhost" + port: int = 1883 + client_id: str = "cuda-grid-controller" + username_env: str | None = None + password_env: str | None = None + keepalive_sec: int = 30 + + @property + def username(self) -> str | None: + return os.environ.get(self.username_env) if self.username_env else None + + @property + def password(self) -> str | None: + return os.environ.get(self.password_env) if self.password_env else None + + +class InstanceCfg(BaseModel): + """Один FFmpeg pipeline = одна cuda_grid filter instance.""" + + name: str = Field(description="уникальное имя — становится частью HA entity ID") + zmq_endpoint: str = Field( + description="ZMQ endpoint FFmpeg's zmq filter (tcp://host:port)" + ) + default_layout: str = "quad" + filter_target: str = Field( + default="Parsed_cuda_grid_0", + description="Filter target name в FFmpeg filter graph (для process_command)", + ) + + @field_validator("name") + @classmethod + def name_alnum(cls, v: str) -> str: + if not v.replace("_", "").isalnum(): + raise ValueError(f"instance name '{v}' must be alphanumeric + underscore") + return v + + +class HaDiscoveryCfg(BaseModel): + enabled: bool = True + prefix: str = "homeassistant" + device_name: str = "CUDA Grid Composer" + device_identifier: str = "cuda_grid_controller" + + +class HttpCfg(BaseModel): + host: str = "0.0.0.0" + port: int = 8080 + + +class LogCfg(BaseModel): + level: str = "INFO" + + +class Config(BaseModel): + broker: BrokerCfg = BrokerCfg() + instances: list[InstanceCfg] = [] + ha_discovery: HaDiscoveryCfg = HaDiscoveryCfg() + http: HttpCfg = HttpCfg() + log: LogCfg = LogCfg() + + @classmethod + def from_yaml(cls, path: Path | str) -> Self: + with open(path) as f: + data = yaml.safe_load(f) or {} + return cls.model_validate(data) diff --git a/controller/cuda_grid_controller/dispatch.py b/controller/cuda_grid_controller/dispatch.py new file mode 100644 index 0000000..893d8f0 --- /dev/null +++ b/controller/cuda_grid_controller/dispatch.py @@ -0,0 +1,89 @@ +"""Command dispatch — между MQTT/HTTP командами и ZMQ выходом. + +Action kinds: + layout.set — set_layout + (future Phase 4+: auto_mode.set, focus_camera.set, overlay.add, ...) +""" + +from __future__ import annotations + +import structlog + +from .config import Config, InstanceCfg +from .layouts import PREDEFINED_LAYOUTS +from .state import ControllerState +from .zmq_client import FFmpegZmqClient + +log = structlog.get_logger() + + +class CommandDispatcher: + def __init__(self, cfg: Config, state: ControllerState) -> None: + self.cfg = cfg + self.state = state + self._zmq_clients: dict[str, FFmpegZmqClient] = {} + # publish callback устанавливается из вне (MqttLoop) + self.on_state_change = None # type: ignore[var-annotated] + self.on_event = None # type: ignore[var-annotated] + + def _client(self, inst: InstanceCfg) -> FFmpegZmqClient: + c = self._zmq_clients.get(inst.name) + if c is None: + c = FFmpegZmqClient(inst.zmq_endpoint) + self._zmq_clients[inst.name] = c + return c + + def _find_instance(self, name: str) -> InstanceCfg | None: + return next((i for i in self.cfg.instances if i.name == name), None) + + async def handle(self, instance_name: str, kind: str, payload: str) -> None: + inst = self._find_instance(instance_name) + if inst is None: + log.warning("dispatch.unknown_instance", instance=instance_name) + return + + if kind == "layout.set": + await self._set_layout(inst, payload.strip()) + else: + log.warning("dispatch.unknown_kind", instance=instance_name, kind=kind) + + async def _set_layout(self, inst: InstanceCfg, layout: str) -> None: + if layout not in PREDEFINED_LAYOUTS: + log.warning( + "dispatch.unknown_layout", + instance=inst.name, + layout=layout, + available=PREDEFINED_LAYOUTS, + ) + return + + old = await self.state.get_layout(inst.name) + client = self._client(inst) + try: + reply = await client.send_command( + inst.filter_target, "layout", layout + ) + log.info( + "dispatch.layout_set", + instance=inst.name, + layout=layout, + ffmpeg_reply=reply, + ) + except (TimeoutError, Exception) as e: + log.error("dispatch.zmq_fail", instance=inst.name, error=str(e)) + return + + await self.state.set_layout(inst.name, layout) + + if self.on_state_change: + await self.on_state_change(inst.name, "layout", layout) + if self.on_event: + await self.on_event( + inst.name, + "layout_switched", + {"from": old, "to": layout, "reason": "mqtt"}, + ) + + async def close(self) -> None: + for c in self._zmq_clients.values(): + await c.close() diff --git a/controller/cuda_grid_controller/ha_discovery.py b/controller/cuda_grid_controller/ha_discovery.py new file mode 100644 index 0000000..7c6ed2c --- /dev/null +++ b/controller/cuda_grid_controller/ha_discovery.py @@ -0,0 +1,84 @@ +"""Home Assistant MQTT Discovery payloads. + +Создаём per-instance entities: + select.cuda_grid__layout — выбор активного layout + sensor.cuda_grid__state — текущий layout (для UI) + binary_sensor.cuda_grid_controller_online — availability + +См. https://www.home-assistant.io/integrations/mqtt/#mqtt-discovery +""" + +from __future__ import annotations + +import json +from typing import Any + +from .config import HaDiscoveryCfg, InstanceCfg +from .layouts import PREDEFINED_LAYOUTS + + +def _device_dict(ha: HaDiscoveryCfg) -> dict[str, Any]: + return { + "identifiers": [ha.device_identifier], + "name": ha.device_name, + "manufacturer": "gx/vf-cuda-grid", + "model": "cuda-grid-controller", + "sw_version": "0.1.0", + } + + +def availability_topic(prefix_base: str = "cuda_grid") -> str: + """Где controller публикует online/offline (LWT).""" + return f"{prefix_base}/state/online" + + +def discovery_payloads(ha: HaDiscoveryCfg, instances: list[InstanceCfg]) -> list[tuple[str, str]]: + """Список (discovery_topic, payload_json) для publish при startup.""" + out: list[tuple[str, str]] = [] + avail = availability_topic() + + # Per-instance entities + for inst in instances: + # select.layout + select_topic = f"{ha.prefix}/select/cuda_grid_{inst.name}/layout/config" + select_payload = { + "name": f"Layout ({inst.name})", + "unique_id": f"cuda_grid_{inst.name}_layout_select", + "command_topic": f"cuda_grid/cmd/{inst.name}/layout/set", + "state_topic": f"cuda_grid/state/{inst.name}/layout", + "options": PREDEFINED_LAYOUTS, + "availability_topic": avail, + "payload_available": "online", + "payload_not_available": "offline", + "device": _device_dict(ha), + } + out.append((select_topic, json.dumps(select_payload))) + + # sensor.current_layout (text — duplicate state, удобно для automations) + sensor_topic = f"{ha.prefix}/sensor/cuda_grid_{inst.name}/current_layout/config" + sensor_payload = { + "name": f"Current layout ({inst.name})", + "unique_id": f"cuda_grid_{inst.name}_layout_sensor", + "state_topic": f"cuda_grid/state/{inst.name}/layout", + "availability_topic": avail, + "payload_available": "online", + "payload_not_available": "offline", + "device": _device_dict(ha), + "icon": "mdi:view-grid", + } + out.append((sensor_topic, json.dumps(sensor_payload))) + + # Глобальный availability binary_sensor + online_topic = f"{ha.prefix}/binary_sensor/cuda_grid_controller/online/config" + online_payload = { + "name": "CUDA Grid Controller online", + "unique_id": "cuda_grid_controller_online", + "state_topic": avail, + "payload_on": "online", + "payload_off": "offline", + "device_class": "connectivity", + "device": _device_dict(ha), + } + out.append((online_topic, json.dumps(online_payload))) + + return out diff --git a/controller/cuda_grid_controller/http_api.py b/controller/cuda_grid_controller/http_api.py new file mode 100644 index 0000000..c5b0c09 --- /dev/null +++ b/controller/cuda_grid_controller/http_api.py @@ -0,0 +1,62 @@ +"""HTTP REST API (FastAPI).""" + +from __future__ import annotations + +from typing import Any + +import structlog +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel + +from .config import Config +from .dispatch import CommandDispatcher +from .layouts import PREDEFINED_LAYOUTS +from .state import ControllerState + +log = structlog.get_logger() + + +class LayoutSetReq(BaseModel): + layout: str + + +def create_app( + cfg: Config, state: ControllerState, dispatcher: CommandDispatcher +) -> FastAPI: + app = FastAPI( + title="cuda-grid-controller", + version="0.1.0", + description="Control plane для vf_cuda_grid FFmpeg filter", + ) + + @app.get("/health") + async def health() -> dict[str, Any]: + return {"status": "ok"} + + @app.get("/layouts") + async def layouts() -> dict[str, Any]: + return {"predefined": PREDEFINED_LAYOUTS} + + @app.get("/state") + async def get_state() -> dict[str, Any]: + out = {} + for inst in cfg.instances: + out[inst.name] = { + "active_layout": await state.get_layout(inst.name), + "zmq_endpoint": inst.zmq_endpoint, + } + return {"instances": out} + + @app.post("/layout/{instance}/set") + async def set_layout(instance: str, req: LayoutSetReq) -> dict[str, Any]: + inst = next((i for i in cfg.instances if i.name == instance), None) + if inst is None: + raise HTTPException(404, f"unknown instance '{instance}'") + if req.layout not in PREDEFINED_LAYOUTS: + raise HTTPException( + 400, f"unknown layout '{req.layout}'. Доступны: {PREDEFINED_LAYOUTS}" + ) + await dispatcher.handle(instance, "layout.set", req.layout) + return {"ok": True, "instance": instance, "layout": req.layout} + + return app diff --git a/controller/cuda_grid_controller/layouts.py b/controller/cuda_grid_controller/layouts.py new file mode 100644 index 0000000..7d2d52a --- /dev/null +++ b/controller/cuda_grid_controller/layouts.py @@ -0,0 +1,20 @@ +"""Известные layouts — сейчас захардкожены в C filter (vf_cuda_grid.c). + +Должно быть в sync с тем что в FFmpeg patch. Phase 4 будет dynamic layout +registry (created через runtime API). +""" + +from __future__ import annotations + +# Список синхронизирован с layouts[] в libavfilter/vf_cuda_grid.c (Phase 2). +PREDEFINED_LAYOUTS: list[str] = [ + "single", + "dual_horizontal", + "dual_vertical", + "quad", + "main_plus_preview", + "six_grid", + "nine_grid", + "sixteen_grid", + "panoramic", +] diff --git a/controller/cuda_grid_controller/mqtt_loop.py b/controller/cuda_grid_controller/mqtt_loop.py new file mode 100644 index 0000000..59cce16 --- /dev/null +++ b/controller/cuda_grid_controller/mqtt_loop.py @@ -0,0 +1,141 @@ +"""MQTT subscriber + publisher loop. + +Topics: + Subscribed: + cuda_grid/cmd//layout/set — payload = layout name + cuda_grid/cmd// — Phase 4+ + homeassistant/status — HA online → republish discovery + + Published (per instance): + cuda_grid/state//layout — текущий layout (retained) + cuda_grid/event//layout_switched — {from, to, reason} + + Global: + cuda_grid/state/online — LWT (online/offline) +""" + +from __future__ import annotations + +import asyncio +import json +from typing import Awaitable, Callable + +import aiomqtt +import structlog + +from .config import Config +from .ha_discovery import availability_topic, discovery_payloads +from .state import ControllerState + +log = structlog.get_logger() + +CommandHandler = Callable[[str, str, str], Awaitable[None]] +# args: (instance_name, command_kind, payload_str) + + +class MqttLoop: + def __init__( + self, + cfg: Config, + state: ControllerState, + command_handler: CommandHandler, + ) -> None: + self.cfg = cfg + self.state = state + self.command_handler = command_handler + self._client: aiomqtt.Client | None = None + self._stop = asyncio.Event() + + async def run(self) -> None: + """Main loop — connect + subscribe + dispatch. Re-connect при разрыве.""" + avail = availability_topic() + while not self._stop.is_set(): + try: + async with aiomqtt.Client( + hostname=self.cfg.broker.host, + port=self.cfg.broker.port, + username=self.cfg.broker.username, + password=self.cfg.broker.password, + identifier=self.cfg.broker.client_id, + keepalive=self.cfg.broker.keepalive_sec, + will=aiomqtt.Will( + topic=avail, payload=b"offline", qos=1, retain=True + ), + ) as client: + self._client = client + log.info("mqtt.connected", host=self.cfg.broker.host) + + # online + HA Discovery + await client.publish(avail, b"online", qos=1, retain=True) + if self.cfg.ha_discovery.enabled: + await self._publish_ha_discovery() + + # Subscribe commands per instance + for inst in self.cfg.instances: + await client.subscribe( + f"cuda_grid/cmd/{inst.name}/+/+", qos=1 + ) + # HA status — republish discovery если HA рестартанул + await client.subscribe("homeassistant/status", qos=0) + + async for msg in client.messages: + await self._handle_message(msg) + except aiomqtt.MqttError as e: + log.warning("mqtt.disconnected", error=str(e)) + self._client = None + await asyncio.sleep(5) + + async def _publish_ha_discovery(self) -> None: + assert self._client is not None + payloads = discovery_payloads(self.cfg.ha_discovery, self.cfg.instances) + for topic, payload in payloads: + await self._client.publish(topic, payload.encode(), qos=1, retain=True) + log.info("mqtt.ha_discovery.published", count=len(payloads)) + + async def _handle_message(self, msg: aiomqtt.Message) -> None: + topic = str(msg.topic) + try: + payload = msg.payload.decode() if isinstance(msg.payload, (bytes, bytearray)) else str(msg.payload) + except Exception: + payload = repr(msg.payload) + + if topic == "homeassistant/status" and payload == "online": + log.info("mqtt.ha.restarted — republish discovery") + if self.cfg.ha_discovery.enabled: + await self._publish_ha_discovery() + return + + # cuda_grid/cmd/// + parts = topic.split("/") + if len(parts) >= 5 and parts[0] == "cuda_grid" and parts[1] == "cmd": + instance, scope, action = parts[2], parts[3], parts[4] + kind = f"{scope}.{action}" # e.g. "layout.set", "auto_mode.set" + await self.command_handler(instance, kind, payload) + return + + log.warning("mqtt.unknown_topic", topic=topic, payload=payload) + + async def publish_state(self, instance: str, scope: str, value: str, retain: bool = True) -> None: + """Publish state — `cuda_grid/state//`.""" + if self._client is None: + return + await self._client.publish( + f"cuda_grid/state/{instance}/{scope}", + value.encode(), + qos=1, + retain=retain, + ) + + async def publish_event(self, instance: str, event_kind: str, data: dict) -> None: + """Publish event (non-retained) — `cuda_grid/event//`.""" + if self._client is None: + return + await self._client.publish( + f"cuda_grid/event/{instance}/{event_kind}", + json.dumps(data).encode(), + qos=0, + retain=False, + ) + + async def stop(self) -> None: + self._stop.set() diff --git a/controller/cuda_grid_controller/state.py b/controller/cuda_grid_controller/state.py new file mode 100644 index 0000000..b14166e --- /dev/null +++ b/controller/cuda_grid_controller/state.py @@ -0,0 +1,33 @@ +"""In-memory state controller'а.""" + +from __future__ import annotations + +import asyncio +from dataclasses import dataclass, field + + +@dataclass +class InstanceState: + name: str + active_layout: str + # Future: fps_out, dropped_frames, motion_cameras, last_event_ts + + +@dataclass +class ControllerState: + instances: dict[str, InstanceState] = field(default_factory=dict) + _lock: asyncio.Lock = field(default_factory=asyncio.Lock) + + async def set_layout(self, instance: str, layout: str) -> None: + async with self._lock: + st = self.instances.get(instance) + if st is None: + st = InstanceState(name=instance, active_layout=layout) + self.instances[instance] = st + else: + st.active_layout = layout + + async def get_layout(self, instance: str) -> str | None: + async with self._lock: + st = self.instances.get(instance) + return st.active_layout if st else None diff --git a/controller/cuda_grid_controller/zmq_client.py b/controller/cuda_grid_controller/zmq_client.py new file mode 100644 index 0000000..4fe2c92 --- /dev/null +++ b/controller/cuda_grid_controller/zmq_client.py @@ -0,0 +1,66 @@ +"""ZMQ клиент к FFmpeg's `zmq` filter. + +FFmpeg zmq filter принимает строки формата: + `target command [args]` + +Например для cuda_grid в filter graph: + `cuda_grid@livingroom_tv set_layout quad` + +См. https://ffmpeg.org/ffmpeg-filters.html#zmq_002c-azmq +""" + +from __future__ import annotations + +import structlog +import zmq +import zmq.asyncio + +log = structlog.get_logger() + + +class FFmpegZmqClient: + """REQ-socket к FFmpeg zmq filter. Один client = один FFmpeg pipeline.""" + + def __init__(self, endpoint: str, request_timeout_ms: int = 2000) -> None: + self.endpoint = endpoint + self.request_timeout_ms = request_timeout_ms + self._ctx = zmq.asyncio.Context.instance() + self._sock: zmq.asyncio.Socket | None = None + + async def connect(self) -> None: + if self._sock is not None: + return + self._sock = self._ctx.socket(zmq.REQ) + self._sock.setsockopt(zmq.LINGER, 0) + self._sock.setsockopt(zmq.RCVTIMEO, self.request_timeout_ms) + self._sock.setsockopt(zmq.SNDTIMEO, self.request_timeout_ms) + self._sock.connect(self.endpoint) + log.info("zmq.connected", endpoint=self.endpoint) + + async def send_command(self, target: str, command: str, args: str | None = None) -> str: + """Отправить команду filter'у. Возвращает ответ от ffmpeg ('0 Success' / error string).""" + if self._sock is None: + await self.connect() + assert self._sock is not None + + cmd_str = f"{target} {command}" + if args: + cmd_str = f"{cmd_str} {args}" + + log.debug("zmq.send", endpoint=self.endpoint, cmd=cmd_str) + try: + await self._sock.send_string(cmd_str) + reply = await self._sock.recv_string() + log.debug("zmq.reply", reply=reply) + return reply + except zmq.error.Again: + log.warning("zmq.timeout", endpoint=self.endpoint, cmd=cmd_str) + # Reset REQ socket state — после timeout REQ нельзя re-use + self._sock.close(linger=0) + self._sock = None + raise TimeoutError(f"zmq command timeout: {cmd_str}") + + async def close(self) -> None: + if self._sock is not None: + self._sock.close(linger=0) + self._sock = None diff --git a/controller/examples/controller.yaml b/controller/examples/controller.yaml new file mode 100644 index 0000000..3d4c3a1 --- /dev/null +++ b/controller/examples/controller.yaml @@ -0,0 +1,33 @@ +# Sample config для cuda-grid-controller. + +broker: + host: localhost + port: 1883 + client_id: cuda-grid-controller + username_env: MQTT_USERNAME + password_env: MQTT_PASSWORD + keepalive_sec: 30 + +instances: + - name: livingroom_tv + zmq_endpoint: tcp://127.0.0.1:5555 + default_layout: quad + filter_target: Parsed_cuda_grid_0 + + - name: public_stream + zmq_endpoint: tcp://127.0.0.1:5556 + default_layout: dual_horizontal + filter_target: Parsed_cuda_grid_0 + +ha_discovery: + enabled: true + prefix: homeassistant + device_name: CUDA Grid Composer + device_identifier: cuda_grid_controller + +http: + host: 0.0.0.0 + port: 8080 + +log: + level: INFO diff --git a/controller/pyproject.toml b/controller/pyproject.toml new file mode 100644 index 0000000..c6b179d --- /dev/null +++ b/controller/pyproject.toml @@ -0,0 +1,47 @@ +[project] +name = "cuda-grid-controller" +version = "0.1.0" +description = "Control-plane sidecar для vf_cuda_grid FFmpeg filter — ZeroMQ + MQTT + HTTP REST + HA Discovery" +readme = "README.md" +license = { text = "LGPL-2.1-or-later" } +authors = [{ name = "gx", email = "gx@goldix.org" }] +requires-python = ">=3.11" + +dependencies = [ + "fastapi>=0.110", + "uvicorn[standard]>=0.27", + "pydantic>=2.5", + "pydantic-settings>=2.1", + "aiomqtt>=2.0", + "pyzmq>=25.0", + "pyyaml>=6.0", + "structlog>=24.1", + "typer>=0.9", + "sse-starlette>=2.0", +] + +[project.optional-dependencies] +dev = [ + "pytest>=8.0", + "pytest-asyncio>=0.23", + "mypy>=1.8", + "ruff>=0.2", +] + +[project.scripts] +cuda-grid-controller = "cuda_grid_controller.__main__:main" + +[build-system] +requires = ["setuptools>=68", "wheel"] +build-backend = "setuptools.build_meta" + +[tool.setuptools.packages.find] +where = ["."] +include = ["cuda_grid_controller*"] + +[tool.ruff] +line-length = 100 +target-version = "py311" + +[tool.pytest.ini_options] +asyncio_mode = "auto"