#199 DLPack export: - frame.dlpack_y() / .dlpack_uv() — explicit multi-plane access для NV12 - frame.__dlpack__() / __dlpack_device__() — protocol для torch/cupy - Capsule deleter правильно держит refcount на frame_keep_alive, releases shape/strides arrays. CUDA pointer принадлежит frame. #200 Health/stats counters: - frames_received, timeouts, errors — per-call counters - last_seq, gap_count — proxy для drop count (NEWEST_ONLY mode) - last_frame_pts_ns - stats() — snapshot dict для MQTT health publish - counted в pybind layer т.к. C API не expose'ит ring_occupancy #201 Per-subscriber CUDA stream + thread-safety: - consumer_stream kwarg в subscribe() — int (cudaStream_t pointer) - subscriber.consumer_stream property - Thread-safety contract в docstring CuframesSubscriber - next_frame() передаёт consumer_stream_ в cuframes_subscriber_next #202 Smoke test + docs: - 10/10 pytest passed (расширен +2 теста на consumer_stream) - docs/python.md (~250 строк): quick start, API reference, integration с PyTorch/CuPy, reconnect-loop pattern, per-stream usage, pitch alignment, thread-safety, error taxonomy, backpressure, Phase 0 limitations Verify build + tests: cmake -B build-python -DBUILD_PYTHON_BINDINGS=ON cmake --build build-python -j pytest python/tests/ -v # 10/10 Закрывает Phase 0 issue gx/cuframes#6. Разблокирует goldix-smart-home/yolo-world-detector Phase 1. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
10 KiB
cuframes Python bindings
Status: v0.4 — Phase 0 alpha (issue gx/cuframes#6)
Python пакет cuframes — pybind11-обёртка над C ABI libcuframes. Цель —
позволить downstream ML/CV пайплайнам (yolo-world-detector, zone-motion,
custom скриптам) подписываться на cuframes без CPU round-trip: получать
NV12 frames прямо как CUDA pointer / torch.Tensor (DLPack export, zero-copy
из VRAM publisher'а в VRAM consumer'а).
Установка
Standalone wheel (рекомендуемый):
cd cuframes/python/
pip install -e . --no-build-isolation
Через корневой CMake:
cmake -B build -DBUILD_PYTHON_BINDINGS=ON
cmake --build build -j
Quick start
import cuframes
print(cuframes.version_string()) # "0.4.0"
with cuframes.subscribe("cam-parking",
consumer_name="yolo-world",
connect_timeout_ms=5000) as sub:
with sub.next_frame(timeout_ms=1000) as frame:
print(f"{frame.width}x{frame.height} "
f"format={frame.format} seq={frame.seq}")
API
cuframes.subscribe(key, ...)
Создать подписку на publisher. Возвращает CuframesSubscriber.
| Параметр | Тип | Default | Назначение |
|---|---|---|---|
key |
str |
(required) | Имя publisher'а ("cam-parking" и т.п.) |
consumer_name |
str | None |
None (auto-generated) |
Идентификатор подписки |
mode |
SubscriberMode |
NEWEST_ONLY |
NEWEST_ONLY skip'ит промежуточные frames, STRICT_ORDER — все по порядку |
cuda_device |
int |
0 |
CUDA device id |
connect_timeout_ms |
int |
-1 (бесконечно) |
Сколько ждать publisher'а |
consumer_stream |
int |
0 (default stream) |
cudaStream_t как pointer |
CuframesSubscriber
Контекст-менеджер. Methods/properties:
sub.next_frame(timeout_ms=-1) # → CuframesFrame
sub.close() # idempotent
# read-only properties
sub.key # str
sub.consumer_name # str
sub.mode # SubscriberMode
sub.cuda_device # int
sub.consumer_stream # int (cudaStream_t ptr)
sub.closed # bool
# health / stats (Phase 0 counters)
sub.frames_received # int
sub.timeouts # int
sub.errors # int
sub.last_seq # int (sequence number последнего frame'а)
sub.gap_count # int (proxy для drop count в NEWEST_ONLY)
sub.last_frame_pts_ns # int
sub.stats() # dict — snapshot всех counters для MQTT publish
CuframesFrame
Контекст-менеджер. Properties (read-only):
frame.cuda_ptr # int (uintptr_t)
frame.format # PixelFormat
frame.width # int
frame.height # int
frame.pitch_y # int — pitch Y plane (важно — может быть > width!)
frame.pitch_uv # int
frame.seq # int — sequence number у publisher'а
frame.pts_ns # int — CLOCK_MONOTONIC у publisher'а
frame.released # bool
# DLPack export (zero-copy)
frame.dlpack_y() # capsule — Y plane как 2D uint8 GPU tensor
frame.dlpack_uv() # capsule — UV plane (только NV12)
frame.__dlpack__() # protocol для torch.from_dlpack(frame)
frame.__dlpack_device__() # (kDLCUDA=2, device_id)
Интеграция с PyTorch
import torch
import cuframes
with cuframes.subscribe("cam-parking", connect_timeout_ms=5000) as sub:
with sub.next_frame() as frame:
# Single-plane (default — Y plane для NV12)
y_tensor = torch.from_dlpack(frame)
# Multi-plane explicit
y = torch.from_dlpack(frame.dlpack_y()) # shape=[H, W] uint8
uv = torch.from_dlpack(frame.dlpack_uv()) # shape=[H/2, W] uint8
# Y plane уже в VRAM — никаких copy. Можно сразу feed в NN.
y_float = y.float() / 255.0 # будет на CUDA device
Интеграция с CuPy
import cupy
import cuframes
with cuframes.subscribe("cam-parking", connect_timeout_ms=5000) as sub:
with sub.next_frame() as frame:
y_array = cupy.from_dlpack(frame.dlpack_y()) # cupy.ndarray на GPU
Pattern: reconnect-loop для долгоживущего consumer'а
import time
import cuframes
def consume_camera(key: str, on_frame):
while True:
try:
with cuframes.subscribe(key, connect_timeout_ms=5000) as sub:
while True:
try:
with sub.next_frame(timeout_ms=1000) as frame:
on_frame(frame)
except cuframes.CuframesFrameTimeout:
# просто нет новых кадров — продолжаем ждать
continue
except cuframes.CuframesPublisherGone:
# publisher умер / перезапускается — переподписываемся
print(f"publisher {key} gone, reconnect через 1s")
time.sleep(1)
except cuframes.CuframesError as e:
# фатальная ошибка — логируем и продолжаем
print(f"error: {e!r}")
time.sleep(5)
Per-subscriber CUDA stream
В продакшене на 4+ камеры каждый subscriber должен иметь свой stream —
иначе cudaStreamWaitEvent сериализует всех consumer'ов через default
stream.
С cuda-python:
from cuda import cudart
import cuframes
err, stream = cudart.cudaStreamCreate()
assert err == cudart.cudaError_t.cudaSuccess
with cuframes.subscribe("cam-parking", consumer_stream=int(stream)) as sub:
...
С torch.cuda.Stream:
import torch
import cuframes
stream = torch.cuda.Stream()
with cuframes.subscribe("cam-parking",
consumer_stream=stream.cuda_stream) as sub:
with torch.cuda.stream(stream):
with sub.next_frame() as frame:
tensor = torch.from_dlpack(frame)
# ... inference на этом stream'е ...
Pitch alignment — важно!
NVDEC отдаёт NV12 с pitch alignment 256 байт. Для камер с шириной не
кратной 256 (gate_lpr 2688×1520 → pitch 2688 OK; но представьте 640×480
→ pitch обычно 640 байт, но может быть 768).
# WRONG — assume pitch == width
y = torch.frombuffer(...) # данные смещены
# RIGHT — использовать DLPack который сам respect'ит strides
y = torch.from_dlpack(frame.dlpack_y()) # stride учтён правильно
# ALTERNATIVELY — manual через cuda-python с правильным pitch
ptr = frame.cuda_ptr
pitch = frame.pitch_y
height = frame.height
Thread-safety contract
- Каждый
CuframesSubscriberпринадлежит одному Python потоку. Создание и все вызовы (next_frame,close) — в одном thread. - Несколько subscriber'ов в разных потоках — OK (каждому свой handle, свой CUDA stream).
CuframesFrameтоже принадлежит одному потоку — послеrelease()его CUDA pointer становится недействительным, доступ из другого потока — undefined behavior.- Внутренний GIL отпускается на блокирующих вызовах (
subscriber_create,next_frame) — другие Python потоки могут выполняться.
Для multi-camera в одном процессе используйте asyncio или threading:
import threading
import cuframes
def worker(camera_key):
with cuframes.subscribe(camera_key, connect_timeout_ms=5000) as sub:
# subscribe в этом же потоке
while True:
with sub.next_frame(timeout_ms=1000) as frame:
process(frame)
for key in ["cam-parking", "cam-front_yard", "cam-gate_lpr", "cam-back_yard"]:
threading.Thread(target=worker, args=(key,), daemon=True).start()
Error taxonomy
Все exception'ы наследуются от CuframesError. Конкретные subclass'ы
позволяют разную обработку:
| Exception | Когда выбрасывается | Что делать |
|---|---|---|
CuframesPublisherGone |
publisher умер или ещё не стартовал | reconnect-loop |
CuframesFrameTimeout |
timeout без frame'а | продолжать ждать или log'нуть |
CuframesDeviceLost |
CUDA error на cross-process sync | abort, не recoverable |
CuframesShmError |
socket/mmap/IPC error | log, abort или восстановить |
CuframesProtocolMismatch |
версия libcuframes несовместима | пересобрать |
CuframesInvalidArgument |
bug в caller | fix code |
CuframesOutOfMemory |
cudaMalloc fail | reduce работу |
CuframesInternal |
bug в libcuframes | report |
Backpressure
next_frame() blocking call с GIL released. Если consumer медленнее
publisher'а:
- В
NEWEST_ONLYmode (default) — publisher продолжает писать, consumer получает самый свежий frame (промежуточные пропускает).gap_countрастёт. - В
STRICT_ORDERmode — при ring overflowCuframesPublisherGone→ reconnect.
Frame удерживать долго нельзя: в STRICT_WAIT policy publisher
заблокирует ring. Pattern — забрать DLPack, инициировать GPU работу,
release frame сразу.
Текущие ограничения (Phase 0)
- Publisher API не обёрнут (только subscriber-side)
- Packet ring (encoded video) не обёрнут
- Async callback API не обёрнут
ring_occupancy/ реальный drop count — нет в C API (counted в pybind какgap_count, это proxy)- Smoke test реального subscribe требует Docker IPC namespace (cuframes socket/SHM живут в namespace publisher'а)
Эти ограничения снимаются по мере необходимости — issues в gx/cuframes.