"""ZMQ клиент к FFmpeg's `zmq` filter. FFmpeg zmq filter принимает строки формата: `target command [args]` Например для cuda_grid в filter graph: `cuda_grid@livingroom_tv set_layout quad` См. https://ffmpeg.org/ffmpeg-filters.html#zmq_002c-azmq """ from __future__ import annotations import asyncio import structlog import zmq import zmq.asyncio log = structlog.get_logger() class FFmpegZmqClient: """REQ-socket к FFmpeg zmq filter. Один client = один FFmpeg pipeline.""" def __init__(self, endpoint: str, request_timeout_ms: int = 2000) -> None: self.endpoint = endpoint self.request_timeout_ms = request_timeout_ms self._ctx = zmq.asyncio.Context.instance() self._sock: zmq.asyncio.Socket | None = None # REQ socket требует strict send→recv→send→recv pattern. Без lock'а # concurrent send_command (overlay + audio) ломает state в "Operation # cannot be accomplished in current state". Serialize requests. self._lock = asyncio.Lock() async def connect(self) -> None: if self._sock is not None: return self._sock = self._ctx.socket(zmq.REQ) self._sock.setsockopt(zmq.LINGER, 0) self._sock.setsockopt(zmq.RCVTIMEO, self.request_timeout_ms) self._sock.setsockopt(zmq.SNDTIMEO, self.request_timeout_ms) self._sock.connect(self.endpoint) log.info("zmq.connected", endpoint=self.endpoint) async def send_command(self, target: str, command: str, args: str | None = None) -> str: """Отправить команду filter'у. Возвращает ответ от ffmpeg ('0 Success' / error string). FFmpeg's zmq filter parses через `av_get_token` (libavfilter/f_zmq.c) — берёт ОДИН token как arg. Если arg содержит пробелы (наш case для add_overlay), нужно обернуть в single-quotes. av_get_token honours quoting + escape '\\''. """ if self._sock is None: await self.connect() assert self._sock is not None cmd_str = f"{target} {command}" if args: # Escape embedded single-quotes (rare для наших args, но safe). escaped = args.replace("'", r"\'") cmd_str = f"{cmd_str} '{escaped}'" log.debug("zmq.send", endpoint=self.endpoint, cmd=cmd_str) async with self._lock: try: await self._sock.send_string(cmd_str) reply = await self._sock.recv_string() log.debug("zmq.reply", reply=reply) return reply except zmq.error.Again: log.warning("zmq.timeout", endpoint=self.endpoint, cmd=cmd_str) self._sock.close(linger=0) self._sock = None raise TimeoutError(f"zmq command timeout: {cmd_str}") except Exception as e: # Любая другая ошибка тоже ломает REQ state — сбрасываем socket log.warning("zmq.error", endpoint=self.endpoint, error=str(e)) if self._sock is not None: self._sock.close(linger=0) self._sock = None raise async def close(self) -> None: if self._sock is not None: self._sock.close(linger=0) self._sock = None