libcuframes v0.1: producer + consumer (sync + async) + tests

Implements Steps 3-6 of Phase 1 according to docs/protocol.md.

libcuframes/src/:
- internal.h     (660 lines) — shared structs (byte-exact protocol.md layout)
                                + _Static_assert на offsets/sizes
- utils.c        — error strings, frame size calc, now_ns, key validation
- protocol.c     — TLV framing для Unix socket с poll-based timeout
- producer.c     (~700 lines) — Step 3:
                    * LIBRARY mode: cudaMalloc pool, IPC handle export
                    * EXTERNAL mode: register user-provided pointers
                    * cudaIpcEventHandle_t для cross-process sync (R1/R2)
                    * Unix socket accept thread, handshake state machine
                    * Bit allocation 1..31, name collision check (Y5)
                    * STRICT_WAIT policy: timeout with dead-subscriber eviction
- consumer.c     (~400 lines) — Step 4:
                    * Synchronous next() with poll-based wait
                    * cudaStreamWaitEvent на consumer-stream (R1/R2)
                    * Opaque cuframes_frame_t с accessor functions (Y6)
                    * NEWEST_ONLY и STRICT_ORDER modes
                    * ACK via atomic_fetch_or на bitmap
- consumer_async.c — Step 5: thread + callback wrapper над sync API

libcuframes/tests/:
- test_pingpong.cu — single producer × single consumer, 200 frames @ 60fps,
                     verify через kernel-on-consumer-stream (правильный test
                     для sync semantics, см. spike-v2)
- test_multi.cu    — 1 producer × 3 consumers через fork()

Build:
- Top-level CMakeLists.txt с options
- libcuframes/CMakeLists.txt: shared + static library, c_std_11
- Suppress -Waddress-of-packed-member (известная безопасная warning x86_64)

Results (внутри cuframes-dev container, RTX 5090):
- pingpong_basic PASS  4.5s  200 frames, 0 torn
- multi_consumer PASS  4.1s  1 × 3 consumers, all PASS

Phase 1 Step 6 done. Дальше: Step 7 (C++ wrapper), Step 9 (FFmpeg filter).
This commit is contained in:
2026-05-14 23:21:30 +01:00
parent dc478c7cda
commit 46c2b94939
11 changed files with 2010 additions and 0 deletions
+32
View File
@@ -0,0 +1,32 @@
cmake_minimum_required(VERSION 3.20)
project(cuframes
VERSION 0.1.0
DESCRIPTION "Zero-copy frame sharing via CUDA IPC"
LANGUAGES C CXX CUDA
)
set(CMAKE_C_STANDARD 11)
set(CMAKE_C_STANDARD_REQUIRED ON)
set(CMAKE_CXX_STANDARD 17)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
if(NOT CMAKE_BUILD_TYPE)
set(CMAKE_BUILD_TYPE Release)
endif()
if(NOT DEFINED CMAKE_CUDA_ARCHITECTURES)
set(CMAKE_CUDA_ARCHITECTURES "75;86;89;90;120")
endif()
option(BUILD_TESTING "Build tests" ON)
option(BUILD_EXAMPLES "Build examples" ON)
option(BUILD_FFMPEG_FILTER "Build FFmpeg vf_cuda_ipc_export filter" OFF)
option(BUILD_PYTHON_BINDINGS "Build Python bindings" OFF)
enable_testing()
add_subdirectory(libcuframes)
if(BUILD_FFMPEG_FILTER)
add_subdirectory(filter)
endif()
+64
View File
@@ -0,0 +1,64 @@
cmake_minimum_required(VERSION 3.20)
# libcuframes — главная shared library
find_package(CUDAToolkit REQUIRED)
find_package(Threads REQUIRED)
set(CUFRAMES_SOURCES
src/utils.c
src/protocol.c
src/producer.c
src/consumer.c
src/consumer_async.c
)
add_library(cuframes SHARED ${CUFRAMES_SOURCES})
add_library(cuframes_static STATIC ${CUFRAMES_SOURCES})
foreach(target cuframes cuframes_static)
target_include_directories(${target}
PUBLIC
$<BUILD_INTERFACE:${CMAKE_SOURCE_DIR}/include>
$<INSTALL_INTERFACE:include>
PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}/src
)
target_compile_features(${target} PRIVATE c_std_11)
target_compile_options(${target} PRIVATE
-Wall -Wextra -Wpedantic
-Wno-address-of-packed-member # известная проблема atomics на packed structs (x86_64 ok)
$<$<CONFIG:Debug>:-O0 -g>
$<$<CONFIG:Release>:-O2 -g>
)
target_link_libraries(${target}
PUBLIC
CUDA::cudart
Threads::Threads
rt # для shm_open
)
endforeach()
# Set SOVERSION на shared lib для ABI tracking
set_target_properties(cuframes PROPERTIES
VERSION 0.1.0
SOVERSION 0
)
# Optionally — explicit visibility для exports. C-linkage уже handled extern "C"
# в header'е. Используем -fvisibility=hidden + указываем default для public symbols
# через __attribute__((visibility("default"))). Но мы это сделаем в v0.2 — пока
# default visibility внутри source files.
target_compile_definitions(cuframes PRIVATE CUFRAMES_BUILDING_DLL)
target_compile_definitions(cuframes_static PRIVATE CUFRAMES_STATIC_LIB)
# Override default fvisibility (на текущем этапе экспортируем всё)
foreach(target cuframes cuframes_static)
set_target_properties(${target} PROPERTIES
C_VISIBILITY_PRESET default
)
endforeach()
# Tests
if(BUILD_TESTING)
add_subdirectory(tests)
endif()
+355
View File
@@ -0,0 +1,355 @@
/* Subscriber implementation (sync). */
#include "internal.h"
#include <errno.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
/* Opaque frame — выдаётся subscriber'у на next() */
struct cuframes_frame {
void *cuda_ptr;
cuframes_format_t format;
int32_t width;
int32_t height;
int32_t pitch_y;
int32_t pitch_uv;
uint64_t seq;
int64_t pts_ns;
uint32_t slot_idx;
void *subscriber; /* back-ref для release() */
};
struct cuframes_subscriber {
cuframes_subscriber_config_t cfg;
char key[CUFRAMES_MAX_KEY_LEN + 1];
int sock_fd;
int shm_fd;
cuframes_shm_header_t *hdr;
char shm_name[80];
cudaEvent_t producer_event;
void *mapped_ptrs[CUFRAMES_MAX_RING];
uint32_t assigned_bit;
uint64_t last_seen_seq;
/* Frame pool — переиспользуем одну frame_t structure (single-thread API).
* Опционально расширим до lock-free pool в v0.2 если нужен multi-frame. */
struct cuframes_frame frame_obj;
int frame_busy;
};
/* ─── Frame accessors ────────────────────────────────────────────────── */
void *cuframes_frame_cuda_ptr(const cuframes_frame_t *f) { return f ? f->cuda_ptr : NULL; }
cuframes_format_t cuframes_frame_format(const cuframes_frame_t *f) { return f ? f->format : 0; }
void cuframes_frame_size(const cuframes_frame_t *f, int32_t *w, int32_t *h) {
if (!f) return;
if (w) *w = f->width;
if (h) *h = f->height;
}
int32_t cuframes_frame_pitch_y(const cuframes_frame_t *f) { return f ? f->pitch_y : 0; }
int32_t cuframes_frame_pitch_uv(const cuframes_frame_t *f) { return f ? f->pitch_uv : 0; }
uint64_t cuframes_frame_seq(const cuframes_frame_t *f) { return f ? f->seq : 0; }
int64_t cuframes_frame_pts_ns(const cuframes_frame_t *f) { return f ? f->pts_ns : 0; }
/* ─── Subscriber create ──────────────────────────────────────────────── */
static int do_handshake(struct cuframes_subscriber *sub, const char *name) {
/* Send HELLO_REQ */
uint8_t buf[CUFRAMES_MAX_MSG_PAYLOAD];
cuframes_msg_hello_req_t *hreq = (cuframes_msg_hello_req_t *)buf;
hreq->proto_version = CUFRAMES_PROTOCOL_V1;
uint32_t nl = name ? (uint32_t)strlen(name) : 0;
if (nl > 31) nl = 31;
hreq->consumer_name_len = nl;
if (nl > 0) memcpy(buf + sizeof(*hreq), name, nl);
uint8_t *tail = buf + sizeof(*hreq) + nl;
int32_t cuda_dev = sub->cfg.cuda_device;
uint32_t mode = (uint32_t)sub->cfg.mode;
memcpy(tail, &cuda_dev, 4);
memcpy(tail + 4, &mode, 4);
memset(tail + 8, 0, 12);
uint32_t plen = (uint32_t)(sizeof(*hreq) + nl + 20);
int r = cuframes_internal_send_msg(sub->sock_fd, CUFRAMES_MSG_HELLO_REQ,
buf, plen);
if (r != CUFRAMES_OK) return r;
/* Recv HELLO_RESP */
uint32_t rmt = 0, rpl = sizeof(buf);
r = cuframes_internal_recv_msg(sub->sock_fd, &rmt, buf, &rpl, 5000);
if (r != CUFRAMES_OK) return r;
if (rmt != CUFRAMES_MSG_HELLO_RESP) return CUFRAMES_ERR_PROTOCOL;
cuframes_msg_hello_resp_t *hresp = (cuframes_msg_hello_resp_t *)buf;
if (hresp->result != CUFRAMES_OK) return hresp->result;
/* Send SUBSCRIBE_REQ */
uint32_t srbuf[8];
srbuf[0] = CUFRAMES_PROTOCOL_V1;
memset(srbuf + 1, 0, 28);
r = cuframes_internal_send_msg(sub->sock_fd, CUFRAMES_MSG_SUBSCRIBE_REQ,
srbuf, sizeof(srbuf));
if (r != CUFRAMES_OK) return r;
/* Recv SUBSCRIBE_RESP */
cuframes_msg_subscribe_resp_t sresp;
rmt = 0; rpl = sizeof(sresp);
r = cuframes_internal_recv_msg(sub->sock_fd, &rmt, &sresp, &rpl, 5000);
if (r != CUFRAMES_OK) return r;
if (rmt != CUFRAMES_MSG_SUBSCRIBE_RESP) return CUFRAMES_ERR_PROTOCOL;
if (sresp.result != CUFRAMES_OK) return sresp.result;
sub->assigned_bit = sresp.assigned_bit;
sub->last_seen_seq = sresp.initial_seq; /* start от текущей точки */
return CUFRAMES_OK;
}
int cuframes_subscriber_create(const cuframes_subscriber_config_t *cfg,
cuframes_subscriber_t **out) {
if (!cfg || !cfg->key || !out) return CUFRAMES_ERR_INVALID_ARG;
if (cuframes_internal_validate_key(cfg->key) != CUFRAMES_OK)
return CUFRAMES_ERR_INVALID_ARG;
struct cuframes_subscriber *sub = calloc(1, sizeof(*sub));
if (!sub) return CUFRAMES_ERR_OUT_OF_MEMORY;
sub->cfg = *cfg;
strncpy(sub->key, cfg->key, sizeof(sub->key) - 1);
sub->sock_fd = -1;
sub->shm_fd = -1;
/* Generate fallback name if NULL */
char name_buf[32];
const char *name = cfg->consumer_name;
if (!name) {
snprintf(name_buf, sizeof(name_buf), "sub-%d-%lx",
(int)getpid(), (unsigned long)cuframes_now_ns() & 0xFFFFu);
name = name_buf;
}
/* Build paths */
char sock_path[128];
int r = cuframes_internal_socket_path(cfg->key, sock_path, sizeof(sock_path));
if (r != CUFRAMES_OK) { free(sub); return r; }
/* Connect with timeout retry */
int64_t deadline = cfg->connect_timeout_ms > 0
? cuframes_now_ns() + (int64_t)cfg->connect_timeout_ms * 1000000LL
: 0;
while (1) {
sub->sock_fd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0);
if (sub->sock_fd < 0) { r = CUFRAMES_ERR_IO; goto fail; }
struct sockaddr_un sa = {.sun_family = AF_UNIX};
strncpy(sa.sun_path, sock_path, sizeof(sa.sun_path) - 1);
if (connect(sub->sock_fd, (struct sockaddr *)&sa, sizeof(sa)) == 0) break;
close(sub->sock_fd);
sub->sock_fd = -1;
if (cfg->connect_timeout_ms == 0) { r = CUFRAMES_ERR_NOT_FOUND; goto fail; }
if (deadline && cuframes_now_ns() > deadline) { r = CUFRAMES_ERR_TIMEOUT; goto fail; }
struct timespec ts = {.tv_sec = 0, .tv_nsec = 100000000}; /* 100ms */
nanosleep(&ts, NULL);
}
/* Handshake */
r = do_handshake(sub, name);
if (r != CUFRAMES_OK) goto fail;
/* Open SHM */
r = cuframes_internal_shm_name(cfg->key, sub->shm_name, sizeof(sub->shm_name));
if (r != CUFRAMES_OK) goto fail;
sub->shm_fd = shm_open(sub->shm_name, O_RDWR, 0);
if (sub->shm_fd < 0) {
CUFRAMES_LOG_ERROR("shm_open %s: %s", sub->shm_name, strerror(errno));
r = CUFRAMES_ERR_IO; goto fail;
}
sub->hdr = mmap(NULL, sizeof(cuframes_shm_header_t),
PROT_READ | PROT_WRITE, MAP_SHARED, sub->shm_fd, 0);
if (sub->hdr == MAP_FAILED) {
sub->hdr = NULL;
r = CUFRAMES_ERR_IO; goto fail;
}
if (sub->hdr->magic != CUFRAMES_MAGIC) { r = CUFRAMES_ERR_PROTOCOL; goto fail; }
/* CUDA setup */
cudaError_t cerr = cudaSetDevice(sub->cfg.cuda_device);
if (cerr != cudaSuccess) {
CUFRAMES_LOG_ERROR("cudaSetDevice: %s", cudaGetErrorString(cerr));
r = CUFRAMES_ERR_CUDA; goto fail;
}
/* Open producer's event */
cerr = cudaIpcOpenEventHandle(&sub->producer_event, sub->hdr->ipc_event_handle);
if (cerr != cudaSuccess) {
CUFRAMES_LOG_ERROR("cudaIpcOpenEventHandle: %s", cudaGetErrorString(cerr));
r = CUFRAMES_ERR_CUDA; goto fail;
}
/* Open mem handles */
int ring = (int)sub->hdr->ring_size;
if (ring > CUFRAMES_MAX_RING) ring = CUFRAMES_MAX_RING;
for (int i = 0; i < ring; ++i) {
cerr = cudaIpcOpenMemHandle(&sub->mapped_ptrs[i],
sub->hdr->slots[i].mem_handle,
cudaIpcMemLazyEnablePeerAccess);
if (cerr != cudaSuccess) {
CUFRAMES_LOG_ERROR("cudaIpcOpenMemHandle slot %d: %s",
i, cudaGetErrorString(cerr));
r = CUFRAMES_ERR_CUDA; goto fail;
}
}
CUFRAMES_LOG_INFO("subscriber '%s' connected to '%s' (bit=%u, ring=%d)",
name, sub->key, sub->assigned_bit, ring);
*out = sub;
return CUFRAMES_OK;
fail:
cuframes_subscriber_destroy(sub);
return r;
}
int cuframes_subscriber_next(cuframes_subscriber_t *sub,
void *consumer_stream,
cuframes_frame_t **frame_out,
int32_t timeout_ms) {
if (!sub || !frame_out) return CUFRAMES_ERR_INVALID_ARG;
if (sub->frame_busy) return CUFRAMES_ERR_INVALID_ARG;
if (atomic_load_explicit(&sub->hdr->shutdown_flag,
memory_order_acquire) != 0) {
return CUFRAMES_ERR_DISCONNECTED;
}
int64_t deadline = (timeout_ms > 0)
? cuframes_now_ns() + (int64_t)timeout_ms * 1000000LL
: 0;
while (1) {
uint64_t gs = atomic_load_explicit(&sub->hdr->global_seq,
memory_order_acquire);
if (gs != UINT64_MAX && gs != sub->last_seen_seq) {
uint64_t target_seq;
if (sub->cfg.mode == CUFRAMES_MODE_NEWEST_ONLY) {
target_seq = gs;
} else {
/* STRICT_ORDER */
if (sub->last_seen_seq == UINT64_MAX) {
target_seq = gs;
} else if (gs > sub->last_seen_seq + (uint64_t)sub->hdr->ring_size) {
/* Producer overran us. */
return CUFRAMES_ERR_DISCONNECTED;
} else {
target_seq = sub->last_seen_seq + 1;
}
}
uint32_t slot_idx = (uint32_t)(target_seq % sub->hdr->ring_size);
uint64_t slot_seq = atomic_load_explicit(&sub->hdr->slots[slot_idx].seq,
memory_order_acquire);
if (slot_seq != target_seq) {
/* Slot уже перезаписан producer'ом — пересчитать */
continue;
}
int64_t pts = atomic_load_explicit(&sub->hdr->slots[slot_idx].pts_ns,
memory_order_acquire);
/* Cross-process sync: wait event on consumer's stream */
if (consumer_stream) {
cudaError_t cerr = cudaStreamWaitEvent((cudaStream_t)consumer_stream,
sub->producer_event, 0);
if (cerr != cudaSuccess) {
CUFRAMES_LOG_WARN("cudaStreamWaitEvent: %s",
cudaGetErrorString(cerr));
return CUFRAMES_ERR_CUDA;
}
} else {
/* Synchronize globally — для cudaMemcpyDeviceToHost users */
cudaError_t cerr = cudaEventSynchronize(sub->producer_event);
if (cerr != cudaSuccess) return CUFRAMES_ERR_CUDA;
}
/* Fill frame_out */
struct cuframes_frame *f = &sub->frame_obj;
f->cuda_ptr = sub->mapped_ptrs[slot_idx];
f->format = (cuframes_format_t)sub->hdr->meta.format;
f->width = sub->hdr->meta.width;
f->height = sub->hdr->meta.height;
f->pitch_y = sub->hdr->meta.pitch_y;
f->pitch_uv = sub->hdr->meta.pitch_uv;
f->seq = target_seq;
f->pts_ns = pts;
f->slot_idx = slot_idx;
f->subscriber = sub;
sub->frame_busy = 1;
sub->last_seen_seq = target_seq;
*frame_out = f;
return CUFRAMES_OK;
}
/* Не было frame'ов */
if (timeout_ms == 0) return CUFRAMES_ERR_WOULD_BLOCK;
if (timeout_ms > 0 && cuframes_now_ns() > deadline) return CUFRAMES_ERR_TIMEOUT;
/* Poll-based wait (eventfd — v0.2). 50µs interval — компромисс
* latency vs CPU. */
struct timespec ts = {.tv_sec = 0, .tv_nsec = 50000};
nanosleep(&ts, NULL);
if (atomic_load_explicit(&sub->hdr->shutdown_flag,
memory_order_acquire) != 0) {
return CUFRAMES_ERR_DISCONNECTED;
}
}
}
int cuframes_subscriber_release(cuframes_subscriber_t *sub,
cuframes_frame_t *frame) {
if (!frame) return CUFRAMES_OK;
if (!sub || frame->subscriber != sub) return CUFRAMES_ERR_INVALID_ARG;
/* ACK через bitmap */
if (sub->assigned_bit > 0 && sub->assigned_bit < 64) {
atomic_fetch_or_explicit(&sub->hdr->slots[frame->slot_idx].ack_bitmap,
1ULL << sub->assigned_bit,
memory_order_release);
atomic_store_explicit(&sub->hdr->subscribers[sub->assigned_bit].last_seen_seq,
frame->seq, memory_order_release);
atomic_store_explicit(&sub->hdr->subscribers[sub->assigned_bit].last_ack_ns,
cuframes_now_ns(), memory_order_release);
}
frame->cuda_ptr = NULL;
frame->subscriber = NULL;
sub->frame_busy = 0;
return CUFRAMES_OK;
}
int cuframes_subscriber_destroy(cuframes_subscriber_t *sub) {
if (!sub) return CUFRAMES_OK;
/* Clear subscriber bit */
if (sub->hdr && sub->assigned_bit > 0) {
atomic_fetch_and_explicit(&sub->hdr->subscriber_bitmap,
~(1ULL << sub->assigned_bit),
memory_order_release);
atomic_store_explicit(&sub->hdr->subscribers[sub->assigned_bit].state,
0, memory_order_release);
}
if (sub->producer_event) cudaEventDestroy(sub->producer_event);
int ring = sub->hdr ? (int)sub->hdr->ring_size : 0;
if (ring > CUFRAMES_MAX_RING) ring = CUFRAMES_MAX_RING;
for (int i = 0; i < ring; ++i) {
if (sub->mapped_ptrs[i]) cudaIpcCloseMemHandle(sub->mapped_ptrs[i]);
}
if (sub->hdr) munmap(sub->hdr, sizeof(cuframes_shm_header_t));
if (sub->shm_fd >= 0) close(sub->shm_fd);
if (sub->sock_fd >= 0) close(sub->sock_fd);
free(sub);
return CUFRAMES_OK;
}
+82
View File
@@ -0,0 +1,82 @@
/* Async-subscriber: thread + callback. Tonкий wrapper над sync API. */
#include "internal.h"
#include <pthread.h>
struct cuframes_async_subscriber {
cuframes_subscriber_t *inner;
pthread_t thread;
int thread_alive;
int stop_flag;
cuframes_frame_callback_t on_frame;
cuframes_error_callback_t on_error;
void *user_data;
cudaStream_t internal_stream;
};
static void *async_loop(void *arg) {
struct cuframes_async_subscriber *as = (struct cuframes_async_subscriber *)arg;
while (!as->stop_flag) {
cuframes_frame_t *frame = NULL;
int r = cuframes_subscriber_next(as->inner,
as->internal_stream,
&frame, 100);
if (r == CUFRAMES_OK) {
if (as->on_frame) as->on_frame(frame, as->user_data);
cuframes_subscriber_release(as->inner, frame);
} else if (r == CUFRAMES_ERR_TIMEOUT) {
/* normal — just poll again */
} else if (r == CUFRAMES_ERR_WOULD_BLOCK) {
/* shouldn't with timeout>0, but be safe */
struct timespec ts = {.tv_sec = 0, .tv_nsec = 1000000};
nanosleep(&ts, NULL);
} else if (r == CUFRAMES_ERR_DISCONNECTED) {
if (as->on_error) as->on_error(r, "publisher disconnected", as->user_data);
break;
} else {
if (as->on_error) as->on_error(r, cuframes_strerror(r), as->user_data);
break;
}
}
return NULL;
}
int cuframes_async_subscriber_create(const cuframes_subscriber_config_t *cfg,
cuframes_frame_callback_t on_frame,
cuframes_error_callback_t on_error,
void *user_data,
cuframes_async_subscriber_t **out) {
if (!cfg || !on_frame || !out) return CUFRAMES_ERR_INVALID_ARG;
struct cuframes_async_subscriber *as = calloc(1, sizeof(*as));
if (!as) return CUFRAMES_ERR_OUT_OF_MEMORY;
as->on_frame = on_frame;
as->on_error = on_error;
as->user_data = user_data;
int r = cuframes_subscriber_create(cfg, &as->inner);
if (r != CUFRAMES_OK) { free(as); return r; }
cudaError_t cerr = cudaSetDevice(cfg->cuda_device);
if (cerr == cudaSuccess) cudaStreamCreate(&as->internal_stream);
as->stop_flag = 0;
if (pthread_create(&as->thread, NULL, async_loop, as) != 0) {
cuframes_subscriber_destroy(as->inner);
if (as->internal_stream) cudaStreamDestroy(as->internal_stream);
free(as);
return CUFRAMES_ERR_INTERNAL;
}
as->thread_alive = 1;
*out = as;
return CUFRAMES_OK;
}
int cuframes_async_subscriber_destroy(cuframes_async_subscriber_t *as) {
if (!as) return CUFRAMES_OK;
as->stop_flag = 1;
if (as->thread_alive) pthread_join(as->thread, NULL);
if (as->inner) cuframes_subscriber_destroy(as->inner);
if (as->internal_stream) cudaStreamDestroy(as->internal_stream);
free(as);
return CUFRAMES_OK;
}
+184
View File
@@ -0,0 +1,184 @@
/* Internal shared types для libcuframes implementation.
* Не публикуется в include/.
*
* Layout соответствует docs/protocol.md (byte-exact).
*/
#ifndef CUFRAMES_INTERNAL_H
#define CUFRAMES_INTERNAL_H
#define _GNU_SOURCE
#include <cuda_runtime.h>
#include <pthread.h>
#include <stdatomic.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include "cuframes/cuframes.h"
/* ─── Protocol constants ──────────────────────────────────────────────── */
#define CUFRAMES_MAGIC 0xCC7C1DCCu
#define CUFRAMES_PROTOCOL_V1 1u
#define CUFRAMES_MAX_SUBSCRIBERS 32
#define CUFRAMES_MAX_RING 16
#define CUFRAMES_MAX_KEY_LEN 63
#define CUFRAMES_MAX_NAME_LEN 31
#define CUFRAMES_RUNTIME_DIR "/run/cuframes"
#define CUFRAMES_SHM_PREFIX "/cuframes-"
/* ─── Shared memory layout (см. docs/protocol.md §2) ──────────────────── */
/* Frame meta — packed 64 байт */
typedef struct __attribute__((packed)) cuframes_shm_meta {
uint32_t format;
int32_t width;
int32_t height;
int32_t pitch_y;
int32_t pitch_uv;
uint32_t bits_per_pixel;
uint64_t frame_size_bytes;
uint8_t reserved[32];
} cuframes_shm_meta_t;
_Static_assert(sizeof(cuframes_shm_meta_t) == 64, "shm meta must be 64 bytes");
/* Slot descriptor — packed 192 байт */
typedef struct __attribute__((packed)) cuframes_shm_slot {
_Atomic uint64_t seq; /* UINT64_MAX = invalid */
_Atomic int64_t pts_ns;
_Atomic uint64_t ack_bitmap;
uint64_t written_bytes;
cudaIpcMemHandle_t mem_handle; /* 64 байта */
uint8_t cuda_ptr_external[32]; /* informative pointer бite-string */
uint8_t reserved_a[16];
uint8_t reserved_b[48];
} cuframes_shm_slot_t;
_Static_assert(sizeof(cuframes_shm_slot_t) == 192, "slot descriptor must be 192 bytes");
/* Subscriber slot — packed 128 байт */
typedef struct __attribute__((packed)) cuframes_shm_subscriber {
_Atomic uint64_t state; /* 0=free, 1=connecting, 2=active, 3=draining */
uint64_t consumer_pid;
_Atomic uint64_t last_seen_seq;
_Atomic int64_t last_ack_ns;
char consumer_name[32]; /* null-terminated */
uint8_t reserved[64];
} cuframes_shm_subscriber_t;
_Static_assert(sizeof(cuframes_shm_subscriber_t) == 128, "subscriber slot must be 128 bytes");
/* Shared header (header + slots[N] + subscribers[M]). Total ≤ 8KB. */
typedef struct __attribute__((packed)) cuframes_shm_header {
uint32_t magic;
uint32_t proto_version;
uint32_t lib_version_major;
uint32_t lib_version_minor;
uint32_t lib_version_patch;
uint32_t reserved_a;
uint64_t producer_pid;
uint64_t ring_size;
uint64_t ownership_mode;
uint64_t policy;
uint64_t max_subscribers;
cuframes_shm_meta_t meta; /* offset 0x40, 64 bytes */
cudaIpcEventHandle_t ipc_event_handle; /* offset 0x80, 64 bytes */
_Atomic uint64_t global_seq; /* offset 0xC0 */
_Atomic uint64_t subscriber_bitmap;
_Atomic uint64_t shutdown_flag;
uint8_t reserved_b[40];
/* offset 0x100 — variable-length tail */
cuframes_shm_slot_t slots[CUFRAMES_MAX_RING]; /* 192 × 16 = 3072 */
cuframes_shm_subscriber_t subscribers[CUFRAMES_MAX_SUBSCRIBERS]; /* 128 × 32 = 4096 */
} cuframes_shm_header_t;
/* Layout sanity checks (docs/protocol.md §2 table) */
_Static_assert(offsetof(cuframes_shm_header_t, magic) == 0x0000, "magic offset");
_Static_assert(offsetof(cuframes_shm_header_t, proto_version) == 0x0004, "proto_version offset");
_Static_assert(offsetof(cuframes_shm_header_t, producer_pid) == 0x0018, "producer_pid offset");
_Static_assert(offsetof(cuframes_shm_header_t, ring_size) == 0x0020, "ring_size offset");
_Static_assert(offsetof(cuframes_shm_header_t, meta) == 0x0040, "meta offset");
_Static_assert(offsetof(cuframes_shm_header_t, ipc_event_handle) == 0x0080, "event handle offset");
_Static_assert(offsetof(cuframes_shm_header_t, global_seq) == 0x00C0, "global_seq offset");
_Static_assert(offsetof(cuframes_shm_header_t, slots) == 0x0100, "slots offset");
/* ─── Socket protocol messages (docs/protocol.md §3) ───────────────────── */
#define CUFRAMES_MSG_HELLO_REQ 0x01
#define CUFRAMES_MSG_HELLO_RESP 0x02
#define CUFRAMES_MSG_SUBSCRIBE_REQ 0x03
#define CUFRAMES_MSG_SUBSCRIBE_RESP 0x04
#define CUFRAMES_MSG_UNSUBSCRIBE 0x10
#define CUFRAMES_MSG_EVENT_FD 0x20
#define CUFRAMES_MSG_SHUTDOWN 0x30
#define CUFRAMES_MSG_PING 0xF0
#define CUFRAMES_MSG_PONG 0xF1
#define CUFRAMES_MSG_ERROR 0xFE
#define CUFRAMES_MAX_MSG_PAYLOAD 4096
typedef struct __attribute__((packed)) cuframes_msg_header {
uint32_t msg_type;
uint32_t payload_length;
} cuframes_msg_header_t;
typedef struct __attribute__((packed)) cuframes_msg_hello_req {
uint32_t proto_version;
uint32_t consumer_name_len;
/* followed by name bytes */
/* + int32_t cuda_device + uint32_t mode + uint8_t reserved[12] */
} cuframes_msg_hello_req_t;
typedef struct __attribute__((packed)) cuframes_msg_hello_resp {
int32_t result;
uint32_t proto_version_actual;
uint32_t ring_size;
uint32_t ownership_mode;
cuframes_shm_meta_t meta;
uint32_t shm_path_len;
/* followed by path bytes */
/* + uint8_t reserved[12] */
} cuframes_msg_hello_resp_t;
typedef struct __attribute__((packed)) cuframes_msg_subscribe_resp {
int32_t result;
uint32_t assigned_bit;
uint64_t initial_seq;
uint8_t reserved[12];
} cuframes_msg_subscribe_resp_t;
/* ─── Logging (minimal — to stderr) ────────────────────────────────────── */
#define CUFRAMES_LOG_ERROR(fmt, ...) \
fprintf(stderr, "[cuframes ERROR] " fmt "\n", ##__VA_ARGS__)
#define CUFRAMES_LOG_WARN(fmt, ...) \
fprintf(stderr, "[cuframes WARN] " fmt "\n", ##__VA_ARGS__)
#define CUFRAMES_LOG_INFO(fmt, ...) \
do { if (getenv("CUFRAMES_DEBUG")) \
fprintf(stderr, "[cuframes] " fmt "\n", ##__VA_ARGS__); } while (0)
/* ─── Internal helpers ────────────────────────────────────────────────── */
/* Build path /run/cuframes/<key>.sock — returns CUFRAMES_OK or INVALID_ARG */
int cuframes_internal_socket_path(const char *key, char *out, size_t out_size);
/* Build /cuframes-<key> (for shm_open) */
int cuframes_internal_shm_name(const char *key, char *out, size_t out_size);
/* Validate key per protocol.md (alphanum/_/-, 1..63 chars) */
int cuframes_internal_validate_key(const char *key);
/* Calculate frame size + pitch для format/W/H */
int cuframes_internal_calc_size(cuframes_format_t format, int32_t w, int32_t h,
size_t *size_out, int32_t *pitch_y_out, int32_t *pitch_uv_out);
/* Ensure /run/cuframes exists */
int cuframes_internal_ensure_runtime_dir(void);
/* Check if pid alive */
int cuframes_internal_pid_alive(pid_t pid);
/* TLV send/recv helpers — returns 0 on success, negative cuframes_error_t */
int cuframes_internal_send_msg(int sock_fd, uint32_t msg_type,
const void *payload, uint32_t payload_len);
int cuframes_internal_recv_msg(int sock_fd, uint32_t *msg_type_out,
void *payload, uint32_t *payload_len_inout,
int32_t timeout_ms);
#endif /* CUFRAMES_INTERNAL_H */
+667
View File
@@ -0,0 +1,667 @@
/* Publisher implementation (docs/protocol.md §1, §2, §3.2, §4.2, §5). */
#include "internal.h"
#include <errno.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/un.h>
#include <unistd.h>
struct cuframes_publisher {
cuframes_publisher_config_t cfg;
char key[CUFRAMES_MAX_KEY_LEN + 1];
/* IPC resources */
int shm_fd;
cuframes_shm_header_t *hdr;
int listen_fd;
char socket_path[128];
char shm_name[80];
/* CUDA */
cudaEvent_t event;
cudaIpcMemHandle_t ipc_mem[CUFRAMES_MAX_RING];
void *cuda_ptrs[CUFRAMES_MAX_RING]; /* mapped pointers */
size_t frame_size_bytes;
int32_t ring_size_actual;
/* Acquire/publish state — LIBRARY ownership */
uint64_t next_seq;
int32_t current_slot; /* индекс slot'а полученного через acquire() */
int has_acquired;
/* EXTERNAL ownership: map user pointer → ring index */
void *external_ptrs[CUFRAMES_MAX_RING];
int32_t external_count;
/* Subscriber-management thread */
pthread_t accept_thread;
int accept_thread_alive;
int stop_flag;
pthread_mutex_t state_mu; /* protects subscriber connections */
};
/* Forward decls */
static void *accept_thread_main(void *arg);
static int handshake_subscriber(struct cuframes_publisher *pub, int client_fd);
/* ─── Internal: alloc/setup CUDA pool and SHM ─────────────────────────── */
static int alloc_library_pool(struct cuframes_publisher *pub) {
int r = cuframes_internal_calc_size(pub->cfg.format,
pub->cfg.width, pub->cfg.height,
&pub->frame_size_bytes, NULL, NULL);
if (r != CUFRAMES_OK) return r;
pub->ring_size_actual = pub->cfg.ring_size;
cudaError_t cerr = cudaSetDevice(pub->cfg.cuda_device);
if (cerr != cudaSuccess) {
CUFRAMES_LOG_ERROR("cudaSetDevice(%d): %s",
pub->cfg.cuda_device, cudaGetErrorString(cerr));
return CUFRAMES_ERR_CUDA;
}
for (int i = 0; i < pub->ring_size_actual; ++i) {
cerr = cudaMalloc(&pub->cuda_ptrs[i], pub->frame_size_bytes);
if (cerr != cudaSuccess) {
CUFRAMES_LOG_ERROR("cudaMalloc slot %d: %s",
i, cudaGetErrorString(cerr));
return CUFRAMES_ERR_CUDA;
}
cerr = cudaIpcGetMemHandle(&pub->ipc_mem[i], pub->cuda_ptrs[i]);
if (cerr != cudaSuccess) {
CUFRAMES_LOG_ERROR("cudaIpcGetMemHandle slot %d: %s",
i, cudaGetErrorString(cerr));
return CUFRAMES_ERR_CUDA;
}
}
return CUFRAMES_OK;
}
static int register_external_pool(struct cuframes_publisher *pub,
void *const *ptrs, int32_t count,
size_t frame_size) {
if (count < 1 || count > CUFRAMES_MAX_RING) return CUFRAMES_ERR_INVALID_ARG;
pub->frame_size_bytes = frame_size;
pub->ring_size_actual = count;
pub->external_count = count;
cudaError_t cerr = cudaSetDevice(pub->cfg.cuda_device);
if (cerr != cudaSuccess) {
CUFRAMES_LOG_ERROR("cudaSetDevice: %s", cudaGetErrorString(cerr));
return CUFRAMES_ERR_CUDA;
}
for (int i = 0; i < count; ++i) {
if (!ptrs[i]) return CUFRAMES_ERR_INVALID_ARG;
pub->cuda_ptrs[i] = ptrs[i];
pub->external_ptrs[i] = ptrs[i];
cerr = cudaIpcGetMemHandle(&pub->ipc_mem[i], ptrs[i]);
if (cerr != cudaSuccess) {
CUFRAMES_LOG_ERROR("cudaIpcGetMemHandle on external ptr %p: %s",
ptrs[i], cudaGetErrorString(cerr));
return CUFRAMES_ERR_CUDA;
}
}
return CUFRAMES_OK;
}
static int create_event_handle(struct cuframes_publisher *pub) {
cudaError_t cerr = cudaEventCreateWithFlags(&pub->event,
cudaEventDisableTiming | cudaEventInterprocess);
if (cerr != cudaSuccess) {
CUFRAMES_LOG_ERROR("cudaEventCreateWithFlags: %s",
cudaGetErrorString(cerr));
return CUFRAMES_ERR_CUDA;
}
return CUFRAMES_OK;
}
static int setup_shm(struct cuframes_publisher *pub) {
/* /dev/shm/cuframes-<key> */
int r = cuframes_internal_shm_name(pub->key, pub->shm_name,
sizeof(pub->shm_name));
if (r != CUFRAMES_OK) return r;
pub->shm_fd = shm_open(pub->shm_name, O_CREAT | O_EXCL | O_RDWR, 0644);
if (pub->shm_fd < 0) {
if (errno == EEXIST) {
/* Check if previous owner alive */
int existing = shm_open(pub->shm_name, O_RDWR, 0);
if (existing >= 0) {
cuframes_shm_header_t tmp;
ssize_t rb = read(existing, &tmp, sizeof(tmp));
close(existing);
if (rb == (ssize_t)sizeof(tmp) && tmp.magic == CUFRAMES_MAGIC) {
if (cuframes_internal_pid_alive((pid_t)tmp.producer_pid)) {
CUFRAMES_LOG_ERROR("publisher with key=%s already running (pid %lu)",
pub->key, (unsigned long)tmp.producer_pid);
return CUFRAMES_ERR_ALREADY_EXISTS;
}
}
}
CUFRAMES_LOG_INFO("stale shm %s — unlinking", pub->shm_name);
shm_unlink(pub->shm_name);
pub->shm_fd = shm_open(pub->shm_name, O_CREAT | O_EXCL | O_RDWR, 0644);
if (pub->shm_fd < 0) return CUFRAMES_ERR_IO;
} else {
CUFRAMES_LOG_ERROR("shm_open: %s", strerror(errno));
return CUFRAMES_ERR_IO;
}
}
if (ftruncate(pub->shm_fd, sizeof(cuframes_shm_header_t)) < 0) {
CUFRAMES_LOG_ERROR("ftruncate shm: %s", strerror(errno));
return CUFRAMES_ERR_IO;
}
pub->hdr = (cuframes_shm_header_t *)mmap(NULL, sizeof(cuframes_shm_header_t),
PROT_READ | PROT_WRITE, MAP_SHARED, pub->shm_fd, 0);
if (pub->hdr == MAP_FAILED) {
CUFRAMES_LOG_ERROR("mmap shm: %s", strerror(errno));
pub->hdr = NULL;
return CUFRAMES_ERR_IO;
}
memset(pub->hdr, 0, sizeof(cuframes_shm_header_t));
pub->hdr->magic = CUFRAMES_MAGIC;
pub->hdr->proto_version = CUFRAMES_PROTOCOL_V1;
pub->hdr->lib_version_major = CUFRAMES_VERSION_MAJOR;
pub->hdr->lib_version_minor = CUFRAMES_VERSION_MINOR;
pub->hdr->lib_version_patch = CUFRAMES_VERSION_PATCH;
pub->hdr->producer_pid = (uint64_t)getpid();
pub->hdr->ring_size = (uint64_t)pub->ring_size_actual;
pub->hdr->ownership_mode = (uint64_t)pub->cfg.ownership;
pub->hdr->policy = (uint64_t)pub->cfg.policy;
pub->hdr->max_subscribers = CUFRAMES_MAX_SUBSCRIBERS;
int32_t py = 0, puv = 0;
cuframes_internal_calc_size(pub->cfg.format, pub->cfg.width, pub->cfg.height,
NULL, &py, &puv);
pub->hdr->meta.format = (uint32_t)pub->cfg.format;
pub->hdr->meta.width = pub->cfg.width;
pub->hdr->meta.height = pub->cfg.height;
pub->hdr->meta.pitch_y = py;
pub->hdr->meta.pitch_uv = puv;
pub->hdr->meta.frame_size_bytes = pub->frame_size_bytes;
/* Export event handle */
cudaError_t cerr = cudaIpcGetEventHandle(&pub->hdr->ipc_event_handle, pub->event);
if (cerr != cudaSuccess) {
CUFRAMES_LOG_ERROR("cudaIpcGetEventHandle: %s", cudaGetErrorString(cerr));
return CUFRAMES_ERR_CUDA;
}
/* Fill slot descriptors */
for (int i = 0; i < pub->ring_size_actual; ++i) {
pub->hdr->slots[i].mem_handle = pub->ipc_mem[i];
atomic_store_explicit(&pub->hdr->slots[i].seq, UINT64_MAX,
memory_order_release);
}
atomic_store_explicit(&pub->hdr->global_seq, UINT64_MAX, memory_order_release);
return CUFRAMES_OK;
}
static int setup_socket(struct cuframes_publisher *pub) {
int r = cuframes_internal_ensure_runtime_dir();
if (r != CUFRAMES_OK) return r;
r = cuframes_internal_socket_path(pub->key, pub->socket_path,
sizeof(pub->socket_path));
if (r != CUFRAMES_OK) return r;
/* Cleanup stale socket */
struct stat st;
if (stat(pub->socket_path, &st) == 0) {
/* Try connect — if works, busy */
int probe = socket(AF_UNIX, SOCK_STREAM, 0);
if (probe >= 0) {
struct sockaddr_un sa = {.sun_family = AF_UNIX};
strncpy(sa.sun_path, pub->socket_path, sizeof(sa.sun_path) - 1);
int c = connect(probe, (struct sockaddr *)&sa, sizeof(sa));
close(probe);
if (c == 0) {
CUFRAMES_LOG_ERROR("socket %s is in use", pub->socket_path);
return CUFRAMES_ERR_ALREADY_EXISTS;
}
}
unlink(pub->socket_path);
}
pub->listen_fd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0);
if (pub->listen_fd < 0) {
CUFRAMES_LOG_ERROR("socket: %s", strerror(errno));
return CUFRAMES_ERR_IO;
}
struct sockaddr_un sa = {.sun_family = AF_UNIX};
strncpy(sa.sun_path, pub->socket_path, sizeof(sa.sun_path) - 1);
if (bind(pub->listen_fd, (struct sockaddr *)&sa, sizeof(sa)) < 0) {
CUFRAMES_LOG_ERROR("bind %s: %s", pub->socket_path, strerror(errno));
return CUFRAMES_ERR_IO;
}
if (listen(pub->listen_fd, 8) < 0) {
CUFRAMES_LOG_ERROR("listen: %s", strerror(errno));
return CUFRAMES_ERR_IO;
}
return CUFRAMES_OK;
}
/* ─── Public API ──────────────────────────────────────────────────────── */
static int validate_config(const cuframes_publisher_config_t *cfg) {
if (!cfg) return CUFRAMES_ERR_INVALID_ARG;
if (!cfg->key) return CUFRAMES_ERR_INVALID_ARG;
if (cuframes_internal_validate_key(cfg->key) != CUFRAMES_OK)
return CUFRAMES_ERR_INVALID_ARG;
if (cfg->width <= 0 || cfg->height <= 0) return CUFRAMES_ERR_INVALID_ARG;
if (cfg->ownership != CUFRAMES_OWNERSHIP_LIBRARY &&
cfg->ownership != CUFRAMES_OWNERSHIP_EXTERNAL)
return CUFRAMES_ERR_INVALID_ARG;
if (cfg->ownership == CUFRAMES_OWNERSHIP_LIBRARY) {
if (cfg->ring_size < 2 || cfg->ring_size > CUFRAMES_MAX_RING)
return CUFRAMES_ERR_INVALID_ARG;
}
if (cfg->policy != CUFRAMES_POLICY_DROP_OLDEST &&
cfg->policy != CUFRAMES_POLICY_STRICT_WAIT)
return CUFRAMES_ERR_INVALID_ARG;
return CUFRAMES_OK;
}
static int common_init(struct cuframes_publisher *pub,
const cuframes_publisher_config_t *cfg) {
pub->cfg = *cfg;
strncpy(pub->key, cfg->key, sizeof(pub->key) - 1);
pub->key[sizeof(pub->key) - 1] = '\0';
pub->shm_fd = -1;
pub->listen_fd = -1;
pub->next_seq = 0;
pub->current_slot = -1;
pub->has_acquired = 0;
pthread_mutex_init(&pub->state_mu, NULL);
return CUFRAMES_OK;
}
int cuframes_publisher_create(const cuframes_publisher_config_t *cfg,
cuframes_publisher_t **out) {
int r = validate_config(cfg);
if (r != CUFRAMES_OK) return r;
if (cfg->ownership != CUFRAMES_OWNERSHIP_LIBRARY) return CUFRAMES_ERR_INVALID_ARG;
struct cuframes_publisher *pub = calloc(1, sizeof(*pub));
if (!pub) return CUFRAMES_ERR_OUT_OF_MEMORY;
common_init(pub, cfg);
if ((r = alloc_library_pool(pub)) != CUFRAMES_OK) goto fail;
if ((r = create_event_handle(pub)) != CUFRAMES_OK) goto fail;
if ((r = setup_shm(pub)) != CUFRAMES_OK) goto fail;
if ((r = setup_socket(pub)) != CUFRAMES_OK) goto fail;
/* Start accept thread */
pub->stop_flag = 0;
if (pthread_create(&pub->accept_thread, NULL, accept_thread_main, pub) != 0) {
r = CUFRAMES_ERR_INTERNAL;
goto fail;
}
pub->accept_thread_alive = 1;
CUFRAMES_LOG_INFO("publisher '%s' ready (ring=%d, %dx%d, fmt=%d, lib-owned)",
pub->key, pub->ring_size_actual,
pub->cfg.width, pub->cfg.height, (int)pub->cfg.format);
*out = pub;
return CUFRAMES_OK;
fail:
cuframes_publisher_destroy(pub);
return r;
}
int cuframes_publisher_create_external(const cuframes_publisher_config_t *cfg,
void *const *cuda_ptrs,
int32_t ptr_count,
size_t frame_size,
cuframes_publisher_t **out) {
int r = validate_config(cfg);
if (r != CUFRAMES_OK) return r;
if (cfg->ownership != CUFRAMES_OWNERSHIP_EXTERNAL) return CUFRAMES_ERR_INVALID_ARG;
if (!cuda_ptrs || ptr_count < 1) return CUFRAMES_ERR_INVALID_ARG;
if (frame_size == 0) return CUFRAMES_ERR_INVALID_ARG;
struct cuframes_publisher *pub = calloc(1, sizeof(*pub));
if (!pub) return CUFRAMES_ERR_OUT_OF_MEMORY;
common_init(pub, cfg);
if ((r = register_external_pool(pub, cuda_ptrs, ptr_count, frame_size)) != CUFRAMES_OK)
goto fail;
if ((r = create_event_handle(pub)) != CUFRAMES_OK) goto fail;
if ((r = setup_shm(pub)) != CUFRAMES_OK) goto fail;
if ((r = setup_socket(pub)) != CUFRAMES_OK) goto fail;
pub->stop_flag = 0;
if (pthread_create(&pub->accept_thread, NULL, accept_thread_main, pub) != 0) {
r = CUFRAMES_ERR_INTERNAL;
goto fail;
}
pub->accept_thread_alive = 1;
CUFRAMES_LOG_INFO("publisher '%s' ready (external pool=%d, %dx%d, fmt=%d)",
pub->key, ptr_count,
pub->cfg.width, pub->cfg.height, (int)pub->cfg.format);
*out = pub;
return CUFRAMES_OK;
fail:
cuframes_publisher_destroy(pub);
return r;
}
int cuframes_publisher_acquire(cuframes_publisher_t *pub, void **cuda_ptr_out) {
if (!pub || !cuda_ptr_out) return CUFRAMES_ERR_INVALID_ARG;
if (pub->cfg.ownership != CUFRAMES_OWNERSHIP_LIBRARY) return CUFRAMES_ERR_INVALID_ARG;
if (pub->has_acquired) return CUFRAMES_ERR_INVALID_ARG;
int32_t slot = (int32_t)(pub->next_seq % (uint64_t)pub->ring_size_actual);
/* STRICT_WAIT: ждём пока ACK bitmap всех subscribers очистится для этого slot */
if (pub->cfg.policy == CUFRAMES_POLICY_STRICT_WAIT) {
uint64_t bitmap = atomic_load_explicit(&pub->hdr->subscriber_bitmap,
memory_order_acquire);
if (bitmap != 0) {
int64_t deadline = pub->cfg.consumer_ack_timeout_ms > 0
? cuframes_now_ns() + (int64_t)pub->cfg.consumer_ack_timeout_ms * 1000000LL
: 0;
while (1) {
uint64_t ack = atomic_load_explicit(&pub->hdr->slots[slot].ack_bitmap,
memory_order_acquire);
/* Если slot ещё не публикован (seq == UINT64_MAX) — пропустить ack check */
uint64_t cur_seq = atomic_load_explicit(&pub->hdr->slots[slot].seq,
memory_order_acquire);
if (cur_seq == UINT64_MAX || (ack & bitmap) == bitmap) break;
if (deadline && cuframes_now_ns() > deadline) {
/* Mark slow subscriber dead и continue */
uint64_t missing = bitmap & ~ack;
CUFRAMES_LOG_WARN("strict-wait timeout, slow subscribers bitmap=0x%lx",
(unsigned long)missing);
/* clear missing subscribers — TODO: send unsubscribe in v0.2 */
atomic_fetch_and_explicit(&pub->hdr->subscriber_bitmap,
~missing, memory_order_release);
break;
}
struct timespec ts = {.tv_sec = 0, .tv_nsec = 100000}; /* 100µs */
nanosleep(&ts, NULL);
}
}
}
*cuda_ptr_out = pub->cuda_ptrs[slot];
pub->current_slot = slot;
pub->has_acquired = 1;
return CUFRAMES_OK;
}
static int do_publish(cuframes_publisher_t *pub, int32_t slot,
void *stream, int64_t pts_ns) {
/* Record event on producer's stream */
cudaError_t cerr = cudaEventRecord(pub->event, (cudaStream_t)stream);
if (cerr != cudaSuccess) {
CUFRAMES_LOG_ERROR("cudaEventRecord: %s", cudaGetErrorString(cerr));
return CUFRAMES_ERR_CUDA;
}
/* Reset ack bitmap для нового frame'а */
atomic_store_explicit(&pub->hdr->slots[slot].ack_bitmap, 0,
memory_order_release);
atomic_store_explicit(&pub->hdr->slots[slot].pts_ns, pts_ns,
memory_order_release);
atomic_store_explicit(&pub->hdr->slots[slot].seq, pub->next_seq,
memory_order_release);
/* Publication barrier */
atomic_store_explicit(&pub->hdr->global_seq, pub->next_seq,
memory_order_release);
pub->next_seq++;
return CUFRAMES_OK;
}
int cuframes_publisher_publish(cuframes_publisher_t *pub, void *stream, int64_t pts_ns) {
if (!pub) return CUFRAMES_ERR_INVALID_ARG;
if (pub->cfg.ownership != CUFRAMES_OWNERSHIP_LIBRARY) return CUFRAMES_ERR_INVALID_ARG;
if (!pub->has_acquired) return CUFRAMES_ERR_INVALID_ARG;
int r = do_publish(pub, pub->current_slot, stream, pts_ns);
pub->has_acquired = 0;
pub->current_slot = -1;
return r;
}
int cuframes_publisher_publish_external(cuframes_publisher_t *pub,
void *cuda_ptr, void *stream, int64_t pts_ns) {
if (!pub || !cuda_ptr) return CUFRAMES_ERR_INVALID_ARG;
if (pub->cfg.ownership != CUFRAMES_OWNERSHIP_EXTERNAL) return CUFRAMES_ERR_INVALID_ARG;
int32_t slot = -1;
for (int i = 0; i < pub->external_count; ++i) {
if (pub->external_ptrs[i] == cuda_ptr) { slot = i; break; }
}
if (slot < 0) {
CUFRAMES_LOG_ERROR("external pointer %p not registered", cuda_ptr);
return CUFRAMES_ERR_INVALID_ARG;
}
/* STRICT_WAIT — то же что в acquire, но per-publish */
if (pub->cfg.policy == CUFRAMES_POLICY_STRICT_WAIT) {
uint64_t bitmap = atomic_load_explicit(&pub->hdr->subscriber_bitmap,
memory_order_acquire);
if (bitmap != 0) {
int64_t deadline = pub->cfg.consumer_ack_timeout_ms > 0
? cuframes_now_ns() + (int64_t)pub->cfg.consumer_ack_timeout_ms * 1000000LL
: 0;
while (1) {
uint64_t ack = atomic_load_explicit(&pub->hdr->slots[slot].ack_bitmap,
memory_order_acquire);
uint64_t cur_seq = atomic_load_explicit(&pub->hdr->slots[slot].seq,
memory_order_acquire);
if (cur_seq == UINT64_MAX || (ack & bitmap) == bitmap) break;
if (deadline && cuframes_now_ns() > deadline) {
uint64_t missing = bitmap & ~ack;
atomic_fetch_and_explicit(&pub->hdr->subscriber_bitmap,
~missing, memory_order_release);
break;
}
struct timespec ts = {.tv_sec = 0, .tv_nsec = 100000};
nanosleep(&ts, NULL);
}
}
}
return do_publish(pub, slot, stream, pts_ns);
}
int cuframes_publisher_destroy(cuframes_publisher_t *pub) {
if (!pub) return CUFRAMES_OK;
if (pub->hdr) {
atomic_store_explicit(&pub->hdr->shutdown_flag, 1, memory_order_release);
}
/* Stop accept thread */
pub->stop_flag = 1;
if (pub->listen_fd >= 0) {
shutdown(pub->listen_fd, SHUT_RDWR);
close(pub->listen_fd);
pub->listen_fd = -1;
}
if (pub->accept_thread_alive) {
pthread_join(pub->accept_thread, NULL);
pub->accept_thread_alive = 0;
}
/* Free CUDA */
if (pub->cfg.ownership == CUFRAMES_OWNERSHIP_LIBRARY) {
for (int i = 0; i < pub->ring_size_actual; ++i) {
if (pub->cuda_ptrs[i]) cudaFree(pub->cuda_ptrs[i]);
}
}
if (pub->event) cudaEventDestroy(pub->event);
/* Unlink resources */
if (pub->hdr) {
munmap(pub->hdr, sizeof(cuframes_shm_header_t));
pub->hdr = NULL;
}
if (pub->shm_fd >= 0) {
close(pub->shm_fd);
shm_unlink(pub->shm_name);
pub->shm_fd = -1;
}
if (pub->socket_path[0]) {
unlink(pub->socket_path);
}
pthread_mutex_destroy(&pub->state_mu);
free(pub);
return CUFRAMES_OK;
}
/* ─── Accept thread + handshake ──────────────────────────────────────── */
static void *accept_thread_main(void *arg) {
struct cuframes_publisher *pub = (struct cuframes_publisher *)arg;
while (!pub->stop_flag) {
struct sockaddr_un sa;
socklen_t sl = sizeof(sa);
int client = accept(pub->listen_fd, (struct sockaddr *)&sa, &sl);
if (client < 0) {
if (pub->stop_flag) break;
if (errno == EINTR) continue;
CUFRAMES_LOG_WARN("accept: %s", strerror(errno));
continue;
}
/* Synchronous handshake — после ответа socket остаётся открытым для
* lifetime signals (SHUTDOWN, PING). Close на error. */
int r = handshake_subscriber(pub, client);
if (r != CUFRAMES_OK) {
close(client);
}
/* TODO v0.2: track client fds для broadcast SHUTDOWN. Сейчас clients
* сами detect socket EOF при publisher_destroy через shutdown(). */
}
return NULL;
}
static int allocate_subscriber_bit(struct cuframes_publisher *pub,
const char *name, uint32_t *bit_out) {
/* Bit 0 reserved (sentinel). Bits 1..31. */
pthread_mutex_lock(&pub->state_mu);
for (uint32_t bit = 1; bit < CUFRAMES_MAX_SUBSCRIBERS; ++bit) {
uint64_t state = atomic_load_explicit(&pub->hdr->subscribers[bit].state,
memory_order_acquire);
if (state == 0) {
atomic_store_explicit(&pub->hdr->subscribers[bit].state, 1,
memory_order_release);
memset(pub->hdr->subscribers[bit].consumer_name, 0,
sizeof(pub->hdr->subscribers[bit].consumer_name));
if (name) {
strncpy(pub->hdr->subscribers[bit].consumer_name, name,
sizeof(pub->hdr->subscribers[bit].consumer_name) - 1);
}
atomic_fetch_or_explicit(&pub->hdr->subscriber_bitmap,
1ULL << bit, memory_order_release);
*bit_out = bit;
pthread_mutex_unlock(&pub->state_mu);
return CUFRAMES_OK;
}
/* Check for name collision */
if (name && state >= 2 &&
strncmp(pub->hdr->subscribers[bit].consumer_name, name,
sizeof(pub->hdr->subscribers[bit].consumer_name)) == 0) {
pthread_mutex_unlock(&pub->state_mu);
return CUFRAMES_ERR_ALREADY_EXISTS;
}
}
pthread_mutex_unlock(&pub->state_mu);
return CUFRAMES_ERR_TOO_MANY;
}
static int handshake_subscriber(struct cuframes_publisher *pub, int client_fd) {
/* recv HELLO_REQ */
uint8_t buf[CUFRAMES_MAX_MSG_PAYLOAD];
uint32_t plen = sizeof(buf);
uint32_t mtype = 0;
int r = cuframes_internal_recv_msg(client_fd, &mtype, buf, &plen, 5000);
if (r != CUFRAMES_OK) {
CUFRAMES_LOG_WARN("handshake recv HELLO: %s", cuframes_strerror(r));
return r;
}
if (mtype != CUFRAMES_MSG_HELLO_REQ) {
CUFRAMES_LOG_WARN("handshake expected HELLO_REQ got 0x%x", mtype);
return CUFRAMES_ERR_PROTOCOL;
}
/* Parse HELLO_REQ: proto_version + name_len + name + cuda_device + mode */
if (plen < sizeof(cuframes_msg_hello_req_t) + 20) return CUFRAMES_ERR_PROTOCOL;
cuframes_msg_hello_req_t *hreq = (cuframes_msg_hello_req_t *)buf;
uint32_t want_proto = hreq->proto_version;
uint32_t name_len = hreq->consumer_name_len;
if (name_len > 31) name_len = 31;
char name[32] = {0};
memcpy(name, buf + sizeof(*hreq), name_len);
int proto_match = (want_proto == CUFRAMES_PROTOCOL_V1);
/* Send HELLO_RESP */
uint8_t resp_buf[CUFRAMES_MAX_MSG_PAYLOAD];
cuframes_msg_hello_resp_t *resp = (cuframes_msg_hello_resp_t *)resp_buf;
memset(resp, 0, sizeof(*resp));
resp->result = proto_match ? CUFRAMES_OK : CUFRAMES_ERR_PROTOCOL;
resp->proto_version_actual = CUFRAMES_PROTOCOL_V1;
resp->ring_size = (uint32_t)pub->ring_size_actual;
resp->ownership_mode = (uint32_t)pub->cfg.ownership;
resp->meta = pub->hdr->meta;
/* shm_path */
int slen = snprintf((char *)(resp_buf + sizeof(*resp)),
sizeof(resp_buf) - sizeof(*resp) - 12,
"%s", pub->shm_name);
resp->shm_path_len = (uint32_t)slen;
uint32_t total = sizeof(*resp) + (uint32_t)slen + 12;
r = cuframes_internal_send_msg(client_fd, CUFRAMES_MSG_HELLO_RESP,
resp_buf, total);
if (r != CUFRAMES_OK) {
CUFRAMES_LOG_WARN("send HELLO_RESP: %s", cuframes_strerror(r));
return r;
}
if (!proto_match) return CUFRAMES_ERR_PROTOCOL;
/* recv SUBSCRIBE_REQ */
plen = sizeof(buf);
r = cuframes_internal_recv_msg(client_fd, &mtype, buf, &plen, 5000);
if (r != CUFRAMES_OK) return r;
if (mtype != CUFRAMES_MSG_SUBSCRIBE_REQ) return CUFRAMES_ERR_PROTOCOL;
/* Allocate subscriber bit */
uint32_t bit = 0;
int alloc_r = allocate_subscriber_bit(pub, name, &bit);
/* Send SUBSCRIBE_RESP */
cuframes_msg_subscribe_resp_t sresp = {0};
sresp.result = alloc_r;
sresp.assigned_bit = bit;
sresp.initial_seq = atomic_load_explicit(&pub->hdr->global_seq,
memory_order_acquire);
r = cuframes_internal_send_msg(client_fd, CUFRAMES_MSG_SUBSCRIBE_RESP,
&sresp, sizeof(sresp));
if (r != CUFRAMES_OK || alloc_r != CUFRAMES_OK) return r ? r : alloc_r;
/* Activate subscriber slot */
atomic_store_explicit(&pub->hdr->subscribers[bit].state, 2,
memory_order_release);
CUFRAMES_LOG_INFO("subscriber '%s' connected (bit=%u)", name, bit);
/* TODO v0.2: spawn per-client thread для liveness/PING/UNSUBSCRIBE.
* Сейчас socket остаётся открытым на heap'е до publisher_destroy. */
return CUFRAMES_OK;
}
+99
View File
@@ -0,0 +1,99 @@
/* Socket TLV framing (docs/protocol.md §3). */
#include "internal.h"
#include <errno.h>
#include <poll.h>
#include <sys/socket.h>
#include <unistd.h>
/* Read exactly N bytes from socket, with poll-based timeout. */
static int recv_all(int fd, void *buf, size_t n, int32_t timeout_ms) {
uint8_t *p = (uint8_t *)buf;
size_t got = 0;
int64_t deadline_ns = (timeout_ms < 0)
? INT64_MAX
: cuframes_now_ns() + (int64_t)timeout_ms * 1000000LL;
while (got < n) {
if (timeout_ms >= 0) {
int64_t remain_ns = deadline_ns - cuframes_now_ns();
if (remain_ns <= 0) return CUFRAMES_ERR_TIMEOUT;
struct pollfd pfd = {.fd = fd, .events = POLLIN};
int pr = poll(&pfd, 1, (int)(remain_ns / 1000000LL));
if (pr == 0) return CUFRAMES_ERR_TIMEOUT;
if (pr < 0) {
if (errno == EINTR) continue;
return CUFRAMES_ERR_IO;
}
}
ssize_t r = recv(fd, p + got, n - got, 0);
if (r == 0) return CUFRAMES_ERR_DISCONNECTED;
if (r < 0) {
if (errno == EINTR) continue;
if (errno == EAGAIN || errno == EWOULDBLOCK) {
if (timeout_ms == 0) return CUFRAMES_ERR_WOULD_BLOCK;
continue;
}
return CUFRAMES_ERR_IO;
}
got += (size_t)r;
}
return CUFRAMES_OK;
}
static int send_all(int fd, const void *buf, size_t n) {
const uint8_t *p = (const uint8_t *)buf;
size_t sent = 0;
while (sent < n) {
ssize_t s = send(fd, p + sent, n - sent, MSG_NOSIGNAL);
if (s < 0) {
if (errno == EINTR) continue;
if (errno == EPIPE) return CUFRAMES_ERR_DISCONNECTED;
return CUFRAMES_ERR_IO;
}
sent += (size_t)s;
}
return CUFRAMES_OK;
}
int cuframes_internal_send_msg(int fd, uint32_t msg_type,
const void *payload, uint32_t payload_len) {
if (payload_len > CUFRAMES_MAX_MSG_PAYLOAD) return CUFRAMES_ERR_INVALID_ARG;
cuframes_msg_header_t h = {.msg_type = msg_type, .payload_length = payload_len};
int r = send_all(fd, &h, sizeof(h));
if (r != CUFRAMES_OK) return r;
if (payload_len > 0 && payload) {
r = send_all(fd, payload, payload_len);
}
return r;
}
int cuframes_internal_recv_msg(int fd, uint32_t *msg_type_out,
void *payload, uint32_t *payload_len_inout,
int32_t timeout_ms) {
cuframes_msg_header_t h;
int r = recv_all(fd, &h, sizeof(h), timeout_ms);
if (r != CUFRAMES_OK) return r;
if (h.payload_length > CUFRAMES_MAX_MSG_PAYLOAD) return CUFRAMES_ERR_PROTOCOL;
if (msg_type_out) *msg_type_out = h.msg_type;
if (h.payload_length > 0) {
if (!payload || !payload_len_inout || *payload_len_inout < h.payload_length) {
/* Drain unread bytes to keep socket in sync */
uint8_t drain[256];
uint32_t left = h.payload_length;
while (left > 0) {
size_t take = left < sizeof(drain) ? left : sizeof(drain);
int dr = recv_all(fd, drain, take, timeout_ms);
if (dr != CUFRAMES_OK) return dr;
left -= (uint32_t)take;
}
if (payload_len_inout) *payload_len_inout = h.payload_length;
return CUFRAMES_ERR_INVALID_ARG;
}
r = recv_all(fd, payload, h.payload_length, timeout_ms);
if (r != CUFRAMES_OK) return r;
}
if (payload_len_inout) *payload_len_inout = h.payload_length;
return CUFRAMES_OK;
}
+149
View File
@@ -0,0 +1,149 @@
/* Utility functions: error strings, frame size calc, timestamps. */
#include "internal.h"
#include <ctype.h>
#include <errno.h>
#include <signal.h>
#include <stdio.h>
#include <sys/stat.h>
#include <time.h>
const char *cuframes_version_string(void) {
static char buf[32];
snprintf(buf, sizeof(buf), "%d.%d.%d",
CUFRAMES_VERSION_MAJOR, CUFRAMES_VERSION_MINOR, CUFRAMES_VERSION_PATCH);
return buf;
}
uint32_t cuframes_protocol_version(void) { return CUFRAMES_PROTOCOL_V1; }
const char *cuframes_strerror(int err) {
switch (err) {
case CUFRAMES_OK: return "ok";
case CUFRAMES_ERR_INVALID_ARG: return "invalid argument";
case CUFRAMES_ERR_OUT_OF_MEMORY: return "out of memory";
case CUFRAMES_ERR_CUDA: return "cuda error";
case CUFRAMES_ERR_IO: return "I/O error (socket/mmap/eventfd)";
case CUFRAMES_ERR_NOT_FOUND: return "publisher not found";
case CUFRAMES_ERR_ALREADY_EXISTS: return "publisher/subscriber-name already exists";
case CUFRAMES_ERR_TIMEOUT: return "timeout";
case CUFRAMES_ERR_PROTOCOL: return "wire protocol mismatch";
case CUFRAMES_ERR_DISCONNECTED: return "disconnected";
case CUFRAMES_ERR_FORMAT: return "unsupported format or size mismatch";
case CUFRAMES_ERR_WOULD_BLOCK: return "would block";
case CUFRAMES_ERR_TOO_MANY: return "too many subscribers (max 32)";
case CUFRAMES_ERR_INTERNAL: return "internal error (please report)";
default: return "unknown error";
}
}
int64_t cuframes_now_ns(void) {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return (int64_t)ts.tv_sec * 1000000000LL + ts.tv_nsec;
}
int cuframes_calc_frame_size(cuframes_format_t format,
int32_t width, int32_t height,
size_t *size_out,
int32_t *pitch_y_out,
int32_t *pitch_uv_out) {
return cuframes_internal_calc_size(format, width, height,
size_out, pitch_y_out, pitch_uv_out);
}
/* ─── Internal helpers ────────────────────────────────────────────────── */
int cuframes_internal_validate_key(const char *key) {
if (!key) return CUFRAMES_ERR_INVALID_ARG;
size_t len = strlen(key);
if (len < 1 || len > CUFRAMES_MAX_KEY_LEN) return CUFRAMES_ERR_INVALID_ARG;
for (size_t i = 0; i < len; ++i) {
char c = key[i];
if (!(isalnum((unsigned char)c) || c == '_' || c == '-')) {
return CUFRAMES_ERR_INVALID_ARG;
}
}
return CUFRAMES_OK;
}
int cuframes_internal_socket_path(const char *key, char *out, size_t out_size) {
int r = cuframes_internal_validate_key(key);
if (r != CUFRAMES_OK) return r;
int n = snprintf(out, out_size, "%s/%s.sock", CUFRAMES_RUNTIME_DIR, key);
if (n < 0 || (size_t)n >= out_size) return CUFRAMES_ERR_INVALID_ARG;
return CUFRAMES_OK;
}
int cuframes_internal_shm_name(const char *key, char *out, size_t out_size) {
int r = cuframes_internal_validate_key(key);
if (r != CUFRAMES_OK) return r;
int n = snprintf(out, out_size, "%s%s", CUFRAMES_SHM_PREFIX, key);
if (n < 0 || (size_t)n >= out_size) return CUFRAMES_ERR_INVALID_ARG;
return CUFRAMES_OK;
}
int cuframes_internal_ensure_runtime_dir(void) {
if (mkdir(CUFRAMES_RUNTIME_DIR, 0755) == 0) return CUFRAMES_OK;
if (errno == EEXIST) return CUFRAMES_OK;
CUFRAMES_LOG_ERROR("cannot create %s: %s",
CUFRAMES_RUNTIME_DIR, strerror(errno));
return CUFRAMES_ERR_IO;
}
int cuframes_internal_pid_alive(pid_t pid) {
if (pid <= 0) return 0;
if (kill(pid, 0) == 0) return 1;
return (errno == EPERM) ? 1 : 0;
}
/* Возвращает размер frame'а + pitch'ы. Pitch aligned 256.
* NV12: Y w×h + UV w×h/2 (interleaved) — оба с тем же pitch_y.
* YUV420P: Y w×h + U w/2×h/2 + V w/2×h/2.
* RGB/BGR: 3 bytes per pixel, single plane.
* RGBA: 4 bytes per pixel.
* GRAYSCALE: 1 byte per pixel.
*/
int cuframes_internal_calc_size(cuframes_format_t format, int32_t w, int32_t h,
size_t *size_out, int32_t *pitch_y_out, int32_t *pitch_uv_out) {
if (w <= 0 || h <= 0) return CUFRAMES_ERR_INVALID_ARG;
int32_t py = 0, puv = 0;
size_t total = 0;
/* round up to 256-byte alignment for CUDA */
#define ALIGN256(x) (((x) + 255u) & ~255u)
switch (format) {
case CUFRAMES_FORMAT_NV12:
py = (int32_t)ALIGN256((uint32_t)w);
puv = py;
total = (size_t)py * (size_t)h + (size_t)puv * (size_t)(h / 2);
break;
case CUFRAMES_FORMAT_YUV420P:
py = (int32_t)ALIGN256((uint32_t)w);
puv = (int32_t)ALIGN256((uint32_t)(w / 2));
total = (size_t)py * (size_t)h + 2 * (size_t)puv * (size_t)(h / 2);
break;
case CUFRAMES_FORMAT_RGB:
case CUFRAMES_FORMAT_BGR:
py = (int32_t)ALIGN256((uint32_t)(w * 3));
total = (size_t)py * (size_t)h;
break;
case CUFRAMES_FORMAT_RGBA:
py = (int32_t)ALIGN256((uint32_t)(w * 4));
total = (size_t)py * (size_t)h;
break;
case CUFRAMES_FORMAT_GRAYSCALE:
py = (int32_t)ALIGN256((uint32_t)w);
total = (size_t)py * (size_t)h;
break;
default:
return CUFRAMES_ERR_FORMAT;
}
if (size_out) *size_out = total;
if (pitch_y_out) *pitch_y_out = py;
if (pitch_uv_out) *pitch_uv_out = puv;
return CUFRAMES_OK;
}
+17
View File
@@ -0,0 +1,17 @@
find_package(CUDAToolkit REQUIRED)
# Standalone test executables (без catch2 — простой ctest)
add_executable(test_pingpong test_pingpong.cu)
target_link_libraries(test_pingpong PRIVATE cuframes CUDA::cudart)
target_include_directories(test_pingpong PRIVATE
${CMAKE_SOURCE_DIR}/include)
add_test(NAME pingpong_basic COMMAND test_pingpong)
set_tests_properties(pingpong_basic PROPERTIES TIMEOUT 60)
add_executable(test_multi test_multi.cu)
target_link_libraries(test_multi PRIVATE cuframes CUDA::cudart)
target_include_directories(test_multi PRIVATE
${CMAKE_SOURCE_DIR}/include)
add_test(NAME multi_consumer COMMAND test_multi)
set_tests_properties(multi_consumer PROPERTIES TIMEOUT 60)
+142
View File
@@ -0,0 +1,142 @@
/* Multi-consumer test: 1 producer × 3 consumers. */
#include <cuframes/cuframes.h>
#include <cuda_runtime.h>
#include <sys/wait.h>
#include <unistd.h>
#include <atomic>
#include <chrono>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <thread>
#define CHECK(call) do { int _r = (call); if (_r != 0) { \
fprintf(stderr, "FAIL %s:%d: %d (%s)\n", __FILE__, __LINE__, _r, cuframes_strerror(_r)); std::exit(2); } } while(0)
#define CHECK_CUDA(call) do { cudaError_t _e = (call); if (_e != cudaSuccess) { \
fprintf(stderr, "CUDA FAIL %s:%d: %s\n", __FILE__, __LINE__, cudaGetErrorString(_e)); std::exit(2); } } while(0)
static const char *KEY = "test_multi";
static const int W = 320, H = 240;
static const int N = 150;
__host__ __device__ inline uint8_t pat(uint64_t seq, int row) {
return static_cast<uint8_t>((seq * 31u + row * 7u) & 0xFF);
}
__global__ void fill_y(uint8_t *y, int w, int h, int py, uint64_t seq) {
int x = blockIdx.x * blockDim.x + threadIdx.x;
int r = blockIdx.y * blockDim.y + threadIdx.y;
if (x < w && r < h) y[r * py + x] = pat(seq, r);
}
__global__ void verify_y(const uint8_t *y, int w, int h, int py, uint64_t seq, int *bad) {
int x = blockIdx.x * blockDim.x + threadIdx.x;
int r = blockIdx.y * blockDim.y + threadIdx.y;
if (x < w && r < h) if (y[r * py + x] != pat(seq, r)) atomicAdd(bad, 1);
}
int run_consumer(const char *name) {
cuframes_subscriber_config_t cfg = {};
cfg.key = KEY;
cfg.consumer_name = name;
cfg.mode = CUFRAMES_MODE_NEWEST_ONLY;
cfg.connect_timeout_ms = 5000;
cuframes_subscriber_t *sub = NULL;
CHECK(cuframes_subscriber_create(&cfg, &sub));
cudaStream_t s;
CHECK_CUDA(cudaStreamCreate(&s));
int *d_bad;
CHECK_CUDA(cudaMalloc(&d_bad, sizeof(int)));
dim3 b(32, 8);
dim3 g((W + b.x - 1) / b.x, (H + b.y - 1) / b.y);
int recv = 0, torn = 0;
while (1) {
cuframes_frame_t *f = NULL;
int r = cuframes_subscriber_next(sub, s, &f, 2000);
if (r == CUFRAMES_ERR_TIMEOUT || r == CUFRAMES_ERR_DISCONNECTED) break;
if (r != 0) { fprintf(stderr, "[%s] next: %s\n", name, cuframes_strerror(r)); std::exit(2); }
CHECK_CUDA(cudaMemsetAsync(d_bad, 0, sizeof(int), s));
verify_y<<<g, b, 0, s>>>((const uint8_t *)cuframes_frame_cuda_ptr(f),
W, H, cuframes_frame_pitch_y(f),
cuframes_frame_seq(f), d_bad);
int bad = 0;
CHECK_CUDA(cudaMemcpyAsync(&bad, d_bad, sizeof(int), cudaMemcpyDeviceToHost, s));
CHECK_CUDA(cudaStreamSynchronize(s));
if (bad > 0) torn++;
recv++;
CHECK(cuframes_subscriber_release(sub, f));
}
fprintf(stderr, "[%s] received=%d torn=%d\n", name, recv, torn);
cudaFree(d_bad);
cudaStreamDestroy(s);
cuframes_subscriber_destroy(sub);
return (recv >= N / 3 && torn == 0) ? 0 : 1;
}
int run_producer() {
cuframes_publisher_config_t cfg = {};
cfg.key = KEY;
cfg.width = W;
cfg.height = H;
cfg.format = CUFRAMES_FORMAT_NV12;
cfg.ownership = CUFRAMES_OWNERSHIP_LIBRARY;
cfg.ring_size = 4;
cfg.policy = CUFRAMES_POLICY_DROP_OLDEST;
cuframes_publisher_t *pub = NULL;
CHECK(cuframes_publisher_create(&cfg, &pub));
int32_t pitch_y = 0;
CHECK(cuframes_calc_frame_size(CUFRAMES_FORMAT_NV12, W, H, NULL, &pitch_y, NULL));
cudaStream_t s;
CHECK_CUDA(cudaStreamCreate(&s));
dim3 b(32, 8);
dim3 g((W + b.x - 1) / b.x, (H + b.y - 1) / b.y);
std::this_thread::sleep_for(std::chrono::milliseconds(800));
auto iv = std::chrono::nanoseconds(1000000000LL / 60);
auto t = std::chrono::steady_clock::now();
for (int i = 0; i < N; ++i) {
void *p = NULL;
CHECK(cuframes_publisher_acquire(pub, &p));
fill_y<<<g, b, 0, s>>>((uint8_t *)p, W, H, pitch_y, (uint64_t)i);
CHECK(cuframes_publisher_publish(pub, s, cuframes_now_ns()));
t += iv;
std::this_thread::sleep_until(t);
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
cuframes_publisher_destroy(pub);
cudaStreamDestroy(s);
return 0;
}
int main() {
char shm[80]; snprintf(shm, 80, "/dev/shm/cuframes-%s", KEY); unlink(shm);
char sock[128]; snprintf(sock, 128, "/run/cuframes/%s.sock", KEY); unlink(sock);
pid_t pids[3];
const char *names[3] = {"c1", "c2", "c3"};
for (int i = 0; i < 3; ++i) {
pids[i] = fork();
if (pids[i] == 0) return run_consumer(names[i]);
}
int prod_r = run_producer();
int fail = (prod_r != 0);
for (int i = 0; i < 3; ++i) {
int st = 0;
waitpid(pids[i], &st, 0);
if (!WIFEXITED(st) || WEXITSTATUS(st) != 0) fail = 1;
}
if (fail) { fprintf(stderr, "test_multi FAIL\n"); return 1; }
fprintf(stderr, "test_multi PASS\n");
return 0;
}
+219
View File
@@ -0,0 +1,219 @@
/* Integration test: publisher + subscriber в одном процессе, fork'нутые в
* отдельные процессы (CUDA IPC требует разных процессов).
*
* Producer publish'ит N frames с pattern; consumer верифицирует через kernel
* на своём stream'е (тестирует event sync correctness).
*
* Test pass criteria:
* - все frames доставлены
* - 0 torn frames
* - p99 latency < 10 ms
*/
#include <cuframes/cuframes.h>
#include <cuda_runtime.h>
#include <sys/wait.h>
#include <unistd.h>
#include <cassert>
#include <chrono>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <thread>
#define CHECK(call) do { \
int _r = (call); \
if (_r != 0) { \
fprintf(stderr, "FAIL at %s:%d: %d (%s)\n", __FILE__, __LINE__, _r, cuframes_strerror(_r)); \
std::exit(2); \
} \
} while(0)
#define CHECK_CUDA(call) do { \
cudaError_t _e = (call); \
if (_e != cudaSuccess) { \
fprintf(stderr, "CUDA FAIL %s:%d: %s\n", __FILE__, __LINE__, cudaGetErrorString(_e)); \
std::exit(2); \
} \
} while(0)
static const char *KEY = "test_pingpong";
static const int W = 640;
static const int H = 480;
static const int N_FRAMES = 200;
__host__ __device__ inline uint8_t pat(uint64_t seq, int row) {
return static_cast<uint8_t>((seq * 31u + row * 7u) & 0xFF);
}
__global__ void fill_y(uint8_t *y, int width, int height, int pitch_y, uint64_t seq) {
int x = blockIdx.x * blockDim.x + threadIdx.x;
int row = blockIdx.y * blockDim.y + threadIdx.y;
if (x < width && row < height) {
y[row * pitch_y + x] = pat(seq, row);
}
}
__global__ void verify_y(const uint8_t *y, int width, int height, int pitch_y,
uint64_t seq, int *bad_count) {
int x = blockIdx.x * blockDim.x + threadIdx.x;
int row = blockIdx.y * blockDim.y + threadIdx.y;
if (x < width && row < height) {
if (y[row * pitch_y + x] != pat(seq, row)) atomicAdd(bad_count, 1);
}
}
int run_producer() {
cuframes_publisher_config_t cfg = {};
cfg.key = KEY;
cfg.width = W;
cfg.height = H;
cfg.format = CUFRAMES_FORMAT_NV12;
cfg.ownership = CUFRAMES_OWNERSHIP_LIBRARY;
cfg.ring_size = 4;
cfg.policy = CUFRAMES_POLICY_DROP_OLDEST;
cfg.cuda_device = 0;
cuframes_publisher_t *pub = NULL;
CHECK(cuframes_publisher_create(&cfg, &pub));
int32_t pitch_y = 0;
CHECK(cuframes_calc_frame_size(CUFRAMES_FORMAT_NV12, W, H, NULL, &pitch_y, NULL));
cudaStream_t stream;
CHECK_CUDA(cudaStreamCreate(&stream));
dim3 block(32, 8);
dim3 grid((W + block.x - 1) / block.x, (H + block.y - 1) / block.y);
/* Дать subscriber'у время подключиться */
std::this_thread::sleep_for(std::chrono::milliseconds(500));
auto frame_interval = std::chrono::nanoseconds(1000000000LL / 60); /* 60 fps */
auto next_t = std::chrono::steady_clock::now();
for (int i = 0; i < N_FRAMES; ++i) {
void *cuda_ptr = NULL;
CHECK(cuframes_publisher_acquire(pub, &cuda_ptr));
fill_y<<<grid, block, 0, stream>>>((uint8_t *)cuda_ptr, W, H, pitch_y, (uint64_t)i);
CHECK(cuframes_publisher_publish(pub, stream, cuframes_now_ns()));
next_t += frame_interval;
std::this_thread::sleep_until(next_t);
}
/* Дать consumer'у дочитать */
std::this_thread::sleep_for(std::chrono::milliseconds(500));
cuframes_publisher_destroy(pub);
cudaStreamDestroy(stream);
return 0;
}
int run_consumer() {
cuframes_subscriber_config_t cfg = {};
cfg.key = KEY;
cfg.consumer_name = "test-consumer";
cfg.mode = CUFRAMES_MODE_NEWEST_ONLY;
cfg.cuda_device = 0;
cfg.connect_timeout_ms = 5000;
cuframes_subscriber_t *sub = NULL;
CHECK(cuframes_subscriber_create(&cfg, &sub));
cudaStream_t stream;
CHECK_CUDA(cudaStreamCreate(&stream));
int *d_bad = NULL;
CHECK_CUDA(cudaMalloc(&d_bad, sizeof(int)));
dim3 block(32, 8);
dim3 grid((W + block.x - 1) / block.x, (H + block.y - 1) / block.y);
int received = 0;
int torn = 0;
int64_t lat_sum = 0;
int64_t lat_max = 0;
while (1) {
cuframes_frame_t *f = NULL;
int r = cuframes_subscriber_next(sub, stream, &f, 2000);
if (r == CUFRAMES_ERR_TIMEOUT) {
fprintf(stderr, "[consumer] no more frames\n");
break;
}
if (r == CUFRAMES_ERR_DISCONNECTED) {
fprintf(stderr, "[consumer] disconnected\n");
break;
}
if (r != 0) {
fprintf(stderr, "[consumer] next: %s\n", cuframes_strerror(r));
std::exit(2);
}
CHECK_CUDA(cudaMemsetAsync(d_bad, 0, sizeof(int), stream));
verify_y<<<grid, block, 0, stream>>>(
(const uint8_t *)cuframes_frame_cuda_ptr(f),
W, H, cuframes_frame_pitch_y(f),
cuframes_frame_seq(f), d_bad);
int bad = 0;
CHECK_CUDA(cudaMemcpyAsync(&bad, d_bad, sizeof(int),
cudaMemcpyDeviceToHost, stream));
CHECK_CUDA(cudaStreamSynchronize(stream));
int64_t lat = cuframes_now_ns() - cuframes_frame_pts_ns(f);
lat_sum += lat;
if (lat > lat_max) lat_max = lat;
if (bad > 0) torn++;
received++;
CHECK(cuframes_subscriber_release(sub, f));
}
fprintf(stderr, "\n=== test_pingpong consumer summary ===\n");
fprintf(stderr, "received: %d\n", received);
fprintf(stderr, "torn: %d\n", torn);
fprintf(stderr, "lat mean: %ld us\n", received > 0 ? lat_sum / 1000 / received : 0);
fprintf(stderr, "lat max: %ld us\n", lat_max / 1000);
cudaFree(d_bad);
cudaStreamDestroy(stream);
cuframes_subscriber_destroy(sub);
if (received < N_FRAMES / 2) { /* допустимо терять немного на newest-only */
fprintf(stderr, "FAIL: received only %d / %d\n", received, N_FRAMES);
return 1;
}
if (torn > 0) {
fprintf(stderr, "FAIL: %d torn frames\n", torn);
return 1;
}
return 0;
}
int main() {
/* Cleanup от prev runs */
char shm[80];
snprintf(shm, sizeof(shm), "/dev/shm/cuframes-%s", KEY);
unlink(shm);
char sock[128];
snprintf(sock, sizeof(sock), "/run/cuframes/%s.sock", KEY);
unlink(sock);
pid_t pid = fork();
if (pid < 0) { perror("fork"); return 2; }
if (pid == 0) {
return run_consumer();
}
/* Parent — producer */
int prod_r = run_producer();
int status = 0;
waitpid(pid, &status, 0);
int cons_r = WIFEXITED(status) ? WEXITSTATUS(status) : 99;
if (prod_r != 0 || cons_r != 0) {
fprintf(stderr, "test FAILED: producer=%d consumer=%d\n", prod_r, cons_r);
return 1;
}
fprintf(stderr, "test_pingpong PASS\n");
return 0;
}