diff --git a/controller/Dockerfile b/controller/Dockerfile index a12afdf..378a60f 100644 --- a/controller/Dockerfile +++ b/controller/Dockerfile @@ -8,6 +8,12 @@ ENV PYTHONDONTWRITEBYTECODE=1 \ WORKDIR /app +# ffmpeg для snapshot endpoint — нужен basic CPU build (RTSP read + PNG output). +# Размер: ~120 MB additional (ffmpeg + libs); приемлемо для controller image. +RUN apt-get update && apt-get install -y --no-install-recommends \ + ffmpeg \ + && rm -rf /var/lib/apt/lists/* + # Install deps первым layer — пересборка только при изменении pyproject.toml COPY pyproject.toml ./ RUN pip install --no-cache-dir \ @@ -20,7 +26,8 @@ RUN pip install --no-cache-dir \ pyyaml \ structlog \ typer \ - sse-starlette + sse-starlette \ + pillow # Source code COPY cuda_grid_controller ./cuda_grid_controller diff --git a/controller/cuda_grid_controller/__main__.py b/controller/cuda_grid_controller/__main__.py index c13e271..432089a 100644 --- a/controller/cuda_grid_controller/__main__.py +++ b/controller/cuda_grid_controller/__main__.py @@ -13,6 +13,7 @@ import uvicorn from .config import Config from .dispatch import CommandDispatcher +from .dynamic_overlays import ChartCfg, ChatCfg, DynamicRenderer from .frigate_bridge import FrigateBridge, FrigateBridgeCfg from .http_api import create_app from .mqtt_loop import MqttLoop @@ -55,7 +56,26 @@ async def _run(cfg: Config) -> None: "frigate_bridge.config_invalid", error=str(e) ) - mqtt = MqttLoop(cfg, state, dispatcher.handle, frigate_bridge=frigate_bridge) + # Dynamic overlays (charts/chats) — Phase 6 + dynamic_renderer: DynamicRenderer | None = None + if cfg.dynamic_overlays: + try: + d = cfg.dynamic_overlays + charts = [ChartCfg.model_validate(c) for c in (d.get("charts") or [])] + chats = [ChatCfg.model_validate(c) for c in (d.get("chats") or [])] + if charts or chats: + dynamic_renderer = DynamicRenderer( + icon_dir=Path(cfg.icon_dir), + dispatcher=dispatcher, + charts=charts, + chats=chats, + ) + except Exception as e: + structlog.get_logger().warning("dynamic_overlays.config_invalid", error=str(e)) + + mqtt = MqttLoop(cfg, state, dispatcher.handle, + frigate_bridge=frigate_bridge, + dynamic_renderer=dynamic_renderer) # Wire dispatcher events → MQTT publishes dispatcher.on_state_change = mqtt.publish_state @@ -80,6 +100,10 @@ async def _run(cfg: Config) -> None: http=f"{cfg.http.host}:{cfg.http.port}", ) + # Start dynamic renderer задачи (если есть) + if dynamic_renderer: + await dynamic_renderer.start() + try: await asyncio.gather( mqtt.run(), @@ -88,6 +112,8 @@ async def _run(cfg: Config) -> None: except asyncio.CancelledError: log.info("controller.shutdown") finally: + if dynamic_renderer: + await dynamic_renderer.stop() await dispatcher.close() await mqtt.stop() diff --git a/controller/cuda_grid_controller/config.py b/controller/cuda_grid_controller/config.py index 3e61249..f53d5d2 100644 --- a/controller/cuda_grid_controller/config.py +++ b/controller/cuda_grid_controller/config.py @@ -64,6 +64,37 @@ class InstanceCfg(BaseModel): default="Parsed_cuda_grid_0", description="Filter target name в FFmpeg filter graph (для process_command)", ) + output_rtsp_url: str | None = Field( + default=None, + description="URL куда pipeline push'ит composed stream — controller read'ит для snapshot/preview endpoints", + ) + audio_sources: list["AudioSourceCfg"] = Field( + default_factory=list, + description="Audio sources для astreamselect switching (Phase 5b)", + ) + audio_filter_target: str = Field( + default="astreamselect@as", + description="Target filter для ZMQ команд переключения audio (должен соответствовать pipeline filter_complex)", + ) + music_volume_target: str = Field( + default="volume@music", + description="Target filter для управления громкостью music chain (Phase 5c ducking)", + ) + intercom_volume_target: str = Field( + default="volume@intercom", + description="Target filter для управления громкостью intercom (Phase 5c)", + ) + music_ducked_volume: float = Field( + default=0.2, ge=0.0, le=1.0, + description="Громкость music когда intercom активен (0.2 = -14 dB)", + ) + + +class AudioSourceCfg(BaseModel): + """Описание audio source в порядке как они добавлены в pipeline -i ...""" + name: str = Field(description="Уникальное имя для API (e.g. 'europa_plus')") + index: int = Field(ge=0, description="Index в astreamselect inputs (соответствует порядку -i в pipeline)") + label: str | None = Field(default=None, description="UI-friendly label (default = name)") @field_validator("name") @classmethod @@ -97,6 +128,12 @@ class Config(BaseModel): log: LogCfg = LogCfg() # Frigate bridge — late import чтобы избежать circular dep frigate: dict | None = None # parsed в FrigateBridgeCfg при runtime + # Dynamic overlays (charts/chats) — late import тоже + dynamic_overlays: dict | None = None # parsed в DynamicRenderer cfg + icon_dir: str = Field( + default="/var/lib/cuda-grid/icons", + description="Shared volume куда controller пишет dynamic PNG; filter (`icon_dir=` option) читает оттуда", + ) @classmethod def from_yaml(cls, path: Path | str) -> Self: diff --git a/controller/cuda_grid_controller/dispatch.py b/controller/cuda_grid_controller/dispatch.py index e68f9a7..47c0648 100644 --- a/controller/cuda_grid_controller/dispatch.py +++ b/controller/cuda_grid_controller/dispatch.py @@ -127,11 +127,83 @@ class CommandDispatcher: await self._overlay_remove(inst, payload.strip()) case "overlay.clear": await self._overlay_clear(inst) + case "audio.set": + await self._audio_set(inst, payload.strip()) + case "intercom.start": + await self._intercom_set(inst, active=True) + case "intercom.end": + await self._intercom_set(inst, active=False) case _: log.warning( "dispatch.unknown_kind", instance=instance_name, kind=kind ) + # ─── Audio ───────────────────────────────────────────────────── + + async def _audio_set(self, inst: InstanceCfg, source_name: str) -> None: + """Switch audio к source_name через ZMQ команду astreamselect map .""" + if not inst.audio_sources: + log.warning("dispatch.no_audio_sources", instance=inst.name) + return + src = next((s for s in inst.audio_sources if s.name == source_name), None) + if src is None: + log.warning("dispatch.unknown_audio_source", + instance=inst.name, source=source_name, + available=[s.name for s in inst.audio_sources]) + return + + client = self._client(inst) + try: + reply = await client.send_command( + inst.audio_filter_target, "map", str(src.index) + ) + log.info("dispatch.audio_set", instance=inst.name, + source=source_name, index=src.index, ffmpeg_reply=reply) + except (TimeoutError, Exception) as e: + log.error("dispatch.audio_zmq_fail", instance=inst.name, error=str(e)) + return + + if self.on_state_change: + await self.on_state_change(inst.name, "audio_source", source_name) + if self.on_event: + await self.on_event( + inst.name, "audio_switched", {"to": source_name, "index": src.index} + ) + + async def _reload_icon(self, instance: str, icon_name: str) -> None: + """Invalidate cached icon atlas в filter — used by DynamicRenderer.""" + inst = self._find_instance(instance) + if inst is None: + return + client = self._client(inst) + try: + await client.send_command(inst.filter_target, "reload_icon", icon_name) + except (TimeoutError, Exception) as e: + log.warning("dispatch.reload_icon_fail", instance=instance, icon=icon_name, error=str(e)) + + async def _intercom_set(self, inst: InstanceCfg, active: bool) -> None: + """Ducking pattern: при intercom ON → music volume ↓ + intercom ↑. + После end — restore. Volume commands отправляются параллельно.""" + client = self._client(inst) + music_vol = inst.music_ducked_volume if active else 1.0 + intercom_vol = 1.0 if active else 0.0 + + try: + r1 = await client.send_command(inst.music_volume_target, "volume", str(music_vol)) + r2 = await client.send_command(inst.intercom_volume_target, "volume", str(intercom_vol)) + log.info("dispatch.intercom", + instance=inst.name, active=active, + music_vol=music_vol, intercom_vol=intercom_vol, + music_reply=r1, intercom_reply=r2) + except (TimeoutError, Exception) as e: + log.error("dispatch.intercom_zmq_fail", instance=inst.name, error=str(e)) + return + + if self.on_event: + await self.on_event( + inst.name, "intercom", {"active": active, "music_vol": music_vol} + ) + # ─── Layout ──────────────────────────────────────────────────── async def _set_layout(self, inst: InstanceCfg, layout: str) -> None: diff --git a/controller/cuda_grid_controller/dynamic_overlays.py b/controller/cuda_grid_controller/dynamic_overlays.py new file mode 100644 index 0000000..d799b2b --- /dev/null +++ b/controller/cuda_grid_controller/dynamic_overlays.py @@ -0,0 +1,274 @@ +"""Dynamic overlay renderer (Phase 6). + +Controller рендерит chart/chat overlays через PIL → PNG в shared volume, +затем посылает filter команду `reload_icon` чтобы invalidate cache. Filter +re-reads PNG file при следующем render. + +Файлы кладутся в `${icon_dir}/${overlay_id}.png` где icon_dir = mount к +volume общий с pipeline (filter ищет тут). +""" + +from __future__ import annotations + +import asyncio +import math +import time +from collections import deque +from pathlib import Path +from typing import TYPE_CHECKING + +import structlog +from PIL import Image, ImageDraw, ImageFont +from pydantic import BaseModel, Field + +from .overlays import IconOverlay + +if TYPE_CHECKING: + from .dispatch import CommandDispatcher + +log = structlog.get_logger() + + +# ─── Config ─────────────────────────────────────────────────────────── + +class ChartCfg(BaseModel): + """Live chart overlay.""" + id: str = Field(description="Уникальное имя — становится PNG file name + overlay_id") + target_instance: str + cell: int | None = None # None = absolute + x: float = Field(ge=0.0, le=1.0) + y: float = Field(ge=0.0, le=1.0) + w_px: int = Field(default=320, ge=64, le=1920) + h_px: int = Field(default=120, ge=32, le=1080) + title: str = "" + data_topic: str | None = Field( + default=None, + description="MQTT topic с numeric payload. Если None — fake sine wave для demo.", + ) + refresh_sec: float = Field(default=2.0, ge=0.5, le=60.0) + max_points: int = Field(default=60, ge=10, le=600) + line_color: tuple[int, int, int] = (0, 255, 128) + bg_color: tuple[int, int, int, int] = (0, 0, 0, 180) + opacity: float = 1.0 + z_order: int = 25 + + +class ChatCfg(BaseModel): + """Scrolling text notifications.""" + id: str + target_instance: str + cell: int | None = None + x: float = Field(ge=0.0, le=1.0) + y: float = Field(ge=0.0, le=1.0) + w_px: int = Field(default=400, ge=64, le=1920) + h_px: int = Field(default=200, ge=32, le=1080) + source_topic: str | None = Field(default=None, description="MQTT topic — каждое сообщение = новая строка") + max_messages: int = Field(default=10, ge=1, le=50) + font_size: int = Field(default=18, ge=10, le=64) + text_color: tuple[int, int, int] = (255, 255, 255) + bg_color: tuple[int, int, int, int] = (0, 0, 0, 150) + opacity: float = 1.0 + z_order: int = 26 + + +# ─── Renderers ──────────────────────────────────────────────────────── + +class _FontCache: + _font: ImageFont.FreeTypeFont | None = None + + @classmethod + def get(cls, size: int) -> ImageFont.FreeTypeFont | ImageFont.ImageFont: + try: + return ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", size=size) + except OSError: + return ImageFont.load_default() + + +def render_chart(cfg: ChartCfg, data: list[float]) -> Image.Image: + """Line chart с title. Returns RGBA Image.""" + img = Image.new("RGBA", (cfg.w_px, cfg.h_px), cfg.bg_color) + draw = ImageDraw.Draw(img) + pad = 8 + + # Title + if cfg.title: + font = _FontCache.get(14) + draw.text((pad, 2), cfg.title, fill=(255, 255, 255), font=font) + chart_top = 20 + else: + chart_top = pad + chart_bottom = cfg.h_px - pad + chart_left = pad + chart_right = cfg.w_px - pad + chart_h = chart_bottom - chart_top + chart_w = chart_right - chart_left + + # Border + draw.rectangle([chart_left, chart_top, chart_right, chart_bottom], + outline=(128, 128, 128, 200), width=1) + + if len(data) >= 2: + lo, hi = min(data), max(data) + span = (hi - lo) or 1.0 + points: list[tuple[float, float]] = [] + for i, v in enumerate(data): + px = chart_left + (i / (len(data) - 1)) * chart_w + py = chart_bottom - ((v - lo) / span) * chart_h + points.append((px, py)) + draw.line(points, fill=cfg.line_color, width=2) + # Last value label + font = _FontCache.get(12) + last = data[-1] + draw.text((chart_right - 50, chart_top + 2), f"{last:.1f}", + fill=cfg.line_color, font=font) + + return img + + +def render_chat(cfg: ChatCfg, messages: list[str]) -> Image.Image: + """Vertical list — last N messages at bottom.""" + img = Image.new("RGBA", (cfg.w_px, cfg.h_px), cfg.bg_color) + draw = ImageDraw.Draw(img) + font = _FontCache.get(cfg.font_size) + pad = 6 + + line_h = cfg.font_size + 4 + visible = messages[-cfg.max_messages:] + y = cfg.h_px - pad - line_h + for msg in reversed(visible): + if y < pad: + break + draw.text((pad, y), msg[:80], fill=cfg.text_color, font=font) + y -= line_h + + return img + + +# ─── Runner ────────────────────────────────────────────────────────── + +class DynamicRenderer: + """Управляет рендерингом charts/chats — пишет PNG + посылает reload_icon.""" + + def __init__( + self, + icon_dir: Path, + dispatcher: "CommandDispatcher", + charts: list[ChartCfg], + chats: list[ChatCfg], + ) -> None: + self.icon_dir = icon_dir + self.dispatcher = dispatcher + self.charts = charts + self.chats = chats + # State + self._chart_data: dict[str, deque[float]] = { + c.id: deque(maxlen=c.max_points) for c in charts + } + self._chat_messages: dict[str, deque[str]] = { + c.id: deque(maxlen=c.max_messages) for c in chats + } + self._tasks: list[asyncio.Task] = [] + self._start_time = time.time() + + def topics_to_subscribe(self) -> list[str]: + topics = [] + for c in self.charts: + if c.data_topic: + topics.append(c.data_topic) + for c in self.chats: + if c.source_topic: + topics.append(c.source_topic) + return topics + + def handle_message(self, topic: str, payload: str) -> None: + """Update buffer when MQTT data arrives. Caller — mqtt_loop.""" + for c in self.charts: + if c.data_topic == topic: + try: + self._chart_data[c.id].append(float(payload.strip())) + except ValueError: + log.warning("dynamic.bad_chart_value", id=c.id, payload=payload[:50]) + return + for c in self.chats: + if c.source_topic == topic: + self._chat_messages[c.id].append(payload.strip()) + return + + async def start(self) -> None: + """Spawn render tasks per overlay.""" + self.icon_dir.mkdir(parents=True, exist_ok=True) + for cfg in self.charts: + self._tasks.append(asyncio.create_task(self._chart_loop(cfg))) + for cfg in self.chats: + self._tasks.append(asyncio.create_task(self._chat_loop(cfg))) + log.info("dynamic.started", charts=len(self.charts), chats=len(self.chats)) + + async def stop(self) -> None: + for t in self._tasks: + t.cancel() + for t in self._tasks: + try: + await t + except asyncio.CancelledError: + pass + self._tasks.clear() + + async def _chart_loop(self, cfg: ChartCfg) -> None: + registered = False + while True: + try: + buf = self._chart_data[cfg.id] + # Demo data если нет MQTT topic + if cfg.data_topic is None: + t = time.time() - self._start_time + buf.append(20.0 + 10.0 * math.sin(t / 5)) + await self._render_and_publish(cfg, lambda: render_chart(cfg, list(buf))) + if not registered: + await self._register_overlay(cfg.id, cfg.target_instance, cfg.cell, + cfg.x, cfg.y, cfg.opacity, cfg.z_order) + registered = True + await asyncio.sleep(cfg.refresh_sec) + except asyncio.CancelledError: + raise + except Exception as e: + log.error("dynamic.chart_loop_fail", id=cfg.id, error=str(e)) + await asyncio.sleep(5) + + async def _chat_loop(self, cfg: ChatCfg) -> None: + registered = False + last_signature: tuple[str, ...] = () + while True: + try: + msgs = list(self._chat_messages[cfg.id]) + sig = tuple(msgs) + if sig != last_signature or not registered: + await self._render_and_publish(cfg, lambda: render_chat(cfg, msgs)) + last_signature = sig + if not registered: + await self._register_overlay(cfg.id, cfg.target_instance, cfg.cell, + cfg.x, cfg.y, cfg.opacity, cfg.z_order) + registered = True + await asyncio.sleep(0.5) # tight check для chat reactivity + except asyncio.CancelledError: + raise + except Exception as e: + log.error("dynamic.chat_loop_fail", id=cfg.id, error=str(e)) + await asyncio.sleep(5) + + async def _render_and_publish(self, cfg, render_fn) -> None: + path = self.icon_dir / f"{cfg.id}.png" + tmp = path.with_suffix(".png.tmp") + img = render_fn() + img.save(tmp, "PNG") + tmp.replace(path) + # Tell filter to invalidate cached atlas — next render re-reads file + await self.dispatcher._reload_icon(cfg.target_instance, cfg.id) + + async def _register_overlay(self, ov_id, instance, cell, x, y, opacity, z_order) -> None: + """Initial register — add IconOverlay referencing icon_name=.""" + ov = IconOverlay( + id=ov_id, cell=cell, name=ov_id, # icon name = same as overlay id + x=x, y=y, size=0.1, # size игнорируется filter'ом — use native PNG dims + opacity=opacity, z_order=z_order, + ) + await self.dispatcher.handle(instance, "overlay.add", ov.model_dump_json()) diff --git a/controller/cuda_grid_controller/frigate_bridge.py b/controller/cuda_grid_controller/frigate_bridge.py index 679e774..f05ffc1 100644 --- a/controller/cuda_grid_controller/frigate_bridge.py +++ b/controller/cuda_grid_controller/frigate_bridge.py @@ -21,7 +21,7 @@ from typing import TYPE_CHECKING import structlog from pydantic import BaseModel, Field -from .overlays import RectOverlay, TextOverlay +from .overlays import DimOverlay, RectOverlay, TextOverlay if TYPE_CHECKING: from .dispatch import CommandDispatcher @@ -54,12 +54,24 @@ class BorderTheme(BaseModel): motion_opacity: float = Field(default=1.0, ge=0.0, le=1.0) +class FocusTheme(BaseModel): + """Auto-focus: при motion на одной cell — затемнение остальных. + Если motion на 2+ cells одновременно — focus не применяется (no obvious focus).""" + + enabled: bool = True + dim_color: str = "#000000" + dim_factor: float = Field(default=0.6, ge=0.0, le=1.0, + description="0.0=без затемнения, 1.0=полное черное") + + class FrigateBridgeCfg(BaseModel): enabled: bool = False base_topic: str = "frigate" mappings: list[FrigateCameraMapping] = [] border_theme: BorderTheme = Field(default_factory=BorderTheme, description="Стиль cell borders — idle/motion цвета") + focus_theme: FocusTheme = Field(default_factory=FocusTheme, + description="Auto-focus: dim non-active cells когда motion на одной") class FrigateBridge: @@ -78,6 +90,7 @@ class FrigateBridge: self._event_overlays: dict[str, tuple[str, str]] = {} self._borders_initialized: dict[str, bool] = {} # target_instance → bool self._cell_states: dict[str, set[str]] = {} # ":" → set of active cameras с motion + self._focus_dims: dict[str, set[int]] = {} # instance → set of cells which сейчас затемнены def topics_to_subscribe(self) -> list[str]: if not self.cfg.enabled: @@ -176,6 +189,57 @@ class FrigateBridge: if is_active != was_active: await self._set_border_state(mapping.target_instance, mapping.cell, motion=is_active) + await self._update_focus(mapping.target_instance) + + def _cell_dim_id(self, cell: int) -> str: + return f"cell_{cell}_focus_dim" + + async def _update_focus(self, instance: str) -> None: + """Auto-focus logic: + 0 active cells → no dim (remove all focus dims) + 1 active cell → dim non-focus cells (focus mode ON) + 2+ active cells → no dim (too many — no obvious single focus) + """ + if not self.dispatcher or not self.cfg.focus_theme.enabled: + return + + # Active cells для этого instance + active_cells = { + int(key.split(":", 1)[1]) + for key, cams in self._cell_states.items() + if cams and key.startswith(f"{instance}:") + } + all_cells = sorted({m.cell for m in self.cfg.mappings if m.target_instance == instance}) + currently_dimmed = self._focus_dims.setdefault(instance, set()) + + target_dimmed: set[int] + if len(active_cells) == 1: + focus_cell = next(iter(active_cells)) + target_dimmed = {c for c in all_cells if c != focus_cell} + else: + target_dimmed = set() + + to_add = target_dimmed - currently_dimmed + to_remove = currently_dimmed - target_dimmed + + theme = self.cfg.focus_theme + for cell in to_add: + ov = DimOverlay( + id=self._cell_dim_id(cell), + cell=cell, + x=0.0, y=0.0, w=1.0, h=1.0, + color=theme.dim_color, + dim_factor=theme.dim_factor, + z_order=1, # под bbox (20) и border (5) но над cell content + ) + await self.dispatcher.handle(instance, "overlay.add", ov.model_dump_json()) + currently_dimmed.add(cell) + for cell in to_remove: + await self.dispatcher.handle(instance, "overlay.remove", self._cell_dim_id(cell)) + currently_dimmed.discard(cell) + + if to_add or to_remove: + log.info("focus.updated", instance=instance, active=sorted(active_cells), dimmed=sorted(target_dimmed)) async def _handle_event(self, event: dict) -> None: event_type = event.get("type", "?") diff --git a/controller/cuda_grid_controller/http_api.py b/controller/cuda_grid_controller/http_api.py index c8f4366..8bfa20f 100644 --- a/controller/cuda_grid_controller/http_api.py +++ b/controller/cuda_grid_controller/http_api.py @@ -2,10 +2,12 @@ from __future__ import annotations +import asyncio from typing import Any import structlog from fastapi import Body, FastAPI, HTTPException +from fastapi.responses import Response from pydantic import BaseModel, TypeAdapter from .config import Config @@ -21,6 +23,10 @@ class LayoutSetReq(BaseModel): layout: str +class AudioSetReq(BaseModel): + source: str + + def create_app( cfg: Config, state: ControllerState, dispatcher: CommandDispatcher ) -> FastAPI: @@ -100,6 +106,80 @@ def create_app( await dispatcher.handle(instance, "overlay.clear", "") return {"ok": True} + # ─── Audio ───────────────────────────────────────────────────── + + @app.get("/audio/{instance}") + async def audio_list(instance: str) -> dict[str, Any]: + inst = _check_instance(instance) + return { + "instance": instance, + "sources": [ + {"name": s.name, "index": s.index, "label": s.label or s.name} + for s in inst.audio_sources + ], + } + + @app.post("/audio/{instance}/set") + async def audio_set(instance: str, req: AudioSetReq) -> dict[str, Any]: + inst = _check_instance(instance) + names = {s.name for s in inst.audio_sources} + if req.source not in names: + raise HTTPException( + 400, f"unknown audio source '{req.source}'. Доступны: {sorted(names)}" + ) + await dispatcher.handle(instance, "audio.set", req.source) + return {"ok": True, "instance": instance, "source": req.source} + + @app.post("/intercom/{instance}/start") + async def intercom_start(instance: str) -> dict[str, Any]: + _check_instance(instance) + await dispatcher.handle(instance, "intercom.start", "") + return {"ok": True, "instance": instance, "intercom": "active"} + + @app.post("/intercom/{instance}/end") + async def intercom_end(instance: str) -> dict[str, Any]: + _check_instance(instance) + await dispatcher.handle(instance, "intercom.end", "") + return {"ok": True, "instance": instance, "intercom": "idle"} + + # ─── Snapshot ────────────────────────────────────────────────── + + @app.post( + "/snapshot/{instance}", + responses={ + 200: {"content": {"image/png": {}}, "description": "PNG snapshot of live grid"}, + 504: {"description": "ffmpeg timeout"}, + }, + ) + async def snapshot(instance: str) -> Response: + """Capture single frame от output_rtsp_url. Returns PNG bytes (image/png).""" + inst = _check_instance(instance) + if not inst.output_rtsp_url: + raise HTTPException(400, f"instance '{instance}' has no output_rtsp_url configured") + + proc = await asyncio.create_subprocess_exec( + "ffmpeg", "-hide_banner", "-loglevel", "error", + "-rtsp_transport", "tcp", + "-i", inst.output_rtsp_url, + "-frames:v", "1", + "-f", "image2pipe", "-c:v", "png", "-", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + try: + png_data, err = await asyncio.wait_for(proc.communicate(), timeout=10) + except asyncio.TimeoutError: + proc.kill() + raise HTTPException(504, "snapshot timeout (ffmpeg >10s)") + + if proc.returncode != 0 or not png_data: + err_msg = err.decode(errors="replace")[:300] if err else "no output" + log.warning("snapshot.failed", instance=instance, err=err_msg) + raise HTTPException(500, f"ffmpeg failed: {err_msg}") + + log.info("snapshot.ok", instance=instance, bytes=len(png_data)) + return Response(content=png_data, media_type="image/png") + @app.patch("/overlay/{instance}/{overlay_id}") async def overlay_update( instance: str, overlay_id: str, overlay: Overlay = Body(...) diff --git a/controller/cuda_grid_controller/mqtt_loop.py b/controller/cuda_grid_controller/mqtt_loop.py index b2e0d4e..6e83497 100644 --- a/controller/cuda_grid_controller/mqtt_loop.py +++ b/controller/cuda_grid_controller/mqtt_loop.py @@ -24,6 +24,7 @@ import aiomqtt import structlog from .config import Config +from .dynamic_overlays import DynamicRenderer from .frigate_bridge import FrigateBridge from .ha_discovery import availability_topic, discovery_payloads from .state import ControllerState @@ -41,11 +42,13 @@ class MqttLoop: state: ControllerState, command_handler: CommandHandler, frigate_bridge: FrigateBridge | None = None, + dynamic_renderer: DynamicRenderer | None = None, ) -> None: self.cfg = cfg self.state = state self.command_handler = command_handler self.frigate_bridge = frigate_bridge + self.dynamic_renderer = dynamic_renderer self._client: aiomqtt.Client | None = None self._stop = asyncio.Event() @@ -85,6 +88,11 @@ class MqttLoop: for t in self.frigate_bridge.topics_to_subscribe(): await client.subscribe(t, qos=0) log.info("mqtt.frigate.subscribed", topic=t) + # Dynamic overlays MQTT data subscriptions + if self.dynamic_renderer: + for t in self.dynamic_renderer.topics_to_subscribe(): + await client.subscribe(t, qos=0) + log.info("mqtt.dynamic.subscribed", topic=t) async for msg in client.messages: await self._handle_message(msg) @@ -126,6 +134,11 @@ class MqttLoop: await self.frigate_bridge.handle_message(topic, payload) return + # Dynamic overlays — chart data + chat messages + if self.dynamic_renderer: + self.dynamic_renderer.handle_message(topic, payload) + return + log.warning("mqtt.unknown_topic", topic=topic, payload=payload) async def publish_state(self, instance: str, scope: str, value: str, retain: bool = True) -> None: diff --git a/controller/cuda_grid_controller/zmq_client.py b/controller/cuda_grid_controller/zmq_client.py index 8b51455..018caae 100644 --- a/controller/cuda_grid_controller/zmq_client.py +++ b/controller/cuda_grid_controller/zmq_client.py @@ -11,6 +11,8 @@ FFmpeg zmq filter принимает строки формата: from __future__ import annotations +import asyncio + import structlog import zmq import zmq.asyncio @@ -26,6 +28,10 @@ class FFmpegZmqClient: self.request_timeout_ms = request_timeout_ms self._ctx = zmq.asyncio.Context.instance() self._sock: zmq.asyncio.Socket | None = None + # REQ socket требует strict send→recv→send→recv pattern. Без lock'а + # concurrent send_command (overlay + audio) ломает state в "Operation + # cannot be accomplished in current state". Serialize requests. + self._lock = asyncio.Lock() async def connect(self) -> None: if self._sock is not None: @@ -55,17 +61,24 @@ class FFmpegZmqClient: cmd_str = f"{cmd_str} '{escaped}'" log.debug("zmq.send", endpoint=self.endpoint, cmd=cmd_str) - try: - await self._sock.send_string(cmd_str) - reply = await self._sock.recv_string() - log.debug("zmq.reply", reply=reply) - return reply - except zmq.error.Again: - log.warning("zmq.timeout", endpoint=self.endpoint, cmd=cmd_str) - # Reset REQ socket state — после timeout REQ нельзя re-use - self._sock.close(linger=0) - self._sock = None - raise TimeoutError(f"zmq command timeout: {cmd_str}") + async with self._lock: + try: + await self._sock.send_string(cmd_str) + reply = await self._sock.recv_string() + log.debug("zmq.reply", reply=reply) + return reply + except zmq.error.Again: + log.warning("zmq.timeout", endpoint=self.endpoint, cmd=cmd_str) + self._sock.close(linger=0) + self._sock = None + raise TimeoutError(f"zmq command timeout: {cmd_str}") + except Exception as e: + # Любая другая ошибка тоже ломает REQ state — сбрасываем socket + log.warning("zmq.error", endpoint=self.endpoint, error=str(e)) + if self._sock is not None: + self._sock.close(linger=0) + self._sock = None + raise async def close(self) -> None: if self._sock is not None: