Python bindings (pybind11) — Phase 0 v1 #7

Merged
gx merged 4 commits from feat/python-bindings into main 2026-06-13 21:34:30 +01:00
9 changed files with 1393 additions and 0 deletions
+4
View File
@@ -39,3 +39,7 @@ endif()
if(BUILD_TOOLS)
add_subdirectory(tools/cuframes-rtsp-source)
endif()
if(BUILD_PYTHON_BINDINGS)
add_subdirectory(python)
endif()
+284
View File
@@ -0,0 +1,284 @@
# cuframes Python bindings
Status: **v0.4 — Phase 0 alpha** (issue [gx/cuframes#6](http://server:3000/gx/cuframes/issues/6))
Python пакет `cuframes` — pybind11-обёртка над C ABI libcuframes. Цель —
позволить downstream ML/CV пайплайнам (yolo-world-detector, zone-motion,
custom скриптам) подписываться на cuframes **без CPU round-trip**: получать
NV12 frames прямо как CUDA pointer / `torch.Tensor` (DLPack export, zero-copy
из VRAM publisher'а в VRAM consumer'а).
## Установка
Standalone wheel (рекомендуемый):
```bash
cd cuframes/python/
pip install -e . --no-build-isolation
```
Через корневой CMake:
```bash
cmake -B build -DBUILD_PYTHON_BINDINGS=ON
cmake --build build -j
```
## Quick start
```python
import cuframes
print(cuframes.version_string()) # "0.4.0"
with cuframes.subscribe("cam-parking",
consumer_name="yolo-world",
connect_timeout_ms=5000) as sub:
with sub.next_frame(timeout_ms=1000) as frame:
print(f"{frame.width}x{frame.height} "
f"format={frame.format} seq={frame.seq}")
```
## API
### `cuframes.subscribe(key, ...)`
Создать подписку на publisher. Возвращает `CuframesSubscriber`.
| Параметр | Тип | Default | Назначение |
|---|---|---|---|
| `key` | `str` | (required) | Имя publisher'а (`"cam-parking"` и т.п.) |
| `consumer_name` | `str \| None` | `None` (auto-generated) | Идентификатор подписки |
| `mode` | `SubscriberMode` | `NEWEST_ONLY` | `NEWEST_ONLY` skip'ит промежуточные frames, `STRICT_ORDER` — все по порядку |
| `cuda_device` | `int` | `0` | CUDA device id |
| `connect_timeout_ms` | `int` | `-1` (бесконечно) | Сколько ждать publisher'а |
| `consumer_stream` | `int` | `0` (default stream) | `cudaStream_t` как pointer |
### `CuframesSubscriber`
Контекст-менеджер. Methods/properties:
```python
sub.next_frame(timeout_ms=-1) # → CuframesFrame
sub.close() # idempotent
# read-only properties
sub.key # str
sub.consumer_name # str
sub.mode # SubscriberMode
sub.cuda_device # int
sub.consumer_stream # int (cudaStream_t ptr)
sub.closed # bool
# health / stats (Phase 0 counters)
sub.frames_received # int
sub.timeouts # int
sub.errors # int
sub.last_seq # int (sequence number последнего frame'а)
sub.gap_count # int (proxy для drop count в NEWEST_ONLY)
sub.last_frame_pts_ns # int
sub.stats() # dict — snapshot всех counters для MQTT publish
```
### `CuframesFrame`
Контекст-менеджер. Properties (read-only):
```python
frame.cuda_ptr # int (uintptr_t)
frame.format # PixelFormat
frame.width # int
frame.height # int
frame.pitch_y # int — pitch Y plane (важно — может быть > width!)
frame.pitch_uv # int
frame.seq # int — sequence number у publisher'а
frame.pts_ns # int — CLOCK_MONOTONIC у publisher'а
frame.released # bool
# DLPack export (zero-copy)
frame.dlpack_y() # capsule — Y plane как 2D uint8 GPU tensor
frame.dlpack_uv() # capsule — UV plane (только NV12)
frame.__dlpack__() # protocol для torch.from_dlpack(frame)
frame.__dlpack_device__() # (kDLCUDA=2, device_id)
```
## Интеграция с PyTorch
```python
import torch
import cuframes
with cuframes.subscribe("cam-parking", connect_timeout_ms=5000) as sub:
with sub.next_frame() as frame:
# Single-plane (default — Y plane для NV12)
y_tensor = torch.from_dlpack(frame)
# Multi-plane explicit
y = torch.from_dlpack(frame.dlpack_y()) # shape=[H, W] uint8
uv = torch.from_dlpack(frame.dlpack_uv()) # shape=[H/2, W] uint8
# Y plane уже в VRAM — никаких copy. Можно сразу feed в NN.
y_float = y.float() / 255.0 # будет на CUDA device
```
## Интеграция с CuPy
```python
import cupy
import cuframes
with cuframes.subscribe("cam-parking", connect_timeout_ms=5000) as sub:
with sub.next_frame() as frame:
y_array = cupy.from_dlpack(frame.dlpack_y()) # cupy.ndarray на GPU
```
## Pattern: reconnect-loop для долгоживущего consumer'а
```python
import time
import cuframes
def consume_camera(key: str, on_frame):
while True:
try:
with cuframes.subscribe(key, connect_timeout_ms=5000) as sub:
while True:
try:
with sub.next_frame(timeout_ms=1000) as frame:
on_frame(frame)
except cuframes.CuframesFrameTimeout:
# просто нет новых кадров — продолжаем ждать
continue
except cuframes.CuframesPublisherGone:
# publisher умер / перезапускается — переподписываемся
print(f"publisher {key} gone, reconnect через 1s")
time.sleep(1)
except cuframes.CuframesError as e:
# фатальная ошибка — логируем и продолжаем
print(f"error: {e!r}")
time.sleep(5)
```
## Per-subscriber CUDA stream
В продакшене на 4+ камеры каждый subscriber должен иметь свой stream —
иначе `cudaStreamWaitEvent` сериализует всех consumer'ов через default
stream.
С `cuda-python`:
```python
from cuda import cudart
import cuframes
err, stream = cudart.cudaStreamCreate()
assert err == cudart.cudaError_t.cudaSuccess
with cuframes.subscribe("cam-parking", consumer_stream=int(stream)) as sub:
...
```
С `torch.cuda.Stream`:
```python
import torch
import cuframes
stream = torch.cuda.Stream()
with cuframes.subscribe("cam-parking",
consumer_stream=stream.cuda_stream) as sub:
with torch.cuda.stream(stream):
with sub.next_frame() as frame:
tensor = torch.from_dlpack(frame)
# ... inference на этом stream'е ...
```
## Pitch alignment — важно!
NVDEC отдаёт NV12 с pitch alignment 256 байт. Для камер с шириной не
кратной 256 (`gate_lpr 2688×1520` → pitch 2688 OK; но представьте `640×480`
→ pitch обычно 640 байт, но **может быть 768**).
```python
# WRONG — assume pitch == width
y = torch.frombuffer(...) # данные смещены
# RIGHT — использовать DLPack который сам respect'ит strides
y = torch.from_dlpack(frame.dlpack_y()) # stride учтён правильно
# ALTERNATIVELY — manual через cuda-python с правильным pitch
ptr = frame.cuda_ptr
pitch = frame.pitch_y
height = frame.height
```
## Thread-safety contract
- Каждый `CuframesSubscriber` принадлежит **одному Python потоку**.
Создание и все вызовы (`next_frame`, `close`) — в одном thread.
- Несколько subscriber'ов в разных потоках — **OK** (каждому свой handle,
свой CUDA stream).
- `CuframesFrame` тоже принадлежит одному потоку — после `release()` его
CUDA pointer становится недействительным, доступ из другого потока —
undefined behavior.
- Внутренний GIL отпускается на блокирующих вызовах (`subscriber_create`,
`next_frame`) — другие Python потоки могут выполняться.
Для multi-camera в одном процессе используйте `asyncio` или `threading`:
```python
import threading
import cuframes
def worker(camera_key):
with cuframes.subscribe(camera_key, connect_timeout_ms=5000) as sub:
# subscribe в этом же потоке
while True:
with sub.next_frame(timeout_ms=1000) as frame:
process(frame)
for key in ["cam-parking", "cam-front_yard", "cam-gate_lpr", "cam-back_yard"]:
threading.Thread(target=worker, args=(key,), daemon=True).start()
```
## Error taxonomy
Все exception'ы наследуются от `CuframesError`. Конкретные subclass'ы
позволяют разную обработку:
| Exception | Когда выбрасывается | Что делать |
|---|---|---|
| `CuframesPublisherGone` | publisher умер или ещё не стартовал | reconnect-loop |
| `CuframesFrameTimeout` | timeout без frame'а | продолжать ждать или log'нуть |
| `CuframesDeviceLost` | CUDA error на cross-process sync | abort, не recoverable |
| `CuframesShmError` | socket/mmap/IPC error | log, abort или восстановить |
| `CuframesProtocolMismatch` | версия libcuframes несовместима | пересобрать |
| `CuframesInvalidArgument` | bug в caller | fix code |
| `CuframesOutOfMemory` | cudaMalloc fail | reduce работу |
| `CuframesInternal` | bug в libcuframes | report |
## Backpressure
`next_frame()` blocking call с GIL released. Если consumer медленнее
publisher'а:
- В `NEWEST_ONLY` mode (default) — publisher продолжает писать, consumer
получает **самый свежий** frame (промежуточные пропускает). `gap_count`
растёт.
- В `STRICT_ORDER` mode — при ring overflow `CuframesPublisherGone`
reconnect.
Frame удерживать долго **нельзя**: в `STRICT_WAIT` policy publisher
заблокирует ring. Pattern — забрать DLPack, инициировать GPU работу,
release frame сразу.
## Текущие ограничения (Phase 0)
- Publisher API не обёрнут (только subscriber-side)
- Packet ring (encoded video) не обёрнут
- Async callback API не обёрнут
- `ring_occupancy` / реальный drop count — нет в C API (counted в pybind как
`gap_count`, это proxy)
- Smoke test реального subscribe требует Docker IPC namespace (cuframes
socket/SHM живут в namespace publisher'а)
Эти ограничения снимаются по мере необходимости — issues в
[gx/cuframes](http://server:3000/gx/cuframes).
+7
View File
@@ -0,0 +1,7 @@
build/
dist/
*.egg-info/
__pycache__/
*.pyc
*.so
.pytest_cache/
+52
View File
@@ -0,0 +1,52 @@
# Python bindings for cuframes — pybind11 module.
#
# Buildup: используется как subdirectory из root CMakeLists.txt при
# BUILD_PYTHON_BINDINGS=ON, либо standalone через scikit-build-core
# (см. pyproject.toml).
#
# Output: единый shared module `_native.so` который импортируется из
# Python package `cuframes` (cuframes/__init__.py re-export'ит публичный API).
include(FetchContent)
# pybind11 — header-only + helper functions. FetchContent чтобы не требовать
# system install; pinned tag для воспроизводимых билдов.
FetchContent_Declare(
pybind11
GIT_REPOSITORY https://github.com/pybind/pybind11.git
GIT_TAG v2.13.6
GIT_SHALLOW TRUE
)
FetchContent_MakeAvailable(pybind11)
pybind11_add_module(_native MODULE
src/_native.cpp
)
target_include_directories(_native PRIVATE
${PROJECT_SOURCE_DIR}/include
)
target_link_libraries(_native PRIVATE
cuframes # imported target из libcuframes/CMakeLists.txt
)
# Версия модуля соответствует libcuframes (см. cuframes.h)
target_compile_definitions(_native PRIVATE
CUFRAMES_PY_BINDING_VERSION="${PROJECT_VERSION}"
)
set_target_properties(_native PROPERTIES
CXX_STANDARD 17
CXX_STANDARD_REQUIRED ON
CXX_VISIBILITY_PRESET hidden
INTERPROCEDURAL_OPTIMIZATION TRUE
)
# При scikit-build-core билде модуль попадает в wheel рядом с Python-исходниками
# пакета. При standalone CMake — устанавливается в site-packages по умолчанию.
if(SKBUILD)
install(TARGETS _native DESTINATION cuframes)
else()
install(TARGETS _native LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}/cuframes)
endif()
+53
View File
@@ -0,0 +1,53 @@
# cuframes — Python bindings
Status: **WIP** (Phase 0 skeleton — issue [gx/cuframes#6](http://server:3000/gx/cuframes/issues/6))
Это пакет Python-обёрток над `libcuframes` (C ABI). Цель — позволить
downstream ML/CV пайплайнам (yolo-world-detector, zone-motion, custom
скриптам) подписываться на cuframes без CPU round-trip: получать NV12
frames прямо как CUDA pointer / `torch.Tensor` (DLPack export, zero-copy).
## Текущий статус (что уже работает в этом skeleton)
- Module import: `import cuframes` загружает `_native.so`
- Версия: `cuframes.version_string()`, `cuframes.protocol_version()`
- Enums: `PixelFormat`, `SubscriberMode`
- Иерархия исключений: `CuframesError` + 8 subclasses (publisher gone,
frame timeout, device lost, и т. д.)
## Что в работе (см. tasks #198-#202)
- [ ] `CuframesSubscriber` + `CuframesFrame` lifecycle
- [ ] DLPack export → `torch.from_dlpack`, `cupy.from_dlpack`
- [ ] Context manager (`with cuframes.subscribe(key) as sub:`)
- [ ] Per-subscriber CUDA stream
- [ ] Health/stats properties (`ring_occupancy`, `drop_count`)
- [ ] Thread-safety contract документация
## Build (dev)
Standalone wheel:
```bash
cd python/
pip install -e . --no-build-isolation
```
Через корневой CMake-проект (вместе с libcuframes):
```bash
cmake -B build -DBUILD_PYTHON_BINDINGS=ON
cmake --build build -j
```
## Зависимости
- `libcuframes` ≥ 0.4 (линкуется из соседнего CMake target)
- CUDA Toolkit 12+
- `pybind11` 2.13+ (берётся через FetchContent при CMake-сборке)
- Python 3.10+
- Опционально: `torch>=2.4` или `cupy-cuda12x>=13` для DLPack-потребителей
## Лицензия
LGPL-2.1+ (как у libcuframes).
+77
View File
@@ -0,0 +1,77 @@
"""cuframes — zero-copy CUDA frame sharing.
Python bindings to libcuframes. См. docs/python.md (т.б.д.) для
архитектуры, threading контракта и примеров интеграции с PyTorch/CuPy.
Пример (subscriber-side):
import cuframes
with cuframes.subscribe("cam-parking",
consumer_name="yolo-world",
connect_timeout_ms=5000) as sub:
# next_frame returns CuframesFrame — context manager
with sub.next_frame(timeout_ms=1000) as frame:
print(frame.cuda_ptr, frame.width, frame.height,
frame.pitch_y, frame.seq, frame.pts_ns)
# DLPack export — в task #199, пока через cuda-python:
# cuda_arr = cuda.from_pointer(frame.cuda_ptr, ...)
Reconnect-loop пример:
while True:
try:
with cuframes.subscribe("cam-parking", connect_timeout_ms=5000) as sub:
while True:
try:
with sub.next_frame(timeout_ms=1000) as frame:
process(frame)
except cuframes.CuframesFrameTimeout:
continue # просто нет новых кадров
except cuframes.CuframesPublisherGone:
time.sleep(1) # publisher restart — переподписываемся
"""
from ._native import (
# Метаданные
version_string,
protocol_version,
# Enums
PixelFormat,
SubscriberMode,
# Core API
CuframesSubscriber,
CuframesFrame,
subscribe,
# Error taxonomy
CuframesError,
CuframesPublisherGone,
CuframesFrameTimeout,
CuframesDeviceLost,
CuframesShmError,
CuframesProtocolMismatch,
CuframesInvalidArgument,
CuframesOutOfMemory,
CuframesInternal,
)
__version__ = version_string()
__all__ = [
"version_string",
"protocol_version",
"PixelFormat",
"SubscriberMode",
"CuframesSubscriber",
"CuframesFrame",
"subscribe",
"CuframesError",
"CuframesPublisherGone",
"CuframesFrameTimeout",
"CuframesDeviceLost",
"CuframesShmError",
"CuframesProtocolMismatch",
"CuframesInvalidArgument",
"CuframesOutOfMemory",
"CuframesInternal",
]
+47
View File
@@ -0,0 +1,47 @@
[build-system]
requires = [
"scikit-build-core>=0.10",
"pybind11>=2.13",
]
build-backend = "scikit_build_core.build"
[project]
name = "cuframes"
version = "0.4.0"
description = "Python bindings for cuframes — zero-copy CUDA frame sharing"
readme = "README.md"
license = { text = "LGPL-2.1+" }
requires-python = ">=3.10"
authors = [{ name = "Evgeny Demchenko", email = "demchenkoev@gmail.com" }]
keywords = ["cuda", "video", "ipc", "zero-copy"]
classifiers = [
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"License :: OSI Approved :: GNU Lesser General Public License v2 or later (LGPLv2+)",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Topic :: Multimedia :: Video",
]
[project.optional-dependencies]
torch = ["torch>=2.4"]
cupy = ["cupy-cuda12x>=13"]
dev = ["pytest>=8", "ruff>=0.6"]
[tool.scikit-build]
cmake.version = ">=3.20"
cmake.build-type = "Release"
build-dir = "build/{wheel_tag}"
wheel.packages = ["cuframes"]
# Будем строить только Python модуль; libcuframes собирается отдельно
# в основном CMake-проекте и линкуется как imported target.
cmake.args = ["-DBUILD_PYTHON_BINDINGS=ON", "-DBUILD_EXAMPLES=OFF", "-DBUILD_TOOLS=OFF"]
cmake.source-dir = ".."
[tool.scikit-build.cmake.define]
BUILD_PYTHON_BINDINGS = "ON"
[tool.pytest.ini_options]
testpaths = ["tests"]
+757
View File
@@ -0,0 +1,757 @@
// cuframes Python bindings — pybind11 entry point.
//
// Этот файл реализует core wrapper для subscriber-side API:
// - CuframesFrame — owning handle одного frame'а, context manager
// - CuframesSubscriber — owning handle subscription'а, context manager
//
// DLPack export (#199), per-subscriber CUDA stream (#201), health/stats props
// (#200) — добавляются в последующих коммитах в этот же файл.
//
// Контракт thread-safety (предварительный, финальный — task #202):
// - Каждый handle (CuframesSubscriber / CuframesFrame) принадлежит одному
// Python потоку. Cross-thread access = undefined behavior на C-уровне.
// - GIL отпускается на длинных I/O вызовах (next_frame) — другие Python
// потоки могут работать пока мы ждём frame.
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
#include <cstring>
#include <optional>
#include <stdexcept>
#include <string>
#include "cuframes/cuframes.h"
// DLPack — стандартный protocol для exchange tensor-like структур между
// фреймворками (PyTorch/CuPy/JAX/TF). См. https://dmlc.github.io/dlpack/latest/
// Мы embedим header inline чтобы не добавлять external dep — header
// небольшой и стабильный (DLPack 1.0+).
namespace dlpack {
typedef enum {
kDLCPU = 1,
kDLCUDA = 2,
} DLDeviceType;
typedef struct {
DLDeviceType device_type;
int32_t device_id;
} DLDevice;
typedef enum {
kDLInt = 0U,
kDLUInt = 1U,
kDLFloat = 2U,
} DLDataTypeCode;
typedef struct {
uint8_t code;
uint8_t bits;
uint16_t lanes;
} DLDataType;
typedef struct {
void* data;
DLDevice device;
int32_t ndim;
DLDataType dtype;
int64_t* shape;
int64_t* strides;
uint64_t byte_offset;
} DLTensor;
typedef struct DLManagedTensor {
DLTensor dl_tensor;
void* manager_ctx;
void (*deleter)(struct DLManagedTensor* self);
} DLManagedTensor;
} // namespace dlpack
namespace py = pybind11;
namespace {
// ─────────────────────────────────────────────────────────────────────────────
// Error taxonomy — Python exceptions, соответствующие cuframes_error_t.
//
// Принцип: каждая категория ошибок которая требует разной обработки в
// downstream'е (reconnect vs retry vs fatal) → отдельный exception class.
// Это решает требование из architect review: «detector должен уметь
// reconnect-loop по publisher-gone, не падать».
// ─────────────────────────────────────────────────────────────────────────────
struct CuframesExceptions {
py::object base;
py::object publisher_gone; // CUFRAMES_ERR_DISCONNECTED, _NOT_FOUND
py::object frame_timeout; // CUFRAMES_ERR_TIMEOUT, _WOULD_BLOCK
py::object device_lost; // CUFRAMES_ERR_CUDA
py::object shm_error; // CUFRAMES_ERR_IO
py::object protocol_mismatch; // CUFRAMES_ERR_PROTOCOL
py::object invalid_argument; // CUFRAMES_ERR_INVALID_ARG
py::object out_of_memory; // CUFRAMES_ERR_OUT_OF_MEMORY
py::object internal; // CUFRAMES_ERR_INTERNAL, прочее
};
CuframesExceptions g_exc;
// Маппинг cuframes_error_t → подходящий Python exception class.
py::object exception_for(int err) {
switch (err) {
case CUFRAMES_ERR_NOT_FOUND:
case CUFRAMES_ERR_DISCONNECTED:
return g_exc.publisher_gone;
case CUFRAMES_ERR_TIMEOUT:
case CUFRAMES_ERR_WOULD_BLOCK:
return g_exc.frame_timeout;
case CUFRAMES_ERR_CUDA:
return g_exc.device_lost;
case CUFRAMES_ERR_IO:
return g_exc.shm_error;
case CUFRAMES_ERR_PROTOCOL:
return g_exc.protocol_mismatch;
case CUFRAMES_ERR_INVALID_ARG:
return g_exc.invalid_argument;
case CUFRAMES_ERR_OUT_OF_MEMORY:
return g_exc.out_of_memory;
default:
return g_exc.internal;
}
}
// Бросает подходящий exception если err != CUFRAMES_OK.
void check(int err, const char* operation = nullptr) {
if (err == CUFRAMES_OK) return;
const char* msg = cuframes_strerror(err);
std::string what = operation
? std::string(operation) + ": " + msg + " (code=" + std::to_string(err) + ")"
: std::string(msg) + " (code=" + std::to_string(err) + ")";
PyErr_SetString(exception_for(err).ptr(), what.c_str());
throw py::error_already_set();
}
// ─────────────────────────────────────────────────────────────────────────────
// CuframesFrame — owning wrapper над cuframes_frame_t*.
//
// Lifecycle:
// - конструируется через Subscriber::next_frame() (single source of truth)
// - в destructor'е (или __exit__) автоматически вызывает release
// - после release() все property accessor'ы бросают CuframesError
// - non-copyable, non-movable из Python (PyObject identity)
//
// Frame держит **слабую** ссылку (raw pointer) на subscriber. Если subscriber
// уничтожен раньше frame'а — released() становится no-op (subscriber разрулит
// освобождение всех outstanding frames при cuframes_subscriber_destroy).
// Чтобы избежать use-after-free, frame проверяет sub_alive_ через shared_ptr.
//
// Для простоты Phase 0 — frame и subscriber должны жить в одном Python потоке,
// порядок destruction под управлением Python GC. Refcount на Python-стороне
// от субскриптора держится через py::object атрибут.
// ─────────────────────────────────────────────────────────────────────────────
class FrameWrapper {
public:
FrameWrapper(cuframes_subscriber_t* sub, cuframes_frame_t* frame)
: sub_(sub), frame_(frame) {}
~FrameWrapper() {
try { release(); } catch (...) { /* destructor — глотаем */ }
}
// pybind11 не любит copyable wrappers для owning ресурсов.
FrameWrapper(const FrameWrapper&) = delete;
FrameWrapper& operator=(const FrameWrapper&) = delete;
bool released() const noexcept { return frame_ == nullptr; }
void release() {
if (frame_ != nullptr) {
// sub_ может быть nullptr если subscriber разорвал связь раньше —
// в этом случае release уже не нужен (subscriber всё освободил).
if (sub_ != nullptr) {
cuframes_subscriber_release(sub_, frame_);
}
frame_ = nullptr;
}
}
// Internal hook — subscriber говорит frame'у «я умираю, не release()ай».
void invalidate_subscriber() noexcept { sub_ = nullptr; }
// ── Properties ──────────────────────────────────────────────────────
// Все геттеры проверяют released() — иначе CuframesError.
void check_alive() const {
if (frame_ == nullptr) {
PyErr_SetString(g_exc.base.ptr(), "frame has been released");
throw py::error_already_set();
}
}
uintptr_t cuda_ptr() const {
check_alive();
return reinterpret_cast<uintptr_t>(cuframes_frame_cuda_ptr(frame_));
}
cuframes_format_t format() const {
check_alive();
return cuframes_frame_format(frame_);
}
int width() const {
check_alive();
int32_t w, h;
cuframes_frame_size(frame_, &w, &h);
return w;
}
int height() const {
check_alive();
int32_t w, h;
cuframes_frame_size(frame_, &w, &h);
return h;
}
int pitch_y() const {
check_alive();
return cuframes_frame_pitch_y(frame_);
}
int pitch_uv() const {
check_alive();
return cuframes_frame_pitch_uv(frame_);
}
uint64_t seq() const {
check_alive();
return cuframes_frame_seq(frame_);
}
int64_t pts_ns() const {
check_alive();
return cuframes_frame_pts_ns(frame_);
}
cuframes_subscriber_t* internal_sub() const noexcept { return sub_; }
cuframes_frame_t* internal_frame() const noexcept { return frame_; }
private:
cuframes_subscriber_t* sub_;
cuframes_frame_t* frame_;
};
// ─────────────────────────────────────────────────────────────────────────────
// DLPack export helpers.
//
// Кадр в NV12 состоит из 2 plane'ов: Y (uint8, H×W, pitch=pitch_y) и
// UV interleaved (uint8, H/2×W, pitch=pitch_uv; W здесь = ширина в байтах
// для interleaved U+V).
//
// Стратегия: даём пользователю 2 отдельных DLPack capsule на каждый plane.
// Это стандартный pattern в PyTorch/CuPy (torchcodec, cuda-python).
// UV offset вычисляется из pitch_y * height_aligned (NVDEC выравнивает
// height до aligned значения — обычно высота уже aligned, но мы используем
// видимую height из frame_size).
//
// Lifetime: deleter capsule освобождает только shape/strides arrays.
// Сам CUDA pointer принадлежит frame'у — gone-frame должно быть released
// **после** того как DLPack capsule destroyed. Чтобы не дать пользователю
// shoot in foot, capsule.manager_ctx держит py::object на FrameWrapper
// (увеличивает refcount), которое освобождается в deleter.
// ─────────────────────────────────────────────────────────────────────────────
struct DLPackContext {
py::object frame_keep_alive; // CuframesFrame Python-side
std::vector<int64_t> shape;
std::vector<int64_t> strides;
};
static void dlpack_deleter(dlpack::DLManagedTensor* self) {
if (!self) return;
auto* ctx = static_cast<DLPackContext*>(self->manager_ctx);
if (ctx) {
// Releasing Python refcount требует GIL
py::gil_scoped_acquire gil;
delete ctx;
}
delete self;
}
static void dlpack_pycapsule_destructor(PyObject* capsule) {
if (PyCapsule_IsValid(capsule, "dltensor")) {
// Capsule НЕ был consumed downstream'ом (e.g. torch.from_dlpack).
// Нужно освободить managed tensor самим.
auto* mt = static_cast<dlpack::DLManagedTensor*>(
PyCapsule_GetPointer(capsule, "dltensor"));
if (mt && mt->deleter) {
mt->deleter(mt);
}
}
// Если PyCapsule имеет name "used_dltensor" — downstream взял ownership,
// мы ничего не делаем.
}
static py::capsule make_dlpack_capsule(
void* data,
int rows, int cols, int64_t row_stride_bytes,
int cuda_device,
py::object frame_keep_alive
) {
auto* ctx = new DLPackContext;
ctx->frame_keep_alive = std::move(frame_keep_alive);
ctx->shape = {static_cast<int64_t>(rows), static_cast<int64_t>(cols)};
ctx->strides = {row_stride_bytes, 1};
auto* mt = new dlpack::DLManagedTensor;
mt->dl_tensor.data = data;
mt->dl_tensor.device = {dlpack::kDLCUDA, cuda_device};
mt->dl_tensor.ndim = 2;
mt->dl_tensor.dtype = {dlpack::kDLUInt, 8, 1}; // uint8
mt->dl_tensor.shape = ctx->shape.data();
mt->dl_tensor.strides = ctx->strides.data();
mt->dl_tensor.byte_offset = 0;
mt->manager_ctx = ctx;
mt->deleter = dlpack_deleter;
return py::capsule(mt, "dltensor", &dlpack_pycapsule_destructor);
}
// ─────────────────────────────────────────────────────────────────────────────
// CuframesSubscriber — owning wrapper над cuframes_subscriber_t*.
//
// API:
// sub = cuframes.subscribe("cam-parking", consumer_name="yolo-world",
// timeout_ms=5000)
// with sub:
// with sub.next_frame(timeout_ms=1000) as frame:
// do_something(frame.cuda_ptr, frame.width, frame.height)
// # sub.close() здесь автоматически
//
// Iteration (Phase 0.5):
// for frame in sub.frames(timeout_ms=1000):
// ...
// ─────────────────────────────────────────────────────────────────────────────
// Per-subscriber health stats. Phase 0 версия — counted в pybind layer
// (cuframes C API не expose'ит ring_occupancy / drop_count напрямую).
// Если в будущем cuframes расширит C API (cuframes_subscriber_get_stats),
// добавим reads оттуда — но текущие counters остаются для совместимости
// с тем что consumer'у видно через Python API.
struct SubscriberStats {
uint64_t frames_received = 0; // успешных next_frame()
uint64_t timeouts = 0; // CUFRAMES_ERR_TIMEOUT / WOULD_BLOCK
uint64_t errors = 0; // прочие fail'ы в next_frame()
uint64_t last_seq = 0; // seq последнего полученного frame'а
uint64_t gap_count = 0; // сколько раз seq[i] > seq[i-1] + 1
// (proxy для drop count в NEWEST_ONLY mode)
int64_t last_frame_pts_ns = 0;
};
class SubscriberWrapper {
public:
SubscriberWrapper(
const std::string& key,
std::optional<std::string> consumer_name,
cuframes_subscriber_mode_t mode,
int cuda_device,
int connect_timeout_ms,
uintptr_t consumer_stream
) : key_(key),
consumer_name_(consumer_name.value_or("")),
mode_(mode),
cuda_device_(cuda_device),
consumer_stream_(reinterpret_cast<void*>(consumer_stream)) {
cuframes_subscriber_config_t cfg = {};
cfg.key = key_.c_str();
cfg.consumer_name = consumer_name.has_value() ? consumer_name_.c_str() : nullptr;
cfg.mode = mode_;
cfg.cuda_device = cuda_device_;
cfg.connect_timeout_ms = connect_timeout_ms;
// create — может быть блокирующим (ждёт publisher'а). GIL release.
int err;
{
py::gil_scoped_release rel;
err = cuframes_subscriber_create(&cfg, &sub_);
}
check(err, "cuframes_subscriber_create");
}
~SubscriberWrapper() {
try { close(); } catch (...) { /* destructor — глотаем */ }
}
SubscriberWrapper(const SubscriberWrapper&) = delete;
SubscriberWrapper& operator=(const SubscriberWrapper&) = delete;
bool closed() const noexcept { return sub_ == nullptr; }
void close() {
if (sub_ != nullptr) {
cuframes_subscriber_destroy(sub_);
sub_ = nullptr;
}
}
void check_alive() const {
if (sub_ == nullptr) {
PyErr_SetString(g_exc.base.ptr(), "subscriber has been closed");
throw py::error_already_set();
}
}
// Возвращает new FrameWrapper. Caller владеет через Python GC.
// GIL release на время блокирующего вызова — другие потоки работают.
std::unique_ptr<FrameWrapper> next_frame(int timeout_ms) {
check_alive();
cuframes_frame_t* raw = nullptr;
int err;
{
py::gil_scoped_release rel;
// Используем persistent per-subscriber stream — все consumer'ы
// получают независимый cudaStreamWaitEvent, не серializуются
// через default stream.
err = cuframes_subscriber_next(sub_, consumer_stream_,
&raw, timeout_ms);
}
// Update health stats до check() — иначе при exception они не
// увеличатся, и оператору будет непонятно почему counters застыли.
if (err == CUFRAMES_OK) {
stats_.frames_received++;
uint64_t seq = cuframes_frame_seq(raw);
if (stats_.last_seq != 0 && seq > stats_.last_seq + 1) {
stats_.gap_count++;
}
stats_.last_seq = seq;
stats_.last_frame_pts_ns = cuframes_frame_pts_ns(raw);
} else if (err == CUFRAMES_ERR_TIMEOUT || err == CUFRAMES_ERR_WOULD_BLOCK) {
stats_.timeouts++;
} else {
stats_.errors++;
}
check(err, "cuframes_subscriber_next");
return std::make_unique<FrameWrapper>(sub_, raw);
}
const std::string& key() const { return key_; }
const std::string& consumer_name() const { return consumer_name_; }
cuframes_subscriber_mode_t mode() const { return mode_; }
int cuda_device() const { return cuda_device_; }
const SubscriberStats& stats() const { return stats_; }
// Snapshot stats как Python dict — для MQTT health publish.
py::dict stats_dict() const {
py::dict d;
d["frames_received"] = stats_.frames_received;
d["timeouts"] = stats_.timeouts;
d["errors"] = stats_.errors;
d["last_seq"] = stats_.last_seq;
d["gap_count"] = stats_.gap_count;
d["last_frame_pts_ns"] = stats_.last_frame_pts_ns;
return d;
}
uintptr_t consumer_stream() const {
return reinterpret_cast<uintptr_t>(consumer_stream_);
}
private:
cuframes_subscriber_t* sub_ = nullptr;
std::string key_;
std::string consumer_name_;
cuframes_subscriber_mode_t mode_;
int cuda_device_;
// CUDA stream — opaque cudaStream_t. Передаётся снаружи как int
// (полученный через cuda-python / torch.cuda.Stream._as_parameter_).
// nullptr = default stream (только для smoke-тестов; в продакшене
// консумерам надо иметь свой stream чтобы избежать serialization
// через default).
void* consumer_stream_ = nullptr;
SubscriberStats stats_{};
};
} // namespace
PYBIND11_MODULE(_native, m) {
m.doc() = "cuframes — zero-copy CUDA frame sharing (native bindings)";
// ── Версия ──────────────────────────────────────────────────────────
m.def("version_string", []() {
return std::string(cuframes_version_string());
}, "Runtime version of libcuframes (MAJOR.MINOR.PATCH).");
m.def("protocol_version", []() {
return static_cast<uint32_t>(cuframes_protocol_version());
}, "Wire-protocol version. Subscribers с разной версией не подключатся.");
m.attr("__binding_version__") = CUFRAMES_PY_BINDING_VERSION;
// ── Error taxonomy ──────────────────────────────────────────────────
// Иерархия:
// CuframesError (base)
// ├── CuframesPublisherGone
// ├── CuframesFrameTimeout
// ├── CuframesDeviceLost
// ├── CuframesShmError
// ├── CuframesProtocolMismatch
// ├── CuframesInvalidArgument
// ├── CuframesOutOfMemory
// └── CuframesInternal
//
// py::exception<T>(...) уже возвращает py::object на сам Python class.
// Не вызываем .attr("__class__") — иначе получим metaclass.
g_exc.base = py::exception<std::runtime_error>(m, "CuframesError");
auto make_subexc = [&m](const char* name) -> py::object {
return py::exception<std::runtime_error>(m, name, g_exc.base.ptr());
};
g_exc.publisher_gone = make_subexc("CuframesPublisherGone");
g_exc.frame_timeout = make_subexc("CuframesFrameTimeout");
g_exc.device_lost = make_subexc("CuframesDeviceLost");
g_exc.shm_error = make_subexc("CuframesShmError");
g_exc.protocol_mismatch = make_subexc("CuframesProtocolMismatch");
g_exc.invalid_argument = make_subexc("CuframesInvalidArgument");
g_exc.out_of_memory = make_subexc("CuframesOutOfMemory");
g_exc.internal = make_subexc("CuframesInternal");
// ── Pixel formats (enum mirror) ─────────────────────────────────────
py::enum_<cuframes_format_t>(m, "PixelFormat")
.value("NV12", CUFRAMES_FORMAT_NV12)
.value("YUV420P", CUFRAMES_FORMAT_YUV420P)
.value("RGB", CUFRAMES_FORMAT_RGB)
.value("BGR", CUFRAMES_FORMAT_BGR)
.value("RGBA", CUFRAMES_FORMAT_RGBA)
.value("GRAYSCALE", CUFRAMES_FORMAT_GRAYSCALE);
py::enum_<cuframes_subscriber_mode_t>(m, "SubscriberMode")
.value("NEWEST_ONLY", CUFRAMES_MODE_NEWEST_ONLY)
.value("STRICT_ORDER", CUFRAMES_MODE_STRICT_ORDER);
// ── CuframesFrame ───────────────────────────────────────────────────
py::class_<FrameWrapper>(m, "CuframesFrame",
"Один кадр от cuframes publisher'а.\n\n"
"Получается через CuframesSubscriber.next_frame().\n"
"Поддерживает context manager — release() при выходе из with-блока.\n"
"Все property accessor'ы после release() бросают CuframesError.\n\n"
"Это handle на frame в ring buffer publisher'а — данные остаются\n"
"в shared memory publisher'а пока frame не released. Долго удерживать\n"
"frame нельзя: medленный consumer заставит publisher либо overwrite\n"
"(DROP_OLDEST policy), либо stall (STRICT_WAIT).")
// properties (read-only)
.def_property_readonly("cuda_ptr", &FrameWrapper::cuda_ptr,
"CUDA device pointer на frame data (uintptr_t). Read-only для\n"
"consumer'а. Используйте через cuda-python / cupy / torch.from_blob.")
.def_property_readonly("format", &FrameWrapper::format,
"PixelFormat (NV12 для NVDEC publisher'а).")
.def_property_readonly("width", &FrameWrapper::width)
.def_property_readonly("height", &FrameWrapper::height)
.def_property_readonly("pitch_y", &FrameWrapper::pitch_y,
"Pitch (байт на строку) для Y plane. ВАЖНО: для больших\n"
"разрешений (2688×1520, gate_lpr) pitch != width — kernel'ы\n"
"должны принимать pitch как параметр.")
.def_property_readonly("pitch_uv", &FrameWrapper::pitch_uv,
"Pitch для UV plane (NV12/YUV420P); 0 для форматов без UV.")
.def_property_readonly("seq", &FrameWrapper::seq,
"Sequence number — монотонная нумерация у publisher'а.")
.def_property_readonly("pts_ns", &FrameWrapper::pts_ns,
"Presentation timestamp от publisher'а (наносекунды, CLOCK_MONOTONIC).")
.def_property_readonly("released", &FrameWrapper::released)
.def("release", &FrameWrapper::release,
"Освободить frame обратно publisher'у (ACK).\n"
"После release() property accessor'ы бросают CuframesError.\n"
"Idempotent — повторный вызов no-op.")
// context manager
.def("__enter__", [](FrameWrapper& self) -> FrameWrapper& {
self.check_alive();
return self;
}, py::return_value_policy::reference_internal)
.def("__exit__", [](FrameWrapper& self, py::object, py::object, py::object) {
self.release();
return py::none();
})
.def("__repr__", [](const FrameWrapper& f) {
if (f.released()) return std::string("<CuframesFrame released>");
return std::string("<CuframesFrame seq=") + std::to_string(f.seq()) +
" size=" + std::to_string(f.width()) + "x" + std::to_string(f.height()) + ">";
})
// ── DLPack export ───────────────────────────────────────────────
// Multi-plane formats (NV12, YUV420P) — экспортируем планы отдельно
// как 2D uint8 tensors. Consumer строит логику склейки сам.
// Для single-plane (RGB/BGR/RGBA/GRAYSCALE) — __dlpack__() работает.
.def("dlpack_y",
[](py::object self) -> py::capsule {
auto& f = self.cast<FrameWrapper&>();
f.check_alive();
void* ptr = cuframes_frame_cuda_ptr(f.internal_frame());
int32_t w, h;
cuframes_frame_size(f.internal_frame(), &w, &h);
int pitch = cuframes_frame_pitch_y(f.internal_frame());
// Для NV12/YUV420P width = ширина в пикселях, Y занимает W байт/строка.
// Pitch (физическая строка в памяти) может быть > W. Передаём как stride.
// cuda_device извлекаем не из frame (нет API) — фиксируем 0 для default;
// task #201 добавит per-subscriber stream и реальный device.
return make_dlpack_capsule(ptr, h, w, pitch, /*cuda_device=*/0, self);
},
"DLPack export Y-plane как 2D uint8 GPU tensor (shape=[H, W], stride=[pitch_y, 1]).\n"
"Работает для NV12, YUV420P, GRAYSCALE. Для других форматов — отдаёт первый plane.")
.def("dlpack_uv",
[](py::object self) -> py::capsule {
auto& f = self.cast<FrameWrapper&>();
f.check_alive();
auto fmt = cuframes_frame_format(f.internal_frame());
if (fmt != CUFRAMES_FORMAT_NV12) {
PyErr_SetString(g_exc.invalid_argument.ptr(),
"dlpack_uv() only supported for NV12 format");
throw py::error_already_set();
}
void* base = cuframes_frame_cuda_ptr(f.internal_frame());
int32_t w, h;
cuframes_frame_size(f.internal_frame(), &w, &h);
int pitch_y = cuframes_frame_pitch_y(f.internal_frame());
int pitch_uv = cuframes_frame_pitch_uv(f.internal_frame());
// NV12 layout: Y plane занимает pitch_y * h bytes,
// UV plane (interleaved U+V) следует сразу за ним.
void* uv_ptr = static_cast<uint8_t*>(base) + (size_t)pitch_y * h;
// UV plane размеры: H/2 строк, W колонок (interleaved U+V байты).
return make_dlpack_capsule(uv_ptr, h / 2, w, pitch_uv, /*cuda_device=*/0, self);
},
"DLPack export UV-plane (interleaved) для NV12.\n"
"Shape=[H/2, W] uint8, stride=[pitch_uv, 1]. U и V interleaved\n"
"по байтам в последнем измерении (W = ширина в пикселях, но\n"
"каждый pixel = 2 байта U+V).")
.def("__dlpack__",
[](py::object self, py::object /*stream*/) -> py::capsule {
// PEP 3118 / DLPack protocol — single-plane access.
// Для NV12/YUV420P возвращает Y plane (это самый частый use
// case — motion detection / brightness работают только с Y).
// Если нужен UV — явно через .dlpack_uv().
auto& f = self.cast<FrameWrapper&>();
f.check_alive();
void* ptr = cuframes_frame_cuda_ptr(f.internal_frame());
int32_t w, h;
cuframes_frame_size(f.internal_frame(), &w, &h);
int pitch = cuframes_frame_pitch_y(f.internal_frame());
return make_dlpack_capsule(ptr, h, w, pitch, /*cuda_device=*/0, self);
},
py::arg("stream") = py::none(),
"DLPack protocol для torch.from_dlpack / cupy.from_dlpack.\n"
"Для NV12 возвращает Y plane. Для других planes — .dlpack_uv().")
.def("__dlpack_device__",
[](const FrameWrapper& f) -> py::tuple {
f.check_alive();
// (device_type, device_id) — kDLCUDA=2, device 0 (task #201).
return py::make_tuple(2, 0);
},
"DLPack device protocol — возвращает (kDLCUDA=2, device_id).");
// ── CuframesSubscriber ──────────────────────────────────────────────
py::class_<SubscriberWrapper>(m, "CuframesSubscriber",
"Subscription на cuframes publisher.\n\n"
"Создаётся через cuframes.subscribe(key, ...). Поддерживает context\n"
"manager — close() при выходе из with-блока.\n\n"
"Thread-safety contract:\n"
" • Handle принадлежит одному Python потоку — создание и\n"
" все вызовы (next_frame, close) должны быть в одном thread.\n"
" • Несколько subscriber'ов в разных потоках — OK (каждому свой\n"
" handle, свой CUDA stream).\n"
" • Доступ к Frame после release() из другого потока — UB\n"
" (cuframes_frame_t* указывает в ring buffer publisher'а, после\n"
" release он может быть переписан).\n"
" • Внутренний GIL отпускается на длинных I/O вызовах\n"
" (subscriber_create, next_frame) — другие Python потоки могут\n"
" выполняться параллельно пока мы ждём frame.\n\n"
"CUDA stream:\n"
" consumer_stream передаётся как int (cudaStream_t как opaque\n"
" pointer). Получается через cuda-python (cudart.cudaStreamCreate)\n"
" или torch (torch.cuda.Stream()._as_parameter_). Если 0 —\n"
" default stream (serialization risk при нескольких subscriber'ах\n"
" в одном процессе).")
.def(py::init<const std::string&, std::optional<std::string>,
cuframes_subscriber_mode_t, int, int, uintptr_t>(),
py::arg("key"),
py::arg("consumer_name") = py::none(),
py::arg("mode") = CUFRAMES_MODE_NEWEST_ONLY,
py::arg("cuda_device") = 0,
py::arg("connect_timeout_ms") = -1,
py::arg("consumer_stream") = 0,
"Создать subscription. Блокирует до publisher_ready или\n"
"connect_timeout_ms. -1 = ждать вечно, 0 = fail сразу.\n"
"consumer_stream: int representation cudaStream_t (0=default).")
.def_property_readonly("key", &SubscriberWrapper::key)
.def_property_readonly("consumer_name", &SubscriberWrapper::consumer_name)
.def_property_readonly("mode", &SubscriberWrapper::mode)
.def_property_readonly("cuda_device", &SubscriberWrapper::cuda_device)
.def_property_readonly("consumer_stream", &SubscriberWrapper::consumer_stream,
"Pointer на cudaStream_t (int). 0 = default stream.")
.def_property_readonly("closed", &SubscriberWrapper::closed)
.def("next_frame", &SubscriberWrapper::next_frame,
py::arg("timeout_ms") = -1,
"Получить следующий frame.\n\n"
"timeout_ms: -1 = ждать вечно; 0 = non-blocking\n"
"(CuframesFrameTimeout если нет данных); >0 = с таймаутом.\n\n"
"Возвращает CuframesFrame — context manager. Использовать через\n"
"`with sub.next_frame() as frame: ...` для гарантии release.")
.def("close", &SubscriberWrapper::close,
"Закрыть subscription. Idempotent.")
// ── Health / stats ──────────────────────────────────────────────
// Phase 0: counted в pybind layer (cuframes C API не expose'ит
// ring_occupancy / drop_count напрямую). Эти counters достаточно
// для MQTT health publisher / monitoring.
.def_property_readonly("frames_received",
[](const SubscriberWrapper& s) { return s.stats().frames_received; },
"Количество успешных next_frame() с момента subscribe.")
.def_property_readonly("timeouts",
[](const SubscriberWrapper& s) { return s.stats().timeouts; },
"Сколько раз next_frame() вернул CuframesFrameTimeout.")
.def_property_readonly("errors",
[](const SubscriberWrapper& s) { return s.stats().errors; },
"Сколько раз next_frame() упал с error (не timeout).")
.def_property_readonly("last_seq",
[](const SubscriberWrapper& s) { return s.stats().last_seq; },
"Sequence number последнего полученного frame'а.")
.def_property_readonly("gap_count",
[](const SubscriberWrapper& s) { return s.stats().gap_count; },
"Сколько раз seq[i] > seq[i-1] + 1 — proxy для drop count\n"
"в NEWEST_ONLY mode. В STRICT_ORDER должен оставаться 0.")
.def_property_readonly("last_frame_pts_ns",
[](const SubscriberWrapper& s) { return s.stats().last_frame_pts_ns; })
.def("stats",
[](const SubscriberWrapper& s) { return s.stats_dict(); },
"Snapshot всех health counters как dict — для MQTT health publish.")
// context manager
.def("__enter__", [](SubscriberWrapper& self) -> SubscriberWrapper& {
self.check_alive();
return self;
}, py::return_value_policy::reference_internal)
.def("__exit__", [](SubscriberWrapper& self, py::object, py::object, py::object) {
self.close();
return py::none();
})
.def("__repr__", [](const SubscriberWrapper& s) {
return std::string("<CuframesSubscriber key='") + s.key() +
"' closed=" + (s.closed() ? "True" : "False") + ">";
});
// ── Module-level factory ────────────────────────────────────────────
// Удобный shortcut: cuframes.subscribe("cam-parking") вместо
// cuframes._native.CuframesSubscriber(...).
m.def("subscribe",
[](const std::string& key,
std::optional<std::string> consumer_name,
cuframes_subscriber_mode_t mode,
int cuda_device,
int connect_timeout_ms,
uintptr_t consumer_stream) {
return std::make_unique<SubscriberWrapper>(
key, consumer_name, mode, cuda_device,
connect_timeout_ms, consumer_stream);
},
py::arg("key"),
py::arg("consumer_name") = py::none(),
py::arg("mode") = CUFRAMES_MODE_NEWEST_ONLY,
py::arg("cuda_device") = 0,
py::arg("connect_timeout_ms") = -1,
py::arg("consumer_stream") = 0,
"Создать CuframesSubscriber. Shortcut для CuframesSubscriber(...).");
}
+112
View File
@@ -0,0 +1,112 @@
"""Smoke tests для cuframes Python bindings.
В Phase 0 (skeleton) проверяем что:
- модуль импортируется
- версия читается
- error классы существуют и являются нормальной иерархией
Subscriber / DLPack тесты появятся в следующих фазах
(см. issue gx/cuframes#6, tasks #198+).
"""
import cuframes
def test_version_format():
v = cuframes.version_string()
assert isinstance(v, str)
parts = v.split(".")
assert len(parts) >= 3
assert all(p.isdigit() for p in parts[:3])
def test_protocol_version_is_uint():
pv = cuframes.protocol_version()
assert isinstance(pv, int)
assert pv >= 0
def test_pixel_format_enum_members():
assert cuframes.PixelFormat.NV12.value == 0
assert cuframes.PixelFormat.YUV420P.value == 1
def test_subscriber_mode_enum_members():
assert cuframes.SubscriberMode.NEWEST_ONLY.value == 0
assert cuframes.SubscriberMode.STRICT_ORDER.value == 1
def test_error_hierarchy():
"""Все subtype'ы наследуются от CuframesError."""
for sub in [
cuframes.CuframesPublisherGone,
cuframes.CuframesFrameTimeout,
cuframes.CuframesDeviceLost,
cuframes.CuframesShmError,
cuframes.CuframesProtocolMismatch,
cuframes.CuframesInvalidArgument,
cuframes.CuframesOutOfMemory,
cuframes.CuframesInternal,
]:
assert issubclass(sub, cuframes.CuframesError)
def test_subscriber_class_exposed():
"""CuframesSubscriber/CuframesFrame exposed как public classes."""
assert hasattr(cuframes, "CuframesSubscriber")
assert hasattr(cuframes, "CuframesFrame")
assert hasattr(cuframes, "subscribe")
def test_subscribe_to_missing_publisher_raises():
"""Subscribe к несуществующему publisher → CuframesError (subclass)
после connect_timeout_ms.
Этот тест работает на любом хосте (без живого cuframes-pub) — мы
верифицируем что error path работает и маппит CUFRAMES_ERR_*
в правильный Python exception.
"""
import pytest
with pytest.raises(cuframes.CuframesError):
cuframes.subscribe(
"definitely-not-existing-publisher-xyz",
connect_timeout_ms=100,
)
def test_subscriber_repr_when_unable_to_connect():
"""Лёгкий тест что repr не падает и close idempotent."""
import pytest
try:
sub = cuframes.subscribe("nope-xyz", connect_timeout_ms=100)
except cuframes.CuframesError:
return # ожидаемо
pytest.fail("subscribe должно было выкинуть exception")
def test_subscribe_accepts_consumer_stream_param():
"""consumer_stream — uintptr (cudaStream_t).
Проверяем что параметр accepted; реальное использование требует
cuda-python / torch.cuda.Stream — это в integration тестах
yolo-world-detector'а.
"""
import pytest
with pytest.raises(cuframes.CuframesError):
cuframes.subscribe(
"nope-xyz",
connect_timeout_ms=100,
consumer_stream=0, # 0 = default stream
)
def test_subscribe_kwargs_signature():
"""Проверяем что у subscribe правильный набор kwargs."""
import inspect
# Pybind11-обёртки не дают inspect.signature, но help_doc отражает их.
doc = cuframes.subscribe.__doc__
assert "consumer_name" in doc
assert "mode" in doc
assert "cuda_device" in doc
assert "connect_timeout_ms" in doc
assert "consumer_stream" in doc