#!/usr/bin/env python3 """ Reference Python consumer для cuframes (через ctypes wrapper). До v0.3 (когда появятся первоклассные pybind11 bindings) — это minimal working pattern для AI/ML скриптов которые хотят подписаться на cuframes IPC. Pattern: 1. subscribe to cuframes (open libcuframes.so via ctypes) 2. в цикле: получить next() frame 3. cudaMemcpy → host (через pycuda либо отдельной CUDA-Python библиотекой) 4. передать в свой ML pipeline (ONNX/TensorRT/PyTorch) 5. release frame обратно publisher'у Limitations: - Этот skeleton НЕ делает actual CUDA copy (нужна pycuda / cupy / cuda-python) - Только sync API - Только NV12 (v0.1) Запуск: python3 cuframes_consumer.py --key cam-parking --max-frames 100 Требования (на target host): - libcuframes.so в LD_LIBRARY_PATH (либо apt install / docker) - publisher запущен (cuframes-rtsp-source --key cam-parking ...) - same IPC + PID namespace что publisher (если в docker — ipc:container: + pid:container:) """ import argparse import ctypes import sys import time from ctypes import c_int, c_int32, c_int64, c_uint64, c_uint32, c_char_p, c_void_p, c_size_t, POINTER, Structure # ─── C API bindings ───────────────────────────────────────────────────── # Error codes CUFRAMES_OK = 0 CUFRAMES_ERR_TIMEOUT = -7 CUFRAMES_ERR_WOULD_BLOCK = -11 CUFRAMES_ERR_DISCONNECTED = -9 # Modes CUFRAMES_MODE_NEWEST_ONLY = 0 CUFRAMES_MODE_STRICT_ORDER = 1 # Pixel format CUFRAMES_FORMAT_NV12 = 0 class SubscriberConfig(Structure): """Соответствует C struct cuframes_subscriber_config.""" _fields_ = [ ("key", c_char_p), ("consumer_name", c_char_p), ("mode", c_int), ("cuda_device", c_int32), ("connect_timeout_ms", c_int32), ("_reserved", c_uint64 * 4), ] def _load_libcuframes(): """Загрузить libcuframes.so + bind ctypes signatures.""" try: lib = ctypes.CDLL("libcuframes.so.0") except OSError as e: sys.stderr.write(f"Cannot load libcuframes.so.0: {e}\n") sys.stderr.write("Установи libcuframes (см. cuframes README) и убедись что .so в LD_LIBRARY_PATH.\n") sys.exit(1) # cuframes_strerror lib.cuframes_strerror.argtypes = [c_int] lib.cuframes_strerror.restype = c_char_p # cuframes_subscriber_create lib.cuframes_subscriber_create.argtypes = [POINTER(SubscriberConfig), POINTER(c_void_p)] lib.cuframes_subscriber_create.restype = c_int # cuframes_subscriber_next (consumer_stream=NULL — sync API, default stream) lib.cuframes_subscriber_next.argtypes = [c_void_p, c_void_p, POINTER(c_void_p), c_int32] lib.cuframes_subscriber_next.restype = c_int # cuframes_subscriber_release lib.cuframes_subscriber_release.argtypes = [c_void_p, c_void_p] lib.cuframes_subscriber_release.restype = c_int # cuframes_subscriber_destroy lib.cuframes_subscriber_destroy.argtypes = [c_void_p] lib.cuframes_subscriber_destroy.restype = c_int # cuframes_frame_* accessors lib.cuframes_frame_cuda_ptr.argtypes = [c_void_p] lib.cuframes_frame_cuda_ptr.restype = c_void_p lib.cuframes_frame_size.argtypes = [c_void_p, POINTER(c_int32), POINTER(c_int32)] lib.cuframes_frame_size.restype = None lib.cuframes_frame_pitch_y.argtypes = [c_void_p] lib.cuframes_frame_pitch_y.restype = c_int32 lib.cuframes_frame_pitch_uv.argtypes = [c_void_p] lib.cuframes_frame_pitch_uv.restype = c_int32 lib.cuframes_frame_seq.argtypes = [c_void_p] lib.cuframes_frame_seq.restype = c_uint64 lib.cuframes_frame_pts_ns.argtypes = [c_void_p] lib.cuframes_frame_pts_ns.restype = c_int64 return lib # ─── Main consumer loop ──────────────────────────────────────────────── def main(): ap = argparse.ArgumentParser(description="Reference cuframes Python consumer") ap.add_argument("--key", required=True, help="publisher key (e.g. cam-parking)") ap.add_argument("--max-frames", type=int, default=100, help="N frames to receive (default 100)") ap.add_argument("--cuda-device", type=int, default=0) ap.add_argument("--timeout-ms", type=int, default=1000, help="per-frame timeout") args = ap.parse_args() lib = _load_libcuframes() # Configure subscriber cfg = SubscriberConfig() cfg.key = args.key.encode("utf-8") cfg.consumer_name = None # auto-generated cfg.mode = CUFRAMES_MODE_NEWEST_ONLY cfg.cuda_device = args.cuda_device cfg.connect_timeout_ms = 5000 sub_handle = c_void_p() rc = lib.cuframes_subscriber_create(ctypes.byref(cfg), ctypes.byref(sub_handle)) if rc != CUFRAMES_OK: sys.stderr.write(f"subscribe failed: {lib.cuframes_strerror(rc).decode()}\n") sys.exit(1) print(f"[consumer] connected to '{args.key}'") received = 0 first_pts = None start_wall = None try: while received < args.max_frames: frame_handle = c_void_p() rc = lib.cuframes_subscriber_next(sub_handle, None, ctypes.byref(frame_handle), args.timeout_ms) if rc == CUFRAMES_ERR_TIMEOUT or rc == CUFRAMES_ERR_WOULD_BLOCK: continue if rc == CUFRAMES_ERR_DISCONNECTED: print(f"[consumer] publisher disconnected — exit") break if rc != CUFRAMES_OK or not frame_handle.value: sys.stderr.write(f"next failed: {lib.cuframes_strerror(rc).decode()}\n") break # Frame metadata w, h = c_int32(0), c_int32(0) lib.cuframes_frame_size(frame_handle, ctypes.byref(w), ctypes.byref(h)) pitch_y = lib.cuframes_frame_pitch_y(frame_handle) pitch_uv = lib.cuframes_frame_pitch_uv(frame_handle) cuda_ptr = lib.cuframes_frame_cuda_ptr(frame_handle) seq = lib.cuframes_frame_seq(frame_handle) pts_ns = lib.cuframes_frame_pts_ns(frame_handle) if first_pts is None: first_pts = pts_ns start_wall = time.monotonic() print(f"[consumer] first frame: {w.value}x{h.value} NV12, " f"pitch_y={pitch_y}, pitch_uv={pitch_uv}, cuda_ptr=0x{cuda_ptr:x}") # ─── ВАШ ML PIPELINE ЗДЕСЬ ──────────────────────────── # 1. cudaMemcpy NV12 frame → host (или используй pycuda / cupy для in-GPU pipeline) # 2. NV12 → RGB conversion (CPU либо GPU) # 3. inference: model(frame) → results # 4. publish results (mqtt / API / etc) # # В этом skeleton — просто counter. received += 1 if received % 25 == 0: print(f"[consumer] received={received} seq={seq} pts_ms={pts_ns // 1_000_000}") # CRITICAL: release frame ОБЯЗАТЕЛЬНО — иначе publisher застрянет # (или drop new frames при ring overflow в STRICT_ORDER mode). lib.cuframes_subscriber_release(sub_handle, frame_handle) finally: lib.cuframes_subscriber_destroy(sub_handle) if received > 1 and start_wall: elapsed = time.monotonic() - start_wall fps = (received - 1) / elapsed if elapsed > 0 else 0 print(f"\n=== RESULT ===") print(f"received: {received} / {args.max_frames}") print(f"elapsed: {elapsed:.2f}s") print(f"avg_fps: {fps:.2f}") sys.exit(0 if received >= args.max_frames else 1) if __name__ == "__main__": main()