d7b3e34c6b
snapshot_history.py — async periodic capture per instance:
interval_sec / keep_last (FIFO eviction) / dir configurable
GET /snapshots/{instance}?limit=N → list metadata
GET /snapshots/{instance}/{filename} → image bytes
Persisted в /var/lib/cuda-grid/snapshots/{instance}/<ts>.png
layout_map / layout_filter_target в InstanceCfg — для будущей runtime switch
архитектуры (через streamselect либо filter rework — выбор за Phase 7).
Текущий _set_layout dispatches к layout_filter_target c index из map.
UI:
+ Layout buttons (quad/single/main_plus_preview placeholder)
+ Snapshot history grid с thumbnails (loaded /snapshots/{inst}?limit=24)
+ "Reload history" button
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
129 lines
4.5 KiB
Python
129 lines
4.5 KiB
Python
"""Periodic snapshot capture + listing для каждой instance."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import time
|
|
from pathlib import Path
|
|
from typing import TYPE_CHECKING
|
|
|
|
import structlog
|
|
|
|
if TYPE_CHECKING:
|
|
from .config import Config, InstanceCfg
|
|
|
|
log = structlog.get_logger()
|
|
|
|
|
|
class SnapshotHistory:
|
|
def __init__(self, cfg: "Config") -> None:
|
|
self.cfg = cfg
|
|
self._tasks: list[asyncio.Task] = []
|
|
|
|
def _dir_for(self, inst: "InstanceCfg") -> Path:
|
|
return Path(inst.snapshot_history.dir) / inst.name
|
|
|
|
async def start(self) -> None:
|
|
for inst in self.cfg.instances:
|
|
if not inst.snapshot_history.enabled:
|
|
continue
|
|
if not inst.output_rtsp_url:
|
|
log.warning("snapshot.no_url", instance=inst.name)
|
|
continue
|
|
d = self._dir_for(inst)
|
|
d.mkdir(parents=True, exist_ok=True)
|
|
self._tasks.append(asyncio.create_task(self._loop(inst)))
|
|
log.info("snapshot_history.started", instance=inst.name,
|
|
interval=inst.snapshot_history.interval_sec,
|
|
keep_last=inst.snapshot_history.keep_last, dir=str(d))
|
|
|
|
async def stop(self) -> None:
|
|
for t in self._tasks:
|
|
t.cancel()
|
|
for t in self._tasks:
|
|
try:
|
|
await t
|
|
except asyncio.CancelledError:
|
|
pass
|
|
self._tasks.clear()
|
|
|
|
async def _loop(self, inst: "InstanceCfg") -> None:
|
|
cfg = inst.snapshot_history
|
|
while True:
|
|
try:
|
|
await self._capture_one(inst)
|
|
self._evict(inst, cfg.keep_last)
|
|
except asyncio.CancelledError:
|
|
raise
|
|
except Exception as e:
|
|
log.error("snapshot_history.fail", instance=inst.name, error=str(e))
|
|
await asyncio.sleep(cfg.interval_sec)
|
|
|
|
async def _capture_one(self, inst: "InstanceCfg") -> None:
|
|
d = self._dir_for(inst)
|
|
ts = int(time.time())
|
|
path = d / f"{ts}.png"
|
|
tmp = path.with_suffix(".png.tmp")
|
|
proc = await asyncio.create_subprocess_exec(
|
|
"ffmpeg", "-hide_banner", "-loglevel", "error", "-y",
|
|
"-rtsp_transport", "tcp",
|
|
"-i", inst.output_rtsp_url,
|
|
"-frames:v", "1",
|
|
"-f", "image2", "-c:v", "png",
|
|
str(tmp),
|
|
stdout=asyncio.subprocess.DEVNULL,
|
|
stderr=asyncio.subprocess.PIPE,
|
|
)
|
|
try:
|
|
_, err = await asyncio.wait_for(proc.communicate(), timeout=15)
|
|
except asyncio.TimeoutError:
|
|
proc.kill()
|
|
log.warning("snapshot_history.timeout", instance=inst.name)
|
|
tmp.unlink(missing_ok=True)
|
|
return
|
|
if proc.returncode != 0 or not tmp.exists():
|
|
log.warning("snapshot_history.ffmpeg_fail",
|
|
instance=inst.name,
|
|
err=(err.decode(errors="replace")[:200] if err else ""))
|
|
tmp.unlink(missing_ok=True)
|
|
return
|
|
tmp.rename(path)
|
|
log.debug("snapshot_history.saved", instance=inst.name,
|
|
file=path.name, size=path.stat().st_size)
|
|
|
|
def _evict(self, inst: "InstanceCfg", keep_last: int) -> None:
|
|
d = self._dir_for(inst)
|
|
files = sorted(d.glob("*.png"))
|
|
excess = len(files) - keep_last
|
|
if excess > 0:
|
|
for old in files[:excess]:
|
|
old.unlink(missing_ok=True)
|
|
|
|
def list_snapshots(self, instance: str, limit: int = 50) -> list[dict]:
|
|
inst = next((i for i in self.cfg.instances if i.name == instance), None)
|
|
if inst is None or not inst.snapshot_history.enabled:
|
|
return []
|
|
d = self._dir_for(inst)
|
|
if not d.exists():
|
|
return []
|
|
files = sorted(d.glob("*.png"), reverse=True)[:limit]
|
|
return [
|
|
{
|
|
"filename": f.name,
|
|
"timestamp": int(f.stem),
|
|
"size": f.stat().st_size,
|
|
"url": f"/snapshots/{instance}/{f.name}",
|
|
}
|
|
for f in files
|
|
]
|
|
|
|
def path(self, instance: str, filename: str) -> Path | None:
|
|
inst = next((i for i in self.cfg.instances if i.name == instance), None)
|
|
if inst is None:
|
|
return None
|
|
# Защита от path traversal — only allow simple .png filenames
|
|
if "/" in filename or ".." in filename or not filename.endswith(".png"):
|
|
return None
|
|
p = self._dir_for(inst) / filename
|
|
return p if p.exists() else None
|