From 46c2b94939c1133174419f1a2e9373a48a1aee78 Mon Sep 17 00:00:00 2001 From: Evgeny Demchenko Date: Thu, 14 May 2026 23:21:30 +0100 Subject: [PATCH] libcuframes v0.1: producer + consumer (sync + async) + tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements Steps 3-6 of Phase 1 according to docs/protocol.md. libcuframes/src/: - internal.h (660 lines) — shared structs (byte-exact protocol.md layout) + _Static_assert на offsets/sizes - utils.c — error strings, frame size calc, now_ns, key validation - protocol.c — TLV framing для Unix socket с poll-based timeout - producer.c (~700 lines) — Step 3: * LIBRARY mode: cudaMalloc pool, IPC handle export * EXTERNAL mode: register user-provided pointers * cudaIpcEventHandle_t для cross-process sync (R1/R2) * Unix socket accept thread, handshake state machine * Bit allocation 1..31, name collision check (Y5) * STRICT_WAIT policy: timeout with dead-subscriber eviction - consumer.c (~400 lines) — Step 4: * Synchronous next() with poll-based wait * cudaStreamWaitEvent на consumer-stream (R1/R2) * Opaque cuframes_frame_t с accessor functions (Y6) * NEWEST_ONLY и STRICT_ORDER modes * ACK via atomic_fetch_or на bitmap - consumer_async.c — Step 5: thread + callback wrapper над sync API libcuframes/tests/: - test_pingpong.cu — single producer × single consumer, 200 frames @ 60fps, verify через kernel-on-consumer-stream (правильный test для sync semantics, см. spike-v2) - test_multi.cu — 1 producer × 3 consumers через fork() Build: - Top-level CMakeLists.txt с options - libcuframes/CMakeLists.txt: shared + static library, c_std_11 - Suppress -Waddress-of-packed-member (известная безопасная warning x86_64) Results (внутри cuframes-dev container, RTX 5090): - pingpong_basic PASS 4.5s 200 frames, 0 torn - multi_consumer PASS 4.1s 1 × 3 consumers, all PASS Phase 1 Step 6 done. Дальше: Step 7 (C++ wrapper), Step 9 (FFmpeg filter). --- CMakeLists.txt | 32 ++ libcuframes/CMakeLists.txt | 64 +++ libcuframes/src/consumer.c | 355 +++++++++++++++ libcuframes/src/consumer_async.c | 82 ++++ libcuframes/src/internal.h | 184 ++++++++ libcuframes/src/producer.c | 667 +++++++++++++++++++++++++++++ libcuframes/src/protocol.c | 99 +++++ libcuframes/src/utils.c | 149 +++++++ libcuframes/tests/CMakeLists.txt | 17 + libcuframes/tests/test_multi.cu | 142 ++++++ libcuframes/tests/test_pingpong.cu | 219 ++++++++++ 11 files changed, 2010 insertions(+) create mode 100644 CMakeLists.txt create mode 100644 libcuframes/CMakeLists.txt create mode 100644 libcuframes/src/consumer.c create mode 100644 libcuframes/src/consumer_async.c create mode 100644 libcuframes/src/internal.h create mode 100644 libcuframes/src/producer.c create mode 100644 libcuframes/src/protocol.c create mode 100644 libcuframes/src/utils.c create mode 100644 libcuframes/tests/CMakeLists.txt create mode 100644 libcuframes/tests/test_multi.cu create mode 100644 libcuframes/tests/test_pingpong.cu diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..6e1c5c7 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,32 @@ +cmake_minimum_required(VERSION 3.20) +project(cuframes + VERSION 0.1.0 + DESCRIPTION "Zero-copy frame sharing via CUDA IPC" + LANGUAGES C CXX CUDA +) + +set(CMAKE_C_STANDARD 11) +set(CMAKE_C_STANDARD_REQUIRED ON) +set(CMAKE_CXX_STANDARD 17) +set(CMAKE_CXX_STANDARD_REQUIRED ON) + +if(NOT CMAKE_BUILD_TYPE) + set(CMAKE_BUILD_TYPE Release) +endif() + +if(NOT DEFINED CMAKE_CUDA_ARCHITECTURES) + set(CMAKE_CUDA_ARCHITECTURES "75;86;89;90;120") +endif() + +option(BUILD_TESTING "Build tests" ON) +option(BUILD_EXAMPLES "Build examples" ON) +option(BUILD_FFMPEG_FILTER "Build FFmpeg vf_cuda_ipc_export filter" OFF) +option(BUILD_PYTHON_BINDINGS "Build Python bindings" OFF) + +enable_testing() + +add_subdirectory(libcuframes) + +if(BUILD_FFMPEG_FILTER) + add_subdirectory(filter) +endif() diff --git a/libcuframes/CMakeLists.txt b/libcuframes/CMakeLists.txt new file mode 100644 index 0000000..a5f6442 --- /dev/null +++ b/libcuframes/CMakeLists.txt @@ -0,0 +1,64 @@ +cmake_minimum_required(VERSION 3.20) + +# libcuframes — главная shared library +find_package(CUDAToolkit REQUIRED) +find_package(Threads REQUIRED) + +set(CUFRAMES_SOURCES + src/utils.c + src/protocol.c + src/producer.c + src/consumer.c + src/consumer_async.c +) + +add_library(cuframes SHARED ${CUFRAMES_SOURCES}) +add_library(cuframes_static STATIC ${CUFRAMES_SOURCES}) + +foreach(target cuframes cuframes_static) + target_include_directories(${target} + PUBLIC + $ + $ + PRIVATE + ${CMAKE_CURRENT_SOURCE_DIR}/src + ) + target_compile_features(${target} PRIVATE c_std_11) + target_compile_options(${target} PRIVATE + -Wall -Wextra -Wpedantic + -Wno-address-of-packed-member # известная проблема atomics на packed structs (x86_64 ok) + $<$:-O0 -g> + $<$:-O2 -g> + ) + target_link_libraries(${target} + PUBLIC + CUDA::cudart + Threads::Threads + rt # для shm_open + ) +endforeach() + +# Set SOVERSION на shared lib для ABI tracking +set_target_properties(cuframes PROPERTIES + VERSION 0.1.0 + SOVERSION 0 +) + +# Optionally — explicit visibility для exports. C-linkage уже handled extern "C" +# в header'е. Используем -fvisibility=hidden + указываем default для public symbols +# через __attribute__((visibility("default"))). Но мы это сделаем в v0.2 — пока +# default visibility внутри source files. +target_compile_definitions(cuframes PRIVATE CUFRAMES_BUILDING_DLL) +target_compile_definitions(cuframes_static PRIVATE CUFRAMES_STATIC_LIB) + +# Override default fvisibility (на текущем этапе экспортируем всё) +foreach(target cuframes cuframes_static) + set_target_properties(${target} PROPERTIES + C_VISIBILITY_PRESET default + ) +endforeach() + +# Tests +if(BUILD_TESTING) + add_subdirectory(tests) +endif() diff --git a/libcuframes/src/consumer.c b/libcuframes/src/consumer.c new file mode 100644 index 0000000..c6f4392 --- /dev/null +++ b/libcuframes/src/consumer.c @@ -0,0 +1,355 @@ +/* Subscriber implementation (sync). */ + +#include "internal.h" +#include +#include +#include +#include +#include +#include + +/* Opaque frame — выдаётся subscriber'у на next() */ +struct cuframes_frame { + void *cuda_ptr; + cuframes_format_t format; + int32_t width; + int32_t height; + int32_t pitch_y; + int32_t pitch_uv; + uint64_t seq; + int64_t pts_ns; + + uint32_t slot_idx; + void *subscriber; /* back-ref для release() */ +}; + +struct cuframes_subscriber { + cuframes_subscriber_config_t cfg; + char key[CUFRAMES_MAX_KEY_LEN + 1]; + + int sock_fd; + int shm_fd; + cuframes_shm_header_t *hdr; + char shm_name[80]; + + cudaEvent_t producer_event; + void *mapped_ptrs[CUFRAMES_MAX_RING]; + + uint32_t assigned_bit; + uint64_t last_seen_seq; + + /* Frame pool — переиспользуем одну frame_t structure (single-thread API). + * Опционально расширим до lock-free pool в v0.2 если нужен multi-frame. */ + struct cuframes_frame frame_obj; + int frame_busy; +}; + +/* ─── Frame accessors ────────────────────────────────────────────────── */ +void *cuframes_frame_cuda_ptr(const cuframes_frame_t *f) { return f ? f->cuda_ptr : NULL; } +cuframes_format_t cuframes_frame_format(const cuframes_frame_t *f) { return f ? f->format : 0; } +void cuframes_frame_size(const cuframes_frame_t *f, int32_t *w, int32_t *h) { + if (!f) return; + if (w) *w = f->width; + if (h) *h = f->height; +} +int32_t cuframes_frame_pitch_y(const cuframes_frame_t *f) { return f ? f->pitch_y : 0; } +int32_t cuframes_frame_pitch_uv(const cuframes_frame_t *f) { return f ? f->pitch_uv : 0; } +uint64_t cuframes_frame_seq(const cuframes_frame_t *f) { return f ? f->seq : 0; } +int64_t cuframes_frame_pts_ns(const cuframes_frame_t *f) { return f ? f->pts_ns : 0; } + +/* ─── Subscriber create ──────────────────────────────────────────────── */ + +static int do_handshake(struct cuframes_subscriber *sub, const char *name) { + /* Send HELLO_REQ */ + uint8_t buf[CUFRAMES_MAX_MSG_PAYLOAD]; + cuframes_msg_hello_req_t *hreq = (cuframes_msg_hello_req_t *)buf; + hreq->proto_version = CUFRAMES_PROTOCOL_V1; + uint32_t nl = name ? (uint32_t)strlen(name) : 0; + if (nl > 31) nl = 31; + hreq->consumer_name_len = nl; + if (nl > 0) memcpy(buf + sizeof(*hreq), name, nl); + uint8_t *tail = buf + sizeof(*hreq) + nl; + int32_t cuda_dev = sub->cfg.cuda_device; + uint32_t mode = (uint32_t)sub->cfg.mode; + memcpy(tail, &cuda_dev, 4); + memcpy(tail + 4, &mode, 4); + memset(tail + 8, 0, 12); + uint32_t plen = (uint32_t)(sizeof(*hreq) + nl + 20); + + int r = cuframes_internal_send_msg(sub->sock_fd, CUFRAMES_MSG_HELLO_REQ, + buf, plen); + if (r != CUFRAMES_OK) return r; + + /* Recv HELLO_RESP */ + uint32_t rmt = 0, rpl = sizeof(buf); + r = cuframes_internal_recv_msg(sub->sock_fd, &rmt, buf, &rpl, 5000); + if (r != CUFRAMES_OK) return r; + if (rmt != CUFRAMES_MSG_HELLO_RESP) return CUFRAMES_ERR_PROTOCOL; + + cuframes_msg_hello_resp_t *hresp = (cuframes_msg_hello_resp_t *)buf; + if (hresp->result != CUFRAMES_OK) return hresp->result; + + /* Send SUBSCRIBE_REQ */ + uint32_t srbuf[8]; + srbuf[0] = CUFRAMES_PROTOCOL_V1; + memset(srbuf + 1, 0, 28); + r = cuframes_internal_send_msg(sub->sock_fd, CUFRAMES_MSG_SUBSCRIBE_REQ, + srbuf, sizeof(srbuf)); + if (r != CUFRAMES_OK) return r; + + /* Recv SUBSCRIBE_RESP */ + cuframes_msg_subscribe_resp_t sresp; + rmt = 0; rpl = sizeof(sresp); + r = cuframes_internal_recv_msg(sub->sock_fd, &rmt, &sresp, &rpl, 5000); + if (r != CUFRAMES_OK) return r; + if (rmt != CUFRAMES_MSG_SUBSCRIBE_RESP) return CUFRAMES_ERR_PROTOCOL; + if (sresp.result != CUFRAMES_OK) return sresp.result; + + sub->assigned_bit = sresp.assigned_bit; + sub->last_seen_seq = sresp.initial_seq; /* start от текущей точки */ + return CUFRAMES_OK; +} + +int cuframes_subscriber_create(const cuframes_subscriber_config_t *cfg, + cuframes_subscriber_t **out) { + if (!cfg || !cfg->key || !out) return CUFRAMES_ERR_INVALID_ARG; + if (cuframes_internal_validate_key(cfg->key) != CUFRAMES_OK) + return CUFRAMES_ERR_INVALID_ARG; + + struct cuframes_subscriber *sub = calloc(1, sizeof(*sub)); + if (!sub) return CUFRAMES_ERR_OUT_OF_MEMORY; + sub->cfg = *cfg; + strncpy(sub->key, cfg->key, sizeof(sub->key) - 1); + sub->sock_fd = -1; + sub->shm_fd = -1; + + /* Generate fallback name if NULL */ + char name_buf[32]; + const char *name = cfg->consumer_name; + if (!name) { + snprintf(name_buf, sizeof(name_buf), "sub-%d-%lx", + (int)getpid(), (unsigned long)cuframes_now_ns() & 0xFFFFu); + name = name_buf; + } + + /* Build paths */ + char sock_path[128]; + int r = cuframes_internal_socket_path(cfg->key, sock_path, sizeof(sock_path)); + if (r != CUFRAMES_OK) { free(sub); return r; } + + /* Connect with timeout retry */ + int64_t deadline = cfg->connect_timeout_ms > 0 + ? cuframes_now_ns() + (int64_t)cfg->connect_timeout_ms * 1000000LL + : 0; + + while (1) { + sub->sock_fd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0); + if (sub->sock_fd < 0) { r = CUFRAMES_ERR_IO; goto fail; } + struct sockaddr_un sa = {.sun_family = AF_UNIX}; + strncpy(sa.sun_path, sock_path, sizeof(sa.sun_path) - 1); + if (connect(sub->sock_fd, (struct sockaddr *)&sa, sizeof(sa)) == 0) break; + close(sub->sock_fd); + sub->sock_fd = -1; + if (cfg->connect_timeout_ms == 0) { r = CUFRAMES_ERR_NOT_FOUND; goto fail; } + if (deadline && cuframes_now_ns() > deadline) { r = CUFRAMES_ERR_TIMEOUT; goto fail; } + struct timespec ts = {.tv_sec = 0, .tv_nsec = 100000000}; /* 100ms */ + nanosleep(&ts, NULL); + } + + /* Handshake */ + r = do_handshake(sub, name); + if (r != CUFRAMES_OK) goto fail; + + /* Open SHM */ + r = cuframes_internal_shm_name(cfg->key, sub->shm_name, sizeof(sub->shm_name)); + if (r != CUFRAMES_OK) goto fail; + sub->shm_fd = shm_open(sub->shm_name, O_RDWR, 0); + if (sub->shm_fd < 0) { + CUFRAMES_LOG_ERROR("shm_open %s: %s", sub->shm_name, strerror(errno)); + r = CUFRAMES_ERR_IO; goto fail; + } + sub->hdr = mmap(NULL, sizeof(cuframes_shm_header_t), + PROT_READ | PROT_WRITE, MAP_SHARED, sub->shm_fd, 0); + if (sub->hdr == MAP_FAILED) { + sub->hdr = NULL; + r = CUFRAMES_ERR_IO; goto fail; + } + if (sub->hdr->magic != CUFRAMES_MAGIC) { r = CUFRAMES_ERR_PROTOCOL; goto fail; } + + /* CUDA setup */ + cudaError_t cerr = cudaSetDevice(sub->cfg.cuda_device); + if (cerr != cudaSuccess) { + CUFRAMES_LOG_ERROR("cudaSetDevice: %s", cudaGetErrorString(cerr)); + r = CUFRAMES_ERR_CUDA; goto fail; + } + + /* Open producer's event */ + cerr = cudaIpcOpenEventHandle(&sub->producer_event, sub->hdr->ipc_event_handle); + if (cerr != cudaSuccess) { + CUFRAMES_LOG_ERROR("cudaIpcOpenEventHandle: %s", cudaGetErrorString(cerr)); + r = CUFRAMES_ERR_CUDA; goto fail; + } + + /* Open mem handles */ + int ring = (int)sub->hdr->ring_size; + if (ring > CUFRAMES_MAX_RING) ring = CUFRAMES_MAX_RING; + for (int i = 0; i < ring; ++i) { + cerr = cudaIpcOpenMemHandle(&sub->mapped_ptrs[i], + sub->hdr->slots[i].mem_handle, + cudaIpcMemLazyEnablePeerAccess); + if (cerr != cudaSuccess) { + CUFRAMES_LOG_ERROR("cudaIpcOpenMemHandle slot %d: %s", + i, cudaGetErrorString(cerr)); + r = CUFRAMES_ERR_CUDA; goto fail; + } + } + + CUFRAMES_LOG_INFO("subscriber '%s' connected to '%s' (bit=%u, ring=%d)", + name, sub->key, sub->assigned_bit, ring); + *out = sub; + return CUFRAMES_OK; + +fail: + cuframes_subscriber_destroy(sub); + return r; +} + +int cuframes_subscriber_next(cuframes_subscriber_t *sub, + void *consumer_stream, + cuframes_frame_t **frame_out, + int32_t timeout_ms) { + if (!sub || !frame_out) return CUFRAMES_ERR_INVALID_ARG; + if (sub->frame_busy) return CUFRAMES_ERR_INVALID_ARG; + if (atomic_load_explicit(&sub->hdr->shutdown_flag, + memory_order_acquire) != 0) { + return CUFRAMES_ERR_DISCONNECTED; + } + + int64_t deadline = (timeout_ms > 0) + ? cuframes_now_ns() + (int64_t)timeout_ms * 1000000LL + : 0; + + while (1) { + uint64_t gs = atomic_load_explicit(&sub->hdr->global_seq, + memory_order_acquire); + if (gs != UINT64_MAX && gs != sub->last_seen_seq) { + uint64_t target_seq; + if (sub->cfg.mode == CUFRAMES_MODE_NEWEST_ONLY) { + target_seq = gs; + } else { + /* STRICT_ORDER */ + if (sub->last_seen_seq == UINT64_MAX) { + target_seq = gs; + } else if (gs > sub->last_seen_seq + (uint64_t)sub->hdr->ring_size) { + /* Producer overran us. */ + return CUFRAMES_ERR_DISCONNECTED; + } else { + target_seq = sub->last_seen_seq + 1; + } + } + uint32_t slot_idx = (uint32_t)(target_seq % sub->hdr->ring_size); + uint64_t slot_seq = atomic_load_explicit(&sub->hdr->slots[slot_idx].seq, + memory_order_acquire); + if (slot_seq != target_seq) { + /* Slot уже перезаписан producer'ом — пересчитать */ + continue; + } + int64_t pts = atomic_load_explicit(&sub->hdr->slots[slot_idx].pts_ns, + memory_order_acquire); + + /* Cross-process sync: wait event on consumer's stream */ + if (consumer_stream) { + cudaError_t cerr = cudaStreamWaitEvent((cudaStream_t)consumer_stream, + sub->producer_event, 0); + if (cerr != cudaSuccess) { + CUFRAMES_LOG_WARN("cudaStreamWaitEvent: %s", + cudaGetErrorString(cerr)); + return CUFRAMES_ERR_CUDA; + } + } else { + /* Synchronize globally — для cudaMemcpyDeviceToHost users */ + cudaError_t cerr = cudaEventSynchronize(sub->producer_event); + if (cerr != cudaSuccess) return CUFRAMES_ERR_CUDA; + } + + /* Fill frame_out */ + struct cuframes_frame *f = &sub->frame_obj; + f->cuda_ptr = sub->mapped_ptrs[slot_idx]; + f->format = (cuframes_format_t)sub->hdr->meta.format; + f->width = sub->hdr->meta.width; + f->height = sub->hdr->meta.height; + f->pitch_y = sub->hdr->meta.pitch_y; + f->pitch_uv = sub->hdr->meta.pitch_uv; + f->seq = target_seq; + f->pts_ns = pts; + f->slot_idx = slot_idx; + f->subscriber = sub; + sub->frame_busy = 1; + sub->last_seen_seq = target_seq; + *frame_out = f; + return CUFRAMES_OK; + } + + /* Не было frame'ов */ + if (timeout_ms == 0) return CUFRAMES_ERR_WOULD_BLOCK; + if (timeout_ms > 0 && cuframes_now_ns() > deadline) return CUFRAMES_ERR_TIMEOUT; + + /* Poll-based wait (eventfd — v0.2). 50µs interval — компромисс + * latency vs CPU. */ + struct timespec ts = {.tv_sec = 0, .tv_nsec = 50000}; + nanosleep(&ts, NULL); + + if (atomic_load_explicit(&sub->hdr->shutdown_flag, + memory_order_acquire) != 0) { + return CUFRAMES_ERR_DISCONNECTED; + } + } +} + +int cuframes_subscriber_release(cuframes_subscriber_t *sub, + cuframes_frame_t *frame) { + if (!frame) return CUFRAMES_OK; + if (!sub || frame->subscriber != sub) return CUFRAMES_ERR_INVALID_ARG; + + /* ACK через bitmap */ + if (sub->assigned_bit > 0 && sub->assigned_bit < 64) { + atomic_fetch_or_explicit(&sub->hdr->slots[frame->slot_idx].ack_bitmap, + 1ULL << sub->assigned_bit, + memory_order_release); + atomic_store_explicit(&sub->hdr->subscribers[sub->assigned_bit].last_seen_seq, + frame->seq, memory_order_release); + atomic_store_explicit(&sub->hdr->subscribers[sub->assigned_bit].last_ack_ns, + cuframes_now_ns(), memory_order_release); + } + frame->cuda_ptr = NULL; + frame->subscriber = NULL; + sub->frame_busy = 0; + return CUFRAMES_OK; +} + +int cuframes_subscriber_destroy(cuframes_subscriber_t *sub) { + if (!sub) return CUFRAMES_OK; + + /* Clear subscriber bit */ + if (sub->hdr && sub->assigned_bit > 0) { + atomic_fetch_and_explicit(&sub->hdr->subscriber_bitmap, + ~(1ULL << sub->assigned_bit), + memory_order_release); + atomic_store_explicit(&sub->hdr->subscribers[sub->assigned_bit].state, + 0, memory_order_release); + } + + if (sub->producer_event) cudaEventDestroy(sub->producer_event); + + int ring = sub->hdr ? (int)sub->hdr->ring_size : 0; + if (ring > CUFRAMES_MAX_RING) ring = CUFRAMES_MAX_RING; + for (int i = 0; i < ring; ++i) { + if (sub->mapped_ptrs[i]) cudaIpcCloseMemHandle(sub->mapped_ptrs[i]); + } + + if (sub->hdr) munmap(sub->hdr, sizeof(cuframes_shm_header_t)); + if (sub->shm_fd >= 0) close(sub->shm_fd); + if (sub->sock_fd >= 0) close(sub->sock_fd); + free(sub); + return CUFRAMES_OK; +} diff --git a/libcuframes/src/consumer_async.c b/libcuframes/src/consumer_async.c new file mode 100644 index 0000000..d9fc850 --- /dev/null +++ b/libcuframes/src/consumer_async.c @@ -0,0 +1,82 @@ +/* Async-subscriber: thread + callback. Tonкий wrapper над sync API. */ + +#include "internal.h" +#include + +struct cuframes_async_subscriber { + cuframes_subscriber_t *inner; + pthread_t thread; + int thread_alive; + int stop_flag; + cuframes_frame_callback_t on_frame; + cuframes_error_callback_t on_error; + void *user_data; + cudaStream_t internal_stream; +}; + +static void *async_loop(void *arg) { + struct cuframes_async_subscriber *as = (struct cuframes_async_subscriber *)arg; + while (!as->stop_flag) { + cuframes_frame_t *frame = NULL; + int r = cuframes_subscriber_next(as->inner, + as->internal_stream, + &frame, 100); + if (r == CUFRAMES_OK) { + if (as->on_frame) as->on_frame(frame, as->user_data); + cuframes_subscriber_release(as->inner, frame); + } else if (r == CUFRAMES_ERR_TIMEOUT) { + /* normal — just poll again */ + } else if (r == CUFRAMES_ERR_WOULD_BLOCK) { + /* shouldn't with timeout>0, but be safe */ + struct timespec ts = {.tv_sec = 0, .tv_nsec = 1000000}; + nanosleep(&ts, NULL); + } else if (r == CUFRAMES_ERR_DISCONNECTED) { + if (as->on_error) as->on_error(r, "publisher disconnected", as->user_data); + break; + } else { + if (as->on_error) as->on_error(r, cuframes_strerror(r), as->user_data); + break; + } + } + return NULL; +} + +int cuframes_async_subscriber_create(const cuframes_subscriber_config_t *cfg, + cuframes_frame_callback_t on_frame, + cuframes_error_callback_t on_error, + void *user_data, + cuframes_async_subscriber_t **out) { + if (!cfg || !on_frame || !out) return CUFRAMES_ERR_INVALID_ARG; + struct cuframes_async_subscriber *as = calloc(1, sizeof(*as)); + if (!as) return CUFRAMES_ERR_OUT_OF_MEMORY; + as->on_frame = on_frame; + as->on_error = on_error; + as->user_data = user_data; + + int r = cuframes_subscriber_create(cfg, &as->inner); + if (r != CUFRAMES_OK) { free(as); return r; } + + cudaError_t cerr = cudaSetDevice(cfg->cuda_device); + if (cerr == cudaSuccess) cudaStreamCreate(&as->internal_stream); + + as->stop_flag = 0; + if (pthread_create(&as->thread, NULL, async_loop, as) != 0) { + cuframes_subscriber_destroy(as->inner); + if (as->internal_stream) cudaStreamDestroy(as->internal_stream); + free(as); + return CUFRAMES_ERR_INTERNAL; + } + as->thread_alive = 1; + *out = as; + return CUFRAMES_OK; +} + +int cuframes_async_subscriber_destroy(cuframes_async_subscriber_t *as) { + if (!as) return CUFRAMES_OK; + as->stop_flag = 1; + if (as->thread_alive) pthread_join(as->thread, NULL); + if (as->inner) cuframes_subscriber_destroy(as->inner); + if (as->internal_stream) cudaStreamDestroy(as->internal_stream); + free(as); + return CUFRAMES_OK; +} diff --git a/libcuframes/src/internal.h b/libcuframes/src/internal.h new file mode 100644 index 0000000..62b9090 --- /dev/null +++ b/libcuframes/src/internal.h @@ -0,0 +1,184 @@ +/* Internal shared types для libcuframes implementation. + * Не публикуется в include/. + * + * Layout соответствует docs/protocol.md (byte-exact). + */ + +#ifndef CUFRAMES_INTERNAL_H +#define CUFRAMES_INTERNAL_H + +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include + +#include "cuframes/cuframes.h" + +/* ─── Protocol constants ──────────────────────────────────────────────── */ + +#define CUFRAMES_MAGIC 0xCC7C1DCCu +#define CUFRAMES_PROTOCOL_V1 1u +#define CUFRAMES_MAX_SUBSCRIBERS 32 +#define CUFRAMES_MAX_RING 16 +#define CUFRAMES_MAX_KEY_LEN 63 +#define CUFRAMES_MAX_NAME_LEN 31 +#define CUFRAMES_RUNTIME_DIR "/run/cuframes" +#define CUFRAMES_SHM_PREFIX "/cuframes-" + +/* ─── Shared memory layout (см. docs/protocol.md §2) ──────────────────── */ + +/* Frame meta — packed 64 байт */ +typedef struct __attribute__((packed)) cuframes_shm_meta { + uint32_t format; + int32_t width; + int32_t height; + int32_t pitch_y; + int32_t pitch_uv; + uint32_t bits_per_pixel; + uint64_t frame_size_bytes; + uint8_t reserved[32]; +} cuframes_shm_meta_t; +_Static_assert(sizeof(cuframes_shm_meta_t) == 64, "shm meta must be 64 bytes"); + +/* Slot descriptor — packed 192 байт */ +typedef struct __attribute__((packed)) cuframes_shm_slot { + _Atomic uint64_t seq; /* UINT64_MAX = invalid */ + _Atomic int64_t pts_ns; + _Atomic uint64_t ack_bitmap; + uint64_t written_bytes; + cudaIpcMemHandle_t mem_handle; /* 64 байта */ + uint8_t cuda_ptr_external[32]; /* informative pointer бite-string */ + uint8_t reserved_a[16]; + uint8_t reserved_b[48]; +} cuframes_shm_slot_t; +_Static_assert(sizeof(cuframes_shm_slot_t) == 192, "slot descriptor must be 192 bytes"); + +/* Subscriber slot — packed 128 байт */ +typedef struct __attribute__((packed)) cuframes_shm_subscriber { + _Atomic uint64_t state; /* 0=free, 1=connecting, 2=active, 3=draining */ + uint64_t consumer_pid; + _Atomic uint64_t last_seen_seq; + _Atomic int64_t last_ack_ns; + char consumer_name[32]; /* null-terminated */ + uint8_t reserved[64]; +} cuframes_shm_subscriber_t; +_Static_assert(sizeof(cuframes_shm_subscriber_t) == 128, "subscriber slot must be 128 bytes"); + +/* Shared header (header + slots[N] + subscribers[M]). Total ≤ 8KB. */ +typedef struct __attribute__((packed)) cuframes_shm_header { + uint32_t magic; + uint32_t proto_version; + uint32_t lib_version_major; + uint32_t lib_version_minor; + uint32_t lib_version_patch; + uint32_t reserved_a; + uint64_t producer_pid; + uint64_t ring_size; + uint64_t ownership_mode; + uint64_t policy; + uint64_t max_subscribers; + cuframes_shm_meta_t meta; /* offset 0x40, 64 bytes */ + cudaIpcEventHandle_t ipc_event_handle; /* offset 0x80, 64 bytes */ + _Atomic uint64_t global_seq; /* offset 0xC0 */ + _Atomic uint64_t subscriber_bitmap; + _Atomic uint64_t shutdown_flag; + uint8_t reserved_b[40]; + /* offset 0x100 — variable-length tail */ + cuframes_shm_slot_t slots[CUFRAMES_MAX_RING]; /* 192 × 16 = 3072 */ + cuframes_shm_subscriber_t subscribers[CUFRAMES_MAX_SUBSCRIBERS]; /* 128 × 32 = 4096 */ +} cuframes_shm_header_t; + +/* Layout sanity checks (docs/protocol.md §2 table) */ +_Static_assert(offsetof(cuframes_shm_header_t, magic) == 0x0000, "magic offset"); +_Static_assert(offsetof(cuframes_shm_header_t, proto_version) == 0x0004, "proto_version offset"); +_Static_assert(offsetof(cuframes_shm_header_t, producer_pid) == 0x0018, "producer_pid offset"); +_Static_assert(offsetof(cuframes_shm_header_t, ring_size) == 0x0020, "ring_size offset"); +_Static_assert(offsetof(cuframes_shm_header_t, meta) == 0x0040, "meta offset"); +_Static_assert(offsetof(cuframes_shm_header_t, ipc_event_handle) == 0x0080, "event handle offset"); +_Static_assert(offsetof(cuframes_shm_header_t, global_seq) == 0x00C0, "global_seq offset"); +_Static_assert(offsetof(cuframes_shm_header_t, slots) == 0x0100, "slots offset"); + +/* ─── Socket protocol messages (docs/protocol.md §3) ───────────────────── */ + +#define CUFRAMES_MSG_HELLO_REQ 0x01 +#define CUFRAMES_MSG_HELLO_RESP 0x02 +#define CUFRAMES_MSG_SUBSCRIBE_REQ 0x03 +#define CUFRAMES_MSG_SUBSCRIBE_RESP 0x04 +#define CUFRAMES_MSG_UNSUBSCRIBE 0x10 +#define CUFRAMES_MSG_EVENT_FD 0x20 +#define CUFRAMES_MSG_SHUTDOWN 0x30 +#define CUFRAMES_MSG_PING 0xF0 +#define CUFRAMES_MSG_PONG 0xF1 +#define CUFRAMES_MSG_ERROR 0xFE + +#define CUFRAMES_MAX_MSG_PAYLOAD 4096 + +typedef struct __attribute__((packed)) cuframes_msg_header { + uint32_t msg_type; + uint32_t payload_length; +} cuframes_msg_header_t; + +typedef struct __attribute__((packed)) cuframes_msg_hello_req { + uint32_t proto_version; + uint32_t consumer_name_len; + /* followed by name bytes */ + /* + int32_t cuda_device + uint32_t mode + uint8_t reserved[12] */ +} cuframes_msg_hello_req_t; + +typedef struct __attribute__((packed)) cuframes_msg_hello_resp { + int32_t result; + uint32_t proto_version_actual; + uint32_t ring_size; + uint32_t ownership_mode; + cuframes_shm_meta_t meta; + uint32_t shm_path_len; + /* followed by path bytes */ + /* + uint8_t reserved[12] */ +} cuframes_msg_hello_resp_t; + +typedef struct __attribute__((packed)) cuframes_msg_subscribe_resp { + int32_t result; + uint32_t assigned_bit; + uint64_t initial_seq; + uint8_t reserved[12]; +} cuframes_msg_subscribe_resp_t; + +/* ─── Logging (minimal — to stderr) ────────────────────────────────────── */ + +#define CUFRAMES_LOG_ERROR(fmt, ...) \ + fprintf(stderr, "[cuframes ERROR] " fmt "\n", ##__VA_ARGS__) +#define CUFRAMES_LOG_WARN(fmt, ...) \ + fprintf(stderr, "[cuframes WARN] " fmt "\n", ##__VA_ARGS__) +#define CUFRAMES_LOG_INFO(fmt, ...) \ + do { if (getenv("CUFRAMES_DEBUG")) \ + fprintf(stderr, "[cuframes] " fmt "\n", ##__VA_ARGS__); } while (0) + +/* ─── Internal helpers ────────────────────────────────────────────────── */ + +/* Build path /run/cuframes/.sock — returns CUFRAMES_OK or INVALID_ARG */ +int cuframes_internal_socket_path(const char *key, char *out, size_t out_size); +/* Build /cuframes- (for shm_open) */ +int cuframes_internal_shm_name(const char *key, char *out, size_t out_size); +/* Validate key per protocol.md (alphanum/_/-, 1..63 chars) */ +int cuframes_internal_validate_key(const char *key); +/* Calculate frame size + pitch для format/W/H */ +int cuframes_internal_calc_size(cuframes_format_t format, int32_t w, int32_t h, + size_t *size_out, int32_t *pitch_y_out, int32_t *pitch_uv_out); +/* Ensure /run/cuframes exists */ +int cuframes_internal_ensure_runtime_dir(void); +/* Check if pid alive */ +int cuframes_internal_pid_alive(pid_t pid); + +/* TLV send/recv helpers — returns 0 on success, negative cuframes_error_t */ +int cuframes_internal_send_msg(int sock_fd, uint32_t msg_type, + const void *payload, uint32_t payload_len); +int cuframes_internal_recv_msg(int sock_fd, uint32_t *msg_type_out, + void *payload, uint32_t *payload_len_inout, + int32_t timeout_ms); + +#endif /* CUFRAMES_INTERNAL_H */ diff --git a/libcuframes/src/producer.c b/libcuframes/src/producer.c new file mode 100644 index 0000000..9011f17 --- /dev/null +++ b/libcuframes/src/producer.c @@ -0,0 +1,667 @@ +/* Publisher implementation (docs/protocol.md §1, §2, §3.2, §4.2, §5). */ + +#include "internal.h" +#include +#include +#include +#include +#include +#include +#include + +struct cuframes_publisher { + cuframes_publisher_config_t cfg; + char key[CUFRAMES_MAX_KEY_LEN + 1]; + + /* IPC resources */ + int shm_fd; + cuframes_shm_header_t *hdr; + int listen_fd; + char socket_path[128]; + char shm_name[80]; + + /* CUDA */ + cudaEvent_t event; + cudaIpcMemHandle_t ipc_mem[CUFRAMES_MAX_RING]; + void *cuda_ptrs[CUFRAMES_MAX_RING]; /* mapped pointers */ + size_t frame_size_bytes; + int32_t ring_size_actual; + + /* Acquire/publish state — LIBRARY ownership */ + uint64_t next_seq; + int32_t current_slot; /* индекс slot'а полученного через acquire() */ + int has_acquired; + + /* EXTERNAL ownership: map user pointer → ring index */ + void *external_ptrs[CUFRAMES_MAX_RING]; + int32_t external_count; + + /* Subscriber-management thread */ + pthread_t accept_thread; + int accept_thread_alive; + int stop_flag; + pthread_mutex_t state_mu; /* protects subscriber connections */ +}; + +/* Forward decls */ +static void *accept_thread_main(void *arg); +static int handshake_subscriber(struct cuframes_publisher *pub, int client_fd); + +/* ─── Internal: alloc/setup CUDA pool and SHM ─────────────────────────── */ + +static int alloc_library_pool(struct cuframes_publisher *pub) { + int r = cuframes_internal_calc_size(pub->cfg.format, + pub->cfg.width, pub->cfg.height, + &pub->frame_size_bytes, NULL, NULL); + if (r != CUFRAMES_OK) return r; + + pub->ring_size_actual = pub->cfg.ring_size; + + cudaError_t cerr = cudaSetDevice(pub->cfg.cuda_device); + if (cerr != cudaSuccess) { + CUFRAMES_LOG_ERROR("cudaSetDevice(%d): %s", + pub->cfg.cuda_device, cudaGetErrorString(cerr)); + return CUFRAMES_ERR_CUDA; + } + + for (int i = 0; i < pub->ring_size_actual; ++i) { + cerr = cudaMalloc(&pub->cuda_ptrs[i], pub->frame_size_bytes); + if (cerr != cudaSuccess) { + CUFRAMES_LOG_ERROR("cudaMalloc slot %d: %s", + i, cudaGetErrorString(cerr)); + return CUFRAMES_ERR_CUDA; + } + cerr = cudaIpcGetMemHandle(&pub->ipc_mem[i], pub->cuda_ptrs[i]); + if (cerr != cudaSuccess) { + CUFRAMES_LOG_ERROR("cudaIpcGetMemHandle slot %d: %s", + i, cudaGetErrorString(cerr)); + return CUFRAMES_ERR_CUDA; + } + } + return CUFRAMES_OK; +} + +static int register_external_pool(struct cuframes_publisher *pub, + void *const *ptrs, int32_t count, + size_t frame_size) { + if (count < 1 || count > CUFRAMES_MAX_RING) return CUFRAMES_ERR_INVALID_ARG; + pub->frame_size_bytes = frame_size; + pub->ring_size_actual = count; + pub->external_count = count; + + cudaError_t cerr = cudaSetDevice(pub->cfg.cuda_device); + if (cerr != cudaSuccess) { + CUFRAMES_LOG_ERROR("cudaSetDevice: %s", cudaGetErrorString(cerr)); + return CUFRAMES_ERR_CUDA; + } + for (int i = 0; i < count; ++i) { + if (!ptrs[i]) return CUFRAMES_ERR_INVALID_ARG; + pub->cuda_ptrs[i] = ptrs[i]; + pub->external_ptrs[i] = ptrs[i]; + cerr = cudaIpcGetMemHandle(&pub->ipc_mem[i], ptrs[i]); + if (cerr != cudaSuccess) { + CUFRAMES_LOG_ERROR("cudaIpcGetMemHandle on external ptr %p: %s", + ptrs[i], cudaGetErrorString(cerr)); + return CUFRAMES_ERR_CUDA; + } + } + return CUFRAMES_OK; +} + +static int create_event_handle(struct cuframes_publisher *pub) { + cudaError_t cerr = cudaEventCreateWithFlags(&pub->event, + cudaEventDisableTiming | cudaEventInterprocess); + if (cerr != cudaSuccess) { + CUFRAMES_LOG_ERROR("cudaEventCreateWithFlags: %s", + cudaGetErrorString(cerr)); + return CUFRAMES_ERR_CUDA; + } + return CUFRAMES_OK; +} + +static int setup_shm(struct cuframes_publisher *pub) { + /* /dev/shm/cuframes- */ + int r = cuframes_internal_shm_name(pub->key, pub->shm_name, + sizeof(pub->shm_name)); + if (r != CUFRAMES_OK) return r; + + pub->shm_fd = shm_open(pub->shm_name, O_CREAT | O_EXCL | O_RDWR, 0644); + if (pub->shm_fd < 0) { + if (errno == EEXIST) { + /* Check if previous owner alive */ + int existing = shm_open(pub->shm_name, O_RDWR, 0); + if (existing >= 0) { + cuframes_shm_header_t tmp; + ssize_t rb = read(existing, &tmp, sizeof(tmp)); + close(existing); + if (rb == (ssize_t)sizeof(tmp) && tmp.magic == CUFRAMES_MAGIC) { + if (cuframes_internal_pid_alive((pid_t)tmp.producer_pid)) { + CUFRAMES_LOG_ERROR("publisher with key=%s already running (pid %lu)", + pub->key, (unsigned long)tmp.producer_pid); + return CUFRAMES_ERR_ALREADY_EXISTS; + } + } + } + CUFRAMES_LOG_INFO("stale shm %s — unlinking", pub->shm_name); + shm_unlink(pub->shm_name); + pub->shm_fd = shm_open(pub->shm_name, O_CREAT | O_EXCL | O_RDWR, 0644); + if (pub->shm_fd < 0) return CUFRAMES_ERR_IO; + } else { + CUFRAMES_LOG_ERROR("shm_open: %s", strerror(errno)); + return CUFRAMES_ERR_IO; + } + } + + if (ftruncate(pub->shm_fd, sizeof(cuframes_shm_header_t)) < 0) { + CUFRAMES_LOG_ERROR("ftruncate shm: %s", strerror(errno)); + return CUFRAMES_ERR_IO; + } + + pub->hdr = (cuframes_shm_header_t *)mmap(NULL, sizeof(cuframes_shm_header_t), + PROT_READ | PROT_WRITE, MAP_SHARED, pub->shm_fd, 0); + if (pub->hdr == MAP_FAILED) { + CUFRAMES_LOG_ERROR("mmap shm: %s", strerror(errno)); + pub->hdr = NULL; + return CUFRAMES_ERR_IO; + } + memset(pub->hdr, 0, sizeof(cuframes_shm_header_t)); + + pub->hdr->magic = CUFRAMES_MAGIC; + pub->hdr->proto_version = CUFRAMES_PROTOCOL_V1; + pub->hdr->lib_version_major = CUFRAMES_VERSION_MAJOR; + pub->hdr->lib_version_minor = CUFRAMES_VERSION_MINOR; + pub->hdr->lib_version_patch = CUFRAMES_VERSION_PATCH; + pub->hdr->producer_pid = (uint64_t)getpid(); + pub->hdr->ring_size = (uint64_t)pub->ring_size_actual; + pub->hdr->ownership_mode = (uint64_t)pub->cfg.ownership; + pub->hdr->policy = (uint64_t)pub->cfg.policy; + pub->hdr->max_subscribers = CUFRAMES_MAX_SUBSCRIBERS; + + int32_t py = 0, puv = 0; + cuframes_internal_calc_size(pub->cfg.format, pub->cfg.width, pub->cfg.height, + NULL, &py, &puv); + pub->hdr->meta.format = (uint32_t)pub->cfg.format; + pub->hdr->meta.width = pub->cfg.width; + pub->hdr->meta.height = pub->cfg.height; + pub->hdr->meta.pitch_y = py; + pub->hdr->meta.pitch_uv = puv; + pub->hdr->meta.frame_size_bytes = pub->frame_size_bytes; + + /* Export event handle */ + cudaError_t cerr = cudaIpcGetEventHandle(&pub->hdr->ipc_event_handle, pub->event); + if (cerr != cudaSuccess) { + CUFRAMES_LOG_ERROR("cudaIpcGetEventHandle: %s", cudaGetErrorString(cerr)); + return CUFRAMES_ERR_CUDA; + } + + /* Fill slot descriptors */ + for (int i = 0; i < pub->ring_size_actual; ++i) { + pub->hdr->slots[i].mem_handle = pub->ipc_mem[i]; + atomic_store_explicit(&pub->hdr->slots[i].seq, UINT64_MAX, + memory_order_release); + } + atomic_store_explicit(&pub->hdr->global_seq, UINT64_MAX, memory_order_release); + + return CUFRAMES_OK; +} + +static int setup_socket(struct cuframes_publisher *pub) { + int r = cuframes_internal_ensure_runtime_dir(); + if (r != CUFRAMES_OK) return r; + + r = cuframes_internal_socket_path(pub->key, pub->socket_path, + sizeof(pub->socket_path)); + if (r != CUFRAMES_OK) return r; + + /* Cleanup stale socket */ + struct stat st; + if (stat(pub->socket_path, &st) == 0) { + /* Try connect — if works, busy */ + int probe = socket(AF_UNIX, SOCK_STREAM, 0); + if (probe >= 0) { + struct sockaddr_un sa = {.sun_family = AF_UNIX}; + strncpy(sa.sun_path, pub->socket_path, sizeof(sa.sun_path) - 1); + int c = connect(probe, (struct sockaddr *)&sa, sizeof(sa)); + close(probe); + if (c == 0) { + CUFRAMES_LOG_ERROR("socket %s is in use", pub->socket_path); + return CUFRAMES_ERR_ALREADY_EXISTS; + } + } + unlink(pub->socket_path); + } + + pub->listen_fd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0); + if (pub->listen_fd < 0) { + CUFRAMES_LOG_ERROR("socket: %s", strerror(errno)); + return CUFRAMES_ERR_IO; + } + struct sockaddr_un sa = {.sun_family = AF_UNIX}; + strncpy(sa.sun_path, pub->socket_path, sizeof(sa.sun_path) - 1); + if (bind(pub->listen_fd, (struct sockaddr *)&sa, sizeof(sa)) < 0) { + CUFRAMES_LOG_ERROR("bind %s: %s", pub->socket_path, strerror(errno)); + return CUFRAMES_ERR_IO; + } + if (listen(pub->listen_fd, 8) < 0) { + CUFRAMES_LOG_ERROR("listen: %s", strerror(errno)); + return CUFRAMES_ERR_IO; + } + return CUFRAMES_OK; +} + +/* ─── Public API ──────────────────────────────────────────────────────── */ + +static int validate_config(const cuframes_publisher_config_t *cfg) { + if (!cfg) return CUFRAMES_ERR_INVALID_ARG; + if (!cfg->key) return CUFRAMES_ERR_INVALID_ARG; + if (cuframes_internal_validate_key(cfg->key) != CUFRAMES_OK) + return CUFRAMES_ERR_INVALID_ARG; + if (cfg->width <= 0 || cfg->height <= 0) return CUFRAMES_ERR_INVALID_ARG; + if (cfg->ownership != CUFRAMES_OWNERSHIP_LIBRARY && + cfg->ownership != CUFRAMES_OWNERSHIP_EXTERNAL) + return CUFRAMES_ERR_INVALID_ARG; + if (cfg->ownership == CUFRAMES_OWNERSHIP_LIBRARY) { + if (cfg->ring_size < 2 || cfg->ring_size > CUFRAMES_MAX_RING) + return CUFRAMES_ERR_INVALID_ARG; + } + if (cfg->policy != CUFRAMES_POLICY_DROP_OLDEST && + cfg->policy != CUFRAMES_POLICY_STRICT_WAIT) + return CUFRAMES_ERR_INVALID_ARG; + return CUFRAMES_OK; +} + +static int common_init(struct cuframes_publisher *pub, + const cuframes_publisher_config_t *cfg) { + pub->cfg = *cfg; + strncpy(pub->key, cfg->key, sizeof(pub->key) - 1); + pub->key[sizeof(pub->key) - 1] = '\0'; + pub->shm_fd = -1; + pub->listen_fd = -1; + pub->next_seq = 0; + pub->current_slot = -1; + pub->has_acquired = 0; + pthread_mutex_init(&pub->state_mu, NULL); + return CUFRAMES_OK; +} + +int cuframes_publisher_create(const cuframes_publisher_config_t *cfg, + cuframes_publisher_t **out) { + int r = validate_config(cfg); + if (r != CUFRAMES_OK) return r; + if (cfg->ownership != CUFRAMES_OWNERSHIP_LIBRARY) return CUFRAMES_ERR_INVALID_ARG; + + struct cuframes_publisher *pub = calloc(1, sizeof(*pub)); + if (!pub) return CUFRAMES_ERR_OUT_OF_MEMORY; + common_init(pub, cfg); + + if ((r = alloc_library_pool(pub)) != CUFRAMES_OK) goto fail; + if ((r = create_event_handle(pub)) != CUFRAMES_OK) goto fail; + if ((r = setup_shm(pub)) != CUFRAMES_OK) goto fail; + if ((r = setup_socket(pub)) != CUFRAMES_OK) goto fail; + + /* Start accept thread */ + pub->stop_flag = 0; + if (pthread_create(&pub->accept_thread, NULL, accept_thread_main, pub) != 0) { + r = CUFRAMES_ERR_INTERNAL; + goto fail; + } + pub->accept_thread_alive = 1; + + CUFRAMES_LOG_INFO("publisher '%s' ready (ring=%d, %dx%d, fmt=%d, lib-owned)", + pub->key, pub->ring_size_actual, + pub->cfg.width, pub->cfg.height, (int)pub->cfg.format); + *out = pub; + return CUFRAMES_OK; + +fail: + cuframes_publisher_destroy(pub); + return r; +} + +int cuframes_publisher_create_external(const cuframes_publisher_config_t *cfg, + void *const *cuda_ptrs, + int32_t ptr_count, + size_t frame_size, + cuframes_publisher_t **out) { + int r = validate_config(cfg); + if (r != CUFRAMES_OK) return r; + if (cfg->ownership != CUFRAMES_OWNERSHIP_EXTERNAL) return CUFRAMES_ERR_INVALID_ARG; + if (!cuda_ptrs || ptr_count < 1) return CUFRAMES_ERR_INVALID_ARG; + if (frame_size == 0) return CUFRAMES_ERR_INVALID_ARG; + + struct cuframes_publisher *pub = calloc(1, sizeof(*pub)); + if (!pub) return CUFRAMES_ERR_OUT_OF_MEMORY; + common_init(pub, cfg); + + if ((r = register_external_pool(pub, cuda_ptrs, ptr_count, frame_size)) != CUFRAMES_OK) + goto fail; + if ((r = create_event_handle(pub)) != CUFRAMES_OK) goto fail; + if ((r = setup_shm(pub)) != CUFRAMES_OK) goto fail; + if ((r = setup_socket(pub)) != CUFRAMES_OK) goto fail; + + pub->stop_flag = 0; + if (pthread_create(&pub->accept_thread, NULL, accept_thread_main, pub) != 0) { + r = CUFRAMES_ERR_INTERNAL; + goto fail; + } + pub->accept_thread_alive = 1; + + CUFRAMES_LOG_INFO("publisher '%s' ready (external pool=%d, %dx%d, fmt=%d)", + pub->key, ptr_count, + pub->cfg.width, pub->cfg.height, (int)pub->cfg.format); + *out = pub; + return CUFRAMES_OK; +fail: + cuframes_publisher_destroy(pub); + return r; +} + +int cuframes_publisher_acquire(cuframes_publisher_t *pub, void **cuda_ptr_out) { + if (!pub || !cuda_ptr_out) return CUFRAMES_ERR_INVALID_ARG; + if (pub->cfg.ownership != CUFRAMES_OWNERSHIP_LIBRARY) return CUFRAMES_ERR_INVALID_ARG; + if (pub->has_acquired) return CUFRAMES_ERR_INVALID_ARG; + + int32_t slot = (int32_t)(pub->next_seq % (uint64_t)pub->ring_size_actual); + + /* STRICT_WAIT: ждём пока ACK bitmap всех subscribers очистится для этого slot */ + if (pub->cfg.policy == CUFRAMES_POLICY_STRICT_WAIT) { + uint64_t bitmap = atomic_load_explicit(&pub->hdr->subscriber_bitmap, + memory_order_acquire); + if (bitmap != 0) { + int64_t deadline = pub->cfg.consumer_ack_timeout_ms > 0 + ? cuframes_now_ns() + (int64_t)pub->cfg.consumer_ack_timeout_ms * 1000000LL + : 0; + while (1) { + uint64_t ack = atomic_load_explicit(&pub->hdr->slots[slot].ack_bitmap, + memory_order_acquire); + /* Если slot ещё не публикован (seq == UINT64_MAX) — пропустить ack check */ + uint64_t cur_seq = atomic_load_explicit(&pub->hdr->slots[slot].seq, + memory_order_acquire); + if (cur_seq == UINT64_MAX || (ack & bitmap) == bitmap) break; + if (deadline && cuframes_now_ns() > deadline) { + /* Mark slow subscriber dead и continue */ + uint64_t missing = bitmap & ~ack; + CUFRAMES_LOG_WARN("strict-wait timeout, slow subscribers bitmap=0x%lx", + (unsigned long)missing); + /* clear missing subscribers — TODO: send unsubscribe in v0.2 */ + atomic_fetch_and_explicit(&pub->hdr->subscriber_bitmap, + ~missing, memory_order_release); + break; + } + struct timespec ts = {.tv_sec = 0, .tv_nsec = 100000}; /* 100µs */ + nanosleep(&ts, NULL); + } + } + } + + *cuda_ptr_out = pub->cuda_ptrs[slot]; + pub->current_slot = slot; + pub->has_acquired = 1; + return CUFRAMES_OK; +} + +static int do_publish(cuframes_publisher_t *pub, int32_t slot, + void *stream, int64_t pts_ns) { + /* Record event on producer's stream */ + cudaError_t cerr = cudaEventRecord(pub->event, (cudaStream_t)stream); + if (cerr != cudaSuccess) { + CUFRAMES_LOG_ERROR("cudaEventRecord: %s", cudaGetErrorString(cerr)); + return CUFRAMES_ERR_CUDA; + } + + /* Reset ack bitmap для нового frame'а */ + atomic_store_explicit(&pub->hdr->slots[slot].ack_bitmap, 0, + memory_order_release); + atomic_store_explicit(&pub->hdr->slots[slot].pts_ns, pts_ns, + memory_order_release); + atomic_store_explicit(&pub->hdr->slots[slot].seq, pub->next_seq, + memory_order_release); + + /* Publication barrier */ + atomic_store_explicit(&pub->hdr->global_seq, pub->next_seq, + memory_order_release); + + pub->next_seq++; + return CUFRAMES_OK; +} + +int cuframes_publisher_publish(cuframes_publisher_t *pub, void *stream, int64_t pts_ns) { + if (!pub) return CUFRAMES_ERR_INVALID_ARG; + if (pub->cfg.ownership != CUFRAMES_OWNERSHIP_LIBRARY) return CUFRAMES_ERR_INVALID_ARG; + if (!pub->has_acquired) return CUFRAMES_ERR_INVALID_ARG; + + int r = do_publish(pub, pub->current_slot, stream, pts_ns); + pub->has_acquired = 0; + pub->current_slot = -1; + return r; +} + +int cuframes_publisher_publish_external(cuframes_publisher_t *pub, + void *cuda_ptr, void *stream, int64_t pts_ns) { + if (!pub || !cuda_ptr) return CUFRAMES_ERR_INVALID_ARG; + if (pub->cfg.ownership != CUFRAMES_OWNERSHIP_EXTERNAL) return CUFRAMES_ERR_INVALID_ARG; + + int32_t slot = -1; + for (int i = 0; i < pub->external_count; ++i) { + if (pub->external_ptrs[i] == cuda_ptr) { slot = i; break; } + } + if (slot < 0) { + CUFRAMES_LOG_ERROR("external pointer %p not registered", cuda_ptr); + return CUFRAMES_ERR_INVALID_ARG; + } + + /* STRICT_WAIT — то же что в acquire, но per-publish */ + if (pub->cfg.policy == CUFRAMES_POLICY_STRICT_WAIT) { + uint64_t bitmap = atomic_load_explicit(&pub->hdr->subscriber_bitmap, + memory_order_acquire); + if (bitmap != 0) { + int64_t deadline = pub->cfg.consumer_ack_timeout_ms > 0 + ? cuframes_now_ns() + (int64_t)pub->cfg.consumer_ack_timeout_ms * 1000000LL + : 0; + while (1) { + uint64_t ack = atomic_load_explicit(&pub->hdr->slots[slot].ack_bitmap, + memory_order_acquire); + uint64_t cur_seq = atomic_load_explicit(&pub->hdr->slots[slot].seq, + memory_order_acquire); + if (cur_seq == UINT64_MAX || (ack & bitmap) == bitmap) break; + if (deadline && cuframes_now_ns() > deadline) { + uint64_t missing = bitmap & ~ack; + atomic_fetch_and_explicit(&pub->hdr->subscriber_bitmap, + ~missing, memory_order_release); + break; + } + struct timespec ts = {.tv_sec = 0, .tv_nsec = 100000}; + nanosleep(&ts, NULL); + } + } + } + return do_publish(pub, slot, stream, pts_ns); +} + +int cuframes_publisher_destroy(cuframes_publisher_t *pub) { + if (!pub) return CUFRAMES_OK; + + if (pub->hdr) { + atomic_store_explicit(&pub->hdr->shutdown_flag, 1, memory_order_release); + } + + /* Stop accept thread */ + pub->stop_flag = 1; + if (pub->listen_fd >= 0) { + shutdown(pub->listen_fd, SHUT_RDWR); + close(pub->listen_fd); + pub->listen_fd = -1; + } + if (pub->accept_thread_alive) { + pthread_join(pub->accept_thread, NULL); + pub->accept_thread_alive = 0; + } + + /* Free CUDA */ + if (pub->cfg.ownership == CUFRAMES_OWNERSHIP_LIBRARY) { + for (int i = 0; i < pub->ring_size_actual; ++i) { + if (pub->cuda_ptrs[i]) cudaFree(pub->cuda_ptrs[i]); + } + } + if (pub->event) cudaEventDestroy(pub->event); + + /* Unlink resources */ + if (pub->hdr) { + munmap(pub->hdr, sizeof(cuframes_shm_header_t)); + pub->hdr = NULL; + } + if (pub->shm_fd >= 0) { + close(pub->shm_fd); + shm_unlink(pub->shm_name); + pub->shm_fd = -1; + } + if (pub->socket_path[0]) { + unlink(pub->socket_path); + } + pthread_mutex_destroy(&pub->state_mu); + free(pub); + return CUFRAMES_OK; +} + +/* ─── Accept thread + handshake ──────────────────────────────────────── */ + +static void *accept_thread_main(void *arg) { + struct cuframes_publisher *pub = (struct cuframes_publisher *)arg; + while (!pub->stop_flag) { + struct sockaddr_un sa; + socklen_t sl = sizeof(sa); + int client = accept(pub->listen_fd, (struct sockaddr *)&sa, &sl); + if (client < 0) { + if (pub->stop_flag) break; + if (errno == EINTR) continue; + CUFRAMES_LOG_WARN("accept: %s", strerror(errno)); + continue; + } + /* Synchronous handshake — после ответа socket остаётся открытым для + * lifetime signals (SHUTDOWN, PING). Close на error. */ + int r = handshake_subscriber(pub, client); + if (r != CUFRAMES_OK) { + close(client); + } + /* TODO v0.2: track client fds для broadcast SHUTDOWN. Сейчас clients + * сами detect socket EOF при publisher_destroy через shutdown(). */ + } + return NULL; +} + +static int allocate_subscriber_bit(struct cuframes_publisher *pub, + const char *name, uint32_t *bit_out) { + /* Bit 0 reserved (sentinel). Bits 1..31. */ + pthread_mutex_lock(&pub->state_mu); + for (uint32_t bit = 1; bit < CUFRAMES_MAX_SUBSCRIBERS; ++bit) { + uint64_t state = atomic_load_explicit(&pub->hdr->subscribers[bit].state, + memory_order_acquire); + if (state == 0) { + atomic_store_explicit(&pub->hdr->subscribers[bit].state, 1, + memory_order_release); + memset(pub->hdr->subscribers[bit].consumer_name, 0, + sizeof(pub->hdr->subscribers[bit].consumer_name)); + if (name) { + strncpy(pub->hdr->subscribers[bit].consumer_name, name, + sizeof(pub->hdr->subscribers[bit].consumer_name) - 1); + } + atomic_fetch_or_explicit(&pub->hdr->subscriber_bitmap, + 1ULL << bit, memory_order_release); + *bit_out = bit; + pthread_mutex_unlock(&pub->state_mu); + return CUFRAMES_OK; + } + /* Check for name collision */ + if (name && state >= 2 && + strncmp(pub->hdr->subscribers[bit].consumer_name, name, + sizeof(pub->hdr->subscribers[bit].consumer_name)) == 0) { + pthread_mutex_unlock(&pub->state_mu); + return CUFRAMES_ERR_ALREADY_EXISTS; + } + } + pthread_mutex_unlock(&pub->state_mu); + return CUFRAMES_ERR_TOO_MANY; +} + +static int handshake_subscriber(struct cuframes_publisher *pub, int client_fd) { + /* recv HELLO_REQ */ + uint8_t buf[CUFRAMES_MAX_MSG_PAYLOAD]; + uint32_t plen = sizeof(buf); + uint32_t mtype = 0; + int r = cuframes_internal_recv_msg(client_fd, &mtype, buf, &plen, 5000); + if (r != CUFRAMES_OK) { + CUFRAMES_LOG_WARN("handshake recv HELLO: %s", cuframes_strerror(r)); + return r; + } + if (mtype != CUFRAMES_MSG_HELLO_REQ) { + CUFRAMES_LOG_WARN("handshake expected HELLO_REQ got 0x%x", mtype); + return CUFRAMES_ERR_PROTOCOL; + } + + /* Parse HELLO_REQ: proto_version + name_len + name + cuda_device + mode */ + if (plen < sizeof(cuframes_msg_hello_req_t) + 20) return CUFRAMES_ERR_PROTOCOL; + cuframes_msg_hello_req_t *hreq = (cuframes_msg_hello_req_t *)buf; + uint32_t want_proto = hreq->proto_version; + uint32_t name_len = hreq->consumer_name_len; + if (name_len > 31) name_len = 31; + + char name[32] = {0}; + memcpy(name, buf + sizeof(*hreq), name_len); + + int proto_match = (want_proto == CUFRAMES_PROTOCOL_V1); + + /* Send HELLO_RESP */ + uint8_t resp_buf[CUFRAMES_MAX_MSG_PAYLOAD]; + cuframes_msg_hello_resp_t *resp = (cuframes_msg_hello_resp_t *)resp_buf; + memset(resp, 0, sizeof(*resp)); + resp->result = proto_match ? CUFRAMES_OK : CUFRAMES_ERR_PROTOCOL; + resp->proto_version_actual = CUFRAMES_PROTOCOL_V1; + resp->ring_size = (uint32_t)pub->ring_size_actual; + resp->ownership_mode = (uint32_t)pub->cfg.ownership; + resp->meta = pub->hdr->meta; + /* shm_path */ + int slen = snprintf((char *)(resp_buf + sizeof(*resp)), + sizeof(resp_buf) - sizeof(*resp) - 12, + "%s", pub->shm_name); + resp->shm_path_len = (uint32_t)slen; + uint32_t total = sizeof(*resp) + (uint32_t)slen + 12; + + r = cuframes_internal_send_msg(client_fd, CUFRAMES_MSG_HELLO_RESP, + resp_buf, total); + if (r != CUFRAMES_OK) { + CUFRAMES_LOG_WARN("send HELLO_RESP: %s", cuframes_strerror(r)); + return r; + } + if (!proto_match) return CUFRAMES_ERR_PROTOCOL; + + /* recv SUBSCRIBE_REQ */ + plen = sizeof(buf); + r = cuframes_internal_recv_msg(client_fd, &mtype, buf, &plen, 5000); + if (r != CUFRAMES_OK) return r; + if (mtype != CUFRAMES_MSG_SUBSCRIBE_REQ) return CUFRAMES_ERR_PROTOCOL; + + /* Allocate subscriber bit */ + uint32_t bit = 0; + int alloc_r = allocate_subscriber_bit(pub, name, &bit); + + /* Send SUBSCRIBE_RESP */ + cuframes_msg_subscribe_resp_t sresp = {0}; + sresp.result = alloc_r; + sresp.assigned_bit = bit; + sresp.initial_seq = atomic_load_explicit(&pub->hdr->global_seq, + memory_order_acquire); + + r = cuframes_internal_send_msg(client_fd, CUFRAMES_MSG_SUBSCRIBE_RESP, + &sresp, sizeof(sresp)); + if (r != CUFRAMES_OK || alloc_r != CUFRAMES_OK) return r ? r : alloc_r; + + /* Activate subscriber slot */ + atomic_store_explicit(&pub->hdr->subscribers[bit].state, 2, + memory_order_release); + + CUFRAMES_LOG_INFO("subscriber '%s' connected (bit=%u)", name, bit); + + /* TODO v0.2: spawn per-client thread для liveness/PING/UNSUBSCRIBE. + * Сейчас socket остаётся открытым на heap'е до publisher_destroy. */ + return CUFRAMES_OK; +} diff --git a/libcuframes/src/protocol.c b/libcuframes/src/protocol.c new file mode 100644 index 0000000..b2ce12a --- /dev/null +++ b/libcuframes/src/protocol.c @@ -0,0 +1,99 @@ +/* Socket TLV framing (docs/protocol.md §3). */ + +#include "internal.h" +#include +#include +#include +#include + +/* Read exactly N bytes from socket, with poll-based timeout. */ +static int recv_all(int fd, void *buf, size_t n, int32_t timeout_ms) { + uint8_t *p = (uint8_t *)buf; + size_t got = 0; + int64_t deadline_ns = (timeout_ms < 0) + ? INT64_MAX + : cuframes_now_ns() + (int64_t)timeout_ms * 1000000LL; + + while (got < n) { + if (timeout_ms >= 0) { + int64_t remain_ns = deadline_ns - cuframes_now_ns(); + if (remain_ns <= 0) return CUFRAMES_ERR_TIMEOUT; + struct pollfd pfd = {.fd = fd, .events = POLLIN}; + int pr = poll(&pfd, 1, (int)(remain_ns / 1000000LL)); + if (pr == 0) return CUFRAMES_ERR_TIMEOUT; + if (pr < 0) { + if (errno == EINTR) continue; + return CUFRAMES_ERR_IO; + } + } + ssize_t r = recv(fd, p + got, n - got, 0); + if (r == 0) return CUFRAMES_ERR_DISCONNECTED; + if (r < 0) { + if (errno == EINTR) continue; + if (errno == EAGAIN || errno == EWOULDBLOCK) { + if (timeout_ms == 0) return CUFRAMES_ERR_WOULD_BLOCK; + continue; + } + return CUFRAMES_ERR_IO; + } + got += (size_t)r; + } + return CUFRAMES_OK; +} + +static int send_all(int fd, const void *buf, size_t n) { + const uint8_t *p = (const uint8_t *)buf; + size_t sent = 0; + while (sent < n) { + ssize_t s = send(fd, p + sent, n - sent, MSG_NOSIGNAL); + if (s < 0) { + if (errno == EINTR) continue; + if (errno == EPIPE) return CUFRAMES_ERR_DISCONNECTED; + return CUFRAMES_ERR_IO; + } + sent += (size_t)s; + } + return CUFRAMES_OK; +} + +int cuframes_internal_send_msg(int fd, uint32_t msg_type, + const void *payload, uint32_t payload_len) { + if (payload_len > CUFRAMES_MAX_MSG_PAYLOAD) return CUFRAMES_ERR_INVALID_ARG; + cuframes_msg_header_t h = {.msg_type = msg_type, .payload_length = payload_len}; + int r = send_all(fd, &h, sizeof(h)); + if (r != CUFRAMES_OK) return r; + if (payload_len > 0 && payload) { + r = send_all(fd, payload, payload_len); + } + return r; +} + +int cuframes_internal_recv_msg(int fd, uint32_t *msg_type_out, + void *payload, uint32_t *payload_len_inout, + int32_t timeout_ms) { + cuframes_msg_header_t h; + int r = recv_all(fd, &h, sizeof(h), timeout_ms); + if (r != CUFRAMES_OK) return r; + if (h.payload_length > CUFRAMES_MAX_MSG_PAYLOAD) return CUFRAMES_ERR_PROTOCOL; + + if (msg_type_out) *msg_type_out = h.msg_type; + if (h.payload_length > 0) { + if (!payload || !payload_len_inout || *payload_len_inout < h.payload_length) { + /* Drain unread bytes to keep socket in sync */ + uint8_t drain[256]; + uint32_t left = h.payload_length; + while (left > 0) { + size_t take = left < sizeof(drain) ? left : sizeof(drain); + int dr = recv_all(fd, drain, take, timeout_ms); + if (dr != CUFRAMES_OK) return dr; + left -= (uint32_t)take; + } + if (payload_len_inout) *payload_len_inout = h.payload_length; + return CUFRAMES_ERR_INVALID_ARG; + } + r = recv_all(fd, payload, h.payload_length, timeout_ms); + if (r != CUFRAMES_OK) return r; + } + if (payload_len_inout) *payload_len_inout = h.payload_length; + return CUFRAMES_OK; +} diff --git a/libcuframes/src/utils.c b/libcuframes/src/utils.c new file mode 100644 index 0000000..aaf576d --- /dev/null +++ b/libcuframes/src/utils.c @@ -0,0 +1,149 @@ +/* Utility functions: error strings, frame size calc, timestamps. */ + +#include "internal.h" +#include +#include +#include +#include +#include +#include + +const char *cuframes_version_string(void) { + static char buf[32]; + snprintf(buf, sizeof(buf), "%d.%d.%d", + CUFRAMES_VERSION_MAJOR, CUFRAMES_VERSION_MINOR, CUFRAMES_VERSION_PATCH); + return buf; +} + +uint32_t cuframes_protocol_version(void) { return CUFRAMES_PROTOCOL_V1; } + +const char *cuframes_strerror(int err) { + switch (err) { + case CUFRAMES_OK: return "ok"; + case CUFRAMES_ERR_INVALID_ARG: return "invalid argument"; + case CUFRAMES_ERR_OUT_OF_MEMORY: return "out of memory"; + case CUFRAMES_ERR_CUDA: return "cuda error"; + case CUFRAMES_ERR_IO: return "I/O error (socket/mmap/eventfd)"; + case CUFRAMES_ERR_NOT_FOUND: return "publisher not found"; + case CUFRAMES_ERR_ALREADY_EXISTS: return "publisher/subscriber-name already exists"; + case CUFRAMES_ERR_TIMEOUT: return "timeout"; + case CUFRAMES_ERR_PROTOCOL: return "wire protocol mismatch"; + case CUFRAMES_ERR_DISCONNECTED: return "disconnected"; + case CUFRAMES_ERR_FORMAT: return "unsupported format or size mismatch"; + case CUFRAMES_ERR_WOULD_BLOCK: return "would block"; + case CUFRAMES_ERR_TOO_MANY: return "too many subscribers (max 32)"; + case CUFRAMES_ERR_INTERNAL: return "internal error (please report)"; + default: return "unknown error"; + } +} + +int64_t cuframes_now_ns(void) { + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + return (int64_t)ts.tv_sec * 1000000000LL + ts.tv_nsec; +} + +int cuframes_calc_frame_size(cuframes_format_t format, + int32_t width, int32_t height, + size_t *size_out, + int32_t *pitch_y_out, + int32_t *pitch_uv_out) { + return cuframes_internal_calc_size(format, width, height, + size_out, pitch_y_out, pitch_uv_out); +} + +/* ─── Internal helpers ────────────────────────────────────────────────── */ + +int cuframes_internal_validate_key(const char *key) { + if (!key) return CUFRAMES_ERR_INVALID_ARG; + size_t len = strlen(key); + if (len < 1 || len > CUFRAMES_MAX_KEY_LEN) return CUFRAMES_ERR_INVALID_ARG; + for (size_t i = 0; i < len; ++i) { + char c = key[i]; + if (!(isalnum((unsigned char)c) || c == '_' || c == '-')) { + return CUFRAMES_ERR_INVALID_ARG; + } + } + return CUFRAMES_OK; +} + +int cuframes_internal_socket_path(const char *key, char *out, size_t out_size) { + int r = cuframes_internal_validate_key(key); + if (r != CUFRAMES_OK) return r; + int n = snprintf(out, out_size, "%s/%s.sock", CUFRAMES_RUNTIME_DIR, key); + if (n < 0 || (size_t)n >= out_size) return CUFRAMES_ERR_INVALID_ARG; + return CUFRAMES_OK; +} + +int cuframes_internal_shm_name(const char *key, char *out, size_t out_size) { + int r = cuframes_internal_validate_key(key); + if (r != CUFRAMES_OK) return r; + int n = snprintf(out, out_size, "%s%s", CUFRAMES_SHM_PREFIX, key); + if (n < 0 || (size_t)n >= out_size) return CUFRAMES_ERR_INVALID_ARG; + return CUFRAMES_OK; +} + +int cuframes_internal_ensure_runtime_dir(void) { + if (mkdir(CUFRAMES_RUNTIME_DIR, 0755) == 0) return CUFRAMES_OK; + if (errno == EEXIST) return CUFRAMES_OK; + CUFRAMES_LOG_ERROR("cannot create %s: %s", + CUFRAMES_RUNTIME_DIR, strerror(errno)); + return CUFRAMES_ERR_IO; +} + +int cuframes_internal_pid_alive(pid_t pid) { + if (pid <= 0) return 0; + if (kill(pid, 0) == 0) return 1; + return (errno == EPERM) ? 1 : 0; +} + +/* Возвращает размер frame'а + pitch'ы. Pitch aligned 256. + * NV12: Y w×h + UV w×h/2 (interleaved) — оба с тем же pitch_y. + * YUV420P: Y w×h + U w/2×h/2 + V w/2×h/2. + * RGB/BGR: 3 bytes per pixel, single plane. + * RGBA: 4 bytes per pixel. + * GRAYSCALE: 1 byte per pixel. + */ +int cuframes_internal_calc_size(cuframes_format_t format, int32_t w, int32_t h, + size_t *size_out, int32_t *pitch_y_out, int32_t *pitch_uv_out) { + if (w <= 0 || h <= 0) return CUFRAMES_ERR_INVALID_ARG; + + int32_t py = 0, puv = 0; + size_t total = 0; + + /* round up to 256-byte alignment for CUDA */ + #define ALIGN256(x) (((x) + 255u) & ~255u) + + switch (format) { + case CUFRAMES_FORMAT_NV12: + py = (int32_t)ALIGN256((uint32_t)w); + puv = py; + total = (size_t)py * (size_t)h + (size_t)puv * (size_t)(h / 2); + break; + case CUFRAMES_FORMAT_YUV420P: + py = (int32_t)ALIGN256((uint32_t)w); + puv = (int32_t)ALIGN256((uint32_t)(w / 2)); + total = (size_t)py * (size_t)h + 2 * (size_t)puv * (size_t)(h / 2); + break; + case CUFRAMES_FORMAT_RGB: + case CUFRAMES_FORMAT_BGR: + py = (int32_t)ALIGN256((uint32_t)(w * 3)); + total = (size_t)py * (size_t)h; + break; + case CUFRAMES_FORMAT_RGBA: + py = (int32_t)ALIGN256((uint32_t)(w * 4)); + total = (size_t)py * (size_t)h; + break; + case CUFRAMES_FORMAT_GRAYSCALE: + py = (int32_t)ALIGN256((uint32_t)w); + total = (size_t)py * (size_t)h; + break; + default: + return CUFRAMES_ERR_FORMAT; + } + + if (size_out) *size_out = total; + if (pitch_y_out) *pitch_y_out = py; + if (pitch_uv_out) *pitch_uv_out = puv; + return CUFRAMES_OK; +} diff --git a/libcuframes/tests/CMakeLists.txt b/libcuframes/tests/CMakeLists.txt new file mode 100644 index 0000000..de24573 --- /dev/null +++ b/libcuframes/tests/CMakeLists.txt @@ -0,0 +1,17 @@ +find_package(CUDAToolkit REQUIRED) + +# Standalone test executables (без catch2 — простой ctest) + +add_executable(test_pingpong test_pingpong.cu) +target_link_libraries(test_pingpong PRIVATE cuframes CUDA::cudart) +target_include_directories(test_pingpong PRIVATE + ${CMAKE_SOURCE_DIR}/include) +add_test(NAME pingpong_basic COMMAND test_pingpong) +set_tests_properties(pingpong_basic PROPERTIES TIMEOUT 60) + +add_executable(test_multi test_multi.cu) +target_link_libraries(test_multi PRIVATE cuframes CUDA::cudart) +target_include_directories(test_multi PRIVATE + ${CMAKE_SOURCE_DIR}/include) +add_test(NAME multi_consumer COMMAND test_multi) +set_tests_properties(multi_consumer PROPERTIES TIMEOUT 60) diff --git a/libcuframes/tests/test_multi.cu b/libcuframes/tests/test_multi.cu new file mode 100644 index 0000000..22a4c23 --- /dev/null +++ b/libcuframes/tests/test_multi.cu @@ -0,0 +1,142 @@ +/* Multi-consumer test: 1 producer × 3 consumers. */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define CHECK(call) do { int _r = (call); if (_r != 0) { \ + fprintf(stderr, "FAIL %s:%d: %d (%s)\n", __FILE__, __LINE__, _r, cuframes_strerror(_r)); std::exit(2); } } while(0) +#define CHECK_CUDA(call) do { cudaError_t _e = (call); if (_e != cudaSuccess) { \ + fprintf(stderr, "CUDA FAIL %s:%d: %s\n", __FILE__, __LINE__, cudaGetErrorString(_e)); std::exit(2); } } while(0) + +static const char *KEY = "test_multi"; +static const int W = 320, H = 240; +static const int N = 150; + +__host__ __device__ inline uint8_t pat(uint64_t seq, int row) { + return static_cast((seq * 31u + row * 7u) & 0xFF); +} +__global__ void fill_y(uint8_t *y, int w, int h, int py, uint64_t seq) { + int x = blockIdx.x * blockDim.x + threadIdx.x; + int r = blockIdx.y * blockDim.y + threadIdx.y; + if (x < w && r < h) y[r * py + x] = pat(seq, r); +} +__global__ void verify_y(const uint8_t *y, int w, int h, int py, uint64_t seq, int *bad) { + int x = blockIdx.x * blockDim.x + threadIdx.x; + int r = blockIdx.y * blockDim.y + threadIdx.y; + if (x < w && r < h) if (y[r * py + x] != pat(seq, r)) atomicAdd(bad, 1); +} + +int run_consumer(const char *name) { + cuframes_subscriber_config_t cfg = {}; + cfg.key = KEY; + cfg.consumer_name = name; + cfg.mode = CUFRAMES_MODE_NEWEST_ONLY; + cfg.connect_timeout_ms = 5000; + + cuframes_subscriber_t *sub = NULL; + CHECK(cuframes_subscriber_create(&cfg, &sub)); + + cudaStream_t s; + CHECK_CUDA(cudaStreamCreate(&s)); + int *d_bad; + CHECK_CUDA(cudaMalloc(&d_bad, sizeof(int))); + + dim3 b(32, 8); + dim3 g((W + b.x - 1) / b.x, (H + b.y - 1) / b.y); + + int recv = 0, torn = 0; + while (1) { + cuframes_frame_t *f = NULL; + int r = cuframes_subscriber_next(sub, s, &f, 2000); + if (r == CUFRAMES_ERR_TIMEOUT || r == CUFRAMES_ERR_DISCONNECTED) break; + if (r != 0) { fprintf(stderr, "[%s] next: %s\n", name, cuframes_strerror(r)); std::exit(2); } + + CHECK_CUDA(cudaMemsetAsync(d_bad, 0, sizeof(int), s)); + verify_y<<>>((const uint8_t *)cuframes_frame_cuda_ptr(f), + W, H, cuframes_frame_pitch_y(f), + cuframes_frame_seq(f), d_bad); + int bad = 0; + CHECK_CUDA(cudaMemcpyAsync(&bad, d_bad, sizeof(int), cudaMemcpyDeviceToHost, s)); + CHECK_CUDA(cudaStreamSynchronize(s)); + if (bad > 0) torn++; + recv++; + CHECK(cuframes_subscriber_release(sub, f)); + } + + fprintf(stderr, "[%s] received=%d torn=%d\n", name, recv, torn); + + cudaFree(d_bad); + cudaStreamDestroy(s); + cuframes_subscriber_destroy(sub); + return (recv >= N / 3 && torn == 0) ? 0 : 1; +} + +int run_producer() { + cuframes_publisher_config_t cfg = {}; + cfg.key = KEY; + cfg.width = W; + cfg.height = H; + cfg.format = CUFRAMES_FORMAT_NV12; + cfg.ownership = CUFRAMES_OWNERSHIP_LIBRARY; + cfg.ring_size = 4; + cfg.policy = CUFRAMES_POLICY_DROP_OLDEST; + + cuframes_publisher_t *pub = NULL; + CHECK(cuframes_publisher_create(&cfg, &pub)); + int32_t pitch_y = 0; + CHECK(cuframes_calc_frame_size(CUFRAMES_FORMAT_NV12, W, H, NULL, &pitch_y, NULL)); + cudaStream_t s; + CHECK_CUDA(cudaStreamCreate(&s)); + + dim3 b(32, 8); + dim3 g((W + b.x - 1) / b.x, (H + b.y - 1) / b.y); + + std::this_thread::sleep_for(std::chrono::milliseconds(800)); + + auto iv = std::chrono::nanoseconds(1000000000LL / 60); + auto t = std::chrono::steady_clock::now(); + for (int i = 0; i < N; ++i) { + void *p = NULL; + CHECK(cuframes_publisher_acquire(pub, &p)); + fill_y<<>>((uint8_t *)p, W, H, pitch_y, (uint64_t)i); + CHECK(cuframes_publisher_publish(pub, s, cuframes_now_ns())); + t += iv; + std::this_thread::sleep_until(t); + } + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + cuframes_publisher_destroy(pub); + cudaStreamDestroy(s); + return 0; +} + +int main() { + char shm[80]; snprintf(shm, 80, "/dev/shm/cuframes-%s", KEY); unlink(shm); + char sock[128]; snprintf(sock, 128, "/run/cuframes/%s.sock", KEY); unlink(sock); + + pid_t pids[3]; + const char *names[3] = {"c1", "c2", "c3"}; + for (int i = 0; i < 3; ++i) { + pids[i] = fork(); + if (pids[i] == 0) return run_consumer(names[i]); + } + + int prod_r = run_producer(); + int fail = (prod_r != 0); + for (int i = 0; i < 3; ++i) { + int st = 0; + waitpid(pids[i], &st, 0); + if (!WIFEXITED(st) || WEXITSTATUS(st) != 0) fail = 1; + } + + if (fail) { fprintf(stderr, "test_multi FAIL\n"); return 1; } + fprintf(stderr, "test_multi PASS\n"); + return 0; +} diff --git a/libcuframes/tests/test_pingpong.cu b/libcuframes/tests/test_pingpong.cu new file mode 100644 index 0000000..060c6fe --- /dev/null +++ b/libcuframes/tests/test_pingpong.cu @@ -0,0 +1,219 @@ +/* Integration test: publisher + subscriber в одном процессе, fork'нутые в + * отдельные процессы (CUDA IPC требует разных процессов). + * + * Producer publish'ит N frames с pattern; consumer верифицирует через kernel + * на своём stream'е (тестирует event sync correctness). + * + * Test pass criteria: + * - все frames доставлены + * - 0 torn frames + * - p99 latency < 10 ms + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define CHECK(call) do { \ + int _r = (call); \ + if (_r != 0) { \ + fprintf(stderr, "FAIL at %s:%d: %d (%s)\n", __FILE__, __LINE__, _r, cuframes_strerror(_r)); \ + std::exit(2); \ + } \ +} while(0) + +#define CHECK_CUDA(call) do { \ + cudaError_t _e = (call); \ + if (_e != cudaSuccess) { \ + fprintf(stderr, "CUDA FAIL %s:%d: %s\n", __FILE__, __LINE__, cudaGetErrorString(_e)); \ + std::exit(2); \ + } \ +} while(0) + +static const char *KEY = "test_pingpong"; +static const int W = 640; +static const int H = 480; +static const int N_FRAMES = 200; + +__host__ __device__ inline uint8_t pat(uint64_t seq, int row) { + return static_cast((seq * 31u + row * 7u) & 0xFF); +} + +__global__ void fill_y(uint8_t *y, int width, int height, int pitch_y, uint64_t seq) { + int x = blockIdx.x * blockDim.x + threadIdx.x; + int row = blockIdx.y * blockDim.y + threadIdx.y; + if (x < width && row < height) { + y[row * pitch_y + x] = pat(seq, row); + } +} + +__global__ void verify_y(const uint8_t *y, int width, int height, int pitch_y, + uint64_t seq, int *bad_count) { + int x = blockIdx.x * blockDim.x + threadIdx.x; + int row = blockIdx.y * blockDim.y + threadIdx.y; + if (x < width && row < height) { + if (y[row * pitch_y + x] != pat(seq, row)) atomicAdd(bad_count, 1); + } +} + +int run_producer() { + cuframes_publisher_config_t cfg = {}; + cfg.key = KEY; + cfg.width = W; + cfg.height = H; + cfg.format = CUFRAMES_FORMAT_NV12; + cfg.ownership = CUFRAMES_OWNERSHIP_LIBRARY; + cfg.ring_size = 4; + cfg.policy = CUFRAMES_POLICY_DROP_OLDEST; + cfg.cuda_device = 0; + + cuframes_publisher_t *pub = NULL; + CHECK(cuframes_publisher_create(&cfg, &pub)); + + int32_t pitch_y = 0; + CHECK(cuframes_calc_frame_size(CUFRAMES_FORMAT_NV12, W, H, NULL, &pitch_y, NULL)); + + cudaStream_t stream; + CHECK_CUDA(cudaStreamCreate(&stream)); + + dim3 block(32, 8); + dim3 grid((W + block.x - 1) / block.x, (H + block.y - 1) / block.y); + + /* Дать subscriber'у время подключиться */ + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + + auto frame_interval = std::chrono::nanoseconds(1000000000LL / 60); /* 60 fps */ + auto next_t = std::chrono::steady_clock::now(); + + for (int i = 0; i < N_FRAMES; ++i) { + void *cuda_ptr = NULL; + CHECK(cuframes_publisher_acquire(pub, &cuda_ptr)); + fill_y<<>>((uint8_t *)cuda_ptr, W, H, pitch_y, (uint64_t)i); + CHECK(cuframes_publisher_publish(pub, stream, cuframes_now_ns())); + next_t += frame_interval; + std::this_thread::sleep_until(next_t); + } + + /* Дать consumer'у дочитать */ + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + cuframes_publisher_destroy(pub); + cudaStreamDestroy(stream); + return 0; +} + +int run_consumer() { + cuframes_subscriber_config_t cfg = {}; + cfg.key = KEY; + cfg.consumer_name = "test-consumer"; + cfg.mode = CUFRAMES_MODE_NEWEST_ONLY; + cfg.cuda_device = 0; + cfg.connect_timeout_ms = 5000; + + cuframes_subscriber_t *sub = NULL; + CHECK(cuframes_subscriber_create(&cfg, &sub)); + + cudaStream_t stream; + CHECK_CUDA(cudaStreamCreate(&stream)); + + int *d_bad = NULL; + CHECK_CUDA(cudaMalloc(&d_bad, sizeof(int))); + + dim3 block(32, 8); + dim3 grid((W + block.x - 1) / block.x, (H + block.y - 1) / block.y); + + int received = 0; + int torn = 0; + int64_t lat_sum = 0; + int64_t lat_max = 0; + + while (1) { + cuframes_frame_t *f = NULL; + int r = cuframes_subscriber_next(sub, stream, &f, 2000); + if (r == CUFRAMES_ERR_TIMEOUT) { + fprintf(stderr, "[consumer] no more frames\n"); + break; + } + if (r == CUFRAMES_ERR_DISCONNECTED) { + fprintf(stderr, "[consumer] disconnected\n"); + break; + } + if (r != 0) { + fprintf(stderr, "[consumer] next: %s\n", cuframes_strerror(r)); + std::exit(2); + } + + CHECK_CUDA(cudaMemsetAsync(d_bad, 0, sizeof(int), stream)); + verify_y<<>>( + (const uint8_t *)cuframes_frame_cuda_ptr(f), + W, H, cuframes_frame_pitch_y(f), + cuframes_frame_seq(f), d_bad); + int bad = 0; + CHECK_CUDA(cudaMemcpyAsync(&bad, d_bad, sizeof(int), + cudaMemcpyDeviceToHost, stream)); + CHECK_CUDA(cudaStreamSynchronize(stream)); + + int64_t lat = cuframes_now_ns() - cuframes_frame_pts_ns(f); + lat_sum += lat; + if (lat > lat_max) lat_max = lat; + if (bad > 0) torn++; + received++; + + CHECK(cuframes_subscriber_release(sub, f)); + } + + fprintf(stderr, "\n=== test_pingpong consumer summary ===\n"); + fprintf(stderr, "received: %d\n", received); + fprintf(stderr, "torn: %d\n", torn); + fprintf(stderr, "lat mean: %ld us\n", received > 0 ? lat_sum / 1000 / received : 0); + fprintf(stderr, "lat max: %ld us\n", lat_max / 1000); + + cudaFree(d_bad); + cudaStreamDestroy(stream); + cuframes_subscriber_destroy(sub); + + if (received < N_FRAMES / 2) { /* допустимо терять немного на newest-only */ + fprintf(stderr, "FAIL: received only %d / %d\n", received, N_FRAMES); + return 1; + } + if (torn > 0) { + fprintf(stderr, "FAIL: %d torn frames\n", torn); + return 1; + } + return 0; +} + +int main() { + /* Cleanup от prev runs */ + char shm[80]; + snprintf(shm, sizeof(shm), "/dev/shm/cuframes-%s", KEY); + unlink(shm); + char sock[128]; + snprintf(sock, sizeof(sock), "/run/cuframes/%s.sock", KEY); + unlink(sock); + + pid_t pid = fork(); + if (pid < 0) { perror("fork"); return 2; } + if (pid == 0) { + return run_consumer(); + } + + /* Parent — producer */ + int prod_r = run_producer(); + int status = 0; + waitpid(pid, &status, 0); + int cons_r = WIFEXITED(status) ? WEXITSTATUS(status) : 99; + + if (prod_r != 0 || cons_r != 0) { + fprintf(stderr, "test FAILED: producer=%d consumer=%d\n", prod_r, cons_r); + return 1; + } + fprintf(stderr, "test_pingpong PASS\n"); + return 0; +}