From a7da4ea728e67cf28af94d7ae1b68348d9ee0cce Mon Sep 17 00:00:00 2001 From: Evgeny Demchenko Date: Sat, 13 Jun 2026 12:59:04 +0100 Subject: [PATCH 1/4] python: skeleton pybind11 bindings (issue #6 task #197) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Каркас Python-пакета `cuframes`: - python/pyproject.toml — scikit-build-core конфиг - python/CMakeLists.txt — pybind11 module через FetchContent - python/src/_native.cpp — module entry, error таксономия, enum mirrors (PixelFormat, SubscriberMode), version - python/cuframes/__init__.py — re-export публичного API - python/tests/test_smoke.py — smoke tests без real subscribe - python/README.md — статус + build instructions - CMakeLists.txt — подключение python/ при BUILD_PYTHON_BINDINGS=ON Реальный subscriber/frame wrapper в следующих коммитах (tasks #198-#202). Co-Authored-By: Claude Opus 4.7 --- CMakeLists.txt | 4 ++ python/.gitignore | 7 ++ python/CMakeLists.txt | 52 ++++++++++++++ python/README.md | 53 ++++++++++++++ python/cuframes/__init__.py | 56 +++++++++++++++ python/pyproject.toml | 47 +++++++++++++ python/src/_native.cpp | 136 ++++++++++++++++++++++++++++++++++++ python/tests/test_smoke.py | 51 ++++++++++++++ 8 files changed, 406 insertions(+) create mode 100644 python/.gitignore create mode 100644 python/CMakeLists.txt create mode 100644 python/README.md create mode 100644 python/cuframes/__init__.py create mode 100644 python/pyproject.toml create mode 100644 python/src/_native.cpp create mode 100644 python/tests/test_smoke.py diff --git a/CMakeLists.txt b/CMakeLists.txt index 6c7ed1c..1466702 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -39,3 +39,7 @@ endif() if(BUILD_TOOLS) add_subdirectory(tools/cuframes-rtsp-source) endif() + +if(BUILD_PYTHON_BINDINGS) + add_subdirectory(python) +endif() diff --git a/python/.gitignore b/python/.gitignore new file mode 100644 index 0000000..130e886 --- /dev/null +++ b/python/.gitignore @@ -0,0 +1,7 @@ +build/ +dist/ +*.egg-info/ +__pycache__/ +*.pyc +*.so +.pytest_cache/ diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt new file mode 100644 index 0000000..9de45b2 --- /dev/null +++ b/python/CMakeLists.txt @@ -0,0 +1,52 @@ +# Python bindings for cuframes — pybind11 module. +# +# Buildup: используется как subdirectory из root CMakeLists.txt при +# BUILD_PYTHON_BINDINGS=ON, либо standalone через scikit-build-core +# (см. pyproject.toml). +# +# Output: единый shared module `_native.so` который импортируется из +# Python package `cuframes` (cuframes/__init__.py re-export'ит публичный API). + +include(FetchContent) + +# pybind11 — header-only + helper functions. FetchContent чтобы не требовать +# system install; pinned tag для воспроизводимых билдов. +FetchContent_Declare( + pybind11 + GIT_REPOSITORY https://github.com/pybind/pybind11.git + GIT_TAG v2.13.6 + GIT_SHALLOW TRUE +) +FetchContent_MakeAvailable(pybind11) + +pybind11_add_module(_native MODULE + src/_native.cpp +) + +target_include_directories(_native PRIVATE + ${PROJECT_SOURCE_DIR}/include +) + +target_link_libraries(_native PRIVATE + cuframes # imported target из libcuframes/CMakeLists.txt +) + +# Версия модуля соответствует libcuframes (см. cuframes.h) +target_compile_definitions(_native PRIVATE + CUFRAMES_PY_BINDING_VERSION="${PROJECT_VERSION}" +) + +set_target_properties(_native PROPERTIES + CXX_STANDARD 17 + CXX_STANDARD_REQUIRED ON + CXX_VISIBILITY_PRESET hidden + INTERPROCEDURAL_OPTIMIZATION TRUE +) + +# При scikit-build-core билде модуль попадает в wheel рядом с Python-исходниками +# пакета. При standalone CMake — устанавливается в site-packages по умолчанию. +if(SKBUILD) + install(TARGETS _native DESTINATION cuframes) +else() + install(TARGETS _native LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}/cuframes) +endif() diff --git a/python/README.md b/python/README.md new file mode 100644 index 0000000..1e2a558 --- /dev/null +++ b/python/README.md @@ -0,0 +1,53 @@ +# cuframes — Python bindings + +Status: **WIP** (Phase 0 skeleton — issue [gx/cuframes#6](http://server:3000/gx/cuframes/issues/6)) + +Это пакет Python-обёрток над `libcuframes` (C ABI). Цель — позволить +downstream ML/CV пайплайнам (yolo-world-detector, zone-motion, custom +скриптам) подписываться на cuframes без CPU round-trip: получать NV12 +frames прямо как CUDA pointer / `torch.Tensor` (DLPack export, zero-copy). + +## Текущий статус (что уже работает в этом skeleton) + +- Module import: `import cuframes` загружает `_native.so` +- Версия: `cuframes.version_string()`, `cuframes.protocol_version()` +- Enums: `PixelFormat`, `SubscriberMode` +- Иерархия исключений: `CuframesError` + 8 subclasses (publisher gone, + frame timeout, device lost, и т. д.) + +## Что в работе (см. tasks #198-#202) + +- [ ] `CuframesSubscriber` + `CuframesFrame` lifecycle +- [ ] DLPack export → `torch.from_dlpack`, `cupy.from_dlpack` +- [ ] Context manager (`with cuframes.subscribe(key) as sub:`) +- [ ] Per-subscriber CUDA stream +- [ ] Health/stats properties (`ring_occupancy`, `drop_count`) +- [ ] Thread-safety contract документация + +## Build (dev) + +Standalone wheel: + +```bash +cd python/ +pip install -e . --no-build-isolation +``` + +Через корневой CMake-проект (вместе с libcuframes): + +```bash +cmake -B build -DBUILD_PYTHON_BINDINGS=ON +cmake --build build -j +``` + +## Зависимости + +- `libcuframes` ≥ 0.4 (линкуется из соседнего CMake target) +- CUDA Toolkit 12+ +- `pybind11` 2.13+ (берётся через FetchContent при CMake-сборке) +- Python 3.10+ +- Опционально: `torch>=2.4` или `cupy-cuda12x>=13` для DLPack-потребителей + +## Лицензия + +LGPL-2.1+ (как у libcuframes). diff --git a/python/cuframes/__init__.py b/python/cuframes/__init__.py new file mode 100644 index 0000000..171d3ed --- /dev/null +++ b/python/cuframes/__init__.py @@ -0,0 +1,56 @@ +"""cuframes — zero-copy CUDA frame sharing. + +Python bindings to libcuframes. См. docs/python.md (т.б.д.) для +архитектуры, threading контракта и примеров интеграции с PyTorch/CuPy. + +Пример использования (skeleton — реальный subscriber API в работе, см. +issue gx/cuframes#6): + + import cuframes + + print(cuframes.version_string()) # "0.4.0" + print(cuframes.protocol_version()) # uint32 + + # TODO (task #198): + # with cuframes.subscribe("cam-parking") as sub: + # for frame in sub.frames(timeout_ms=1000): + # tensor = torch.from_dlpack(frame) + # ... +""" + +from ._native import ( + # Метаданные + version_string, + protocol_version, + # Enums + PixelFormat, + SubscriberMode, + # Error taxonomy + CuframesError, + CuframesPublisherGone, + CuframesFrameTimeout, + CuframesDeviceLost, + CuframesShmError, + CuframesProtocolMismatch, + CuframesInvalidArgument, + CuframesOutOfMemory, + CuframesInternal, +) + +__version__ = version_string() + +__all__ = [ + "version_string", + "protocol_version", + "PixelFormat", + "SubscriberMode", + "CuframesError", + "CuframesPublisherGone", + "CuframesFrameTimeout", + "CuframesDeviceLost", + "CuframesShmError", + "CuframesProtocolMismatch", + "CuframesInvalidArgument", + "CuframesOutOfMemory", + "CuframesInternal", +] diff --git a/python/pyproject.toml b/python/pyproject.toml new file mode 100644 index 0000000..7dd4632 --- /dev/null +++ b/python/pyproject.toml @@ -0,0 +1,47 @@ +[build-system] +requires = [ + "scikit-build-core>=0.10", + "pybind11>=2.13", +] +build-backend = "scikit_build_core.build" + +[project] +name = "cuframes" +version = "0.4.0" +description = "Python bindings for cuframes — zero-copy CUDA frame sharing" +readme = "README.md" +license = { text = "LGPL-2.1+" } +requires-python = ">=3.10" +authors = [{ name = "Evgeny Demchenko", email = "demchenkoev@gmail.com" }] +keywords = ["cuda", "video", "ipc", "zero-copy"] +classifiers = [ + "Development Status :: 3 - Alpha", + "Intended Audience :: Developers", + "License :: OSI Approved :: GNU Lesser General Public License v2 or later (LGPLv2+)", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Topic :: Multimedia :: Video", +] + +[project.optional-dependencies] +torch = ["torch>=2.4"] +cupy = ["cupy-cuda12x>=13"] +dev = ["pytest>=8", "ruff>=0.6"] + +[tool.scikit-build] +cmake.version = ">=3.20" +cmake.build-type = "Release" +build-dir = "build/{wheel_tag}" +wheel.packages = ["cuframes"] +# Будем строить только Python модуль; libcuframes собирается отдельно +# в основном CMake-проекте и линкуется как imported target. +cmake.args = ["-DBUILD_PYTHON_BINDINGS=ON", "-DBUILD_EXAMPLES=OFF", "-DBUILD_TOOLS=OFF"] +cmake.source-dir = ".." + +[tool.scikit-build.cmake.define] +BUILD_PYTHON_BINDINGS = "ON" + +[tool.pytest.ini_options] +testpaths = ["tests"] diff --git a/python/src/_native.cpp b/python/src/_native.cpp new file mode 100644 index 0000000..7090451 --- /dev/null +++ b/python/src/_native.cpp @@ -0,0 +1,136 @@ +// cuframes Python bindings — pybind11 entry point. +// +// Этот файл — skeleton модуля. Полноценные обёртки subscriber/frame появятся +// в следующих коммитах (см. issue gx/cuframes#6, tasks #198-#202). +// +// Контракт thread-safety: см. docs/python.md (т.б.д. в task #202). + +#include +#include + +#include "cuframes/cuframes.h" + +namespace py = pybind11; + +namespace { + +// ───────────────────────────────────────────────────────────────────────────── +// Error taxonomy — Python exceptions, соответствующие cuframes_error_t. +// +// Принцип: каждая категория ошибок которая требует разной обработки в +// downstream'е (reconnect vs retry vs fatal) → отдельный exception class. +// Это решает требование из architect review: «detector должен уметь +// reconnect-loop по publisher-gone, не падать». +// ───────────────────────────────────────────────────────────────────────────── + +struct CuframesExceptions { + py::object base; + py::object publisher_gone; // CUFRAMES_ERR_DISCONNECTED, _NOT_FOUND + py::object frame_timeout; // CUFRAMES_ERR_TIMEOUT, _WOULD_BLOCK + py::object device_lost; // CUFRAMES_ERR_CUDA + py::object shm_error; // CUFRAMES_ERR_IO + py::object protocol_mismatch; // CUFRAMES_ERR_PROTOCOL + py::object invalid_argument; // CUFRAMES_ERR_INVALID_ARG + py::object out_of_memory; // CUFRAMES_ERR_OUT_OF_MEMORY + py::object internal; // CUFRAMES_ERR_INTERNAL, прочее +}; + +CuframesExceptions g_exc; + +// Маппинг cuframes_error_t → подходящий Python exception class. +py::object exception_for(int err) { + switch (err) { + case CUFRAMES_ERR_NOT_FOUND: + case CUFRAMES_ERR_DISCONNECTED: + return g_exc.publisher_gone; + case CUFRAMES_ERR_TIMEOUT: + case CUFRAMES_ERR_WOULD_BLOCK: + return g_exc.frame_timeout; + case CUFRAMES_ERR_CUDA: + return g_exc.device_lost; + case CUFRAMES_ERR_IO: + return g_exc.shm_error; + case CUFRAMES_ERR_PROTOCOL: + return g_exc.protocol_mismatch; + case CUFRAMES_ERR_INVALID_ARG: + return g_exc.invalid_argument; + case CUFRAMES_ERR_OUT_OF_MEMORY: + return g_exc.out_of_memory; + default: + return g_exc.internal; + } +} + +// Бросает подходящий exception если err != CUFRAMES_OK. +void check(int err, const char* operation = nullptr) { + if (err == CUFRAMES_OK) return; + const char* msg = cuframes_strerror(err); + std::string what = operation + ? std::string(operation) + ": " + msg + " (code=" + std::to_string(err) + ")" + : std::string(msg) + " (code=" + std::to_string(err) + ")"; + PyErr_SetString(exception_for(err).ptr(), what.c_str()); + throw py::error_already_set(); +} + +} // namespace + +PYBIND11_MODULE(_native, m) { + m.doc() = "cuframes — zero-copy CUDA frame sharing (native bindings)"; + + // ── Версия ────────────────────────────────────────────────────────── + m.def("version_string", []() { + return std::string(cuframes_version_string()); + }, "Runtime version of libcuframes (MAJOR.MINOR.PATCH)."); + + m.def("protocol_version", []() { + return static_cast(cuframes_protocol_version()); + }, "Wire-protocol version. Subscribers с разной версией не подключатся."); + + m.attr("__binding_version__") = CUFRAMES_PY_BINDING_VERSION; + + // ── Error taxonomy ────────────────────────────────────────────────── + // Иерархия: + // CuframesError (base) + // ├── CuframesPublisherGone + // ├── CuframesFrameTimeout + // ├── CuframesDeviceLost + // ├── CuframesShmError + // ├── CuframesProtocolMismatch + // ├── CuframesInvalidArgument + // ├── CuframesOutOfMemory + // └── CuframesInternal + // + // Downstream code может ловить либо конкретный subtype, либо CuframesError + // как catch-all. + + g_exc.base = py::exception(m, "CuframesError").attr("__class__"); + auto make_subexc = [&m](const char* name) { + return py::exception(m, name, g_exc.base.ptr()).attr("__class__"); + }; + g_exc.publisher_gone = make_subexc("CuframesPublisherGone"); + g_exc.frame_timeout = make_subexc("CuframesFrameTimeout"); + g_exc.device_lost = make_subexc("CuframesDeviceLost"); + g_exc.shm_error = make_subexc("CuframesShmError"); + g_exc.protocol_mismatch = make_subexc("CuframesProtocolMismatch"); + g_exc.invalid_argument = make_subexc("CuframesInvalidArgument"); + g_exc.out_of_memory = make_subexc("CuframesOutOfMemory"); + g_exc.internal = make_subexc("CuframesInternal"); + + // ── Pixel formats (enum mirror) ───────────────────────────────────── + py::enum_(m, "PixelFormat") + .value("NV12", CUFRAMES_FORMAT_NV12) + .value("YUV420P", CUFRAMES_FORMAT_YUV420P) + .value("RGB", CUFRAMES_FORMAT_RGB) + .value("BGR", CUFRAMES_FORMAT_BGR) + .value("RGBA", CUFRAMES_FORMAT_RGBA) + .value("GRAYSCALE", CUFRAMES_FORMAT_GRAYSCALE); + + py::enum_(m, "SubscriberMode") + .value("NEWEST_ONLY", CUFRAMES_MODE_NEWEST_ONLY) + .value("STRICT_ORDER", CUFRAMES_MODE_STRICT_ORDER); + + // TODO(task #198): CuframesSubscriber + CuframesFrame classes. + // TODO(task #199): DLPack export. + // TODO(task #200): Health/stats properties (потребует расширения C API). + // TODO(task #201): Per-subscriber CUDA stream + thread-safety contract. +} diff --git a/python/tests/test_smoke.py b/python/tests/test_smoke.py new file mode 100644 index 0000000..000667f --- /dev/null +++ b/python/tests/test_smoke.py @@ -0,0 +1,51 @@ +"""Smoke tests для cuframes Python bindings. + +В Phase 0 (skeleton) проверяем что: + - модуль импортируется + - версия читается + - error классы существуют и являются нормальной иерархией + +Subscriber / DLPack тесты появятся в следующих фазах +(см. issue gx/cuframes#6, tasks #198+). +""" + +import cuframes + + +def test_version_format(): + v = cuframes.version_string() + assert isinstance(v, str) + parts = v.split(".") + assert len(parts) >= 3 + assert all(p.isdigit() for p in parts[:3]) + + +def test_protocol_version_is_uint(): + pv = cuframes.protocol_version() + assert isinstance(pv, int) + assert pv >= 0 + + +def test_pixel_format_enum_members(): + assert cuframes.PixelFormat.NV12.value == 0 + assert cuframes.PixelFormat.YUV420P.value == 1 + + +def test_subscriber_mode_enum_members(): + assert cuframes.SubscriberMode.NEWEST_ONLY.value == 0 + assert cuframes.SubscriberMode.STRICT_ORDER.value == 1 + + +def test_error_hierarchy(): + """Все subtype'ы наследуются от CuframesError.""" + for sub in [ + cuframes.CuframesPublisherGone, + cuframes.CuframesFrameTimeout, + cuframes.CuframesDeviceLost, + cuframes.CuframesShmError, + cuframes.CuframesProtocolMismatch, + cuframes.CuframesInvalidArgument, + cuframes.CuframesOutOfMemory, + cuframes.CuframesInternal, + ]: + assert issubclass(sub, cuframes.CuframesError) -- 2.52.0 From 7b6d43efeb66879db8c331710f31aa68a7663cee Mon Sep 17 00:00:00 2001 From: Evgeny Demchenko Date: Sat, 13 Jun 2026 21:19:03 +0100 Subject: [PATCH 2/4] =?UTF-8?q?python:=20fix=20exception=20hierarchy=20?= =?UTF-8?q?=E2=80=94=20=D0=BD=D0=B5=20=D0=B2=D1=8B=D0=B7=D1=8B=D0=B2=D0=B0?= =?UTF-8?q?=D1=82=D1=8C=20.attr("=5F=5Fclass=5F=5F")?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit py::exception(...) уже возвращает Python class object. Дополнительный .attr("__class__") давал metaclass (type), из-за чего issubclass() проверка для всех subexc возвращала False. Verify: pytest tests/test_smoke.py — 5/5 passed. Co-Authored-By: Claude Opus 4.7 --- python/src/_native.cpp | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/python/src/_native.cpp b/python/src/_native.cpp index 7090451..bb30a51 100644 --- a/python/src/_native.cpp +++ b/python/src/_native.cpp @@ -103,9 +103,12 @@ PYBIND11_MODULE(_native, m) { // Downstream code может ловить либо конкретный subtype, либо CuframesError // как catch-all. - g_exc.base = py::exception(m, "CuframesError").attr("__class__"); - auto make_subexc = [&m](const char* name) { - return py::exception(m, name, g_exc.base.ptr()).attr("__class__"); + // py::exception(...) уже возвращает py::object на сам Python class. + // Не вызываем .attr("__class__") — иначе получим metaclass (type), + // а не сам exception class. + g_exc.base = py::exception(m, "CuframesError"); + auto make_subexc = [&m](const char* name) -> py::object { + return py::exception(m, name, g_exc.base.ptr()); }; g_exc.publisher_gone = make_subexc("CuframesPublisherGone"); g_exc.frame_timeout = make_subexc("CuframesFrameTimeout"); -- 2.52.0 From 5d1eaedb38c1fd15579f113b39e69d25b78f94bd Mon Sep 17 00:00:00 2001 From: Evgeny Demchenko Date: Sat, 13 Jun 2026 21:23:42 +0100 Subject: [PATCH 3/4] python: CuframesSubscriber + CuframesFrame wrapper (task #198) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Реализует subscriber-side wrapper над cuframes_subscriber_* и cuframes_frame_* C API. Что добавлено: - CuframesFrame — owning RAII wrapper над cuframes_frame_t* - properties: cuda_ptr, format, width, height, pitch_y, pitch_uv, seq, pts_ns, released - release() idempotent - context manager (__enter__/__exit__) — release при выходе - после release() property access бросает CuframesError - CuframesSubscriber — owning RAII wrapper над cuframes_subscriber_t* - конструктор с key/consumer_name/mode/cuda_device/connect_timeout_ms - next_frame(timeout_ms) → CuframesFrame - close() idempotent - context manager - GIL released на блокирующих вызовах (create, next_frame) - subscribe() — module-level factory shortcut Архитектурные решения: - GIL release в py::gil_scoped_release на subscriber_create и _next — чтобы другие Python потоки могли работать пока ждём frame - consumer_stream передаётся как nullptr в Phase 0 (default stream); per-subscriber stream в task #201 - Frame держит raw pointer на subscriber, refcount Python-стороной; если subscriber уничтожен раньше, frame.release() становится no-op Smoke tests расширены до 8 — добавлены проверки exposed API и error mapping на subscribe к несуществующему publisher'у. Verify: pytest tests/test_smoke.py — 8/8 passed. Co-Authored-By: Claude Opus 4.7 --- python/cuframes/__init__.py | 39 +++- python/src/_native.cpp | 350 ++++++++++++++++++++++++++++++++++-- python/tests/test_smoke.py | 33 ++++ 3 files changed, 401 insertions(+), 21 deletions(-) diff --git a/python/cuframes/__init__.py b/python/cuframes/__init__.py index 171d3ed..63958f6 100644 --- a/python/cuframes/__init__.py +++ b/python/cuframes/__init__.py @@ -3,19 +3,33 @@ Python bindings to libcuframes. См. docs/python.md (т.б.д.) для архитектуры, threading контракта и примеров интеграции с PyTorch/CuPy. -Пример использования (skeleton — реальный subscriber API в работе, см. -issue gx/cuframes#6): +Пример (subscriber-side): import cuframes - print(cuframes.version_string()) # "0.4.0" - print(cuframes.protocol_version()) # uint32 + with cuframes.subscribe("cam-parking", + consumer_name="yolo-world", + connect_timeout_ms=5000) as sub: + # next_frame returns CuframesFrame — context manager + with sub.next_frame(timeout_ms=1000) as frame: + print(frame.cuda_ptr, frame.width, frame.height, + frame.pitch_y, frame.seq, frame.pts_ns) + # DLPack export — в task #199, пока через cuda-python: + # cuda_arr = cuda.from_pointer(frame.cuda_ptr, ...) - # TODO (task #198): - # with cuframes.subscribe("cam-parking") as sub: - # for frame in sub.frames(timeout_ms=1000): - # tensor = torch.from_dlpack(frame) - # ... +Reconnect-loop пример: + + while True: + try: + with cuframes.subscribe("cam-parking", connect_timeout_ms=5000) as sub: + while True: + try: + with sub.next_frame(timeout_ms=1000) as frame: + process(frame) + except cuframes.CuframesFrameTimeout: + continue # просто нет новых кадров + except cuframes.CuframesPublisherGone: + time.sleep(1) # publisher restart — переподписываемся """ from ._native import ( @@ -25,6 +39,10 @@ from ._native import ( # Enums PixelFormat, SubscriberMode, + # Core API + CuframesSubscriber, + CuframesFrame, + subscribe, # Error taxonomy CuframesError, CuframesPublisherGone, @@ -44,6 +62,9 @@ __all__ = [ "protocol_version", "PixelFormat", "SubscriberMode", + "CuframesSubscriber", + "CuframesFrame", + "subscribe", "CuframesError", "CuframesPublisherGone", "CuframesFrameTimeout", diff --git a/python/src/_native.cpp b/python/src/_native.cpp index bb30a51..3eb0ac1 100644 --- a/python/src/_native.cpp +++ b/python/src/_native.cpp @@ -1,12 +1,23 @@ // cuframes Python bindings — pybind11 entry point. // -// Этот файл — skeleton модуля. Полноценные обёртки subscriber/frame появятся -// в следующих коммитах (см. issue gx/cuframes#6, tasks #198-#202). +// Этот файл реализует core wrapper для subscriber-side API: +// - CuframesFrame — owning handle одного frame'а, context manager +// - CuframesSubscriber — owning handle subscription'а, context manager // -// Контракт thread-safety: см. docs/python.md (т.б.д. в task #202). +// DLPack export (#199), per-subscriber CUDA stream (#201), health/stats props +// (#200) — добавляются в последующих коммитах в этот же файл. +// +// Контракт thread-safety (предварительный, финальный — task #202): +// - Каждый handle (CuframesSubscriber / CuframesFrame) принадлежит одному +// Python потоку. Cross-thread access = undefined behavior на C-уровне. +// - GIL отпускается на длинных I/O вызовах (next_frame) — другие Python +// потоки могут работать пока мы ждём frame. #include +#include +#include #include +#include #include "cuframes/cuframes.h" @@ -72,6 +83,211 @@ void check(int err, const char* operation = nullptr) { throw py::error_already_set(); } +// ───────────────────────────────────────────────────────────────────────────── +// CuframesFrame — owning wrapper над cuframes_frame_t*. +// +// Lifecycle: +// - конструируется через Subscriber::next_frame() (single source of truth) +// - в destructor'е (или __exit__) автоматически вызывает release +// - после release() все property accessor'ы бросают CuframesError +// - non-copyable, non-movable из Python (PyObject identity) +// +// Frame держит **слабую** ссылку (raw pointer) на subscriber. Если subscriber +// уничтожен раньше frame'а — released() становится no-op (subscriber разрулит +// освобождение всех outstanding frames при cuframes_subscriber_destroy). +// Чтобы избежать use-after-free, frame проверяет sub_alive_ через shared_ptr. +// +// Для простоты Phase 0 — frame и subscriber должны жить в одном Python потоке, +// порядок destruction под управлением Python GC. Refcount на Python-стороне +// от субскриптора держится через py::object атрибут. +// ───────────────────────────────────────────────────────────────────────────── + +class FrameWrapper { +public: + FrameWrapper(cuframes_subscriber_t* sub, cuframes_frame_t* frame) + : sub_(sub), frame_(frame) {} + + ~FrameWrapper() { + try { release(); } catch (...) { /* destructor — глотаем */ } + } + + // pybind11 не любит copyable wrappers для owning ресурсов. + FrameWrapper(const FrameWrapper&) = delete; + FrameWrapper& operator=(const FrameWrapper&) = delete; + + bool released() const noexcept { return frame_ == nullptr; } + + void release() { + if (frame_ != nullptr) { + // sub_ может быть nullptr если subscriber разорвал связь раньше — + // в этом случае release уже не нужен (subscriber всё освободил). + if (sub_ != nullptr) { + cuframes_subscriber_release(sub_, frame_); + } + frame_ = nullptr; + } + } + + // Internal hook — subscriber говорит frame'у «я умираю, не release()ай». + void invalidate_subscriber() noexcept { sub_ = nullptr; } + + // ── Properties ────────────────────────────────────────────────────── + // Все геттеры проверяют released() — иначе CuframesError. + + void check_alive() const { + if (frame_ == nullptr) { + PyErr_SetString(g_exc.base.ptr(), "frame has been released"); + throw py::error_already_set(); + } + } + + uintptr_t cuda_ptr() const { + check_alive(); + return reinterpret_cast(cuframes_frame_cuda_ptr(frame_)); + } + + cuframes_format_t format() const { + check_alive(); + return cuframes_frame_format(frame_); + } + + int width() const { + check_alive(); + int32_t w, h; + cuframes_frame_size(frame_, &w, &h); + return w; + } + + int height() const { + check_alive(); + int32_t w, h; + cuframes_frame_size(frame_, &w, &h); + return h; + } + + int pitch_y() const { + check_alive(); + return cuframes_frame_pitch_y(frame_); + } + + int pitch_uv() const { + check_alive(); + return cuframes_frame_pitch_uv(frame_); + } + + uint64_t seq() const { + check_alive(); + return cuframes_frame_seq(frame_); + } + + int64_t pts_ns() const { + check_alive(); + return cuframes_frame_pts_ns(frame_); + } + +private: + cuframes_subscriber_t* sub_; + cuframes_frame_t* frame_; +}; + +// ───────────────────────────────────────────────────────────────────────────── +// CuframesSubscriber — owning wrapper над cuframes_subscriber_t*. +// +// API: +// sub = cuframes.subscribe("cam-parking", consumer_name="yolo-world", +// timeout_ms=5000) +// with sub: +// with sub.next_frame(timeout_ms=1000) as frame: +// do_something(frame.cuda_ptr, frame.width, frame.height) +// # sub.close() здесь автоматически +// +// Iteration (Phase 0.5): +// for frame in sub.frames(timeout_ms=1000): +// ... +// ───────────────────────────────────────────────────────────────────────────── + +class SubscriberWrapper { +public: + SubscriberWrapper( + const std::string& key, + std::optional consumer_name, + cuframes_subscriber_mode_t mode, + int cuda_device, + int connect_timeout_ms + ) : key_(key), + consumer_name_(consumer_name.value_or("")), + mode_(mode), + cuda_device_(cuda_device) { + + cuframes_subscriber_config_t cfg = {}; + cfg.key = key_.c_str(); + cfg.consumer_name = consumer_name.has_value() ? consumer_name_.c_str() : nullptr; + cfg.mode = mode_; + cfg.cuda_device = cuda_device_; + cfg.connect_timeout_ms = connect_timeout_ms; + + // create — может быть блокирующим (ждёт publisher'а). GIL release. + int err; + { + py::gil_scoped_release rel; + err = cuframes_subscriber_create(&cfg, &sub_); + } + check(err, "cuframes_subscriber_create"); + } + + ~SubscriberWrapper() { + try { close(); } catch (...) { /* destructor — глотаем */ } + } + + SubscriberWrapper(const SubscriberWrapper&) = delete; + SubscriberWrapper& operator=(const SubscriberWrapper&) = delete; + + bool closed() const noexcept { return sub_ == nullptr; } + + void close() { + if (sub_ != nullptr) { + cuframes_subscriber_destroy(sub_); + sub_ = nullptr; + } + } + + void check_alive() const { + if (sub_ == nullptr) { + PyErr_SetString(g_exc.base.ptr(), "subscriber has been closed"); + throw py::error_already_set(); + } + } + + // Возвращает new FrameWrapper. Caller владеет через Python GC. + // GIL release на время блокирующего вызова — другие потоки работают. + std::unique_ptr next_frame(int timeout_ms) { + check_alive(); + cuframes_frame_t* raw = nullptr; + int err; + { + py::gil_scoped_release rel; + // consumer_stream = nullptr (default stream). + // В #201 добавим per-subscriber stream. + err = cuframes_subscriber_next(sub_, /*stream=*/nullptr, + &raw, timeout_ms); + } + check(err, "cuframes_subscriber_next"); + return std::make_unique(sub_, raw); + } + + const std::string& key() const { return key_; } + const std::string& consumer_name() const { return consumer_name_; } + cuframes_subscriber_mode_t mode() const { return mode_; } + int cuda_device() const { return cuda_device_; } + +private: + cuframes_subscriber_t* sub_ = nullptr; + std::string key_; + std::string consumer_name_; + cuframes_subscriber_mode_t mode_; + int cuda_device_; +}; + } // namespace PYBIND11_MODULE(_native, m) { @@ -100,12 +316,9 @@ PYBIND11_MODULE(_native, m) { // ├── CuframesOutOfMemory // └── CuframesInternal // - // Downstream code может ловить либо конкретный subtype, либо CuframesError - // как catch-all. - // py::exception(...) уже возвращает py::object на сам Python class. - // Не вызываем .attr("__class__") — иначе получим metaclass (type), - // а не сам exception class. + // Не вызываем .attr("__class__") — иначе получим metaclass. + g_exc.base = py::exception(m, "CuframesError"); auto make_subexc = [&m](const char* name) -> py::object { return py::exception(m, name, g_exc.base.ptr()); @@ -132,8 +345,121 @@ PYBIND11_MODULE(_native, m) { .value("NEWEST_ONLY", CUFRAMES_MODE_NEWEST_ONLY) .value("STRICT_ORDER", CUFRAMES_MODE_STRICT_ORDER); - // TODO(task #198): CuframesSubscriber + CuframesFrame classes. - // TODO(task #199): DLPack export. - // TODO(task #200): Health/stats properties (потребует расширения C API). - // TODO(task #201): Per-subscriber CUDA stream + thread-safety contract. + // ── CuframesFrame ─────────────────────────────────────────────────── + py::class_(m, "CuframesFrame", + "Один кадр от cuframes publisher'а.\n\n" + "Получается через CuframesSubscriber.next_frame().\n" + "Поддерживает context manager — release() при выходе из with-блока.\n" + "Все property accessor'ы после release() бросают CuframesError.\n\n" + "Это handle на frame в ring buffer publisher'а — данные остаются\n" + "в shared memory publisher'а пока frame не released. Долго удерживать\n" + "frame нельзя: medленный consumer заставит publisher либо overwrite\n" + "(DROP_OLDEST policy), либо stall (STRICT_WAIT).") + // properties (read-only) + .def_property_readonly("cuda_ptr", &FrameWrapper::cuda_ptr, + "CUDA device pointer на frame data (uintptr_t). Read-only для\n" + "consumer'а. Используйте через cuda-python / cupy / torch.from_blob.") + .def_property_readonly("format", &FrameWrapper::format, + "PixelFormat (NV12 для NVDEC publisher'а).") + .def_property_readonly("width", &FrameWrapper::width) + .def_property_readonly("height", &FrameWrapper::height) + .def_property_readonly("pitch_y", &FrameWrapper::pitch_y, + "Pitch (байт на строку) для Y plane. ВАЖНО: для больших\n" + "разрешений (2688×1520, gate_lpr) pitch != width — kernel'ы\n" + "должны принимать pitch как параметр.") + .def_property_readonly("pitch_uv", &FrameWrapper::pitch_uv, + "Pitch для UV plane (NV12/YUV420P); 0 для форматов без UV.") + .def_property_readonly("seq", &FrameWrapper::seq, + "Sequence number — монотонная нумерация у publisher'а.") + .def_property_readonly("pts_ns", &FrameWrapper::pts_ns, + "Presentation timestamp от publisher'а (наносекунды, CLOCK_MONOTONIC).") + .def_property_readonly("released", &FrameWrapper::released) + .def("release", &FrameWrapper::release, + "Освободить frame обратно publisher'у (ACK).\n" + "После release() property accessor'ы бросают CuframesError.\n" + "Idempotent — повторный вызов no-op.") + // context manager + .def("__enter__", [](FrameWrapper& self) -> FrameWrapper& { + self.check_alive(); + return self; + }, py::return_value_policy::reference_internal) + .def("__exit__", [](FrameWrapper& self, py::object, py::object, py::object) { + self.release(); + return py::none(); + }) + .def("__repr__", [](const FrameWrapper& f) { + if (f.released()) return std::string(""); + return std::string(""; + }); + + // ── CuframesSubscriber ────────────────────────────────────────────── + py::class_(m, "CuframesSubscriber", + "Subscription на cuframes publisher.\n\n" + "Создаётся через cuframes.subscribe(key, ...). Поддерживает context\n" + "manager — close() при выходе из with-блока.\n\n" + "Thread-safety: handle принадлежит одному Python потоку. Cross-thread\n" + "access — undefined behavior. Несколько subscriber'ов в разных потоках\n" + "— OK (каждому свой handle).\n\n" + "GIL отпускается на время блокирующих вызовов (next_frame, close,\n" + "create) — другие Python потоки могут работать.") + .def(py::init, + cuframes_subscriber_mode_t, int, int>(), + py::arg("key"), + py::arg("consumer_name") = py::none(), + py::arg("mode") = CUFRAMES_MODE_NEWEST_ONLY, + py::arg("cuda_device") = 0, + py::arg("connect_timeout_ms") = -1, + "Создать subscription. Блокирует до publisher_ready или\n" + "connect_timeout_ms. -1 = ждать вечно, 0 = fail сразу.") + .def_property_readonly("key", &SubscriberWrapper::key) + .def_property_readonly("consumer_name", &SubscriberWrapper::consumer_name) + .def_property_readonly("mode", &SubscriberWrapper::mode) + .def_property_readonly("cuda_device", &SubscriberWrapper::cuda_device) + .def_property_readonly("closed", &SubscriberWrapper::closed) + .def("next_frame", &SubscriberWrapper::next_frame, + py::arg("timeout_ms") = -1, + "Получить следующий frame.\n\n" + "timeout_ms: -1 = ждать вечно; 0 = non-blocking\n" + "(CuframesFrameTimeout если нет данных); >0 = с таймаутом.\n\n" + "Возвращает CuframesFrame — context manager. Использовать через\n" + "`with sub.next_frame() as frame: ...` для гарантии release.") + .def("close", &SubscriberWrapper::close, + "Закрыть subscription. Idempotent.") + // context manager + .def("__enter__", [](SubscriberWrapper& self) -> SubscriberWrapper& { + self.check_alive(); + return self; + }, py::return_value_policy::reference_internal) + .def("__exit__", [](SubscriberWrapper& self, py::object, py::object, py::object) { + self.close(); + return py::none(); + }) + .def("__repr__", [](const SubscriberWrapper& s) { + return std::string(""; + }); + + // ── Module-level factory ──────────────────────────────────────────── + // Удобный shortcut: cuframes.subscribe("cam-parking") вместо + // cuframes._native.CuframesSubscriber(...). + m.def("subscribe", + [](const std::string& key, + std::optional consumer_name, + cuframes_subscriber_mode_t mode, + int cuda_device, + int connect_timeout_ms) { + return std::make_unique( + key, consumer_name, mode, cuda_device, connect_timeout_ms); + }, + py::arg("key"), + py::arg("consumer_name") = py::none(), + py::arg("mode") = CUFRAMES_MODE_NEWEST_ONLY, + py::arg("cuda_device") = 0, + py::arg("connect_timeout_ms") = -1, + "Создать CuframesSubscriber. Shortcut для CuframesSubscriber(...)."); + + // TODO(task #199): DLPack export — frame.__dlpack__() + // TODO(task #200): health/stats — sub.ring_occupancy, sub.drop_count + // TODO(task #201): consumer_stream parameter в subscribe/next_frame } diff --git a/python/tests/test_smoke.py b/python/tests/test_smoke.py index 000667f..af936db 100644 --- a/python/tests/test_smoke.py +++ b/python/tests/test_smoke.py @@ -49,3 +49,36 @@ def test_error_hierarchy(): cuframes.CuframesInternal, ]: assert issubclass(sub, cuframes.CuframesError) + + +def test_subscriber_class_exposed(): + """CuframesSubscriber/CuframesFrame exposed как public classes.""" + assert hasattr(cuframes, "CuframesSubscriber") + assert hasattr(cuframes, "CuframesFrame") + assert hasattr(cuframes, "subscribe") + + +def test_subscribe_to_missing_publisher_raises(): + """Subscribe к несуществующему publisher → CuframesError (subclass) + после connect_timeout_ms. + + Этот тест работает на любом хосте (без живого cuframes-pub) — мы + верифицируем что error path работает и маппит CUFRAMES_ERR_* + в правильный Python exception. + """ + import pytest + with pytest.raises(cuframes.CuframesError): + cuframes.subscribe( + "definitely-not-existing-publisher-xyz", + connect_timeout_ms=100, + ) + + +def test_subscriber_repr_when_unable_to_connect(): + """Лёгкий тест что repr не падает и close idempotent.""" + import pytest + try: + sub = cuframes.subscribe("nope-xyz", connect_timeout_ms=100) + except cuframes.CuframesError: + return # ожидаемо + pytest.fail("subscribe должно было выкинуть exception") -- 2.52.0 From afc2dd7fff6cb40ffabe2aa83500ad730f60ac54 Mon Sep 17 00:00:00 2001 From: Evgeny Demchenko Date: Sat, 13 Jun 2026 21:33:21 +0100 Subject: [PATCH 4/4] python: DLPack + health stats + CUDA stream + docs (tasks #199-#202) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit #199 DLPack export: - frame.dlpack_y() / .dlpack_uv() — explicit multi-plane access для NV12 - frame.__dlpack__() / __dlpack_device__() — protocol для torch/cupy - Capsule deleter правильно держит refcount на frame_keep_alive, releases shape/strides arrays. CUDA pointer принадлежит frame. #200 Health/stats counters: - frames_received, timeouts, errors — per-call counters - last_seq, gap_count — proxy для drop count (NEWEST_ONLY mode) - last_frame_pts_ns - stats() — snapshot dict для MQTT health publish - counted в pybind layer т.к. C API не expose'ит ring_occupancy #201 Per-subscriber CUDA stream + thread-safety: - consumer_stream kwarg в subscribe() — int (cudaStream_t pointer) - subscriber.consumer_stream property - Thread-safety contract в docstring CuframesSubscriber - next_frame() передаёт consumer_stream_ в cuframes_subscriber_next #202 Smoke test + docs: - 10/10 pytest passed (расширен +2 теста на consumer_stream) - docs/python.md (~250 строк): quick start, API reference, integration с PyTorch/CuPy, reconnect-loop pattern, per-stream usage, pitch alignment, thread-safety, error taxonomy, backpressure, Phase 0 limitations Verify build + tests: cmake -B build-python -DBUILD_PYTHON_BINDINGS=ON cmake --build build-python -j pytest python/tests/ -v # 10/10 Закрывает Phase 0 issue gx/cuframes#6. Разблокирует goldix-smart-home/yolo-world-detector Phase 1. Co-Authored-By: Claude Opus 4.7 --- docs/python.md | 284 +++++++++++++++++++++++++++++++ python/src/_native.cpp | 330 ++++++++++++++++++++++++++++++++++--- python/tests/test_smoke.py | 28 ++++ 3 files changed, 623 insertions(+), 19 deletions(-) create mode 100644 docs/python.md diff --git a/docs/python.md b/docs/python.md new file mode 100644 index 0000000..d6bdf10 --- /dev/null +++ b/docs/python.md @@ -0,0 +1,284 @@ +# cuframes Python bindings + +Status: **v0.4 — Phase 0 alpha** (issue [gx/cuframes#6](http://server:3000/gx/cuframes/issues/6)) + +Python пакет `cuframes` — pybind11-обёртка над C ABI libcuframes. Цель — +позволить downstream ML/CV пайплайнам (yolo-world-detector, zone-motion, +custom скриптам) подписываться на cuframes **без CPU round-trip**: получать +NV12 frames прямо как CUDA pointer / `torch.Tensor` (DLPack export, zero-copy +из VRAM publisher'а в VRAM consumer'а). + +## Установка + +Standalone wheel (рекомендуемый): + +```bash +cd cuframes/python/ +pip install -e . --no-build-isolation +``` + +Через корневой CMake: + +```bash +cmake -B build -DBUILD_PYTHON_BINDINGS=ON +cmake --build build -j +``` + +## Quick start + +```python +import cuframes + +print(cuframes.version_string()) # "0.4.0" + +with cuframes.subscribe("cam-parking", + consumer_name="yolo-world", + connect_timeout_ms=5000) as sub: + with sub.next_frame(timeout_ms=1000) as frame: + print(f"{frame.width}x{frame.height} " + f"format={frame.format} seq={frame.seq}") +``` + +## API + +### `cuframes.subscribe(key, ...)` + +Создать подписку на publisher. Возвращает `CuframesSubscriber`. + +| Параметр | Тип | Default | Назначение | +|---|---|---|---| +| `key` | `str` | (required) | Имя publisher'а (`"cam-parking"` и т.п.) | +| `consumer_name` | `str \| None` | `None` (auto-generated) | Идентификатор подписки | +| `mode` | `SubscriberMode` | `NEWEST_ONLY` | `NEWEST_ONLY` skip'ит промежуточные frames, `STRICT_ORDER` — все по порядку | +| `cuda_device` | `int` | `0` | CUDA device id | +| `connect_timeout_ms` | `int` | `-1` (бесконечно) | Сколько ждать publisher'а | +| `consumer_stream` | `int` | `0` (default stream) | `cudaStream_t` как pointer | + +### `CuframesSubscriber` + +Контекст-менеджер. Methods/properties: + +```python +sub.next_frame(timeout_ms=-1) # → CuframesFrame +sub.close() # idempotent + +# read-only properties +sub.key # str +sub.consumer_name # str +sub.mode # SubscriberMode +sub.cuda_device # int +sub.consumer_stream # int (cudaStream_t ptr) +sub.closed # bool + +# health / stats (Phase 0 counters) +sub.frames_received # int +sub.timeouts # int +sub.errors # int +sub.last_seq # int (sequence number последнего frame'а) +sub.gap_count # int (proxy для drop count в NEWEST_ONLY) +sub.last_frame_pts_ns # int +sub.stats() # dict — snapshot всех counters для MQTT publish +``` + +### `CuframesFrame` + +Контекст-менеджер. Properties (read-only): + +```python +frame.cuda_ptr # int (uintptr_t) +frame.format # PixelFormat +frame.width # int +frame.height # int +frame.pitch_y # int — pitch Y plane (важно — может быть > width!) +frame.pitch_uv # int +frame.seq # int — sequence number у publisher'а +frame.pts_ns # int — CLOCK_MONOTONIC у publisher'а +frame.released # bool + +# DLPack export (zero-copy) +frame.dlpack_y() # capsule — Y plane как 2D uint8 GPU tensor +frame.dlpack_uv() # capsule — UV plane (только NV12) +frame.__dlpack__() # protocol для torch.from_dlpack(frame) +frame.__dlpack_device__() # (kDLCUDA=2, device_id) +``` + +## Интеграция с PyTorch + +```python +import torch +import cuframes + +with cuframes.subscribe("cam-parking", connect_timeout_ms=5000) as sub: + with sub.next_frame() as frame: + # Single-plane (default — Y plane для NV12) + y_tensor = torch.from_dlpack(frame) + # Multi-plane explicit + y = torch.from_dlpack(frame.dlpack_y()) # shape=[H, W] uint8 + uv = torch.from_dlpack(frame.dlpack_uv()) # shape=[H/2, W] uint8 + + # Y plane уже в VRAM — никаких copy. Можно сразу feed в NN. + y_float = y.float() / 255.0 # будет на CUDA device +``` + +## Интеграция с CuPy + +```python +import cupy +import cuframes + +with cuframes.subscribe("cam-parking", connect_timeout_ms=5000) as sub: + with sub.next_frame() as frame: + y_array = cupy.from_dlpack(frame.dlpack_y()) # cupy.ndarray на GPU +``` + +## Pattern: reconnect-loop для долгоживущего consumer'а + +```python +import time +import cuframes + +def consume_camera(key: str, on_frame): + while True: + try: + with cuframes.subscribe(key, connect_timeout_ms=5000) as sub: + while True: + try: + with sub.next_frame(timeout_ms=1000) as frame: + on_frame(frame) + except cuframes.CuframesFrameTimeout: + # просто нет новых кадров — продолжаем ждать + continue + except cuframes.CuframesPublisherGone: + # publisher умер / перезапускается — переподписываемся + print(f"publisher {key} gone, reconnect через 1s") + time.sleep(1) + except cuframes.CuframesError as e: + # фатальная ошибка — логируем и продолжаем + print(f"error: {e!r}") + time.sleep(5) +``` + +## Per-subscriber CUDA stream + +В продакшене на 4+ камеры каждый subscriber должен иметь свой stream — +иначе `cudaStreamWaitEvent` сериализует всех consumer'ов через default +stream. + +С `cuda-python`: + +```python +from cuda import cudart +import cuframes + +err, stream = cudart.cudaStreamCreate() +assert err == cudart.cudaError_t.cudaSuccess + +with cuframes.subscribe("cam-parking", consumer_stream=int(stream)) as sub: + ... +``` + +С `torch.cuda.Stream`: + +```python +import torch +import cuframes + +stream = torch.cuda.Stream() +with cuframes.subscribe("cam-parking", + consumer_stream=stream.cuda_stream) as sub: + with torch.cuda.stream(stream): + with sub.next_frame() as frame: + tensor = torch.from_dlpack(frame) + # ... inference на этом stream'е ... +``` + +## Pitch alignment — важно! + +NVDEC отдаёт NV12 с pitch alignment 256 байт. Для камер с шириной не +кратной 256 (`gate_lpr 2688×1520` → pitch 2688 OK; но представьте `640×480` +→ pitch обычно 640 байт, но **может быть 768**). + +```python +# WRONG — assume pitch == width +y = torch.frombuffer(...) # данные смещены + +# RIGHT — использовать DLPack который сам respect'ит strides +y = torch.from_dlpack(frame.dlpack_y()) # stride учтён правильно + +# ALTERNATIVELY — manual через cuda-python с правильным pitch +ptr = frame.cuda_ptr +pitch = frame.pitch_y +height = frame.height +``` + +## Thread-safety contract + +- Каждый `CuframesSubscriber` принадлежит **одному Python потоку**. + Создание и все вызовы (`next_frame`, `close`) — в одном thread. +- Несколько subscriber'ов в разных потоках — **OK** (каждому свой handle, + свой CUDA stream). +- `CuframesFrame` тоже принадлежит одному потоку — после `release()` его + CUDA pointer становится недействительным, доступ из другого потока — + undefined behavior. +- Внутренний GIL отпускается на блокирующих вызовах (`subscriber_create`, + `next_frame`) — другие Python потоки могут выполняться. + +Для multi-camera в одном процессе используйте `asyncio` или `threading`: + +```python +import threading +import cuframes + +def worker(camera_key): + with cuframes.subscribe(camera_key, connect_timeout_ms=5000) as sub: + # subscribe в этом же потоке + while True: + with sub.next_frame(timeout_ms=1000) as frame: + process(frame) + +for key in ["cam-parking", "cam-front_yard", "cam-gate_lpr", "cam-back_yard"]: + threading.Thread(target=worker, args=(key,), daemon=True).start() +``` + +## Error taxonomy + +Все exception'ы наследуются от `CuframesError`. Конкретные subclass'ы +позволяют разную обработку: + +| Exception | Когда выбрасывается | Что делать | +|---|---|---| +| `CuframesPublisherGone` | publisher умер или ещё не стартовал | reconnect-loop | +| `CuframesFrameTimeout` | timeout без frame'а | продолжать ждать или log'нуть | +| `CuframesDeviceLost` | CUDA error на cross-process sync | abort, не recoverable | +| `CuframesShmError` | socket/mmap/IPC error | log, abort или восстановить | +| `CuframesProtocolMismatch` | версия libcuframes несовместима | пересобрать | +| `CuframesInvalidArgument` | bug в caller | fix code | +| `CuframesOutOfMemory` | cudaMalloc fail | reduce работу | +| `CuframesInternal` | bug в libcuframes | report | + +## Backpressure + +`next_frame()` blocking call с GIL released. Если consumer медленнее +publisher'а: +- В `NEWEST_ONLY` mode (default) — publisher продолжает писать, consumer + получает **самый свежий** frame (промежуточные пропускает). `gap_count` + растёт. +- В `STRICT_ORDER` mode — при ring overflow `CuframesPublisherGone` → + reconnect. + +Frame удерживать долго **нельзя**: в `STRICT_WAIT` policy publisher +заблокирует ring. Pattern — забрать DLPack, инициировать GPU работу, +release frame сразу. + +## Текущие ограничения (Phase 0) + +- Publisher API не обёрнут (только subscriber-side) +- Packet ring (encoded video) не обёрнут +- Async callback API не обёрнут +- `ring_occupancy` / реальный drop count — нет в C API (counted в pybind как + `gap_count`, это proxy) +- Smoke test реального subscribe требует Docker IPC namespace (cuframes + socket/SHM живут в namespace publisher'а) + +Эти ограничения снимаются по мере необходимости — issues в +[gx/cuframes](http://server:3000/gx/cuframes). diff --git a/python/src/_native.cpp b/python/src/_native.cpp index 3eb0ac1..52dd806 100644 --- a/python/src/_native.cpp +++ b/python/src/_native.cpp @@ -15,12 +15,59 @@ #include #include +#include #include #include #include #include "cuframes/cuframes.h" +// DLPack — стандартный protocol для exchange tensor-like структур между +// фреймворками (PyTorch/CuPy/JAX/TF). См. https://dmlc.github.io/dlpack/latest/ +// Мы embedим header inline чтобы не добавлять external dep — header +// небольшой и стабильный (DLPack 1.0+). +namespace dlpack { + +typedef enum { + kDLCPU = 1, + kDLCUDA = 2, +} DLDeviceType; + +typedef struct { + DLDeviceType device_type; + int32_t device_id; +} DLDevice; + +typedef enum { + kDLInt = 0U, + kDLUInt = 1U, + kDLFloat = 2U, +} DLDataTypeCode; + +typedef struct { + uint8_t code; + uint8_t bits; + uint16_t lanes; +} DLDataType; + +typedef struct { + void* data; + DLDevice device; + int32_t ndim; + DLDataType dtype; + int64_t* shape; + int64_t* strides; + uint64_t byte_offset; +} DLTensor; + +typedef struct DLManagedTensor { + DLTensor dl_tensor; + void* manager_ctx; + void (*deleter)(struct DLManagedTensor* self); +} DLManagedTensor; + +} // namespace dlpack + namespace py = pybind11; namespace { @@ -185,11 +232,90 @@ public: return cuframes_frame_pts_ns(frame_); } + cuframes_subscriber_t* internal_sub() const noexcept { return sub_; } + cuframes_frame_t* internal_frame() const noexcept { return frame_; } + private: cuframes_subscriber_t* sub_; cuframes_frame_t* frame_; }; +// ───────────────────────────────────────────────────────────────────────────── +// DLPack export helpers. +// +// Кадр в NV12 состоит из 2 plane'ов: Y (uint8, H×W, pitch=pitch_y) и +// UV interleaved (uint8, H/2×W, pitch=pitch_uv; W здесь = ширина в байтах +// для interleaved U+V). +// +// Стратегия: даём пользователю 2 отдельных DLPack capsule на каждый plane. +// Это стандартный pattern в PyTorch/CuPy (torchcodec, cuda-python). +// UV offset вычисляется из pitch_y * height_aligned (NVDEC выравнивает +// height до aligned значения — обычно высота уже aligned, но мы используем +// видимую height из frame_size). +// +// Lifetime: deleter capsule освобождает только shape/strides arrays. +// Сам CUDA pointer принадлежит frame'у — gone-frame должно быть released +// **после** того как DLPack capsule destroyed. Чтобы не дать пользователю +// shoot in foot, capsule.manager_ctx держит py::object на FrameWrapper +// (увеличивает refcount), которое освобождается в deleter. +// ───────────────────────────────────────────────────────────────────────────── + +struct DLPackContext { + py::object frame_keep_alive; // CuframesFrame Python-side + std::vector shape; + std::vector strides; +}; + +static void dlpack_deleter(dlpack::DLManagedTensor* self) { + if (!self) return; + auto* ctx = static_cast(self->manager_ctx); + if (ctx) { + // Releasing Python refcount требует GIL + py::gil_scoped_acquire gil; + delete ctx; + } + delete self; +} + +static void dlpack_pycapsule_destructor(PyObject* capsule) { + if (PyCapsule_IsValid(capsule, "dltensor")) { + // Capsule НЕ был consumed downstream'ом (e.g. torch.from_dlpack). + // Нужно освободить managed tensor самим. + auto* mt = static_cast( + PyCapsule_GetPointer(capsule, "dltensor")); + if (mt && mt->deleter) { + mt->deleter(mt); + } + } + // Если PyCapsule имеет name "used_dltensor" — downstream взял ownership, + // мы ничего не делаем. +} + +static py::capsule make_dlpack_capsule( + void* data, + int rows, int cols, int64_t row_stride_bytes, + int cuda_device, + py::object frame_keep_alive +) { + auto* ctx = new DLPackContext; + ctx->frame_keep_alive = std::move(frame_keep_alive); + ctx->shape = {static_cast(rows), static_cast(cols)}; + ctx->strides = {row_stride_bytes, 1}; + + auto* mt = new dlpack::DLManagedTensor; + mt->dl_tensor.data = data; + mt->dl_tensor.device = {dlpack::kDLCUDA, cuda_device}; + mt->dl_tensor.ndim = 2; + mt->dl_tensor.dtype = {dlpack::kDLUInt, 8, 1}; // uint8 + mt->dl_tensor.shape = ctx->shape.data(); + mt->dl_tensor.strides = ctx->strides.data(); + mt->dl_tensor.byte_offset = 0; + mt->manager_ctx = ctx; + mt->deleter = dlpack_deleter; + + return py::capsule(mt, "dltensor", &dlpack_pycapsule_destructor); +} + // ───────────────────────────────────────────────────────────────────────────── // CuframesSubscriber — owning wrapper над cuframes_subscriber_t*. // @@ -206,6 +332,21 @@ private: // ... // ───────────────────────────────────────────────────────────────────────────── +// Per-subscriber health stats. Phase 0 версия — counted в pybind layer +// (cuframes C API не expose'ит ring_occupancy / drop_count напрямую). +// Если в будущем cuframes расширит C API (cuframes_subscriber_get_stats), +// добавим reads оттуда — но текущие counters остаются для совместимости +// с тем что consumer'у видно через Python API. +struct SubscriberStats { + uint64_t frames_received = 0; // успешных next_frame() + uint64_t timeouts = 0; // CUFRAMES_ERR_TIMEOUT / WOULD_BLOCK + uint64_t errors = 0; // прочие fail'ы в next_frame() + uint64_t last_seq = 0; // seq последнего полученного frame'а + uint64_t gap_count = 0; // сколько раз seq[i] > seq[i-1] + 1 + // (proxy для drop count в NEWEST_ONLY mode) + int64_t last_frame_pts_ns = 0; +}; + class SubscriberWrapper { public: SubscriberWrapper( @@ -213,11 +354,13 @@ public: std::optional consumer_name, cuframes_subscriber_mode_t mode, int cuda_device, - int connect_timeout_ms + int connect_timeout_ms, + uintptr_t consumer_stream ) : key_(key), consumer_name_(consumer_name.value_or("")), mode_(mode), - cuda_device_(cuda_device) { + cuda_device_(cuda_device), + consumer_stream_(reinterpret_cast(consumer_stream)) { cuframes_subscriber_config_t cfg = {}; cfg.key = key_.c_str(); @@ -266,11 +409,27 @@ public: int err; { py::gil_scoped_release rel; - // consumer_stream = nullptr (default stream). - // В #201 добавим per-subscriber stream. - err = cuframes_subscriber_next(sub_, /*stream=*/nullptr, + // Используем persistent per-subscriber stream — все consumer'ы + // получают независимый cudaStreamWaitEvent, не серializуются + // через default stream. + err = cuframes_subscriber_next(sub_, consumer_stream_, &raw, timeout_ms); } + // Update health stats до check() — иначе при exception они не + // увеличатся, и оператору будет непонятно почему counters застыли. + if (err == CUFRAMES_OK) { + stats_.frames_received++; + uint64_t seq = cuframes_frame_seq(raw); + if (stats_.last_seq != 0 && seq > stats_.last_seq + 1) { + stats_.gap_count++; + } + stats_.last_seq = seq; + stats_.last_frame_pts_ns = cuframes_frame_pts_ns(raw); + } else if (err == CUFRAMES_ERR_TIMEOUT || err == CUFRAMES_ERR_WOULD_BLOCK) { + stats_.timeouts++; + } else { + stats_.errors++; + } check(err, "cuframes_subscriber_next"); return std::make_unique(sub_, raw); } @@ -279,6 +438,23 @@ public: const std::string& consumer_name() const { return consumer_name_; } cuframes_subscriber_mode_t mode() const { return mode_; } int cuda_device() const { return cuda_device_; } + const SubscriberStats& stats() const { return stats_; } + + // Snapshot stats как Python dict — для MQTT health publish. + py::dict stats_dict() const { + py::dict d; + d["frames_received"] = stats_.frames_received; + d["timeouts"] = stats_.timeouts; + d["errors"] = stats_.errors; + d["last_seq"] = stats_.last_seq; + d["gap_count"] = stats_.gap_count; + d["last_frame_pts_ns"] = stats_.last_frame_pts_ns; + return d; + } + + uintptr_t consumer_stream() const { + return reinterpret_cast(consumer_stream_); + } private: cuframes_subscriber_t* sub_ = nullptr; @@ -286,6 +462,13 @@ private: std::string consumer_name_; cuframes_subscriber_mode_t mode_; int cuda_device_; + // CUDA stream — opaque cudaStream_t. Передаётся снаружи как int + // (полученный через cuda-python / torch.cuda.Stream._as_parameter_). + // nullptr = default stream (только для smoke-тестов; в продакшене + // консумерам надо иметь свой stream чтобы избежать serialization + // через default). + void* consumer_stream_ = nullptr; + SubscriberStats stats_{}; }; } // namespace @@ -391,31 +574,116 @@ PYBIND11_MODULE(_native, m) { if (f.released()) return std::string(""); return std::string(""; - }); + }) + // ── DLPack export ─────────────────────────────────────────────── + // Multi-plane formats (NV12, YUV420P) — экспортируем планы отдельно + // как 2D uint8 tensors. Consumer строит логику склейки сам. + // Для single-plane (RGB/BGR/RGBA/GRAYSCALE) — __dlpack__() работает. + .def("dlpack_y", + [](py::object self) -> py::capsule { + auto& f = self.cast(); + f.check_alive(); + void* ptr = cuframes_frame_cuda_ptr(f.internal_frame()); + int32_t w, h; + cuframes_frame_size(f.internal_frame(), &w, &h); + int pitch = cuframes_frame_pitch_y(f.internal_frame()); + // Для NV12/YUV420P width = ширина в пикселях, Y занимает W байт/строка. + // Pitch (физическая строка в памяти) может быть > W. Передаём как stride. + // cuda_device извлекаем не из frame (нет API) — фиксируем 0 для default; + // task #201 добавит per-subscriber stream и реальный device. + return make_dlpack_capsule(ptr, h, w, pitch, /*cuda_device=*/0, self); + }, + "DLPack export Y-plane как 2D uint8 GPU tensor (shape=[H, W], stride=[pitch_y, 1]).\n" + "Работает для NV12, YUV420P, GRAYSCALE. Для других форматов — отдаёт первый plane.") + .def("dlpack_uv", + [](py::object self) -> py::capsule { + auto& f = self.cast(); + f.check_alive(); + auto fmt = cuframes_frame_format(f.internal_frame()); + if (fmt != CUFRAMES_FORMAT_NV12) { + PyErr_SetString(g_exc.invalid_argument.ptr(), + "dlpack_uv() only supported for NV12 format"); + throw py::error_already_set(); + } + void* base = cuframes_frame_cuda_ptr(f.internal_frame()); + int32_t w, h; + cuframes_frame_size(f.internal_frame(), &w, &h); + int pitch_y = cuframes_frame_pitch_y(f.internal_frame()); + int pitch_uv = cuframes_frame_pitch_uv(f.internal_frame()); + // NV12 layout: Y plane занимает pitch_y * h bytes, + // UV plane (interleaved U+V) следует сразу за ним. + void* uv_ptr = static_cast(base) + (size_t)pitch_y * h; + // UV plane размеры: H/2 строк, W колонок (interleaved U+V байты). + return make_dlpack_capsule(uv_ptr, h / 2, w, pitch_uv, /*cuda_device=*/0, self); + }, + "DLPack export UV-plane (interleaved) для NV12.\n" + "Shape=[H/2, W] uint8, stride=[pitch_uv, 1]. U и V interleaved\n" + "по байтам в последнем измерении (W = ширина в пикселях, но\n" + "каждый pixel = 2 байта U+V).") + .def("__dlpack__", + [](py::object self, py::object /*stream*/) -> py::capsule { + // PEP 3118 / DLPack protocol — single-plane access. + // Для NV12/YUV420P возвращает Y plane (это самый частый use + // case — motion detection / brightness работают только с Y). + // Если нужен UV — явно через .dlpack_uv(). + auto& f = self.cast(); + f.check_alive(); + void* ptr = cuframes_frame_cuda_ptr(f.internal_frame()); + int32_t w, h; + cuframes_frame_size(f.internal_frame(), &w, &h); + int pitch = cuframes_frame_pitch_y(f.internal_frame()); + return make_dlpack_capsule(ptr, h, w, pitch, /*cuda_device=*/0, self); + }, + py::arg("stream") = py::none(), + "DLPack protocol для torch.from_dlpack / cupy.from_dlpack.\n" + "Для NV12 возвращает Y plane. Для других planes — .dlpack_uv().") + .def("__dlpack_device__", + [](const FrameWrapper& f) -> py::tuple { + f.check_alive(); + // (device_type, device_id) — kDLCUDA=2, device 0 (task #201). + return py::make_tuple(2, 0); + }, + "DLPack device protocol — возвращает (kDLCUDA=2, device_id)."); // ── CuframesSubscriber ────────────────────────────────────────────── py::class_(m, "CuframesSubscriber", "Subscription на cuframes publisher.\n\n" "Создаётся через cuframes.subscribe(key, ...). Поддерживает context\n" "manager — close() при выходе из with-блока.\n\n" - "Thread-safety: handle принадлежит одному Python потоку. Cross-thread\n" - "access — undefined behavior. Несколько subscriber'ов в разных потоках\n" - "— OK (каждому свой handle).\n\n" - "GIL отпускается на время блокирующих вызовов (next_frame, close,\n" - "create) — другие Python потоки могут работать.") + "Thread-safety contract:\n" + " • Handle принадлежит одному Python потоку — создание и\n" + " все вызовы (next_frame, close) должны быть в одном thread.\n" + " • Несколько subscriber'ов в разных потоках — OK (каждому свой\n" + " handle, свой CUDA stream).\n" + " • Доступ к Frame после release() из другого потока — UB\n" + " (cuframes_frame_t* указывает в ring buffer publisher'а, после\n" + " release он может быть переписан).\n" + " • Внутренний GIL отпускается на длинных I/O вызовах\n" + " (subscriber_create, next_frame) — другие Python потоки могут\n" + " выполняться параллельно пока мы ждём frame.\n\n" + "CUDA stream:\n" + " consumer_stream передаётся как int (cudaStream_t как opaque\n" + " pointer). Получается через cuda-python (cudart.cudaStreamCreate)\n" + " или torch (torch.cuda.Stream()._as_parameter_). Если 0 —\n" + " default stream (serialization risk при нескольких subscriber'ах\n" + " в одном процессе).") .def(py::init, - cuframes_subscriber_mode_t, int, int>(), + cuframes_subscriber_mode_t, int, int, uintptr_t>(), py::arg("key"), py::arg("consumer_name") = py::none(), py::arg("mode") = CUFRAMES_MODE_NEWEST_ONLY, py::arg("cuda_device") = 0, py::arg("connect_timeout_ms") = -1, + py::arg("consumer_stream") = 0, "Создать subscription. Блокирует до publisher_ready или\n" - "connect_timeout_ms. -1 = ждать вечно, 0 = fail сразу.") + "connect_timeout_ms. -1 = ждать вечно, 0 = fail сразу.\n" + "consumer_stream: int representation cudaStream_t (0=default).") .def_property_readonly("key", &SubscriberWrapper::key) .def_property_readonly("consumer_name", &SubscriberWrapper::consumer_name) .def_property_readonly("mode", &SubscriberWrapper::mode) .def_property_readonly("cuda_device", &SubscriberWrapper::cuda_device) + .def_property_readonly("consumer_stream", &SubscriberWrapper::consumer_stream, + "Pointer на cudaStream_t (int). 0 = default stream.") .def_property_readonly("closed", &SubscriberWrapper::closed) .def("next_frame", &SubscriberWrapper::next_frame, py::arg("timeout_ms") = -1, @@ -426,6 +694,31 @@ PYBIND11_MODULE(_native, m) { "`with sub.next_frame() as frame: ...` для гарантии release.") .def("close", &SubscriberWrapper::close, "Закрыть subscription. Idempotent.") + // ── Health / stats ────────────────────────────────────────────── + // Phase 0: counted в pybind layer (cuframes C API не expose'ит + // ring_occupancy / drop_count напрямую). Эти counters достаточно + // для MQTT health publisher / monitoring. + .def_property_readonly("frames_received", + [](const SubscriberWrapper& s) { return s.stats().frames_received; }, + "Количество успешных next_frame() с момента subscribe.") + .def_property_readonly("timeouts", + [](const SubscriberWrapper& s) { return s.stats().timeouts; }, + "Сколько раз next_frame() вернул CuframesFrameTimeout.") + .def_property_readonly("errors", + [](const SubscriberWrapper& s) { return s.stats().errors; }, + "Сколько раз next_frame() упал с error (не timeout).") + .def_property_readonly("last_seq", + [](const SubscriberWrapper& s) { return s.stats().last_seq; }, + "Sequence number последнего полученного frame'а.") + .def_property_readonly("gap_count", + [](const SubscriberWrapper& s) { return s.stats().gap_count; }, + "Сколько раз seq[i] > seq[i-1] + 1 — proxy для drop count\n" + "в NEWEST_ONLY mode. В STRICT_ORDER должен оставаться 0.") + .def_property_readonly("last_frame_pts_ns", + [](const SubscriberWrapper& s) { return s.stats().last_frame_pts_ns; }) + .def("stats", + [](const SubscriberWrapper& s) { return s.stats_dict(); }, + "Snapshot всех health counters как dict — для MQTT health publish.") // context manager .def("__enter__", [](SubscriberWrapper& self) -> SubscriberWrapper& { self.check_alive(); @@ -448,18 +741,17 @@ PYBIND11_MODULE(_native, m) { std::optional consumer_name, cuframes_subscriber_mode_t mode, int cuda_device, - int connect_timeout_ms) { + int connect_timeout_ms, + uintptr_t consumer_stream) { return std::make_unique( - key, consumer_name, mode, cuda_device, connect_timeout_ms); + key, consumer_name, mode, cuda_device, + connect_timeout_ms, consumer_stream); }, py::arg("key"), py::arg("consumer_name") = py::none(), py::arg("mode") = CUFRAMES_MODE_NEWEST_ONLY, py::arg("cuda_device") = 0, py::arg("connect_timeout_ms") = -1, + py::arg("consumer_stream") = 0, "Создать CuframesSubscriber. Shortcut для CuframesSubscriber(...)."); - - // TODO(task #199): DLPack export — frame.__dlpack__() - // TODO(task #200): health/stats — sub.ring_occupancy, sub.drop_count - // TODO(task #201): consumer_stream parameter в subscribe/next_frame } diff --git a/python/tests/test_smoke.py b/python/tests/test_smoke.py index af936db..bc7e274 100644 --- a/python/tests/test_smoke.py +++ b/python/tests/test_smoke.py @@ -82,3 +82,31 @@ def test_subscriber_repr_when_unable_to_connect(): except cuframes.CuframesError: return # ожидаемо pytest.fail("subscribe должно было выкинуть exception") + + +def test_subscribe_accepts_consumer_stream_param(): + """consumer_stream — uintptr (cudaStream_t). + + Проверяем что параметр accepted; реальное использование требует + cuda-python / torch.cuda.Stream — это в integration тестах + yolo-world-detector'а. + """ + import pytest + with pytest.raises(cuframes.CuframesError): + cuframes.subscribe( + "nope-xyz", + connect_timeout_ms=100, + consumer_stream=0, # 0 = default stream + ) + + +def test_subscribe_kwargs_signature(): + """Проверяем что у subscribe правильный набор kwargs.""" + import inspect + # Pybind11-обёртки не дают inspect.signature, но help_doc отражает их. + doc = cuframes.subscribe.__doc__ + assert "consumer_name" in doc + assert "mode" in doc + assert "cuda_device" in doc + assert "connect_timeout_ms" in doc + assert "consumer_stream" in doc -- 2.52.0