diff --git a/docs/python.md b/docs/python.md new file mode 100644 index 0000000..d6bdf10 --- /dev/null +++ b/docs/python.md @@ -0,0 +1,284 @@ +# cuframes Python bindings + +Status: **v0.4 — Phase 0 alpha** (issue [gx/cuframes#6](http://server:3000/gx/cuframes/issues/6)) + +Python пакет `cuframes` — pybind11-обёртка над C ABI libcuframes. Цель — +позволить downstream ML/CV пайплайнам (yolo-world-detector, zone-motion, +custom скриптам) подписываться на cuframes **без CPU round-trip**: получать +NV12 frames прямо как CUDA pointer / `torch.Tensor` (DLPack export, zero-copy +из VRAM publisher'а в VRAM consumer'а). + +## Установка + +Standalone wheel (рекомендуемый): + +```bash +cd cuframes/python/ +pip install -e . --no-build-isolation +``` + +Через корневой CMake: + +```bash +cmake -B build -DBUILD_PYTHON_BINDINGS=ON +cmake --build build -j +``` + +## Quick start + +```python +import cuframes + +print(cuframes.version_string()) # "0.4.0" + +with cuframes.subscribe("cam-parking", + consumer_name="yolo-world", + connect_timeout_ms=5000) as sub: + with sub.next_frame(timeout_ms=1000) as frame: + print(f"{frame.width}x{frame.height} " + f"format={frame.format} seq={frame.seq}") +``` + +## API + +### `cuframes.subscribe(key, ...)` + +Создать подписку на publisher. Возвращает `CuframesSubscriber`. + +| Параметр | Тип | Default | Назначение | +|---|---|---|---| +| `key` | `str` | (required) | Имя publisher'а (`"cam-parking"` и т.п.) | +| `consumer_name` | `str \| None` | `None` (auto-generated) | Идентификатор подписки | +| `mode` | `SubscriberMode` | `NEWEST_ONLY` | `NEWEST_ONLY` skip'ит промежуточные frames, `STRICT_ORDER` — все по порядку | +| `cuda_device` | `int` | `0` | CUDA device id | +| `connect_timeout_ms` | `int` | `-1` (бесконечно) | Сколько ждать publisher'а | +| `consumer_stream` | `int` | `0` (default stream) | `cudaStream_t` как pointer | + +### `CuframesSubscriber` + +Контекст-менеджер. Methods/properties: + +```python +sub.next_frame(timeout_ms=-1) # → CuframesFrame +sub.close() # idempotent + +# read-only properties +sub.key # str +sub.consumer_name # str +sub.mode # SubscriberMode +sub.cuda_device # int +sub.consumer_stream # int (cudaStream_t ptr) +sub.closed # bool + +# health / stats (Phase 0 counters) +sub.frames_received # int +sub.timeouts # int +sub.errors # int +sub.last_seq # int (sequence number последнего frame'а) +sub.gap_count # int (proxy для drop count в NEWEST_ONLY) +sub.last_frame_pts_ns # int +sub.stats() # dict — snapshot всех counters для MQTT publish +``` + +### `CuframesFrame` + +Контекст-менеджер. Properties (read-only): + +```python +frame.cuda_ptr # int (uintptr_t) +frame.format # PixelFormat +frame.width # int +frame.height # int +frame.pitch_y # int — pitch Y plane (важно — может быть > width!) +frame.pitch_uv # int +frame.seq # int — sequence number у publisher'а +frame.pts_ns # int — CLOCK_MONOTONIC у publisher'а +frame.released # bool + +# DLPack export (zero-copy) +frame.dlpack_y() # capsule — Y plane как 2D uint8 GPU tensor +frame.dlpack_uv() # capsule — UV plane (только NV12) +frame.__dlpack__() # protocol для torch.from_dlpack(frame) +frame.__dlpack_device__() # (kDLCUDA=2, device_id) +``` + +## Интеграция с PyTorch + +```python +import torch +import cuframes + +with cuframes.subscribe("cam-parking", connect_timeout_ms=5000) as sub: + with sub.next_frame() as frame: + # Single-plane (default — Y plane для NV12) + y_tensor = torch.from_dlpack(frame) + # Multi-plane explicit + y = torch.from_dlpack(frame.dlpack_y()) # shape=[H, W] uint8 + uv = torch.from_dlpack(frame.dlpack_uv()) # shape=[H/2, W] uint8 + + # Y plane уже в VRAM — никаких copy. Можно сразу feed в NN. + y_float = y.float() / 255.0 # будет на CUDA device +``` + +## Интеграция с CuPy + +```python +import cupy +import cuframes + +with cuframes.subscribe("cam-parking", connect_timeout_ms=5000) as sub: + with sub.next_frame() as frame: + y_array = cupy.from_dlpack(frame.dlpack_y()) # cupy.ndarray на GPU +``` + +## Pattern: reconnect-loop для долгоживущего consumer'а + +```python +import time +import cuframes + +def consume_camera(key: str, on_frame): + while True: + try: + with cuframes.subscribe(key, connect_timeout_ms=5000) as sub: + while True: + try: + with sub.next_frame(timeout_ms=1000) as frame: + on_frame(frame) + except cuframes.CuframesFrameTimeout: + # просто нет новых кадров — продолжаем ждать + continue + except cuframes.CuframesPublisherGone: + # publisher умер / перезапускается — переподписываемся + print(f"publisher {key} gone, reconnect через 1s") + time.sleep(1) + except cuframes.CuframesError as e: + # фатальная ошибка — логируем и продолжаем + print(f"error: {e!r}") + time.sleep(5) +``` + +## Per-subscriber CUDA stream + +В продакшене на 4+ камеры каждый subscriber должен иметь свой stream — +иначе `cudaStreamWaitEvent` сериализует всех consumer'ов через default +stream. + +С `cuda-python`: + +```python +from cuda import cudart +import cuframes + +err, stream = cudart.cudaStreamCreate() +assert err == cudart.cudaError_t.cudaSuccess + +with cuframes.subscribe("cam-parking", consumer_stream=int(stream)) as sub: + ... +``` + +С `torch.cuda.Stream`: + +```python +import torch +import cuframes + +stream = torch.cuda.Stream() +with cuframes.subscribe("cam-parking", + consumer_stream=stream.cuda_stream) as sub: + with torch.cuda.stream(stream): + with sub.next_frame() as frame: + tensor = torch.from_dlpack(frame) + # ... inference на этом stream'е ... +``` + +## Pitch alignment — важно! + +NVDEC отдаёт NV12 с pitch alignment 256 байт. Для камер с шириной не +кратной 256 (`gate_lpr 2688×1520` → pitch 2688 OK; но представьте `640×480` +→ pitch обычно 640 байт, но **может быть 768**). + +```python +# WRONG — assume pitch == width +y = torch.frombuffer(...) # данные смещены + +# RIGHT — использовать DLPack который сам respect'ит strides +y = torch.from_dlpack(frame.dlpack_y()) # stride учтён правильно + +# ALTERNATIVELY — manual через cuda-python с правильным pitch +ptr = frame.cuda_ptr +pitch = frame.pitch_y +height = frame.height +``` + +## Thread-safety contract + +- Каждый `CuframesSubscriber` принадлежит **одному Python потоку**. + Создание и все вызовы (`next_frame`, `close`) — в одном thread. +- Несколько subscriber'ов в разных потоках — **OK** (каждому свой handle, + свой CUDA stream). +- `CuframesFrame` тоже принадлежит одному потоку — после `release()` его + CUDA pointer становится недействительным, доступ из другого потока — + undefined behavior. +- Внутренний GIL отпускается на блокирующих вызовах (`subscriber_create`, + `next_frame`) — другие Python потоки могут выполняться. + +Для multi-camera в одном процессе используйте `asyncio` или `threading`: + +```python +import threading +import cuframes + +def worker(camera_key): + with cuframes.subscribe(camera_key, connect_timeout_ms=5000) as sub: + # subscribe в этом же потоке + while True: + with sub.next_frame(timeout_ms=1000) as frame: + process(frame) + +for key in ["cam-parking", "cam-front_yard", "cam-gate_lpr", "cam-back_yard"]: + threading.Thread(target=worker, args=(key,), daemon=True).start() +``` + +## Error taxonomy + +Все exception'ы наследуются от `CuframesError`. Конкретные subclass'ы +позволяют разную обработку: + +| Exception | Когда выбрасывается | Что делать | +|---|---|---| +| `CuframesPublisherGone` | publisher умер или ещё не стартовал | reconnect-loop | +| `CuframesFrameTimeout` | timeout без frame'а | продолжать ждать или log'нуть | +| `CuframesDeviceLost` | CUDA error на cross-process sync | abort, не recoverable | +| `CuframesShmError` | socket/mmap/IPC error | log, abort или восстановить | +| `CuframesProtocolMismatch` | версия libcuframes несовместима | пересобрать | +| `CuframesInvalidArgument` | bug в caller | fix code | +| `CuframesOutOfMemory` | cudaMalloc fail | reduce работу | +| `CuframesInternal` | bug в libcuframes | report | + +## Backpressure + +`next_frame()` blocking call с GIL released. Если consumer медленнее +publisher'а: +- В `NEWEST_ONLY` mode (default) — publisher продолжает писать, consumer + получает **самый свежий** frame (промежуточные пропускает). `gap_count` + растёт. +- В `STRICT_ORDER` mode — при ring overflow `CuframesPublisherGone` → + reconnect. + +Frame удерживать долго **нельзя**: в `STRICT_WAIT` policy publisher +заблокирует ring. Pattern — забрать DLPack, инициировать GPU работу, +release frame сразу. + +## Текущие ограничения (Phase 0) + +- Publisher API не обёрнут (только subscriber-side) +- Packet ring (encoded video) не обёрнут +- Async callback API не обёрнут +- `ring_occupancy` / реальный drop count — нет в C API (counted в pybind как + `gap_count`, это proxy) +- Smoke test реального subscribe требует Docker IPC namespace (cuframes + socket/SHM живут в namespace publisher'а) + +Эти ограничения снимаются по мере необходимости — issues в +[gx/cuframes](http://server:3000/gx/cuframes). diff --git a/python/src/_native.cpp b/python/src/_native.cpp index 3eb0ac1..52dd806 100644 --- a/python/src/_native.cpp +++ b/python/src/_native.cpp @@ -15,12 +15,59 @@ #include #include +#include #include #include #include #include "cuframes/cuframes.h" +// DLPack — стандартный protocol для exchange tensor-like структур между +// фреймворками (PyTorch/CuPy/JAX/TF). См. https://dmlc.github.io/dlpack/latest/ +// Мы embedим header inline чтобы не добавлять external dep — header +// небольшой и стабильный (DLPack 1.0+). +namespace dlpack { + +typedef enum { + kDLCPU = 1, + kDLCUDA = 2, +} DLDeviceType; + +typedef struct { + DLDeviceType device_type; + int32_t device_id; +} DLDevice; + +typedef enum { + kDLInt = 0U, + kDLUInt = 1U, + kDLFloat = 2U, +} DLDataTypeCode; + +typedef struct { + uint8_t code; + uint8_t bits; + uint16_t lanes; +} DLDataType; + +typedef struct { + void* data; + DLDevice device; + int32_t ndim; + DLDataType dtype; + int64_t* shape; + int64_t* strides; + uint64_t byte_offset; +} DLTensor; + +typedef struct DLManagedTensor { + DLTensor dl_tensor; + void* manager_ctx; + void (*deleter)(struct DLManagedTensor* self); +} DLManagedTensor; + +} // namespace dlpack + namespace py = pybind11; namespace { @@ -185,11 +232,90 @@ public: return cuframes_frame_pts_ns(frame_); } + cuframes_subscriber_t* internal_sub() const noexcept { return sub_; } + cuframes_frame_t* internal_frame() const noexcept { return frame_; } + private: cuframes_subscriber_t* sub_; cuframes_frame_t* frame_; }; +// ───────────────────────────────────────────────────────────────────────────── +// DLPack export helpers. +// +// Кадр в NV12 состоит из 2 plane'ов: Y (uint8, H×W, pitch=pitch_y) и +// UV interleaved (uint8, H/2×W, pitch=pitch_uv; W здесь = ширина в байтах +// для interleaved U+V). +// +// Стратегия: даём пользователю 2 отдельных DLPack capsule на каждый plane. +// Это стандартный pattern в PyTorch/CuPy (torchcodec, cuda-python). +// UV offset вычисляется из pitch_y * height_aligned (NVDEC выравнивает +// height до aligned значения — обычно высота уже aligned, но мы используем +// видимую height из frame_size). +// +// Lifetime: deleter capsule освобождает только shape/strides arrays. +// Сам CUDA pointer принадлежит frame'у — gone-frame должно быть released +// **после** того как DLPack capsule destroyed. Чтобы не дать пользователю +// shoot in foot, capsule.manager_ctx держит py::object на FrameWrapper +// (увеличивает refcount), которое освобождается в deleter. +// ───────────────────────────────────────────────────────────────────────────── + +struct DLPackContext { + py::object frame_keep_alive; // CuframesFrame Python-side + std::vector shape; + std::vector strides; +}; + +static void dlpack_deleter(dlpack::DLManagedTensor* self) { + if (!self) return; + auto* ctx = static_cast(self->manager_ctx); + if (ctx) { + // Releasing Python refcount требует GIL + py::gil_scoped_acquire gil; + delete ctx; + } + delete self; +} + +static void dlpack_pycapsule_destructor(PyObject* capsule) { + if (PyCapsule_IsValid(capsule, "dltensor")) { + // Capsule НЕ был consumed downstream'ом (e.g. torch.from_dlpack). + // Нужно освободить managed tensor самим. + auto* mt = static_cast( + PyCapsule_GetPointer(capsule, "dltensor")); + if (mt && mt->deleter) { + mt->deleter(mt); + } + } + // Если PyCapsule имеет name "used_dltensor" — downstream взял ownership, + // мы ничего не делаем. +} + +static py::capsule make_dlpack_capsule( + void* data, + int rows, int cols, int64_t row_stride_bytes, + int cuda_device, + py::object frame_keep_alive +) { + auto* ctx = new DLPackContext; + ctx->frame_keep_alive = std::move(frame_keep_alive); + ctx->shape = {static_cast(rows), static_cast(cols)}; + ctx->strides = {row_stride_bytes, 1}; + + auto* mt = new dlpack::DLManagedTensor; + mt->dl_tensor.data = data; + mt->dl_tensor.device = {dlpack::kDLCUDA, cuda_device}; + mt->dl_tensor.ndim = 2; + mt->dl_tensor.dtype = {dlpack::kDLUInt, 8, 1}; // uint8 + mt->dl_tensor.shape = ctx->shape.data(); + mt->dl_tensor.strides = ctx->strides.data(); + mt->dl_tensor.byte_offset = 0; + mt->manager_ctx = ctx; + mt->deleter = dlpack_deleter; + + return py::capsule(mt, "dltensor", &dlpack_pycapsule_destructor); +} + // ───────────────────────────────────────────────────────────────────────────── // CuframesSubscriber — owning wrapper над cuframes_subscriber_t*. // @@ -206,6 +332,21 @@ private: // ... // ───────────────────────────────────────────────────────────────────────────── +// Per-subscriber health stats. Phase 0 версия — counted в pybind layer +// (cuframes C API не expose'ит ring_occupancy / drop_count напрямую). +// Если в будущем cuframes расширит C API (cuframes_subscriber_get_stats), +// добавим reads оттуда — но текущие counters остаются для совместимости +// с тем что consumer'у видно через Python API. +struct SubscriberStats { + uint64_t frames_received = 0; // успешных next_frame() + uint64_t timeouts = 0; // CUFRAMES_ERR_TIMEOUT / WOULD_BLOCK + uint64_t errors = 0; // прочие fail'ы в next_frame() + uint64_t last_seq = 0; // seq последнего полученного frame'а + uint64_t gap_count = 0; // сколько раз seq[i] > seq[i-1] + 1 + // (proxy для drop count в NEWEST_ONLY mode) + int64_t last_frame_pts_ns = 0; +}; + class SubscriberWrapper { public: SubscriberWrapper( @@ -213,11 +354,13 @@ public: std::optional consumer_name, cuframes_subscriber_mode_t mode, int cuda_device, - int connect_timeout_ms + int connect_timeout_ms, + uintptr_t consumer_stream ) : key_(key), consumer_name_(consumer_name.value_or("")), mode_(mode), - cuda_device_(cuda_device) { + cuda_device_(cuda_device), + consumer_stream_(reinterpret_cast(consumer_stream)) { cuframes_subscriber_config_t cfg = {}; cfg.key = key_.c_str(); @@ -266,11 +409,27 @@ public: int err; { py::gil_scoped_release rel; - // consumer_stream = nullptr (default stream). - // В #201 добавим per-subscriber stream. - err = cuframes_subscriber_next(sub_, /*stream=*/nullptr, + // Используем persistent per-subscriber stream — все consumer'ы + // получают независимый cudaStreamWaitEvent, не серializуются + // через default stream. + err = cuframes_subscriber_next(sub_, consumer_stream_, &raw, timeout_ms); } + // Update health stats до check() — иначе при exception они не + // увеличатся, и оператору будет непонятно почему counters застыли. + if (err == CUFRAMES_OK) { + stats_.frames_received++; + uint64_t seq = cuframes_frame_seq(raw); + if (stats_.last_seq != 0 && seq > stats_.last_seq + 1) { + stats_.gap_count++; + } + stats_.last_seq = seq; + stats_.last_frame_pts_ns = cuframes_frame_pts_ns(raw); + } else if (err == CUFRAMES_ERR_TIMEOUT || err == CUFRAMES_ERR_WOULD_BLOCK) { + stats_.timeouts++; + } else { + stats_.errors++; + } check(err, "cuframes_subscriber_next"); return std::make_unique(sub_, raw); } @@ -279,6 +438,23 @@ public: const std::string& consumer_name() const { return consumer_name_; } cuframes_subscriber_mode_t mode() const { return mode_; } int cuda_device() const { return cuda_device_; } + const SubscriberStats& stats() const { return stats_; } + + // Snapshot stats как Python dict — для MQTT health publish. + py::dict stats_dict() const { + py::dict d; + d["frames_received"] = stats_.frames_received; + d["timeouts"] = stats_.timeouts; + d["errors"] = stats_.errors; + d["last_seq"] = stats_.last_seq; + d["gap_count"] = stats_.gap_count; + d["last_frame_pts_ns"] = stats_.last_frame_pts_ns; + return d; + } + + uintptr_t consumer_stream() const { + return reinterpret_cast(consumer_stream_); + } private: cuframes_subscriber_t* sub_ = nullptr; @@ -286,6 +462,13 @@ private: std::string consumer_name_; cuframes_subscriber_mode_t mode_; int cuda_device_; + // CUDA stream — opaque cudaStream_t. Передаётся снаружи как int + // (полученный через cuda-python / torch.cuda.Stream._as_parameter_). + // nullptr = default stream (только для smoke-тестов; в продакшене + // консумерам надо иметь свой stream чтобы избежать serialization + // через default). + void* consumer_stream_ = nullptr; + SubscriberStats stats_{}; }; } // namespace @@ -391,31 +574,116 @@ PYBIND11_MODULE(_native, m) { if (f.released()) return std::string(""); return std::string(""; - }); + }) + // ── DLPack export ─────────────────────────────────────────────── + // Multi-plane formats (NV12, YUV420P) — экспортируем планы отдельно + // как 2D uint8 tensors. Consumer строит логику склейки сам. + // Для single-plane (RGB/BGR/RGBA/GRAYSCALE) — __dlpack__() работает. + .def("dlpack_y", + [](py::object self) -> py::capsule { + auto& f = self.cast(); + f.check_alive(); + void* ptr = cuframes_frame_cuda_ptr(f.internal_frame()); + int32_t w, h; + cuframes_frame_size(f.internal_frame(), &w, &h); + int pitch = cuframes_frame_pitch_y(f.internal_frame()); + // Для NV12/YUV420P width = ширина в пикселях, Y занимает W байт/строка. + // Pitch (физическая строка в памяти) может быть > W. Передаём как stride. + // cuda_device извлекаем не из frame (нет API) — фиксируем 0 для default; + // task #201 добавит per-subscriber stream и реальный device. + return make_dlpack_capsule(ptr, h, w, pitch, /*cuda_device=*/0, self); + }, + "DLPack export Y-plane как 2D uint8 GPU tensor (shape=[H, W], stride=[pitch_y, 1]).\n" + "Работает для NV12, YUV420P, GRAYSCALE. Для других форматов — отдаёт первый plane.") + .def("dlpack_uv", + [](py::object self) -> py::capsule { + auto& f = self.cast(); + f.check_alive(); + auto fmt = cuframes_frame_format(f.internal_frame()); + if (fmt != CUFRAMES_FORMAT_NV12) { + PyErr_SetString(g_exc.invalid_argument.ptr(), + "dlpack_uv() only supported for NV12 format"); + throw py::error_already_set(); + } + void* base = cuframes_frame_cuda_ptr(f.internal_frame()); + int32_t w, h; + cuframes_frame_size(f.internal_frame(), &w, &h); + int pitch_y = cuframes_frame_pitch_y(f.internal_frame()); + int pitch_uv = cuframes_frame_pitch_uv(f.internal_frame()); + // NV12 layout: Y plane занимает pitch_y * h bytes, + // UV plane (interleaved U+V) следует сразу за ним. + void* uv_ptr = static_cast(base) + (size_t)pitch_y * h; + // UV plane размеры: H/2 строк, W колонок (interleaved U+V байты). + return make_dlpack_capsule(uv_ptr, h / 2, w, pitch_uv, /*cuda_device=*/0, self); + }, + "DLPack export UV-plane (interleaved) для NV12.\n" + "Shape=[H/2, W] uint8, stride=[pitch_uv, 1]. U и V interleaved\n" + "по байтам в последнем измерении (W = ширина в пикселях, но\n" + "каждый pixel = 2 байта U+V).") + .def("__dlpack__", + [](py::object self, py::object /*stream*/) -> py::capsule { + // PEP 3118 / DLPack protocol — single-plane access. + // Для NV12/YUV420P возвращает Y plane (это самый частый use + // case — motion detection / brightness работают только с Y). + // Если нужен UV — явно через .dlpack_uv(). + auto& f = self.cast(); + f.check_alive(); + void* ptr = cuframes_frame_cuda_ptr(f.internal_frame()); + int32_t w, h; + cuframes_frame_size(f.internal_frame(), &w, &h); + int pitch = cuframes_frame_pitch_y(f.internal_frame()); + return make_dlpack_capsule(ptr, h, w, pitch, /*cuda_device=*/0, self); + }, + py::arg("stream") = py::none(), + "DLPack protocol для torch.from_dlpack / cupy.from_dlpack.\n" + "Для NV12 возвращает Y plane. Для других planes — .dlpack_uv().") + .def("__dlpack_device__", + [](const FrameWrapper& f) -> py::tuple { + f.check_alive(); + // (device_type, device_id) — kDLCUDA=2, device 0 (task #201). + return py::make_tuple(2, 0); + }, + "DLPack device protocol — возвращает (kDLCUDA=2, device_id)."); // ── CuframesSubscriber ────────────────────────────────────────────── py::class_(m, "CuframesSubscriber", "Subscription на cuframes publisher.\n\n" "Создаётся через cuframes.subscribe(key, ...). Поддерживает context\n" "manager — close() при выходе из with-блока.\n\n" - "Thread-safety: handle принадлежит одному Python потоку. Cross-thread\n" - "access — undefined behavior. Несколько subscriber'ов в разных потоках\n" - "— OK (каждому свой handle).\n\n" - "GIL отпускается на время блокирующих вызовов (next_frame, close,\n" - "create) — другие Python потоки могут работать.") + "Thread-safety contract:\n" + " • Handle принадлежит одному Python потоку — создание и\n" + " все вызовы (next_frame, close) должны быть в одном thread.\n" + " • Несколько subscriber'ов в разных потоках — OK (каждому свой\n" + " handle, свой CUDA stream).\n" + " • Доступ к Frame после release() из другого потока — UB\n" + " (cuframes_frame_t* указывает в ring buffer publisher'а, после\n" + " release он может быть переписан).\n" + " • Внутренний GIL отпускается на длинных I/O вызовах\n" + " (subscriber_create, next_frame) — другие Python потоки могут\n" + " выполняться параллельно пока мы ждём frame.\n\n" + "CUDA stream:\n" + " consumer_stream передаётся как int (cudaStream_t как opaque\n" + " pointer). Получается через cuda-python (cudart.cudaStreamCreate)\n" + " или torch (torch.cuda.Stream()._as_parameter_). Если 0 —\n" + " default stream (serialization risk при нескольких subscriber'ах\n" + " в одном процессе).") .def(py::init, - cuframes_subscriber_mode_t, int, int>(), + cuframes_subscriber_mode_t, int, int, uintptr_t>(), py::arg("key"), py::arg("consumer_name") = py::none(), py::arg("mode") = CUFRAMES_MODE_NEWEST_ONLY, py::arg("cuda_device") = 0, py::arg("connect_timeout_ms") = -1, + py::arg("consumer_stream") = 0, "Создать subscription. Блокирует до publisher_ready или\n" - "connect_timeout_ms. -1 = ждать вечно, 0 = fail сразу.") + "connect_timeout_ms. -1 = ждать вечно, 0 = fail сразу.\n" + "consumer_stream: int representation cudaStream_t (0=default).") .def_property_readonly("key", &SubscriberWrapper::key) .def_property_readonly("consumer_name", &SubscriberWrapper::consumer_name) .def_property_readonly("mode", &SubscriberWrapper::mode) .def_property_readonly("cuda_device", &SubscriberWrapper::cuda_device) + .def_property_readonly("consumer_stream", &SubscriberWrapper::consumer_stream, + "Pointer на cudaStream_t (int). 0 = default stream.") .def_property_readonly("closed", &SubscriberWrapper::closed) .def("next_frame", &SubscriberWrapper::next_frame, py::arg("timeout_ms") = -1, @@ -426,6 +694,31 @@ PYBIND11_MODULE(_native, m) { "`with sub.next_frame() as frame: ...` для гарантии release.") .def("close", &SubscriberWrapper::close, "Закрыть subscription. Idempotent.") + // ── Health / stats ────────────────────────────────────────────── + // Phase 0: counted в pybind layer (cuframes C API не expose'ит + // ring_occupancy / drop_count напрямую). Эти counters достаточно + // для MQTT health publisher / monitoring. + .def_property_readonly("frames_received", + [](const SubscriberWrapper& s) { return s.stats().frames_received; }, + "Количество успешных next_frame() с момента subscribe.") + .def_property_readonly("timeouts", + [](const SubscriberWrapper& s) { return s.stats().timeouts; }, + "Сколько раз next_frame() вернул CuframesFrameTimeout.") + .def_property_readonly("errors", + [](const SubscriberWrapper& s) { return s.stats().errors; }, + "Сколько раз next_frame() упал с error (не timeout).") + .def_property_readonly("last_seq", + [](const SubscriberWrapper& s) { return s.stats().last_seq; }, + "Sequence number последнего полученного frame'а.") + .def_property_readonly("gap_count", + [](const SubscriberWrapper& s) { return s.stats().gap_count; }, + "Сколько раз seq[i] > seq[i-1] + 1 — proxy для drop count\n" + "в NEWEST_ONLY mode. В STRICT_ORDER должен оставаться 0.") + .def_property_readonly("last_frame_pts_ns", + [](const SubscriberWrapper& s) { return s.stats().last_frame_pts_ns; }) + .def("stats", + [](const SubscriberWrapper& s) { return s.stats_dict(); }, + "Snapshot всех health counters как dict — для MQTT health publish.") // context manager .def("__enter__", [](SubscriberWrapper& self) -> SubscriberWrapper& { self.check_alive(); @@ -448,18 +741,17 @@ PYBIND11_MODULE(_native, m) { std::optional consumer_name, cuframes_subscriber_mode_t mode, int cuda_device, - int connect_timeout_ms) { + int connect_timeout_ms, + uintptr_t consumer_stream) { return std::make_unique( - key, consumer_name, mode, cuda_device, connect_timeout_ms); + key, consumer_name, mode, cuda_device, + connect_timeout_ms, consumer_stream); }, py::arg("key"), py::arg("consumer_name") = py::none(), py::arg("mode") = CUFRAMES_MODE_NEWEST_ONLY, py::arg("cuda_device") = 0, py::arg("connect_timeout_ms") = -1, + py::arg("consumer_stream") = 0, "Создать CuframesSubscriber. Shortcut для CuframesSubscriber(...)."); - - // TODO(task #199): DLPack export — frame.__dlpack__() - // TODO(task #200): health/stats — sub.ring_occupancy, sub.drop_count - // TODO(task #201): consumer_stream parameter в subscribe/next_frame } diff --git a/python/tests/test_smoke.py b/python/tests/test_smoke.py index af936db..bc7e274 100644 --- a/python/tests/test_smoke.py +++ b/python/tests/test_smoke.py @@ -82,3 +82,31 @@ def test_subscriber_repr_when_unable_to_connect(): except cuframes.CuframesError: return # ожидаемо pytest.fail("subscribe должно было выкинуть exception") + + +def test_subscribe_accepts_consumer_stream_param(): + """consumer_stream — uintptr (cudaStream_t). + + Проверяем что параметр accepted; реальное использование требует + cuda-python / torch.cuda.Stream — это в integration тестах + yolo-world-detector'а. + """ + import pytest + with pytest.raises(cuframes.CuframesError): + cuframes.subscribe( + "nope-xyz", + connect_timeout_ms=100, + consumer_stream=0, # 0 = default stream + ) + + +def test_subscribe_kwargs_signature(): + """Проверяем что у subscribe правильный набор kwargs.""" + import inspect + # Pybind11-обёртки не дают inspect.signature, но help_doc отражает их. + doc = cuframes.subscribe.__doc__ + assert "consumer_name" in doc + assert "mode" in doc + assert "cuda_device" in doc + assert "connect_timeout_ms" in doc + assert "consumer_stream" in doc