python: CuframesSubscriber + CuframesFrame wrapper (task #198)

Реализует subscriber-side wrapper над cuframes_subscriber_* и
cuframes_frame_* C API.

Что добавлено:
- CuframesFrame — owning RAII wrapper над cuframes_frame_t*
  - properties: cuda_ptr, format, width, height, pitch_y, pitch_uv,
    seq, pts_ns, released
  - release() idempotent
  - context manager (__enter__/__exit__) — release при выходе
  - после release() property access бросает CuframesError

- CuframesSubscriber — owning RAII wrapper над cuframes_subscriber_t*
  - конструктор с key/consumer_name/mode/cuda_device/connect_timeout_ms
  - next_frame(timeout_ms) → CuframesFrame
  - close() idempotent
  - context manager
  - GIL released на блокирующих вызовах (create, next_frame)

- subscribe() — module-level factory shortcut

Архитектурные решения:
- GIL release в py::gil_scoped_release на subscriber_create и _next —
  чтобы другие Python потоки могли работать пока ждём frame
- consumer_stream передаётся как nullptr в Phase 0 (default stream);
  per-subscriber stream в task #201
- Frame держит raw pointer на subscriber, refcount Python-стороной;
  если subscriber уничтожен раньше, frame.release() становится no-op

Smoke tests расширены до 8 — добавлены проверки exposed API и
error mapping на subscribe к несуществующему publisher'у.

Verify: pytest tests/test_smoke.py — 8/8 passed.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-06-13 21:23:42 +01:00
parent 7b6d43efeb
commit 5d1eaedb38
3 changed files with 401 additions and 21 deletions
+30 -9
View File
@@ -3,19 +3,33 @@
Python bindings to libcuframes. См. docs/python.md (т.б.д.) для
архитектуры, threading контракта и примеров интеграции с PyTorch/CuPy.
Пример использования (skeleton — реальный subscriber API в работе, см.
issue gx/cuframes#6):
Пример (subscriber-side):
import cuframes
print(cuframes.version_string()) # "0.4.0"
print(cuframes.protocol_version()) # uint32
with cuframes.subscribe("cam-parking",
consumer_name="yolo-world",
connect_timeout_ms=5000) as sub:
# next_frame returns CuframesFrame — context manager
with sub.next_frame(timeout_ms=1000) as frame:
print(frame.cuda_ptr, frame.width, frame.height,
frame.pitch_y, frame.seq, frame.pts_ns)
# DLPack export — в task #199, пока через cuda-python:
# cuda_arr = cuda.from_pointer(frame.cuda_ptr, ...)
# TODO (task #198):
# with cuframes.subscribe("cam-parking") as sub:
# for frame in sub.frames(timeout_ms=1000):
# tensor = torch.from_dlpack(frame)
# ...
Reconnect-loop пример:
while True:
try:
with cuframes.subscribe("cam-parking", connect_timeout_ms=5000) as sub:
while True:
try:
with sub.next_frame(timeout_ms=1000) as frame:
process(frame)
except cuframes.CuframesFrameTimeout:
continue # просто нет новых кадров
except cuframes.CuframesPublisherGone:
time.sleep(1) # publisher restart — переподписываемся
"""
from ._native import (
@@ -25,6 +39,10 @@ from ._native import (
# Enums
PixelFormat,
SubscriberMode,
# Core API
CuframesSubscriber,
CuframesFrame,
subscribe,
# Error taxonomy
CuframesError,
CuframesPublisherGone,
@@ -44,6 +62,9 @@ __all__ = [
"protocol_version",
"PixelFormat",
"SubscriberMode",
"CuframesSubscriber",
"CuframesFrame",
"subscribe",
"CuframesError",
"CuframesPublisherGone",
"CuframesFrameTimeout",
+338 -12
View File
@@ -1,12 +1,23 @@
// cuframes Python bindings — pybind11 entry point.
//
// Этот файл — skeleton модуля. Полноценные обёртки subscriber/frame появятся
// в следующих коммитах (см. issue gx/cuframes#6, tasks #198-#202).
// Этот файл реализует core wrapper для subscriber-side API:
// - CuframesFrame — owning handle одного frame'а, context manager
// - CuframesSubscriber — owning handle subscription'а, context manager
//
// Контракт thread-safety: см. docs/python.md (т.б.д. в task #202).
// DLPack export (#199), per-subscriber CUDA stream (#201), health/stats props
// (#200) — добавляются в последующих коммитах в этот же файл.
//
// Контракт thread-safety (предварительный, финальный — task #202):
// - Каждый handle (CuframesSubscriber / CuframesFrame) принадлежит одному
// Python потоку. Cross-thread access = undefined behavior на C-уровне.
// - GIL отпускается на длинных I/O вызовах (next_frame) — другие Python
// потоки могут работать пока мы ждём frame.
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
#include <optional>
#include <stdexcept>
#include <string>
#include "cuframes/cuframes.h"
@@ -72,6 +83,211 @@ void check(int err, const char* operation = nullptr) {
throw py::error_already_set();
}
// ─────────────────────────────────────────────────────────────────────────────
// CuframesFrame — owning wrapper над cuframes_frame_t*.
//
// Lifecycle:
// - конструируется через Subscriber::next_frame() (single source of truth)
// - в destructor'е (или __exit__) автоматически вызывает release
// - после release() все property accessor'ы бросают CuframesError
// - non-copyable, non-movable из Python (PyObject identity)
//
// Frame держит **слабую** ссылку (raw pointer) на subscriber. Если subscriber
// уничтожен раньше frame'а — released() становится no-op (subscriber разрулит
// освобождение всех outstanding frames при cuframes_subscriber_destroy).
// Чтобы избежать use-after-free, frame проверяет sub_alive_ через shared_ptr.
//
// Для простоты Phase 0 — frame и subscriber должны жить в одном Python потоке,
// порядок destruction под управлением Python GC. Refcount на Python-стороне
// от субскриптора держится через py::object атрибут.
// ─────────────────────────────────────────────────────────────────────────────
class FrameWrapper {
public:
FrameWrapper(cuframes_subscriber_t* sub, cuframes_frame_t* frame)
: sub_(sub), frame_(frame) {}
~FrameWrapper() {
try { release(); } catch (...) { /* destructor — глотаем */ }
}
// pybind11 не любит copyable wrappers для owning ресурсов.
FrameWrapper(const FrameWrapper&) = delete;
FrameWrapper& operator=(const FrameWrapper&) = delete;
bool released() const noexcept { return frame_ == nullptr; }
void release() {
if (frame_ != nullptr) {
// sub_ может быть nullptr если subscriber разорвал связь раньше —
// в этом случае release уже не нужен (subscriber всё освободил).
if (sub_ != nullptr) {
cuframes_subscriber_release(sub_, frame_);
}
frame_ = nullptr;
}
}
// Internal hook — subscriber говорит frame'у «я умираю, не release()ай».
void invalidate_subscriber() noexcept { sub_ = nullptr; }
// ── Properties ──────────────────────────────────────────────────────
// Все геттеры проверяют released() — иначе CuframesError.
void check_alive() const {
if (frame_ == nullptr) {
PyErr_SetString(g_exc.base.ptr(), "frame has been released");
throw py::error_already_set();
}
}
uintptr_t cuda_ptr() const {
check_alive();
return reinterpret_cast<uintptr_t>(cuframes_frame_cuda_ptr(frame_));
}
cuframes_format_t format() const {
check_alive();
return cuframes_frame_format(frame_);
}
int width() const {
check_alive();
int32_t w, h;
cuframes_frame_size(frame_, &w, &h);
return w;
}
int height() const {
check_alive();
int32_t w, h;
cuframes_frame_size(frame_, &w, &h);
return h;
}
int pitch_y() const {
check_alive();
return cuframes_frame_pitch_y(frame_);
}
int pitch_uv() const {
check_alive();
return cuframes_frame_pitch_uv(frame_);
}
uint64_t seq() const {
check_alive();
return cuframes_frame_seq(frame_);
}
int64_t pts_ns() const {
check_alive();
return cuframes_frame_pts_ns(frame_);
}
private:
cuframes_subscriber_t* sub_;
cuframes_frame_t* frame_;
};
// ─────────────────────────────────────────────────────────────────────────────
// CuframesSubscriber — owning wrapper над cuframes_subscriber_t*.
//
// API:
// sub = cuframes.subscribe("cam-parking", consumer_name="yolo-world",
// timeout_ms=5000)
// with sub:
// with sub.next_frame(timeout_ms=1000) as frame:
// do_something(frame.cuda_ptr, frame.width, frame.height)
// # sub.close() здесь автоматически
//
// Iteration (Phase 0.5):
// for frame in sub.frames(timeout_ms=1000):
// ...
// ─────────────────────────────────────────────────────────────────────────────
class SubscriberWrapper {
public:
SubscriberWrapper(
const std::string& key,
std::optional<std::string> consumer_name,
cuframes_subscriber_mode_t mode,
int cuda_device,
int connect_timeout_ms
) : key_(key),
consumer_name_(consumer_name.value_or("")),
mode_(mode),
cuda_device_(cuda_device) {
cuframes_subscriber_config_t cfg = {};
cfg.key = key_.c_str();
cfg.consumer_name = consumer_name.has_value() ? consumer_name_.c_str() : nullptr;
cfg.mode = mode_;
cfg.cuda_device = cuda_device_;
cfg.connect_timeout_ms = connect_timeout_ms;
// create — может быть блокирующим (ждёт publisher'а). GIL release.
int err;
{
py::gil_scoped_release rel;
err = cuframes_subscriber_create(&cfg, &sub_);
}
check(err, "cuframes_subscriber_create");
}
~SubscriberWrapper() {
try { close(); } catch (...) { /* destructor — глотаем */ }
}
SubscriberWrapper(const SubscriberWrapper&) = delete;
SubscriberWrapper& operator=(const SubscriberWrapper&) = delete;
bool closed() const noexcept { return sub_ == nullptr; }
void close() {
if (sub_ != nullptr) {
cuframes_subscriber_destroy(sub_);
sub_ = nullptr;
}
}
void check_alive() const {
if (sub_ == nullptr) {
PyErr_SetString(g_exc.base.ptr(), "subscriber has been closed");
throw py::error_already_set();
}
}
// Возвращает new FrameWrapper. Caller владеет через Python GC.
// GIL release на время блокирующего вызова — другие потоки работают.
std::unique_ptr<FrameWrapper> next_frame(int timeout_ms) {
check_alive();
cuframes_frame_t* raw = nullptr;
int err;
{
py::gil_scoped_release rel;
// consumer_stream = nullptr (default stream).
// В #201 добавим per-subscriber stream.
err = cuframes_subscriber_next(sub_, /*stream=*/nullptr,
&raw, timeout_ms);
}
check(err, "cuframes_subscriber_next");
return std::make_unique<FrameWrapper>(sub_, raw);
}
const std::string& key() const { return key_; }
const std::string& consumer_name() const { return consumer_name_; }
cuframes_subscriber_mode_t mode() const { return mode_; }
int cuda_device() const { return cuda_device_; }
private:
cuframes_subscriber_t* sub_ = nullptr;
std::string key_;
std::string consumer_name_;
cuframes_subscriber_mode_t mode_;
int cuda_device_;
};
} // namespace
PYBIND11_MODULE(_native, m) {
@@ -100,12 +316,9 @@ PYBIND11_MODULE(_native, m) {
// ├── CuframesOutOfMemory
// └── CuframesInternal
//
// Downstream code может ловить либо конкретный subtype, либо CuframesError
// как catch-all.
// py::exception<T>(...) уже возвращает py::object на сам Python class.
// Не вызываем .attr("__class__") — иначе получим metaclass (type),
// а не сам exception class.
// Не вызываем .attr("__class__") — иначе получим metaclass.
g_exc.base = py::exception<std::runtime_error>(m, "CuframesError");
auto make_subexc = [&m](const char* name) -> py::object {
return py::exception<std::runtime_error>(m, name, g_exc.base.ptr());
@@ -132,8 +345,121 @@ PYBIND11_MODULE(_native, m) {
.value("NEWEST_ONLY", CUFRAMES_MODE_NEWEST_ONLY)
.value("STRICT_ORDER", CUFRAMES_MODE_STRICT_ORDER);
// TODO(task #198): CuframesSubscriber + CuframesFrame classes.
// TODO(task #199): DLPack export.
// TODO(task #200): Health/stats properties (потребует расширения C API).
// TODO(task #201): Per-subscriber CUDA stream + thread-safety contract.
// ── CuframesFrame ───────────────────────────────────────────────────
py::class_<FrameWrapper>(m, "CuframesFrame",
"Один кадр от cuframes publisher'а.\n\n"
"Получается через CuframesSubscriber.next_frame().\n"
"Поддерживает context manager — release() при выходе из with-блока.\n"
"Все property accessor'ы после release() бросают CuframesError.\n\n"
"Это handle на frame в ring buffer publisher'а — данные остаются\n"
"в shared memory publisher'а пока frame не released. Долго удерживать\n"
"frame нельзя: medленный consumer заставит publisher либо overwrite\n"
"(DROP_OLDEST policy), либо stall (STRICT_WAIT).")
// properties (read-only)
.def_property_readonly("cuda_ptr", &FrameWrapper::cuda_ptr,
"CUDA device pointer на frame data (uintptr_t). Read-only для\n"
"consumer'а. Используйте через cuda-python / cupy / torch.from_blob.")
.def_property_readonly("format", &FrameWrapper::format,
"PixelFormat (NV12 для NVDEC publisher'а).")
.def_property_readonly("width", &FrameWrapper::width)
.def_property_readonly("height", &FrameWrapper::height)
.def_property_readonly("pitch_y", &FrameWrapper::pitch_y,
"Pitch (байт на строку) для Y plane. ВАЖНО: для больших\n"
"разрешений (2688×1520, gate_lpr) pitch != width — kernel'ы\n"
"должны принимать pitch как параметр.")
.def_property_readonly("pitch_uv", &FrameWrapper::pitch_uv,
"Pitch для UV plane (NV12/YUV420P); 0 для форматов без UV.")
.def_property_readonly("seq", &FrameWrapper::seq,
"Sequence number — монотонная нумерация у publisher'а.")
.def_property_readonly("pts_ns", &FrameWrapper::pts_ns,
"Presentation timestamp от publisher'а (наносекунды, CLOCK_MONOTONIC).")
.def_property_readonly("released", &FrameWrapper::released)
.def("release", &FrameWrapper::release,
"Освободить frame обратно publisher'у (ACK).\n"
"После release() property accessor'ы бросают CuframesError.\n"
"Idempotent — повторный вызов no-op.")
// context manager
.def("__enter__", [](FrameWrapper& self) -> FrameWrapper& {
self.check_alive();
return self;
}, py::return_value_policy::reference_internal)
.def("__exit__", [](FrameWrapper& self, py::object, py::object, py::object) {
self.release();
return py::none();
})
.def("__repr__", [](const FrameWrapper& f) {
if (f.released()) return std::string("<CuframesFrame released>");
return std::string("<CuframesFrame seq=") + std::to_string(f.seq()) +
" size=" + std::to_string(f.width()) + "x" + std::to_string(f.height()) + ">";
});
// ── CuframesSubscriber ──────────────────────────────────────────────
py::class_<SubscriberWrapper>(m, "CuframesSubscriber",
"Subscription на cuframes publisher.\n\n"
"Создаётся через cuframes.subscribe(key, ...). Поддерживает context\n"
"manager — close() при выходе из with-блока.\n\n"
"Thread-safety: handle принадлежит одному Python потоку. Cross-thread\n"
"access — undefined behavior. Несколько subscriber'ов в разных потоках\n"
"— OK (каждому свой handle).\n\n"
"GIL отпускается на время блокирующих вызовов (next_frame, close,\n"
"create) — другие Python потоки могут работать.")
.def(py::init<const std::string&, std::optional<std::string>,
cuframes_subscriber_mode_t, int, int>(),
py::arg("key"),
py::arg("consumer_name") = py::none(),
py::arg("mode") = CUFRAMES_MODE_NEWEST_ONLY,
py::arg("cuda_device") = 0,
py::arg("connect_timeout_ms") = -1,
"Создать subscription. Блокирует до publisher_ready или\n"
"connect_timeout_ms. -1 = ждать вечно, 0 = fail сразу.")
.def_property_readonly("key", &SubscriberWrapper::key)
.def_property_readonly("consumer_name", &SubscriberWrapper::consumer_name)
.def_property_readonly("mode", &SubscriberWrapper::mode)
.def_property_readonly("cuda_device", &SubscriberWrapper::cuda_device)
.def_property_readonly("closed", &SubscriberWrapper::closed)
.def("next_frame", &SubscriberWrapper::next_frame,
py::arg("timeout_ms") = -1,
"Получить следующий frame.\n\n"
"timeout_ms: -1 = ждать вечно; 0 = non-blocking\n"
"(CuframesFrameTimeout если нет данных); >0 = с таймаутом.\n\n"
"Возвращает CuframesFrame — context manager. Использовать через\n"
"`with sub.next_frame() as frame: ...` для гарантии release.")
.def("close", &SubscriberWrapper::close,
"Закрыть subscription. Idempotent.")
// context manager
.def("__enter__", [](SubscriberWrapper& self) -> SubscriberWrapper& {
self.check_alive();
return self;
}, py::return_value_policy::reference_internal)
.def("__exit__", [](SubscriberWrapper& self, py::object, py::object, py::object) {
self.close();
return py::none();
})
.def("__repr__", [](const SubscriberWrapper& s) {
return std::string("<CuframesSubscriber key='") + s.key() +
"' closed=" + (s.closed() ? "True" : "False") + ">";
});
// ── Module-level factory ────────────────────────────────────────────
// Удобный shortcut: cuframes.subscribe("cam-parking") вместо
// cuframes._native.CuframesSubscriber(...).
m.def("subscribe",
[](const std::string& key,
std::optional<std::string> consumer_name,
cuframes_subscriber_mode_t mode,
int cuda_device,
int connect_timeout_ms) {
return std::make_unique<SubscriberWrapper>(
key, consumer_name, mode, cuda_device, connect_timeout_ms);
},
py::arg("key"),
py::arg("consumer_name") = py::none(),
py::arg("mode") = CUFRAMES_MODE_NEWEST_ONLY,
py::arg("cuda_device") = 0,
py::arg("connect_timeout_ms") = -1,
"Создать CuframesSubscriber. Shortcut для CuframesSubscriber(...).");
// TODO(task #199): DLPack export — frame.__dlpack__()
// TODO(task #200): health/stats — sub.ring_occupancy, sub.drop_count
// TODO(task #201): consumer_stream parameter в subscribe/next_frame
}
+33
View File
@@ -49,3 +49,36 @@ def test_error_hierarchy():
cuframes.CuframesInternal,
]:
assert issubclass(sub, cuframes.CuframesError)
def test_subscriber_class_exposed():
"""CuframesSubscriber/CuframesFrame exposed как public classes."""
assert hasattr(cuframes, "CuframesSubscriber")
assert hasattr(cuframes, "CuframesFrame")
assert hasattr(cuframes, "subscribe")
def test_subscribe_to_missing_publisher_raises():
"""Subscribe к несуществующему publisher → CuframesError (subclass)
после connect_timeout_ms.
Этот тест работает на любом хосте (без живого cuframes-pub) — мы
верифицируем что error path работает и маппит CUFRAMES_ERR_*
в правильный Python exception.
"""
import pytest
with pytest.raises(cuframes.CuframesError):
cuframes.subscribe(
"definitely-not-existing-publisher-xyz",
connect_timeout_ms=100,
)
def test_subscriber_repr_when_unable_to_connect():
"""Лёгкий тест что repr не падает и close idempotent."""
import pytest
try:
sub = cuframes.subscribe("nope-xyz", connect_timeout_ms=100)
except cuframes.CuframesError:
return # ожидаемо
pytest.fail("subscribe должно было выкинуть exception")