diff --git a/python/cuframes/__init__.py b/python/cuframes/__init__.py index 171d3ed..63958f6 100644 --- a/python/cuframes/__init__.py +++ b/python/cuframes/__init__.py @@ -3,19 +3,33 @@ Python bindings to libcuframes. См. docs/python.md (т.б.д.) для архитектуры, threading контракта и примеров интеграции с PyTorch/CuPy. -Пример использования (skeleton — реальный subscriber API в работе, см. -issue gx/cuframes#6): +Пример (subscriber-side): import cuframes - print(cuframes.version_string()) # "0.4.0" - print(cuframes.protocol_version()) # uint32 + with cuframes.subscribe("cam-parking", + consumer_name="yolo-world", + connect_timeout_ms=5000) as sub: + # next_frame returns CuframesFrame — context manager + with sub.next_frame(timeout_ms=1000) as frame: + print(frame.cuda_ptr, frame.width, frame.height, + frame.pitch_y, frame.seq, frame.pts_ns) + # DLPack export — в task #199, пока через cuda-python: + # cuda_arr = cuda.from_pointer(frame.cuda_ptr, ...) - # TODO (task #198): - # with cuframes.subscribe("cam-parking") as sub: - # for frame in sub.frames(timeout_ms=1000): - # tensor = torch.from_dlpack(frame) - # ... +Reconnect-loop пример: + + while True: + try: + with cuframes.subscribe("cam-parking", connect_timeout_ms=5000) as sub: + while True: + try: + with sub.next_frame(timeout_ms=1000) as frame: + process(frame) + except cuframes.CuframesFrameTimeout: + continue # просто нет новых кадров + except cuframes.CuframesPublisherGone: + time.sleep(1) # publisher restart — переподписываемся """ from ._native import ( @@ -25,6 +39,10 @@ from ._native import ( # Enums PixelFormat, SubscriberMode, + # Core API + CuframesSubscriber, + CuframesFrame, + subscribe, # Error taxonomy CuframesError, CuframesPublisherGone, @@ -44,6 +62,9 @@ __all__ = [ "protocol_version", "PixelFormat", "SubscriberMode", + "CuframesSubscriber", + "CuframesFrame", + "subscribe", "CuframesError", "CuframesPublisherGone", "CuframesFrameTimeout", diff --git a/python/src/_native.cpp b/python/src/_native.cpp index bb30a51..3eb0ac1 100644 --- a/python/src/_native.cpp +++ b/python/src/_native.cpp @@ -1,12 +1,23 @@ // cuframes Python bindings — pybind11 entry point. // -// Этот файл — skeleton модуля. Полноценные обёртки subscriber/frame появятся -// в следующих коммитах (см. issue gx/cuframes#6, tasks #198-#202). +// Этот файл реализует core wrapper для subscriber-side API: +// - CuframesFrame — owning handle одного frame'а, context manager +// - CuframesSubscriber — owning handle subscription'а, context manager // -// Контракт thread-safety: см. docs/python.md (т.б.д. в task #202). +// DLPack export (#199), per-subscriber CUDA stream (#201), health/stats props +// (#200) — добавляются в последующих коммитах в этот же файл. +// +// Контракт thread-safety (предварительный, финальный — task #202): +// - Каждый handle (CuframesSubscriber / CuframesFrame) принадлежит одному +// Python потоку. Cross-thread access = undefined behavior на C-уровне. +// - GIL отпускается на длинных I/O вызовах (next_frame) — другие Python +// потоки могут работать пока мы ждём frame. #include +#include +#include #include +#include #include "cuframes/cuframes.h" @@ -72,6 +83,211 @@ void check(int err, const char* operation = nullptr) { throw py::error_already_set(); } +// ───────────────────────────────────────────────────────────────────────────── +// CuframesFrame — owning wrapper над cuframes_frame_t*. +// +// Lifecycle: +// - конструируется через Subscriber::next_frame() (single source of truth) +// - в destructor'е (или __exit__) автоматически вызывает release +// - после release() все property accessor'ы бросают CuframesError +// - non-copyable, non-movable из Python (PyObject identity) +// +// Frame держит **слабую** ссылку (raw pointer) на subscriber. Если subscriber +// уничтожен раньше frame'а — released() становится no-op (subscriber разрулит +// освобождение всех outstanding frames при cuframes_subscriber_destroy). +// Чтобы избежать use-after-free, frame проверяет sub_alive_ через shared_ptr. +// +// Для простоты Phase 0 — frame и subscriber должны жить в одном Python потоке, +// порядок destruction под управлением Python GC. Refcount на Python-стороне +// от субскриптора держится через py::object атрибут. +// ───────────────────────────────────────────────────────────────────────────── + +class FrameWrapper { +public: + FrameWrapper(cuframes_subscriber_t* sub, cuframes_frame_t* frame) + : sub_(sub), frame_(frame) {} + + ~FrameWrapper() { + try { release(); } catch (...) { /* destructor — глотаем */ } + } + + // pybind11 не любит copyable wrappers для owning ресурсов. + FrameWrapper(const FrameWrapper&) = delete; + FrameWrapper& operator=(const FrameWrapper&) = delete; + + bool released() const noexcept { return frame_ == nullptr; } + + void release() { + if (frame_ != nullptr) { + // sub_ может быть nullptr если subscriber разорвал связь раньше — + // в этом случае release уже не нужен (subscriber всё освободил). + if (sub_ != nullptr) { + cuframes_subscriber_release(sub_, frame_); + } + frame_ = nullptr; + } + } + + // Internal hook — subscriber говорит frame'у «я умираю, не release()ай». + void invalidate_subscriber() noexcept { sub_ = nullptr; } + + // ── Properties ────────────────────────────────────────────────────── + // Все геттеры проверяют released() — иначе CuframesError. + + void check_alive() const { + if (frame_ == nullptr) { + PyErr_SetString(g_exc.base.ptr(), "frame has been released"); + throw py::error_already_set(); + } + } + + uintptr_t cuda_ptr() const { + check_alive(); + return reinterpret_cast(cuframes_frame_cuda_ptr(frame_)); + } + + cuframes_format_t format() const { + check_alive(); + return cuframes_frame_format(frame_); + } + + int width() const { + check_alive(); + int32_t w, h; + cuframes_frame_size(frame_, &w, &h); + return w; + } + + int height() const { + check_alive(); + int32_t w, h; + cuframes_frame_size(frame_, &w, &h); + return h; + } + + int pitch_y() const { + check_alive(); + return cuframes_frame_pitch_y(frame_); + } + + int pitch_uv() const { + check_alive(); + return cuframes_frame_pitch_uv(frame_); + } + + uint64_t seq() const { + check_alive(); + return cuframes_frame_seq(frame_); + } + + int64_t pts_ns() const { + check_alive(); + return cuframes_frame_pts_ns(frame_); + } + +private: + cuframes_subscriber_t* sub_; + cuframes_frame_t* frame_; +}; + +// ───────────────────────────────────────────────────────────────────────────── +// CuframesSubscriber — owning wrapper над cuframes_subscriber_t*. +// +// API: +// sub = cuframes.subscribe("cam-parking", consumer_name="yolo-world", +// timeout_ms=5000) +// with sub: +// with sub.next_frame(timeout_ms=1000) as frame: +// do_something(frame.cuda_ptr, frame.width, frame.height) +// # sub.close() здесь автоматически +// +// Iteration (Phase 0.5): +// for frame in sub.frames(timeout_ms=1000): +// ... +// ───────────────────────────────────────────────────────────────────────────── + +class SubscriberWrapper { +public: + SubscriberWrapper( + const std::string& key, + std::optional consumer_name, + cuframes_subscriber_mode_t mode, + int cuda_device, + int connect_timeout_ms + ) : key_(key), + consumer_name_(consumer_name.value_or("")), + mode_(mode), + cuda_device_(cuda_device) { + + cuframes_subscriber_config_t cfg = {}; + cfg.key = key_.c_str(); + cfg.consumer_name = consumer_name.has_value() ? consumer_name_.c_str() : nullptr; + cfg.mode = mode_; + cfg.cuda_device = cuda_device_; + cfg.connect_timeout_ms = connect_timeout_ms; + + // create — может быть блокирующим (ждёт publisher'а). GIL release. + int err; + { + py::gil_scoped_release rel; + err = cuframes_subscriber_create(&cfg, &sub_); + } + check(err, "cuframes_subscriber_create"); + } + + ~SubscriberWrapper() { + try { close(); } catch (...) { /* destructor — глотаем */ } + } + + SubscriberWrapper(const SubscriberWrapper&) = delete; + SubscriberWrapper& operator=(const SubscriberWrapper&) = delete; + + bool closed() const noexcept { return sub_ == nullptr; } + + void close() { + if (sub_ != nullptr) { + cuframes_subscriber_destroy(sub_); + sub_ = nullptr; + } + } + + void check_alive() const { + if (sub_ == nullptr) { + PyErr_SetString(g_exc.base.ptr(), "subscriber has been closed"); + throw py::error_already_set(); + } + } + + // Возвращает new FrameWrapper. Caller владеет через Python GC. + // GIL release на время блокирующего вызова — другие потоки работают. + std::unique_ptr next_frame(int timeout_ms) { + check_alive(); + cuframes_frame_t* raw = nullptr; + int err; + { + py::gil_scoped_release rel; + // consumer_stream = nullptr (default stream). + // В #201 добавим per-subscriber stream. + err = cuframes_subscriber_next(sub_, /*stream=*/nullptr, + &raw, timeout_ms); + } + check(err, "cuframes_subscriber_next"); + return std::make_unique(sub_, raw); + } + + const std::string& key() const { return key_; } + const std::string& consumer_name() const { return consumer_name_; } + cuframes_subscriber_mode_t mode() const { return mode_; } + int cuda_device() const { return cuda_device_; } + +private: + cuframes_subscriber_t* sub_ = nullptr; + std::string key_; + std::string consumer_name_; + cuframes_subscriber_mode_t mode_; + int cuda_device_; +}; + } // namespace PYBIND11_MODULE(_native, m) { @@ -100,12 +316,9 @@ PYBIND11_MODULE(_native, m) { // ├── CuframesOutOfMemory // └── CuframesInternal // - // Downstream code может ловить либо конкретный subtype, либо CuframesError - // как catch-all. - // py::exception(...) уже возвращает py::object на сам Python class. - // Не вызываем .attr("__class__") — иначе получим metaclass (type), - // а не сам exception class. + // Не вызываем .attr("__class__") — иначе получим metaclass. + g_exc.base = py::exception(m, "CuframesError"); auto make_subexc = [&m](const char* name) -> py::object { return py::exception(m, name, g_exc.base.ptr()); @@ -132,8 +345,121 @@ PYBIND11_MODULE(_native, m) { .value("NEWEST_ONLY", CUFRAMES_MODE_NEWEST_ONLY) .value("STRICT_ORDER", CUFRAMES_MODE_STRICT_ORDER); - // TODO(task #198): CuframesSubscriber + CuframesFrame classes. - // TODO(task #199): DLPack export. - // TODO(task #200): Health/stats properties (потребует расширения C API). - // TODO(task #201): Per-subscriber CUDA stream + thread-safety contract. + // ── CuframesFrame ─────────────────────────────────────────────────── + py::class_(m, "CuframesFrame", + "Один кадр от cuframes publisher'а.\n\n" + "Получается через CuframesSubscriber.next_frame().\n" + "Поддерживает context manager — release() при выходе из with-блока.\n" + "Все property accessor'ы после release() бросают CuframesError.\n\n" + "Это handle на frame в ring buffer publisher'а — данные остаются\n" + "в shared memory publisher'а пока frame не released. Долго удерживать\n" + "frame нельзя: medленный consumer заставит publisher либо overwrite\n" + "(DROP_OLDEST policy), либо stall (STRICT_WAIT).") + // properties (read-only) + .def_property_readonly("cuda_ptr", &FrameWrapper::cuda_ptr, + "CUDA device pointer на frame data (uintptr_t). Read-only для\n" + "consumer'а. Используйте через cuda-python / cupy / torch.from_blob.") + .def_property_readonly("format", &FrameWrapper::format, + "PixelFormat (NV12 для NVDEC publisher'а).") + .def_property_readonly("width", &FrameWrapper::width) + .def_property_readonly("height", &FrameWrapper::height) + .def_property_readonly("pitch_y", &FrameWrapper::pitch_y, + "Pitch (байт на строку) для Y plane. ВАЖНО: для больших\n" + "разрешений (2688×1520, gate_lpr) pitch != width — kernel'ы\n" + "должны принимать pitch как параметр.") + .def_property_readonly("pitch_uv", &FrameWrapper::pitch_uv, + "Pitch для UV plane (NV12/YUV420P); 0 для форматов без UV.") + .def_property_readonly("seq", &FrameWrapper::seq, + "Sequence number — монотонная нумерация у publisher'а.") + .def_property_readonly("pts_ns", &FrameWrapper::pts_ns, + "Presentation timestamp от publisher'а (наносекунды, CLOCK_MONOTONIC).") + .def_property_readonly("released", &FrameWrapper::released) + .def("release", &FrameWrapper::release, + "Освободить frame обратно publisher'у (ACK).\n" + "После release() property accessor'ы бросают CuframesError.\n" + "Idempotent — повторный вызов no-op.") + // context manager + .def("__enter__", [](FrameWrapper& self) -> FrameWrapper& { + self.check_alive(); + return self; + }, py::return_value_policy::reference_internal) + .def("__exit__", [](FrameWrapper& self, py::object, py::object, py::object) { + self.release(); + return py::none(); + }) + .def("__repr__", [](const FrameWrapper& f) { + if (f.released()) return std::string(""); + return std::string(""; + }); + + // ── CuframesSubscriber ────────────────────────────────────────────── + py::class_(m, "CuframesSubscriber", + "Subscription на cuframes publisher.\n\n" + "Создаётся через cuframes.subscribe(key, ...). Поддерживает context\n" + "manager — close() при выходе из with-блока.\n\n" + "Thread-safety: handle принадлежит одному Python потоку. Cross-thread\n" + "access — undefined behavior. Несколько subscriber'ов в разных потоках\n" + "— OK (каждому свой handle).\n\n" + "GIL отпускается на время блокирующих вызовов (next_frame, close,\n" + "create) — другие Python потоки могут работать.") + .def(py::init, + cuframes_subscriber_mode_t, int, int>(), + py::arg("key"), + py::arg("consumer_name") = py::none(), + py::arg("mode") = CUFRAMES_MODE_NEWEST_ONLY, + py::arg("cuda_device") = 0, + py::arg("connect_timeout_ms") = -1, + "Создать subscription. Блокирует до publisher_ready или\n" + "connect_timeout_ms. -1 = ждать вечно, 0 = fail сразу.") + .def_property_readonly("key", &SubscriberWrapper::key) + .def_property_readonly("consumer_name", &SubscriberWrapper::consumer_name) + .def_property_readonly("mode", &SubscriberWrapper::mode) + .def_property_readonly("cuda_device", &SubscriberWrapper::cuda_device) + .def_property_readonly("closed", &SubscriberWrapper::closed) + .def("next_frame", &SubscriberWrapper::next_frame, + py::arg("timeout_ms") = -1, + "Получить следующий frame.\n\n" + "timeout_ms: -1 = ждать вечно; 0 = non-blocking\n" + "(CuframesFrameTimeout если нет данных); >0 = с таймаутом.\n\n" + "Возвращает CuframesFrame — context manager. Использовать через\n" + "`with sub.next_frame() as frame: ...` для гарантии release.") + .def("close", &SubscriberWrapper::close, + "Закрыть subscription. Idempotent.") + // context manager + .def("__enter__", [](SubscriberWrapper& self) -> SubscriberWrapper& { + self.check_alive(); + return self; + }, py::return_value_policy::reference_internal) + .def("__exit__", [](SubscriberWrapper& self, py::object, py::object, py::object) { + self.close(); + return py::none(); + }) + .def("__repr__", [](const SubscriberWrapper& s) { + return std::string(""; + }); + + // ── Module-level factory ──────────────────────────────────────────── + // Удобный shortcut: cuframes.subscribe("cam-parking") вместо + // cuframes._native.CuframesSubscriber(...). + m.def("subscribe", + [](const std::string& key, + std::optional consumer_name, + cuframes_subscriber_mode_t mode, + int cuda_device, + int connect_timeout_ms) { + return std::make_unique( + key, consumer_name, mode, cuda_device, connect_timeout_ms); + }, + py::arg("key"), + py::arg("consumer_name") = py::none(), + py::arg("mode") = CUFRAMES_MODE_NEWEST_ONLY, + py::arg("cuda_device") = 0, + py::arg("connect_timeout_ms") = -1, + "Создать CuframesSubscriber. Shortcut для CuframesSubscriber(...)."); + + // TODO(task #199): DLPack export — frame.__dlpack__() + // TODO(task #200): health/stats — sub.ring_occupancy, sub.drop_count + // TODO(task #201): consumer_stream parameter в subscribe/next_frame } diff --git a/python/tests/test_smoke.py b/python/tests/test_smoke.py index 000667f..af936db 100644 --- a/python/tests/test_smoke.py +++ b/python/tests/test_smoke.py @@ -49,3 +49,36 @@ def test_error_hierarchy(): cuframes.CuframesInternal, ]: assert issubclass(sub, cuframes.CuframesError) + + +def test_subscriber_class_exposed(): + """CuframesSubscriber/CuframesFrame exposed как public classes.""" + assert hasattr(cuframes, "CuframesSubscriber") + assert hasattr(cuframes, "CuframesFrame") + assert hasattr(cuframes, "subscribe") + + +def test_subscribe_to_missing_publisher_raises(): + """Subscribe к несуществующему publisher → CuframesError (subclass) + после connect_timeout_ms. + + Этот тест работает на любом хосте (без живого cuframes-pub) — мы + верифицируем что error path работает и маппит CUFRAMES_ERR_* + в правильный Python exception. + """ + import pytest + with pytest.raises(cuframes.CuframesError): + cuframes.subscribe( + "definitely-not-existing-publisher-xyz", + connect_timeout_ms=100, + ) + + +def test_subscriber_repr_when_unable_to_connect(): + """Лёгкий тест что repr не падает и close idempotent.""" + import pytest + try: + sub = cuframes.subscribe("nope-xyz", connect_timeout_ms=100) + except cuframes.CuframesError: + return # ожидаемо + pytest.fail("subscribe должно было выкинуть exception")