v0.4: VMM + POSIX FD — namespace decoupling (no pid share required)
Заменяет cudaMalloc + cudaIpcGetMemHandle на cuMemCreate (VMM) +
cuMemExportToShareableHandle(POSIX_FILE_DESCRIPTOR). FDs передаются consumer'у
через sendmsg(SCM_RIGHTS) в handshake. Frigate (s6-overlay не даёт share PID)
и любой другой consumer работают БЕЗ pid namespace share — только volume mount
unix socket'a /run/cuframes и IPC share для /dev/shm header.
Sync: cudaEventRecord+IPC events → cuStreamSynchronize в do_publish.
Producer ждёт ~1 ms что stream flush'нулся, потом atomic_store(seq).
Consumer читает seq через memory_order_acquire и копирует DtoD без
event wait — HW coherence гарантирована на одном GPU.
ABI break (согласован с user'ом):
- magic 0xCC7C1DCC → 0xCC7C1DCE (старые consumers fail cleanly)
- protocol V3 → V4
- libcuframes.so.0 SOVERSION остаётся, но .so.0.3.0 → .so.0.4.0
- EXTERNAL ownership убран (VMM требует cuMemCreate-allocated memory,
нельзя export'нуть произвольный cudaMalloc-pointer как POSIX FD)
- cuframes-rtsp-source переведён на LIBRARY mode + один D2D memcpy
в acquire'нутый slot (overhead малый — публишер всё равно делал такой
D2D из FFmpeg hwframe pool в EXTERNAL pool раньше)
Размер: granularity 2 MB на 5090 → NV12 1920×1080 (~3.1 MB) округляется до
4 MB, +1 MB на slot × 16 × 4 камеры = +64 MB VRAM. Терпимо.
Packet ring (cuframes_packets://) НЕ затронут — отдельный SHM с своим
magic, работает как раньше.
PoC + smoke в spike/:
- vmm_fd_pingpong/ — minimal cuMemCreate+FD round-trip
- smoke_v04/ — full publisher+subscriber, 100/100 frames без pid share
Base image: Dockerfile.runtime → CUDA 12.4 (был 13.0). Matching prod
pipeline + Frigate base, иначе libcudart conflict при load.
Compose stack (localhost-infra repo) — параллельный commit:
- убран pid: container:cuframes-pub-parking из subscribers
- image теги: gx/cuframes:0.4, gx/cuda-grid-pipeline:phase8,
gx/frigate:cuframes-v0.4
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
+2
-2
@@ -1,7 +1,7 @@
|
|||||||
cmake_minimum_required(VERSION 3.20)
|
cmake_minimum_required(VERSION 3.20)
|
||||||
project(cuframes
|
project(cuframes
|
||||||
VERSION 0.3.0
|
VERSION 0.4.0
|
||||||
DESCRIPTION "Zero-copy frame sharing via CUDA IPC"
|
DESCRIPTION "Zero-copy frame sharing via CUDA VMM + POSIX FD"
|
||||||
LANGUAGES C CXX CUDA
|
LANGUAGES C CXX CUDA
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -16,7 +16,8 @@
|
|||||||
# /usr/local/bin/cuframes-rtsp-source --rtsp ... --key ...
|
# /usr/local/bin/cuframes-rtsp-source --rtsp ... --key ...
|
||||||
|
|
||||||
# ─── Build stage ─────────────────────────────────────────────────────────
|
# ─── Build stage ─────────────────────────────────────────────────────────
|
||||||
FROM nvidia/cuda:13.0.3-cudnn-devel-ubuntu24.04 AS build
|
# CUDA 12.4 — matching ffmpeg-vf-cuda-grid base + Frigate stable-tensorrt
|
||||||
|
FROM nvidia/cuda:12.4.1-devel-ubuntu22.04 AS build
|
||||||
|
|
||||||
ENV DEBIAN_FRONTEND=noninteractive
|
ENV DEBIAN_FRONTEND=noninteractive
|
||||||
RUN apt-get update && apt-get install -y --no-install-recommends \
|
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||||
@@ -36,11 +37,11 @@ RUN cmake -B build -S . -G Ninja \
|
|||||||
&& cmake --build build --parallel
|
&& cmake --build build --parallel
|
||||||
|
|
||||||
# ─── Runtime stage ────────────────────────────────────────────────────────
|
# ─── Runtime stage ────────────────────────────────────────────────────────
|
||||||
FROM nvidia/cuda:13.0.3-cudnn-runtime-ubuntu24.04 AS runtime
|
FROM nvidia/cuda:12.4.1-runtime-ubuntu22.04 AS runtime
|
||||||
|
|
||||||
ENV DEBIAN_FRONTEND=noninteractive
|
ENV DEBIAN_FRONTEND=noninteractive
|
||||||
RUN apt-get update && apt-get install -y --no-install-recommends \
|
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||||
libavcodec60 libavformat60 libavutil58 \
|
libavcodec58 libavformat58 libavutil56 \
|
||||||
ca-certificates \
|
ca-certificates \
|
||||||
&& rm -rf /var/lib/apt/lists/*
|
&& rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
|
|||||||
@@ -36,7 +36,7 @@ extern "C" {
|
|||||||
/* ─────────────────────────────────────────────────────────────────────── */
|
/* ─────────────────────────────────────────────────────────────────────── */
|
||||||
|
|
||||||
#define CUFRAMES_VERSION_MAJOR 0
|
#define CUFRAMES_VERSION_MAJOR 0
|
||||||
#define CUFRAMES_VERSION_MINOR 3
|
#define CUFRAMES_VERSION_MINOR 4
|
||||||
#define CUFRAMES_VERSION_PATCH 0
|
#define CUFRAMES_VERSION_PATCH 0
|
||||||
|
|
||||||
/** @brief Runtime-версия библиотеки в формате "MAJOR.MINOR.PATCH". */
|
/** @brief Runtime-версия библиотеки в формате "MAJOR.MINOR.PATCH". */
|
||||||
|
|||||||
@@ -34,6 +34,7 @@ foreach(target cuframes cuframes_static)
|
|||||||
target_link_libraries(${target}
|
target_link_libraries(${target}
|
||||||
PUBLIC
|
PUBLIC
|
||||||
CUDA::cudart
|
CUDA::cudart
|
||||||
|
CUDA::cuda_driver # v0.4 — cuMemCreate/cuMemMap/cuMemExportToShareableHandle
|
||||||
Threads::Threads
|
Threads::Threads
|
||||||
rt # для shm_open
|
rt # для shm_open
|
||||||
)
|
)
|
||||||
@@ -41,7 +42,7 @@ endforeach()
|
|||||||
|
|
||||||
# Set SOVERSION на shared lib для ABI tracking
|
# Set SOVERSION на shared lib для ABI tracking
|
||||||
set_target_properties(cuframes PROPERTIES
|
set_target_properties(cuframes PROPERTIES
|
||||||
VERSION 0.3.0
|
VERSION 0.4.0
|
||||||
SOVERSION 0
|
SOVERSION 0
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
+161
-146
@@ -1,4 +1,13 @@
|
|||||||
/* Subscriber implementation (sync). */
|
/* Subscriber implementation (sync).
|
||||||
|
*
|
||||||
|
* v0.4 — VMM + POSIX FD. Принимает FDs через SCM_RIGHTS в handshake,
|
||||||
|
* импортирует через cuMemImportFromShareableHandle + cuMemMap. Не требует
|
||||||
|
* shared pid/ipc namespace с producer'ом.
|
||||||
|
*
|
||||||
|
* Sync: producer cuStreamSynchronize'ит свой stream перед atomic_store(seq).
|
||||||
|
* Consumer просто читает seq (acquire) и копирует данные через DtoD memcpy —
|
||||||
|
* никаких cudaEventWait не нужно (HW coherence на одном GPU).
|
||||||
|
*/
|
||||||
|
|
||||||
#include "internal.h"
|
#include "internal.h"
|
||||||
#include <errno.h>
|
#include <errno.h>
|
||||||
@@ -21,14 +30,13 @@ struct cuframes_frame {
|
|||||||
int64_t pts_ns;
|
int64_t pts_ns;
|
||||||
|
|
||||||
uint32_t slot_idx;
|
uint32_t slot_idx;
|
||||||
void *subscriber; /* back-ref для release() */
|
void *subscriber;
|
||||||
};
|
};
|
||||||
|
|
||||||
/* Opaque packet handle — single-packet pattern (как frame_obj). */
|
|
||||||
struct cuframes_packet {
|
struct cuframes_packet {
|
||||||
uint8_t *data; /* heap buffer, allocated by subscriber на enable_packets */
|
uint8_t *data;
|
||||||
size_t capacity; /* size of allocation */
|
size_t capacity;
|
||||||
size_t size; /* actual payload size */
|
size_t size;
|
||||||
int64_t pts_ns;
|
int64_t pts_ns;
|
||||||
int64_t dts_ns;
|
int64_t dts_ns;
|
||||||
uint32_t flags;
|
uint32_t flags;
|
||||||
@@ -44,26 +52,31 @@ struct cuframes_subscriber {
|
|||||||
cuframes_shm_header_t *hdr;
|
cuframes_shm_header_t *hdr;
|
||||||
char shm_name[80];
|
char shm_name[80];
|
||||||
|
|
||||||
cudaEvent_t producer_event; /* legacy fallback (v0.2 proto) */
|
/* v0.4 — VMM imported slots */
|
||||||
cudaEvent_t slot_events[CUFRAMES_MAX_RING]; /* v0.3 — per-slot events */
|
CUmemGenericAllocationHandle vmm_handles[CUFRAMES_MAX_RING];
|
||||||
int has_slot_events; /* 1 if v0.3 events opened OK */
|
CUdeviceptr vmm_ptrs[CUFRAMES_MAX_RING];
|
||||||
void *mapped_ptrs[CUFRAMES_MAX_RING];
|
size_t vmm_slot_size;
|
||||||
|
int imported_count;
|
||||||
|
|
||||||
uint32_t assigned_bit;
|
uint32_t assigned_bit;
|
||||||
uint64_t last_seen_seq;
|
uint64_t last_seen_seq;
|
||||||
|
|
||||||
/* Frame pool — переиспользуем одну frame_t structure (single-thread API). */
|
|
||||||
struct cuframes_frame frame_obj;
|
struct cuframes_frame frame_obj;
|
||||||
int frame_busy;
|
int frame_busy;
|
||||||
|
|
||||||
/* v0.2 — packet ring (optional, opened via enable_packets). */
|
|
||||||
int has_pkt_ring;
|
int has_pkt_ring;
|
||||||
cuframes_pkt_ring_t pkt_ring;
|
cuframes_pkt_ring_t pkt_ring;
|
||||||
uint64_t last_packet_seq; /* UINT64_MAX = no packet read yet */
|
uint64_t last_packet_seq;
|
||||||
struct cuframes_packet packet_obj;
|
struct cuframes_packet packet_obj;
|
||||||
int packet_busy;
|
int packet_busy;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
static const char *cu_err_str(CUresult r) {
|
||||||
|
const char *s = NULL;
|
||||||
|
cuGetErrorString(r, &s);
|
||||||
|
return s ? s : "?";
|
||||||
|
}
|
||||||
|
|
||||||
/* ─── Frame accessors ────────────────────────────────────────────────── */
|
/* ─── Frame accessors ────────────────────────────────────────────────── */
|
||||||
void *cuframes_frame_cuda_ptr(const cuframes_frame_t *f) { return f ? f->cuda_ptr : NULL; }
|
void *cuframes_frame_cuda_ptr(const cuframes_frame_t *f) { return f ? f->cuda_ptr : NULL; }
|
||||||
cuframes_format_t cuframes_frame_format(const cuframes_frame_t *f) { return f ? f->format : 0; }
|
cuframes_format_t cuframes_frame_format(const cuframes_frame_t *f) { return f ? f->format : 0; }
|
||||||
@@ -79,11 +92,13 @@ int64_t cuframes_frame_pts_ns(const cuframes_frame_t *f) { return f ? f->pts_ns
|
|||||||
|
|
||||||
/* ─── Subscriber create ──────────────────────────────────────────────── */
|
/* ─── Subscriber create ──────────────────────────────────────────────── */
|
||||||
|
|
||||||
static int do_handshake(struct cuframes_subscriber *sub, const char *name) {
|
static int do_handshake(struct cuframes_subscriber *sub, const char *name,
|
||||||
/* Send HELLO_REQ */
|
int *fds_out, uint32_t *fd_count_inout,
|
||||||
|
uint64_t *slot_size_out) {
|
||||||
|
/* Send HELLO_REQ — proto v4 */
|
||||||
uint8_t buf[CUFRAMES_MAX_MSG_PAYLOAD];
|
uint8_t buf[CUFRAMES_MAX_MSG_PAYLOAD];
|
||||||
cuframes_msg_hello_req_t *hreq = (cuframes_msg_hello_req_t *)buf;
|
cuframes_msg_hello_req_t *hreq = (cuframes_msg_hello_req_t *)buf;
|
||||||
hreq->proto_version = CUFRAMES_PROTOCOL_V1;
|
hreq->proto_version = CUFRAMES_PROTOCOL_V4;
|
||||||
uint32_t nl = name ? (uint32_t)strlen(name) : 0;
|
uint32_t nl = name ? (uint32_t)strlen(name) : 0;
|
||||||
if (nl > 31) nl = 31;
|
if (nl > 31) nl = 31;
|
||||||
hreq->consumer_name_len = nl;
|
hreq->consumer_name_len = nl;
|
||||||
@@ -100,7 +115,6 @@ static int do_handshake(struct cuframes_subscriber *sub, const char *name) {
|
|||||||
buf, plen);
|
buf, plen);
|
||||||
if (r != CUFRAMES_OK) return r;
|
if (r != CUFRAMES_OK) return r;
|
||||||
|
|
||||||
/* Recv HELLO_RESP */
|
|
||||||
uint32_t rmt = 0, rpl = sizeof(buf);
|
uint32_t rmt = 0, rpl = sizeof(buf);
|
||||||
r = cuframes_internal_recv_msg(sub->sock_fd, &rmt, buf, &rpl, 5000);
|
r = cuframes_internal_recv_msg(sub->sock_fd, &rmt, buf, &rpl, 5000);
|
||||||
if (r != CUFRAMES_OK) return r;
|
if (r != CUFRAMES_OK) return r;
|
||||||
@@ -108,10 +122,15 @@ static int do_handshake(struct cuframes_subscriber *sub, const char *name) {
|
|||||||
|
|
||||||
cuframes_msg_hello_resp_t *hresp = (cuframes_msg_hello_resp_t *)buf;
|
cuframes_msg_hello_resp_t *hresp = (cuframes_msg_hello_resp_t *)buf;
|
||||||
if (hresp->result != CUFRAMES_OK) return hresp->result;
|
if (hresp->result != CUFRAMES_OK) return hresp->result;
|
||||||
|
if (hresp->proto_version_actual != CUFRAMES_PROTOCOL_V4) {
|
||||||
|
CUFRAMES_LOG_ERROR("publisher proto v%u — нужен v%u (v0.4)",
|
||||||
|
hresp->proto_version_actual, CUFRAMES_PROTOCOL_V4);
|
||||||
|
return CUFRAMES_ERR_PROTOCOL;
|
||||||
|
}
|
||||||
|
|
||||||
/* Send SUBSCRIBE_REQ */
|
/* Send SUBSCRIBE_REQ */
|
||||||
uint32_t srbuf[8];
|
uint32_t srbuf[8];
|
||||||
srbuf[0] = CUFRAMES_PROTOCOL_V1;
|
srbuf[0] = CUFRAMES_PROTOCOL_V4;
|
||||||
memset(srbuf + 1, 0, 28);
|
memset(srbuf + 1, 0, 28);
|
||||||
r = cuframes_internal_send_msg(sub->sock_fd, CUFRAMES_MSG_SUBSCRIBE_REQ,
|
r = cuframes_internal_send_msg(sub->sock_fd, CUFRAMES_MSG_SUBSCRIBE_REQ,
|
||||||
srbuf, sizeof(srbuf));
|
srbuf, sizeof(srbuf));
|
||||||
@@ -126,7 +145,29 @@ static int do_handshake(struct cuframes_subscriber *sub, const char *name) {
|
|||||||
if (sresp.result != CUFRAMES_OK) return sresp.result;
|
if (sresp.result != CUFRAMES_OK) return sresp.result;
|
||||||
|
|
||||||
sub->assigned_bit = sresp.assigned_bit;
|
sub->assigned_bit = sresp.assigned_bit;
|
||||||
sub->last_seen_seq = sresp.initial_seq; /* start от текущей точки */
|
sub->last_seen_seq = sresp.initial_seq;
|
||||||
|
|
||||||
|
/* Recv VMM_FDS */
|
||||||
|
cuframes_msg_vmm_fds_t vmm_payload = {0};
|
||||||
|
uint32_t vmm_plen = sizeof(vmm_payload);
|
||||||
|
rmt = 0;
|
||||||
|
r = cuframes_internal_recv_msg_with_fds(sub->sock_fd, &rmt,
|
||||||
|
&vmm_payload, &vmm_plen,
|
||||||
|
fds_out, fd_count_inout, 5000);
|
||||||
|
if (r != CUFRAMES_OK) {
|
||||||
|
CUFRAMES_LOG_ERROR("recv VMM_FDS: %s", cuframes_strerror(r));
|
||||||
|
return r;
|
||||||
|
}
|
||||||
|
if (rmt != CUFRAMES_MSG_VMM_FDS) {
|
||||||
|
CUFRAMES_LOG_ERROR("expected VMM_FDS got 0x%x", rmt);
|
||||||
|
return CUFRAMES_ERR_PROTOCOL;
|
||||||
|
}
|
||||||
|
if (vmm_payload.fd_count != *fd_count_inout) {
|
||||||
|
CUFRAMES_LOG_ERROR("VMM_FDS: payload fd_count=%u, received %u",
|
||||||
|
vmm_payload.fd_count, *fd_count_inout);
|
||||||
|
return CUFRAMES_ERR_PROTOCOL;
|
||||||
|
}
|
||||||
|
*slot_size_out = vmm_payload.slot_size_bytes;
|
||||||
return CUFRAMES_OK;
|
return CUFRAMES_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -143,7 +184,6 @@ int cuframes_subscriber_create(const cuframes_subscriber_config_t *cfg,
|
|||||||
sub->sock_fd = -1;
|
sub->sock_fd = -1;
|
||||||
sub->shm_fd = -1;
|
sub->shm_fd = -1;
|
||||||
|
|
||||||
/* Generate fallback name if NULL */
|
|
||||||
char name_buf[32];
|
char name_buf[32];
|
||||||
const char *name = cfg->consumer_name;
|
const char *name = cfg->consumer_name;
|
||||||
if (!name) {
|
if (!name) {
|
||||||
@@ -152,12 +192,10 @@ int cuframes_subscriber_create(const cuframes_subscriber_config_t *cfg,
|
|||||||
name = name_buf;
|
name = name_buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Build paths */
|
|
||||||
char sock_path[128];
|
char sock_path[128];
|
||||||
int r = cuframes_internal_socket_path(cfg->key, sock_path, sizeof(sock_path));
|
int r = cuframes_internal_socket_path(cfg->key, sock_path, sizeof(sock_path));
|
||||||
if (r != CUFRAMES_OK) { free(sub); return r; }
|
if (r != CUFRAMES_OK) { free(sub); return r; }
|
||||||
|
|
||||||
/* Connect with timeout retry */
|
|
||||||
int64_t deadline = cfg->connect_timeout_ms > 0
|
int64_t deadline = cfg->connect_timeout_ms > 0
|
||||||
? cuframes_now_ns() + (int64_t)cfg->connect_timeout_ms * 1000000LL
|
? cuframes_now_ns() + (int64_t)cfg->connect_timeout_ms * 1000000LL
|
||||||
: 0;
|
: 0;
|
||||||
@@ -172,87 +210,117 @@ int cuframes_subscriber_create(const cuframes_subscriber_config_t *cfg,
|
|||||||
sub->sock_fd = -1;
|
sub->sock_fd = -1;
|
||||||
if (cfg->connect_timeout_ms == 0) { r = CUFRAMES_ERR_NOT_FOUND; goto fail; }
|
if (cfg->connect_timeout_ms == 0) { r = CUFRAMES_ERR_NOT_FOUND; goto fail; }
|
||||||
if (deadline && cuframes_now_ns() > deadline) { r = CUFRAMES_ERR_TIMEOUT; goto fail; }
|
if (deadline && cuframes_now_ns() > deadline) { r = CUFRAMES_ERR_TIMEOUT; goto fail; }
|
||||||
struct timespec ts = {.tv_sec = 0, .tv_nsec = 100000000}; /* 100ms */
|
struct timespec ts = {.tv_sec = 0, .tv_nsec = 100000000};
|
||||||
nanosleep(&ts, NULL);
|
nanosleep(&ts, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Handshake */
|
/* Handshake (включая VMM_FDS) */
|
||||||
r = do_handshake(sub, name);
|
int fds[CUFRAMES_MAX_RING];
|
||||||
|
for (int i = 0; i < CUFRAMES_MAX_RING; i++) fds[i] = -1;
|
||||||
|
uint32_t fd_count = CUFRAMES_MAX_RING;
|
||||||
|
uint64_t slot_size = 0;
|
||||||
|
r = do_handshake(sub, name, fds, &fd_count, &slot_size);
|
||||||
if (r != CUFRAMES_OK) goto fail;
|
if (r != CUFRAMES_OK) goto fail;
|
||||||
|
|
||||||
/* Open SHM */
|
/* Open SHM (для seq atomics + meta) */
|
||||||
r = cuframes_internal_shm_name(cfg->key, sub->shm_name, sizeof(sub->shm_name));
|
r = cuframes_internal_shm_name(cfg->key, sub->shm_name, sizeof(sub->shm_name));
|
||||||
if (r != CUFRAMES_OK) goto fail;
|
if (r != CUFRAMES_OK) goto fail_close_fds;
|
||||||
sub->shm_fd = shm_open(sub->shm_name, O_RDWR, 0);
|
sub->shm_fd = shm_open(sub->shm_name, O_RDWR, 0);
|
||||||
if (sub->shm_fd < 0) {
|
if (sub->shm_fd < 0) {
|
||||||
CUFRAMES_LOG_ERROR("shm_open %s: %s", sub->shm_name, strerror(errno));
|
CUFRAMES_LOG_ERROR("shm_open %s: %s", sub->shm_name, strerror(errno));
|
||||||
r = CUFRAMES_ERR_IO; goto fail;
|
r = CUFRAMES_ERR_IO; goto fail_close_fds;
|
||||||
}
|
}
|
||||||
sub->hdr = mmap(NULL, sizeof(cuframes_shm_header_t),
|
sub->hdr = mmap(NULL, sizeof(cuframes_shm_header_t),
|
||||||
PROT_READ | PROT_WRITE, MAP_SHARED, sub->shm_fd, 0);
|
PROT_READ | PROT_WRITE, MAP_SHARED, sub->shm_fd, 0);
|
||||||
if (sub->hdr == MAP_FAILED) {
|
if (sub->hdr == MAP_FAILED) {
|
||||||
sub->hdr = NULL;
|
sub->hdr = NULL;
|
||||||
r = CUFRAMES_ERR_IO; goto fail;
|
r = CUFRAMES_ERR_IO; goto fail_close_fds;
|
||||||
|
}
|
||||||
|
if (sub->hdr->magic != CUFRAMES_MAGIC) {
|
||||||
|
if (sub->hdr->magic == CUFRAMES_MAGIC_LEGACY) {
|
||||||
|
CUFRAMES_LOG_ERROR("publisher uses legacy v0.1-v0.3 SHM — нужен v0.4 publisher");
|
||||||
|
} else {
|
||||||
|
CUFRAMES_LOG_ERROR("SHM magic mismatch: 0x%x", sub->hdr->magic);
|
||||||
|
}
|
||||||
|
r = CUFRAMES_ERR_PROTOCOL; goto fail_close_fds;
|
||||||
|
}
|
||||||
|
if (sub->hdr->proto_version != CUFRAMES_PROTOCOL_V4) {
|
||||||
|
CUFRAMES_LOG_ERROR("SHM proto v%u — нужен v%u",
|
||||||
|
sub->hdr->proto_version, CUFRAMES_PROTOCOL_V4);
|
||||||
|
r = CUFRAMES_ERR_PROTOCOL; goto fail_close_fds;
|
||||||
}
|
}
|
||||||
if (sub->hdr->magic != CUFRAMES_MAGIC) { r = CUFRAMES_ERR_PROTOCOL; goto fail; }
|
|
||||||
|
|
||||||
/* CUDA setup */
|
/* CUDA driver init + import VMM handles */
|
||||||
|
CUresult cr = cuInit(0);
|
||||||
|
if (cr != CUDA_SUCCESS) {
|
||||||
|
CUFRAMES_LOG_ERROR("cuInit: %s", cu_err_str(cr));
|
||||||
|
r = CUFRAMES_ERR_CUDA; goto fail_close_fds;
|
||||||
|
}
|
||||||
|
/* Ensure a runtime context exists (cudaMemcpyAsync from this pool needs it) */
|
||||||
cudaError_t cerr = cudaSetDevice(sub->cfg.cuda_device);
|
cudaError_t cerr = cudaSetDevice(sub->cfg.cuda_device);
|
||||||
if (cerr != cudaSuccess) {
|
if (cerr != cudaSuccess) {
|
||||||
CUFRAMES_LOG_ERROR("cudaSetDevice: %s", cudaGetErrorString(cerr));
|
CUFRAMES_LOG_ERROR("cudaSetDevice: %s", cudaGetErrorString(cerr));
|
||||||
r = CUFRAMES_ERR_CUDA; goto fail;
|
r = CUFRAMES_ERR_CUDA; goto fail_close_fds;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Open producer's event (legacy single — v0.2 compat fallback) */
|
CUmemAccessDesc access = {0};
|
||||||
cerr = cudaIpcOpenEventHandle(&sub->producer_event, sub->hdr->ipc_event_handle);
|
access.location.type = CU_MEM_LOCATION_TYPE_DEVICE;
|
||||||
if (cerr != cudaSuccess) {
|
access.location.id = sub->cfg.cuda_device;
|
||||||
CUFRAMES_LOG_ERROR("cudaIpcOpenEventHandle: %s", cudaGetErrorString(cerr));
|
access.flags = CU_MEM_ACCESS_FLAGS_PROT_READWRITE;
|
||||||
r = CUFRAMES_ERR_CUDA; goto fail;
|
|
||||||
|
sub->vmm_slot_size = (size_t)slot_size;
|
||||||
|
sub->imported_count = 0;
|
||||||
|
for (uint32_t i = 0; i < fd_count; ++i) {
|
||||||
|
cr = cuMemImportFromShareableHandle(&sub->vmm_handles[i],
|
||||||
|
(void *)(uintptr_t)fds[i],
|
||||||
|
CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR);
|
||||||
|
if (cr != CUDA_SUCCESS) {
|
||||||
|
CUFRAMES_LOG_ERROR("cuMemImportFromShareableHandle slot %u: %s",
|
||||||
|
i, cu_err_str(cr));
|
||||||
|
r = CUFRAMES_ERR_CUDA; goto fail_unmap;
|
||||||
|
}
|
||||||
|
/* После import можно закрыть FD — kernel держит reference через handle */
|
||||||
|
close(fds[i]);
|
||||||
|
fds[i] = -1;
|
||||||
|
|
||||||
|
cr = cuMemAddressReserve(&sub->vmm_ptrs[i], sub->vmm_slot_size, 0, 0, 0);
|
||||||
|
if (cr != CUDA_SUCCESS) {
|
||||||
|
CUFRAMES_LOG_ERROR("cuMemAddressReserve slot %u: %s",
|
||||||
|
i, cu_err_str(cr));
|
||||||
|
r = CUFRAMES_ERR_CUDA; goto fail_unmap;
|
||||||
|
}
|
||||||
|
cr = cuMemMap(sub->vmm_ptrs[i], sub->vmm_slot_size, 0,
|
||||||
|
sub->vmm_handles[i], 0);
|
||||||
|
if (cr != CUDA_SUCCESS) {
|
||||||
|
CUFRAMES_LOG_ERROR("cuMemMap slot %u: %s", i, cu_err_str(cr));
|
||||||
|
r = CUFRAMES_ERR_CUDA; goto fail_unmap;
|
||||||
|
}
|
||||||
|
cr = cuMemSetAccess(sub->vmm_ptrs[i], sub->vmm_slot_size, &access, 1);
|
||||||
|
if (cr != CUDA_SUCCESS) {
|
||||||
|
CUFRAMES_LOG_ERROR("cuMemSetAccess slot %u: %s", i, cu_err_str(cr));
|
||||||
|
r = CUFRAMES_ERR_CUDA; goto fail_unmap;
|
||||||
|
}
|
||||||
|
sub->imported_count++;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* v0.3 — open per-slot events если protocol supports. */
|
CUFRAMES_LOG_INFO("subscriber '%s' connected to '%s' (bit=%u, ring=%u, v0.4 VMM)",
|
||||||
sub->has_slot_events = 0;
|
name, sub->key, sub->assigned_bit, fd_count);
|
||||||
if (sub->hdr->proto_version >= CUFRAMES_PROTOCOL_V3) {
|
|
||||||
int ring_evt = (int)sub->hdr->ring_size;
|
|
||||||
if (ring_evt > CUFRAMES_MAX_RING) ring_evt = CUFRAMES_MAX_RING;
|
|
||||||
int evt_ok = 1;
|
|
||||||
for (int i = 0; i < ring_evt; i++) {
|
|
||||||
cerr = cudaIpcOpenEventHandle(&sub->slot_events[i],
|
|
||||||
sub->hdr->slot_event_handles[i]);
|
|
||||||
if (cerr != cudaSuccess) {
|
|
||||||
CUFRAMES_LOG_WARN("cudaIpcOpenEventHandle slot %d: %s — "
|
|
||||||
"fallback к legacy single event",
|
|
||||||
i, cudaGetErrorString(cerr));
|
|
||||||
for (int j = 0; j < i; j++) cudaEventDestroy(sub->slot_events[j]);
|
|
||||||
evt_ok = 0;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (evt_ok) {
|
|
||||||
sub->has_slot_events = 1;
|
|
||||||
CUFRAMES_LOG_INFO("subscribed с per-slot events (v0.3 proto)");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Open mem handles */
|
|
||||||
int ring = (int)sub->hdr->ring_size;
|
|
||||||
if (ring > CUFRAMES_MAX_RING) ring = CUFRAMES_MAX_RING;
|
|
||||||
for (int i = 0; i < ring; ++i) {
|
|
||||||
cerr = cudaIpcOpenMemHandle(&sub->mapped_ptrs[i],
|
|
||||||
sub->hdr->slots[i].mem_handle,
|
|
||||||
cudaIpcMemLazyEnablePeerAccess);
|
|
||||||
if (cerr != cudaSuccess) {
|
|
||||||
CUFRAMES_LOG_ERROR("cudaIpcOpenMemHandle slot %d: %s",
|
|
||||||
i, cudaGetErrorString(cerr));
|
|
||||||
r = CUFRAMES_ERR_CUDA; goto fail;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
CUFRAMES_LOG_INFO("subscriber '%s' connected to '%s' (bit=%u, ring=%d)",
|
|
||||||
name, sub->key, sub->assigned_bit, ring);
|
|
||||||
*out = sub;
|
*out = sub;
|
||||||
return CUFRAMES_OK;
|
return CUFRAMES_OK;
|
||||||
|
|
||||||
|
fail_unmap:
|
||||||
|
/* Cleanup partial VMM */
|
||||||
|
for (int i = 0; i < sub->imported_count; i++) {
|
||||||
|
if (sub->vmm_ptrs[i]) {
|
||||||
|
cuMemUnmap(sub->vmm_ptrs[i], sub->vmm_slot_size);
|
||||||
|
cuMemAddressFree(sub->vmm_ptrs[i], sub->vmm_slot_size);
|
||||||
|
}
|
||||||
|
if (sub->vmm_handles[i]) cuMemRelease(sub->vmm_handles[i]);
|
||||||
|
}
|
||||||
|
fail_close_fds:
|
||||||
|
for (int i = 0; i < CUFRAMES_MAX_RING; i++) {
|
||||||
|
if (fds[i] >= 0) close(fds[i]);
|
||||||
|
}
|
||||||
fail:
|
fail:
|
||||||
cuframes_subscriber_destroy(sub);
|
cuframes_subscriber_destroy(sub);
|
||||||
return r;
|
return r;
|
||||||
@@ -268,6 +336,7 @@ int cuframes_subscriber_next(cuframes_subscriber_t *sub,
|
|||||||
memory_order_acquire) != 0) {
|
memory_order_acquire) != 0) {
|
||||||
return CUFRAMES_ERR_DISCONNECTED;
|
return CUFRAMES_ERR_DISCONNECTED;
|
||||||
}
|
}
|
||||||
|
(void)consumer_stream; /* v0.4: producer уже StreamSync'нул, sync не нужен */
|
||||||
|
|
||||||
int64_t deadline = (timeout_ms > 0)
|
int64_t deadline = (timeout_ms > 0)
|
||||||
? cuframes_now_ns() + (int64_t)timeout_ms * 1000000LL
|
? cuframes_now_ns() + (int64_t)timeout_ms * 1000000LL
|
||||||
@@ -281,11 +350,9 @@ int cuframes_subscriber_next(cuframes_subscriber_t *sub,
|
|||||||
if (sub->cfg.mode == CUFRAMES_MODE_NEWEST_ONLY) {
|
if (sub->cfg.mode == CUFRAMES_MODE_NEWEST_ONLY) {
|
||||||
target_seq = gs;
|
target_seq = gs;
|
||||||
} else {
|
} else {
|
||||||
/* STRICT_ORDER */
|
|
||||||
if (sub->last_seen_seq == UINT64_MAX) {
|
if (sub->last_seen_seq == UINT64_MAX) {
|
||||||
target_seq = gs;
|
target_seq = gs;
|
||||||
} else if (gs > sub->last_seen_seq + (uint64_t)sub->hdr->ring_size) {
|
} else if (gs > sub->last_seen_seq + (uint64_t)sub->hdr->ring_size) {
|
||||||
/* Producer overran us. */
|
|
||||||
return CUFRAMES_ERR_DISCONNECTED;
|
return CUFRAMES_ERR_DISCONNECTED;
|
||||||
} else {
|
} else {
|
||||||
target_seq = sub->last_seen_seq + 1;
|
target_seq = sub->last_seen_seq + 1;
|
||||||
@@ -295,54 +362,22 @@ int cuframes_subscriber_next(cuframes_subscriber_t *sub,
|
|||||||
uint64_t slot_seq = atomic_load_explicit(&sub->hdr->slots[slot_idx].seq,
|
uint64_t slot_seq = atomic_load_explicit(&sub->hdr->slots[slot_idx].seq,
|
||||||
memory_order_acquire);
|
memory_order_acquire);
|
||||||
if (slot_seq != target_seq) {
|
if (slot_seq != target_seq) {
|
||||||
/* Slot уже перезаписан producer'ом — пересчитать */
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
int64_t pts = atomic_load_explicit(&sub->hdr->slots[slot_idx].pts_ns,
|
int64_t pts = atomic_load_explicit(&sub->hdr->slots[slot_idx].pts_ns,
|
||||||
memory_order_acquire);
|
memory_order_acquire);
|
||||||
|
|
||||||
/* Cross-process sync: wait event on consumer's stream.
|
/* v0.4: producer уже cuStreamSynchronize'нул перед atomic_store seq.
|
||||||
* v0.3: per-slot event точно соответствует slot[slot_idx] —
|
* Данные физически в GPU memory к моменту acquire fence. Post-sync
|
||||||
* no TOCTOU race possible. v0.2 fallback: single global event +
|
* verify оставляем — defending against ring wrap pока мы читали pts. */
|
||||||
* post-sync verify (less precise, but still correct). */
|
|
||||||
cudaEvent_t sync_event = sub->has_slot_events
|
|
||||||
? sub->slot_events[slot_idx]
|
|
||||||
: sub->producer_event;
|
|
||||||
if (consumer_stream) {
|
|
||||||
cudaError_t cerr = cudaStreamWaitEvent((cudaStream_t)consumer_stream,
|
|
||||||
sync_event, 0);
|
|
||||||
if (cerr != cudaSuccess) {
|
|
||||||
CUFRAMES_LOG_WARN("cudaStreamWaitEvent: %s",
|
|
||||||
cudaGetErrorString(cerr));
|
|
||||||
return CUFRAMES_ERR_CUDA;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
/* Synchronize globally — для cudaMemcpyDeviceToHost users */
|
|
||||||
cudaError_t cerr = cudaEventSynchronize(sync_event);
|
|
||||||
if (cerr != cudaSuccess) return CUFRAMES_ERR_CUDA;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* TOCTOU защита — unconditional (v0.2 и v0.3 обa). v0.3 per-slot
|
|
||||||
* events НЕ guaranty ordering: cudaEventRecord overwrites previous
|
|
||||||
* state каждый publish. Если producer wrapped ring пока consumer
|
|
||||||
* ждал event sync, slot[slot_idx] уже содержит seq > target.
|
|
||||||
* Event signal от nового publish satisfies stale wait — consumer
|
|
||||||
* читает new content thinking it's old (lazy consumption).
|
|
||||||
*
|
|
||||||
* Симптом в long-running pipeline: 50k+ ring wraps накапливают drift,
|
|
||||||
* output stream duplicates 60-70% frames despite stable encoder fps.
|
|
||||||
*
|
|
||||||
* Proper v0.4 fix: per-slot+per-publish event handle (event pool).
|
|
||||||
* Сейчас — post-sync verify catches main race window. */
|
|
||||||
uint64_t verify_seq = atomic_load_explicit(&sub->hdr->slots[slot_idx].seq,
|
uint64_t verify_seq = atomic_load_explicit(&sub->hdr->slots[slot_idx].seq,
|
||||||
memory_order_acquire);
|
memory_order_acquire);
|
||||||
if (verify_seq != target_seq) {
|
if (verify_seq != target_seq) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Fill frame_out */
|
|
||||||
struct cuframes_frame *f = &sub->frame_obj;
|
struct cuframes_frame *f = &sub->frame_obj;
|
||||||
f->cuda_ptr = sub->mapped_ptrs[slot_idx];
|
f->cuda_ptr = (void *)(uintptr_t)sub->vmm_ptrs[slot_idx];
|
||||||
f->format = (cuframes_format_t)sub->hdr->meta.format;
|
f->format = (cuframes_format_t)sub->hdr->meta.format;
|
||||||
f->width = sub->hdr->meta.width;
|
f->width = sub->hdr->meta.width;
|
||||||
f->height = sub->hdr->meta.height;
|
f->height = sub->hdr->meta.height;
|
||||||
@@ -358,12 +393,9 @@ int cuframes_subscriber_next(cuframes_subscriber_t *sub,
|
|||||||
return CUFRAMES_OK;
|
return CUFRAMES_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Не было frame'ов */
|
|
||||||
if (timeout_ms == 0) return CUFRAMES_ERR_WOULD_BLOCK;
|
if (timeout_ms == 0) return CUFRAMES_ERR_WOULD_BLOCK;
|
||||||
if (timeout_ms > 0 && cuframes_now_ns() > deadline) return CUFRAMES_ERR_TIMEOUT;
|
if (timeout_ms > 0 && cuframes_now_ns() > deadline) return CUFRAMES_ERR_TIMEOUT;
|
||||||
|
|
||||||
/* Poll-based wait (eventfd — v0.2). 50µs interval — компромисс
|
|
||||||
* latency vs CPU. */
|
|
||||||
struct timespec ts = {.tv_sec = 0, .tv_nsec = 50000};
|
struct timespec ts = {.tv_sec = 0, .tv_nsec = 50000};
|
||||||
nanosleep(&ts, NULL);
|
nanosleep(&ts, NULL);
|
||||||
|
|
||||||
@@ -379,7 +411,6 @@ int cuframes_subscriber_release(cuframes_subscriber_t *sub,
|
|||||||
if (!frame) return CUFRAMES_OK;
|
if (!frame) return CUFRAMES_OK;
|
||||||
if (!sub || frame->subscriber != sub) return CUFRAMES_ERR_INVALID_ARG;
|
if (!sub || frame->subscriber != sub) return CUFRAMES_ERR_INVALID_ARG;
|
||||||
|
|
||||||
/* ACK через bitmap */
|
|
||||||
if (sub->assigned_bit > 0 && sub->assigned_bit < 64) {
|
if (sub->assigned_bit > 0 && sub->assigned_bit < 64) {
|
||||||
atomic_fetch_or_explicit(&sub->hdr->slots[frame->slot_idx].ack_bitmap,
|
atomic_fetch_or_explicit(&sub->hdr->slots[frame->slot_idx].ack_bitmap,
|
||||||
1ULL << sub->assigned_bit,
|
1ULL << sub->assigned_bit,
|
||||||
@@ -398,7 +429,6 @@ int cuframes_subscriber_release(cuframes_subscriber_t *sub,
|
|||||||
int cuframes_subscriber_destroy(cuframes_subscriber_t *sub) {
|
int cuframes_subscriber_destroy(cuframes_subscriber_t *sub) {
|
||||||
if (!sub) return CUFRAMES_OK;
|
if (!sub) return CUFRAMES_OK;
|
||||||
|
|
||||||
/* Clear subscriber bit */
|
|
||||||
if (sub->hdr && sub->assigned_bit > 0) {
|
if (sub->hdr && sub->assigned_bit > 0) {
|
||||||
atomic_fetch_and_explicit(&sub->hdr->subscriber_bitmap,
|
atomic_fetch_and_explicit(&sub->hdr->subscriber_bitmap,
|
||||||
~(1ULL << sub->assigned_bit),
|
~(1ULL << sub->assigned_bit),
|
||||||
@@ -407,22 +437,15 @@ int cuframes_subscriber_destroy(cuframes_subscriber_t *sub) {
|
|||||||
0, memory_order_release);
|
0, memory_order_release);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (sub->producer_event) cudaEventDestroy(sub->producer_event);
|
/* VMM cleanup */
|
||||||
if (sub->has_slot_events) {
|
for (int i = 0; i < sub->imported_count; i++) {
|
||||||
int ring_evt = (int)sub->hdr->ring_size;
|
if (sub->vmm_ptrs[i]) {
|
||||||
if (ring_evt > CUFRAMES_MAX_RING) ring_evt = CUFRAMES_MAX_RING;
|
cuMemUnmap(sub->vmm_ptrs[i], sub->vmm_slot_size);
|
||||||
for (int i = 0; i < ring_evt; i++) {
|
cuMemAddressFree(sub->vmm_ptrs[i], sub->vmm_slot_size);
|
||||||
if (sub->slot_events[i]) cudaEventDestroy(sub->slot_events[i]);
|
|
||||||
}
|
}
|
||||||
|
if (sub->vmm_handles[i]) cuMemRelease(sub->vmm_handles[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
int ring = sub->hdr ? (int)sub->hdr->ring_size : 0;
|
|
||||||
if (ring > CUFRAMES_MAX_RING) ring = CUFRAMES_MAX_RING;
|
|
||||||
for (int i = 0; i < ring; ++i) {
|
|
||||||
if (sub->mapped_ptrs[i]) cudaIpcCloseMemHandle(sub->mapped_ptrs[i]);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Packet ring cleanup */
|
|
||||||
if (sub->has_pkt_ring) {
|
if (sub->has_pkt_ring) {
|
||||||
cuframes_internal_pkt_ring_destroy(&sub->pkt_ring);
|
cuframes_internal_pkt_ring_destroy(&sub->pkt_ring);
|
||||||
}
|
}
|
||||||
@@ -442,7 +465,6 @@ int cuframes_subscriber_destroy(cuframes_subscriber_t *sub) {
|
|||||||
/* v0.2 — encoded packet ring API (см. docs/protocol.md §10) */
|
/* v0.2 — encoded packet ring API (см. docs/protocol.md §10) */
|
||||||
/* ─────────────────────────────────────────────────────────────────────── */
|
/* ─────────────────────────────────────────────────────────────────────── */
|
||||||
|
|
||||||
/* Packet accessors */
|
|
||||||
const void *cuframes_packet_data(const cuframes_packet_t *p) { return p ? p->data : NULL; }
|
const void *cuframes_packet_data(const cuframes_packet_t *p) { return p ? p->data : NULL; }
|
||||||
size_t cuframes_packet_size(const cuframes_packet_t *p) { return p ? p->size : 0; }
|
size_t cuframes_packet_size(const cuframes_packet_t *p) { return p ? p->size : 0; }
|
||||||
int64_t cuframes_packet_pts(const cuframes_packet_t *p) { return p ? p->pts_ns : 0; }
|
int64_t cuframes_packet_pts(const cuframes_packet_t *p) { return p ? p->pts_ns : 0; }
|
||||||
@@ -452,7 +474,7 @@ uint64_t cuframes_packet_seq(const cuframes_packet_t *p) { return p ? p->se
|
|||||||
|
|
||||||
int cuframes_subscriber_enable_packets(cuframes_subscriber_t *sub) {
|
int cuframes_subscriber_enable_packets(cuframes_subscriber_t *sub) {
|
||||||
if (!sub) return CUFRAMES_ERR_INVALID_ARG;
|
if (!sub) return CUFRAMES_ERR_INVALID_ARG;
|
||||||
if (sub->has_pkt_ring) return CUFRAMES_OK; /* idempotent */
|
if (sub->has_pkt_ring) return CUFRAMES_OK;
|
||||||
|
|
||||||
char pkt_name[128];
|
char pkt_name[128];
|
||||||
int r = cuframes_internal_pkt_shm_name(sub->key, pkt_name, sizeof(pkt_name));
|
int r = cuframes_internal_pkt_shm_name(sub->key, pkt_name, sizeof(pkt_name));
|
||||||
@@ -461,8 +483,6 @@ int cuframes_subscriber_enable_packets(cuframes_subscriber_t *sub) {
|
|||||||
r = cuframes_internal_pkt_ring_open(pkt_name, &sub->pkt_ring);
|
r = cuframes_internal_pkt_ring_open(pkt_name, &sub->pkt_ring);
|
||||||
if (r != CUFRAMES_OK) return r;
|
if (r != CUFRAMES_OK) return r;
|
||||||
|
|
||||||
/* Allocate copy-buffer (max packet size). Используем data_size как
|
|
||||||
* conservative upper bound (publisher гарантирует data_size >= max_packet_size). */
|
|
||||||
size_t capacity = sub->pkt_ring.hdr->data_size;
|
size_t capacity = sub->pkt_ring.hdr->data_size;
|
||||||
sub->packet_obj.data = (uint8_t *)malloc(capacity);
|
sub->packet_obj.data = (uint8_t *)malloc(capacity);
|
||||||
if (!sub->packet_obj.data) {
|
if (!sub->packet_obj.data) {
|
||||||
@@ -471,7 +491,6 @@ int cuframes_subscriber_enable_packets(cuframes_subscriber_t *sub) {
|
|||||||
}
|
}
|
||||||
sub->packet_obj.capacity = capacity;
|
sub->packet_obj.capacity = capacity;
|
||||||
|
|
||||||
/* Start с last_keyframe_seq - 1 → первый read даст IDR (§10.14). */
|
|
||||||
uint64_t kf = atomic_load_explicit(&sub->pkt_ring.hdr->last_keyframe_seq,
|
uint64_t kf = atomic_load_explicit(&sub->pkt_ring.hdr->last_keyframe_seq,
|
||||||
memory_order_acquire);
|
memory_order_acquire);
|
||||||
sub->last_packet_seq = (kf == UINT64_MAX) ? UINT64_MAX : kf - 1;
|
sub->last_packet_seq = (kf == UINT64_MAX) ? UINT64_MAX : kf - 1;
|
||||||
@@ -484,7 +503,7 @@ int cuframes_subscriber_next_packet(cuframes_subscriber_t *sub,
|
|||||||
int32_t timeout_ms) {
|
int32_t timeout_ms) {
|
||||||
if (!sub || !pkt_out) return CUFRAMES_ERR_INVALID_ARG;
|
if (!sub || !pkt_out) return CUFRAMES_ERR_INVALID_ARG;
|
||||||
if (!sub->has_pkt_ring) return CUFRAMES_ERR_NO_PACKET_RING;
|
if (!sub->has_pkt_ring) return CUFRAMES_ERR_NO_PACKET_RING;
|
||||||
if (sub->packet_busy) return CUFRAMES_ERR_INVALID_ARG; /* previous packet not released */
|
if (sub->packet_busy) return CUFRAMES_ERR_INVALID_ARG;
|
||||||
|
|
||||||
int64_t deadline_ns = (timeout_ms > 0) ?
|
int64_t deadline_ns = (timeout_ms > 0) ?
|
||||||
cuframes_now_ns() + (int64_t)timeout_ms * 1000000LL : 0;
|
cuframes_now_ns() + (int64_t)timeout_ms * 1000000LL : 0;
|
||||||
@@ -513,28 +532,25 @@ int cuframes_subscriber_next_packet(cuframes_subscriber_t *sub,
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (r == CUFRAMES_ERR_PACKET_OVERRUN) {
|
if (r == CUFRAMES_ERR_PACKET_OVERRUN) {
|
||||||
/* Resync — установить last_seq = last_keyframe_seq - 1, повторить. */
|
|
||||||
uint64_t kf = atomic_load_explicit(
|
uint64_t kf = atomic_load_explicit(
|
||||||
&sub->pkt_ring.hdr->last_keyframe_seq, memory_order_acquire);
|
&sub->pkt_ring.hdr->last_keyframe_seq, memory_order_acquire);
|
||||||
if (kf != UINT64_MAX) {
|
if (kf != UINT64_MAX) {
|
||||||
sub->last_packet_seq = kf - 1;
|
sub->last_packet_seq = kf - 1;
|
||||||
}
|
}
|
||||||
/* Возвращаем OVERRUN наружу — caller знает что был discontinuity. */
|
|
||||||
*pkt_out = NULL;
|
*pkt_out = NULL;
|
||||||
return CUFRAMES_ERR_PACKET_OVERRUN;
|
return CUFRAMES_ERR_PACKET_OVERRUN;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (r != CUFRAMES_ERR_TIMEOUT) {
|
if (r != CUFRAMES_ERR_TIMEOUT) {
|
||||||
*pkt_out = NULL;
|
*pkt_out = NULL;
|
||||||
return r; /* DISCONNECTED, INVALID_ARG, etc. */
|
return r;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* TIMEOUT branch — poll/sleep */
|
|
||||||
if (timeout_ms == 0) return CUFRAMES_ERR_WOULD_BLOCK;
|
if (timeout_ms == 0) return CUFRAMES_ERR_WOULD_BLOCK;
|
||||||
if (timeout_ms > 0 && cuframes_now_ns() >= deadline_ns) {
|
if (timeout_ms > 0 && cuframes_now_ns() >= deadline_ns) {
|
||||||
return CUFRAMES_ERR_TIMEOUT;
|
return CUFRAMES_ERR_TIMEOUT;
|
||||||
}
|
}
|
||||||
struct timespec ts = {0, 1 * 1000 * 1000}; /* 1 ms poll interval */
|
struct timespec ts = {0, 1 * 1000 * 1000};
|
||||||
nanosleep(&ts, NULL);
|
nanosleep(&ts, NULL);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -542,7 +558,7 @@ int cuframes_subscriber_next_packet(cuframes_subscriber_t *sub,
|
|||||||
int cuframes_subscriber_release_packet(cuframes_subscriber_t *sub,
|
int cuframes_subscriber_release_packet(cuframes_subscriber_t *sub,
|
||||||
cuframes_packet_t *pkt) {
|
cuframes_packet_t *pkt) {
|
||||||
if (!sub) return CUFRAMES_ERR_INVALID_ARG;
|
if (!sub) return CUFRAMES_ERR_INVALID_ARG;
|
||||||
if (!pkt) return CUFRAMES_OK; /* NULL-safe */
|
if (!pkt) return CUFRAMES_OK;
|
||||||
if (pkt != &sub->packet_obj) return CUFRAMES_ERR_INVALID_ARG;
|
if (pkt != &sub->packet_obj) return CUFRAMES_ERR_INVALID_ARG;
|
||||||
sub->packet_busy = 0;
|
sub->packet_busy = 0;
|
||||||
return CUFRAMES_OK;
|
return CUFRAMES_OK;
|
||||||
@@ -556,7 +572,6 @@ int cuframes_subscriber_get_codec_params(cuframes_subscriber_t *sub,
|
|||||||
if (!sub->has_pkt_ring) return CUFRAMES_ERR_NO_PACKET_RING;
|
if (!sub->has_pkt_ring) return CUFRAMES_ERR_NO_PACKET_RING;
|
||||||
cuframes_pkt_header_t *hdr = sub->pkt_ring.hdr;
|
cuframes_pkt_header_t *hdr = sub->pkt_ring.hdr;
|
||||||
if (codec_id_out) *codec_id_out = hdr->codec_id;
|
if (codec_id_out) *codec_id_out = hdr->codec_id;
|
||||||
/* Если extradata ещё не выставлен publisher'ом — size=0, pointer ok но empty. */
|
|
||||||
if (extradata_out) *extradata_out = hdr->codec_extradata;
|
if (extradata_out) *extradata_out = hdr->codec_extradata;
|
||||||
if (extradata_size_out) *extradata_size_out = hdr->codec_extradata_size;
|
if (extradata_size_out) *extradata_size_out = hdr->codec_extradata_size;
|
||||||
if (hdr->codec_extradata_size == 0) return CUFRAMES_ERR_NO_CODEC_PARAMS;
|
if (hdr->codec_extradata_size == 0) return CUFRAMES_ERR_NO_CODEC_PARAMS;
|
||||||
|
|||||||
@@ -8,6 +8,7 @@
|
|||||||
#define CUFRAMES_INTERNAL_H
|
#define CUFRAMES_INTERNAL_H
|
||||||
|
|
||||||
#define _GNU_SOURCE
|
#define _GNU_SOURCE
|
||||||
|
#include <cuda.h> /* v0.4 — driver API: cuMemCreate/cuMemMap/cuMemExportToShareableHandle */
|
||||||
#include <cuda_runtime.h>
|
#include <cuda_runtime.h>
|
||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
#include <stdatomic.h>
|
#include <stdatomic.h>
|
||||||
@@ -21,10 +22,12 @@
|
|||||||
|
|
||||||
/* ─── Protocol constants ──────────────────────────────────────────────── */
|
/* ─── Protocol constants ──────────────────────────────────────────────── */
|
||||||
|
|
||||||
#define CUFRAMES_MAGIC 0xCC7C1DCCu
|
#define CUFRAMES_MAGIC 0xCC7C1DCEu /* v0.4 — bumped с 0xCC7C1DCC (full ABI break) */
|
||||||
|
#define CUFRAMES_MAGIC_LEGACY 0xCC7C1DCCu /* v0.1—v0.3 magic; ловится consumer'ом как clean PROTOCOL error */
|
||||||
#define CUFRAMES_PROTOCOL_V1 1u
|
#define CUFRAMES_PROTOCOL_V1 1u
|
||||||
#define CUFRAMES_PROTOCOL_V2 2u /* v0.2 — packet ring support */
|
#define CUFRAMES_PROTOCOL_V2 2u /* v0.2 — packet ring support */
|
||||||
#define CUFRAMES_PROTOCOL_V3 3u /* v0.3 — per-slot CUDA events (no TOCTOU race) */
|
#define CUFRAMES_PROTOCOL_V3 3u /* v0.3 — per-slot CUDA events (deprecated; не работает без pid share) */
|
||||||
|
#define CUFRAMES_PROTOCOL_V4 4u /* v0.4 — VMM + POSIX FD: pid/ipc namespace share не требуется */
|
||||||
#define CUFRAMES_MAX_SUBSCRIBERS 32
|
#define CUFRAMES_MAX_SUBSCRIBERS 32
|
||||||
#define CUFRAMES_MAX_RING 16
|
#define CUFRAMES_MAX_RING 16
|
||||||
#define CUFRAMES_MAX_KEY_LEN 63
|
#define CUFRAMES_MAX_KEY_LEN 63
|
||||||
@@ -204,6 +207,10 @@ typedef struct cuframes_pkt_ring {
|
|||||||
#define CUFRAMES_MSG_PING 0xF0
|
#define CUFRAMES_MSG_PING 0xF0
|
||||||
#define CUFRAMES_MSG_PONG 0xF1
|
#define CUFRAMES_MSG_PONG 0xF1
|
||||||
#define CUFRAMES_MSG_ERROR 0xFE
|
#define CUFRAMES_MSG_ERROR 0xFE
|
||||||
|
/* v0.4: после SUBSCRIBE_RESP publisher шлёт VMM_FDS с N posix FD handles в
|
||||||
|
* SCM_RIGHTS control. Payload: uint64_t slot_size + uint32_t fd_count +
|
||||||
|
* uint32_t reserved (для alignment). FDs приходят отдельным контрол-блоком. */
|
||||||
|
#define CUFRAMES_MSG_VMM_FDS 0x05
|
||||||
|
|
||||||
#define CUFRAMES_MAX_MSG_PAYLOAD 4096
|
#define CUFRAMES_MAX_MSG_PAYLOAD 4096
|
||||||
|
|
||||||
@@ -237,6 +244,14 @@ typedef struct __attribute__((packed)) cuframes_msg_subscribe_resp {
|
|||||||
uint8_t reserved[12];
|
uint8_t reserved[12];
|
||||||
} cuframes_msg_subscribe_resp_t;
|
} cuframes_msg_subscribe_resp_t;
|
||||||
|
|
||||||
|
/* v0.4: payload VMM_FDS message. Сами FDs идут в SCM_RIGHTS control-msg
|
||||||
|
* (см. cuframes_internal_send_msg_with_fds). */
|
||||||
|
typedef struct __attribute__((packed)) cuframes_msg_vmm_fds {
|
||||||
|
uint64_t slot_size_bytes; /* физический размер одного slot после round-up к granularity */
|
||||||
|
uint32_t fd_count; /* должно совпадать с ring_size */
|
||||||
|
uint32_t reserved;
|
||||||
|
} cuframes_msg_vmm_fds_t;
|
||||||
|
|
||||||
/* ─── Logging (minimal — to stderr) ────────────────────────────────────── */
|
/* ─── Logging (minimal — to stderr) ────────────────────────────────────── */
|
||||||
|
|
||||||
#define CUFRAMES_LOG_ERROR(fmt, ...) \
|
#define CUFRAMES_LOG_ERROR(fmt, ...) \
|
||||||
@@ -272,6 +287,15 @@ int cuframes_internal_recv_msg(int sock_fd, uint32_t *msg_type_out,
|
|||||||
void *payload, uint32_t *payload_len_inout,
|
void *payload, uint32_t *payload_len_inout,
|
||||||
int32_t timeout_ms);
|
int32_t timeout_ms);
|
||||||
|
|
||||||
|
/* v0.4 — send/recv с FD-attached. Используется только для VMM_FDS message. */
|
||||||
|
int cuframes_internal_send_msg_with_fds(int sock_fd, uint32_t msg_type,
|
||||||
|
const void *payload, uint32_t payload_len,
|
||||||
|
const int *fds, uint32_t fd_count);
|
||||||
|
int cuframes_internal_recv_msg_with_fds(int sock_fd, uint32_t *msg_type_out,
|
||||||
|
void *payload, uint32_t *payload_len_inout,
|
||||||
|
int *fds_out, uint32_t *fd_count_inout,
|
||||||
|
int32_t timeout_ms);
|
||||||
|
|
||||||
/* ─── Packet ring helpers (libcuframes/src/packet_ring.c) ─────────────── */
|
/* ─── Packet ring helpers (libcuframes/src/packet_ring.c) ─────────────── */
|
||||||
|
|
||||||
/* Publisher: create SHM + initialize header + slots. Stale recovery как у frames. */
|
/* Publisher: create SHM + initialize header + slots. Stale recovery как у frames. */
|
||||||
|
|||||||
+172
-217
@@ -1,4 +1,14 @@
|
|||||||
/* Publisher implementation (docs/protocol.md §1, §2, §3.2, §4.2, §5). */
|
/* Publisher implementation (docs/protocol.md §1, §2, §3.2, §4.2, §5).
|
||||||
|
*
|
||||||
|
* v0.4 — VMM + POSIX FD. Заменяет cudaMalloc+cudaIpcGetMemHandle на
|
||||||
|
* cuMemCreate + cuMemExportToShareableHandle(POSIX_FILE_DESCRIPTOR). FDs
|
||||||
|
* передаются consumer'у через SCM_RIGHTS, не нужны shared pid/ipc namespace.
|
||||||
|
*
|
||||||
|
* Sync (вместо cudaEventRecord+cudaIpcEventHandle): cuStreamSynchronize в
|
||||||
|
* do_publish — producer ждёт ~ms что stream flush'нулся, потом publishes seq.
|
||||||
|
* Consumer читает данные через DtoD копию без event wait — HW coherence
|
||||||
|
* гарантирована на одном GPU.
|
||||||
|
*/
|
||||||
|
|
||||||
#include "internal.h"
|
#include "internal.h"
|
||||||
#include <errno.h>
|
#include <errno.h>
|
||||||
@@ -20,11 +30,18 @@ struct cuframes_publisher {
|
|||||||
char socket_path[128];
|
char socket_path[128];
|
||||||
char shm_name[80];
|
char shm_name[80];
|
||||||
|
|
||||||
/* CUDA */
|
/* v0.4 — VMM-allocated pool. Каждый slot: cuMemCreate → cuMemAddressReserve
|
||||||
cudaEvent_t event; /* legacy single event (v0.2 compat) */
|
* → cuMemMap → cuMemSetAccess. FD экспортируется один раз и передаётся всем
|
||||||
cudaEvent_t slot_events[CUFRAMES_MAX_RING]; /* v0.3 — per-slot events */
|
* subscribers через SCM_RIGHTS. */
|
||||||
cudaIpcMemHandle_t ipc_mem[CUFRAMES_MAX_RING];
|
CUmemGenericAllocationHandle vmm_handles[CUFRAMES_MAX_RING];
|
||||||
void *cuda_ptrs[CUFRAMES_MAX_RING]; /* mapped pointers */
|
CUdeviceptr vmm_ptrs[CUFRAMES_MAX_RING];
|
||||||
|
int vmm_fds[CUFRAMES_MAX_RING];
|
||||||
|
size_t vmm_slot_size; /* rounded к granularity */
|
||||||
|
int has_vmm_pool;
|
||||||
|
|
||||||
|
/* CUDA stream sync — заменяет per-slot events. Producer перед каждым publish
|
||||||
|
* вызывает cuStreamSynchronize чтобы гарантировать что previous writes
|
||||||
|
* завершены (data visible для consumer'ов на любом GPU stream). */
|
||||||
size_t frame_size_bytes;
|
size_t frame_size_bytes;
|
||||||
int32_t ring_size_actual;
|
int32_t ring_size_actual;
|
||||||
|
|
||||||
@@ -33,10 +50,6 @@ struct cuframes_publisher {
|
|||||||
int32_t current_slot; /* индекс slot'а полученного через acquire() */
|
int32_t current_slot; /* индекс slot'а полученного через acquire() */
|
||||||
int has_acquired;
|
int has_acquired;
|
||||||
|
|
||||||
/* EXTERNAL ownership: map user pointer → ring index */
|
|
||||||
void *external_ptrs[CUFRAMES_MAX_RING];
|
|
||||||
int32_t external_count;
|
|
||||||
|
|
||||||
/* Subscriber-management thread */
|
/* Subscriber-management thread */
|
||||||
pthread_t accept_thread;
|
pthread_t accept_thread;
|
||||||
int accept_thread_alive;
|
int accept_thread_alive;
|
||||||
@@ -52,8 +65,16 @@ struct cuframes_publisher {
|
|||||||
/* Forward decls */
|
/* Forward decls */
|
||||||
static void *accept_thread_main(void *arg);
|
static void *accept_thread_main(void *arg);
|
||||||
static int handshake_subscriber(struct cuframes_publisher *pub, int client_fd);
|
static int handshake_subscriber(struct cuframes_publisher *pub, int client_fd);
|
||||||
|
static void free_vmm_pool(struct cuframes_publisher *pub);
|
||||||
|
|
||||||
/* ─── Internal: alloc/setup CUDA pool and SHM ─────────────────────────── */
|
/* Helper: format CUresult error для CUFRAMES_LOG_ERROR */
|
||||||
|
static const char *cu_err_str(CUresult r) {
|
||||||
|
const char *s = NULL;
|
||||||
|
cuGetErrorString(r, &s);
|
||||||
|
return s ? s : "?";
|
||||||
|
}
|
||||||
|
|
||||||
|
/* ─── Internal: alloc VMM pool + export POSIX FDs ─────────────────────── */
|
||||||
|
|
||||||
static int alloc_library_pool(struct cuframes_publisher *pub) {
|
static int alloc_library_pool(struct cuframes_publisher *pub) {
|
||||||
int r = cuframes_internal_calc_size(pub->cfg.format,
|
int r = cuframes_internal_calc_size(pub->cfg.format,
|
||||||
@@ -62,7 +83,37 @@ static int alloc_library_pool(struct cuframes_publisher *pub) {
|
|||||||
if (r != CUFRAMES_OK) return r;
|
if (r != CUFRAMES_OK) return r;
|
||||||
|
|
||||||
pub->ring_size_actual = pub->cfg.ring_size;
|
pub->ring_size_actual = pub->cfg.ring_size;
|
||||||
|
for (int i = 0; i < CUFRAMES_MAX_RING; i++) pub->vmm_fds[i] = -1;
|
||||||
|
|
||||||
|
/* Initialize CUDA driver API context */
|
||||||
|
CUresult cr = cuInit(0);
|
||||||
|
if (cr != CUDA_SUCCESS) {
|
||||||
|
CUFRAMES_LOG_ERROR("cuInit: %s", cu_err_str(cr));
|
||||||
|
return CUFRAMES_ERR_CUDA;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Pick allocation prop: pinned device memory с POSIX FD handle */
|
||||||
|
CUmemAllocationProp prop = {0};
|
||||||
|
prop.type = CU_MEM_ALLOCATION_TYPE_PINNED;
|
||||||
|
prop.location.type = CU_MEM_LOCATION_TYPE_DEVICE;
|
||||||
|
prop.location.id = pub->cfg.cuda_device;
|
||||||
|
prop.requestedHandleTypes = CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR;
|
||||||
|
|
||||||
|
/* Round slot size up to granularity */
|
||||||
|
size_t granularity = 0;
|
||||||
|
cr = cuMemGetAllocationGranularity(&granularity, &prop,
|
||||||
|
CU_MEM_ALLOC_GRANULARITY_MINIMUM);
|
||||||
|
if (cr != CUDA_SUCCESS) {
|
||||||
|
CUFRAMES_LOG_ERROR("cuMemGetAllocationGranularity: %s", cu_err_str(cr));
|
||||||
|
return CUFRAMES_ERR_CUDA;
|
||||||
|
}
|
||||||
|
pub->vmm_slot_size = ((pub->frame_size_bytes + granularity - 1) / granularity)
|
||||||
|
* granularity;
|
||||||
|
CUFRAMES_LOG_INFO("VMM granularity=%zu frame=%zu slot=%zu",
|
||||||
|
granularity, pub->frame_size_bytes, pub->vmm_slot_size);
|
||||||
|
|
||||||
|
/* Required: also need a runtime API context so that cudaMemcpyAsync from
|
||||||
|
* user works on this allocation. cudaSetDevice достаточно. */
|
||||||
cudaError_t cerr = cudaSetDevice(pub->cfg.cuda_device);
|
cudaError_t cerr = cudaSetDevice(pub->cfg.cuda_device);
|
||||||
if (cerr != cudaSuccess) {
|
if (cerr != cudaSuccess) {
|
||||||
CUFRAMES_LOG_ERROR("cudaSetDevice(%d): %s",
|
CUFRAMES_LOG_ERROR("cudaSetDevice(%d): %s",
|
||||||
@@ -70,74 +121,68 @@ static int alloc_library_pool(struct cuframes_publisher *pub) {
|
|||||||
return CUFRAMES_ERR_CUDA;
|
return CUFRAMES_ERR_CUDA;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
CUmemAccessDesc access = {0};
|
||||||
|
access.location = prop.location;
|
||||||
|
access.flags = CU_MEM_ACCESS_FLAGS_PROT_READWRITE;
|
||||||
|
|
||||||
for (int i = 0; i < pub->ring_size_actual; ++i) {
|
for (int i = 0; i < pub->ring_size_actual; ++i) {
|
||||||
cerr = cudaMalloc(&pub->cuda_ptrs[i], pub->frame_size_bytes);
|
cr = cuMemCreate(&pub->vmm_handles[i], pub->vmm_slot_size, &prop, 0);
|
||||||
if (cerr != cudaSuccess) {
|
if (cr != CUDA_SUCCESS) {
|
||||||
CUFRAMES_LOG_ERROR("cudaMalloc slot %d: %s",
|
CUFRAMES_LOG_ERROR("cuMemCreate slot %d: %s", i, cu_err_str(cr));
|
||||||
i, cudaGetErrorString(cerr));
|
free_vmm_pool(pub);
|
||||||
return CUFRAMES_ERR_CUDA;
|
return CUFRAMES_ERR_CUDA;
|
||||||
}
|
}
|
||||||
cerr = cudaIpcGetMemHandle(&pub->ipc_mem[i], pub->cuda_ptrs[i]);
|
cr = cuMemAddressReserve(&pub->vmm_ptrs[i], pub->vmm_slot_size, 0, 0, 0);
|
||||||
if (cerr != cudaSuccess) {
|
if (cr != CUDA_SUCCESS) {
|
||||||
CUFRAMES_LOG_ERROR("cudaIpcGetMemHandle slot %d: %s",
|
CUFRAMES_LOG_ERROR("cuMemAddressReserve slot %d: %s", i, cu_err_str(cr));
|
||||||
i, cudaGetErrorString(cerr));
|
free_vmm_pool(pub);
|
||||||
|
return CUFRAMES_ERR_CUDA;
|
||||||
|
}
|
||||||
|
cr = cuMemMap(pub->vmm_ptrs[i], pub->vmm_slot_size, 0,
|
||||||
|
pub->vmm_handles[i], 0);
|
||||||
|
if (cr != CUDA_SUCCESS) {
|
||||||
|
CUFRAMES_LOG_ERROR("cuMemMap slot %d: %s", i, cu_err_str(cr));
|
||||||
|
free_vmm_pool(pub);
|
||||||
|
return CUFRAMES_ERR_CUDA;
|
||||||
|
}
|
||||||
|
cr = cuMemSetAccess(pub->vmm_ptrs[i], pub->vmm_slot_size, &access, 1);
|
||||||
|
if (cr != CUDA_SUCCESS) {
|
||||||
|
CUFRAMES_LOG_ERROR("cuMemSetAccess slot %d: %s", i, cu_err_str(cr));
|
||||||
|
free_vmm_pool(pub);
|
||||||
|
return CUFRAMES_ERR_CUDA;
|
||||||
|
}
|
||||||
|
/* Export POSIX FD — будет shared с consumers через SCM_RIGHTS */
|
||||||
|
cr = cuMemExportToShareableHandle((void *)&pub->vmm_fds[i],
|
||||||
|
pub->vmm_handles[i],
|
||||||
|
CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR, 0);
|
||||||
|
if (cr != CUDA_SUCCESS) {
|
||||||
|
CUFRAMES_LOG_ERROR("cuMemExportToShareableHandle slot %d: %s",
|
||||||
|
i, cu_err_str(cr));
|
||||||
|
free_vmm_pool(pub);
|
||||||
return CUFRAMES_ERR_CUDA;
|
return CUFRAMES_ERR_CUDA;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
pub->has_vmm_pool = 1;
|
||||||
return CUFRAMES_OK;
|
return CUFRAMES_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int register_external_pool(struct cuframes_publisher *pub,
|
static void free_vmm_pool(struct cuframes_publisher *pub) {
|
||||||
void *const *ptrs, int32_t count,
|
for (int i = 0; i < CUFRAMES_MAX_RING; i++) {
|
||||||
size_t frame_size) {
|
if (pub->vmm_fds[i] >= 0) {
|
||||||
if (count < 1 || count > CUFRAMES_MAX_RING) return CUFRAMES_ERR_INVALID_ARG;
|
close(pub->vmm_fds[i]);
|
||||||
pub->frame_size_bytes = frame_size;
|
pub->vmm_fds[i] = -1;
|
||||||
pub->ring_size_actual = count;
|
|
||||||
pub->external_count = count;
|
|
||||||
|
|
||||||
cudaError_t cerr = cudaSetDevice(pub->cfg.cuda_device);
|
|
||||||
if (cerr != cudaSuccess) {
|
|
||||||
CUFRAMES_LOG_ERROR("cudaSetDevice: %s", cudaGetErrorString(cerr));
|
|
||||||
return CUFRAMES_ERR_CUDA;
|
|
||||||
}
|
}
|
||||||
for (int i = 0; i < count; ++i) {
|
if (pub->vmm_ptrs[i]) {
|
||||||
if (!ptrs[i]) return CUFRAMES_ERR_INVALID_ARG;
|
cuMemUnmap(pub->vmm_ptrs[i], pub->vmm_slot_size);
|
||||||
pub->cuda_ptrs[i] = ptrs[i];
|
cuMemAddressFree(pub->vmm_ptrs[i], pub->vmm_slot_size);
|
||||||
pub->external_ptrs[i] = ptrs[i];
|
pub->vmm_ptrs[i] = 0;
|
||||||
cerr = cudaIpcGetMemHandle(&pub->ipc_mem[i], ptrs[i]);
|
}
|
||||||
if (cerr != cudaSuccess) {
|
if (pub->vmm_handles[i]) {
|
||||||
CUFRAMES_LOG_ERROR("cudaIpcGetMemHandle on external ptr %p: %s",
|
cuMemRelease(pub->vmm_handles[i]);
|
||||||
ptrs[i], cudaGetErrorString(cerr));
|
pub->vmm_handles[i] = 0;
|
||||||
return CUFRAMES_ERR_CUDA;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return CUFRAMES_OK;
|
pub->has_vmm_pool = 0;
|
||||||
}
|
|
||||||
|
|
||||||
static int create_event_handle(struct cuframes_publisher *pub) {
|
|
||||||
/* Legacy single event — keep для v0.2 consumer compat fallback */
|
|
||||||
cudaError_t cerr = cudaEventCreateWithFlags(&pub->event,
|
|
||||||
cudaEventDisableTiming | cudaEventInterprocess);
|
|
||||||
if (cerr != cudaSuccess) {
|
|
||||||
CUFRAMES_LOG_ERROR("cudaEventCreateWithFlags (legacy): %s",
|
|
||||||
cudaGetErrorString(cerr));
|
|
||||||
return CUFRAMES_ERR_CUDA;
|
|
||||||
}
|
|
||||||
/* v0.3 — per-slot events. Каждый publish записывает event на свой slot;
|
|
||||||
* consumer waits event[slot_idx] specifically — закрывает TOCTOU race
|
|
||||||
* (один global event может signal'ить для другого frame). */
|
|
||||||
for (int32_t i = 0; i < pub->ring_size_actual; i++) {
|
|
||||||
cerr = cudaEventCreateWithFlags(&pub->slot_events[i],
|
|
||||||
cudaEventDisableTiming | cudaEventInterprocess);
|
|
||||||
if (cerr != cudaSuccess) {
|
|
||||||
CUFRAMES_LOG_ERROR("cudaEventCreateWithFlags (slot %d): %s",
|
|
||||||
i, cudaGetErrorString(cerr));
|
|
||||||
for (int32_t j = 0; j < i; j++) cudaEventDestroy(pub->slot_events[j]);
|
|
||||||
cudaEventDestroy(pub->event);
|
|
||||||
return CUFRAMES_ERR_CUDA;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return CUFRAMES_OK;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int setup_shm(struct cuframes_publisher *pub) {
|
static int setup_shm(struct cuframes_publisher *pub) {
|
||||||
@@ -155,7 +200,8 @@ static int setup_shm(struct cuframes_publisher *pub) {
|
|||||||
cuframes_shm_header_t tmp;
|
cuframes_shm_header_t tmp;
|
||||||
ssize_t rb = read(existing, &tmp, sizeof(tmp));
|
ssize_t rb = read(existing, &tmp, sizeof(tmp));
|
||||||
close(existing);
|
close(existing);
|
||||||
if (rb == (ssize_t)sizeof(tmp) && tmp.magic == CUFRAMES_MAGIC) {
|
if (rb == (ssize_t)sizeof(tmp) &&
|
||||||
|
(tmp.magic == CUFRAMES_MAGIC || tmp.magic == CUFRAMES_MAGIC_LEGACY)) {
|
||||||
if (cuframes_internal_pid_alive((pid_t)tmp.producer_pid)) {
|
if (cuframes_internal_pid_alive((pid_t)tmp.producer_pid)) {
|
||||||
CUFRAMES_LOG_ERROR("publisher with key=%s already running (pid %lu)",
|
CUFRAMES_LOG_ERROR("publisher with key=%s already running (pid %lu)",
|
||||||
pub->key, (unsigned long)tmp.producer_pid);
|
pub->key, (unsigned long)tmp.producer_pid);
|
||||||
@@ -188,7 +234,7 @@ static int setup_shm(struct cuframes_publisher *pub) {
|
|||||||
memset(pub->hdr, 0, sizeof(cuframes_shm_header_t));
|
memset(pub->hdr, 0, sizeof(cuframes_shm_header_t));
|
||||||
|
|
||||||
pub->hdr->magic = CUFRAMES_MAGIC;
|
pub->hdr->magic = CUFRAMES_MAGIC;
|
||||||
pub->hdr->proto_version = CUFRAMES_PROTOCOL_V3;
|
pub->hdr->proto_version = CUFRAMES_PROTOCOL_V4;
|
||||||
pub->hdr->lib_version_major = CUFRAMES_VERSION_MAJOR;
|
pub->hdr->lib_version_major = CUFRAMES_VERSION_MAJOR;
|
||||||
pub->hdr->lib_version_minor = CUFRAMES_VERSION_MINOR;
|
pub->hdr->lib_version_minor = CUFRAMES_VERSION_MINOR;
|
||||||
pub->hdr->lib_version_patch = CUFRAMES_VERSION_PATCH;
|
pub->hdr->lib_version_patch = CUFRAMES_VERSION_PATCH;
|
||||||
@@ -208,25 +254,11 @@ static int setup_shm(struct cuframes_publisher *pub) {
|
|||||||
pub->hdr->meta.pitch_uv = puv;
|
pub->hdr->meta.pitch_uv = puv;
|
||||||
pub->hdr->meta.frame_size_bytes = pub->frame_size_bytes;
|
pub->hdr->meta.frame_size_bytes = pub->frame_size_bytes;
|
||||||
|
|
||||||
/* Export event handle (legacy single) */
|
/* v0.4: legacy event fields в header не используются (cuStreamSynchronize
|
||||||
cudaError_t cerr = cudaIpcGetEventHandle(&pub->hdr->ipc_event_handle, pub->event);
|
* заменяет IPC events). Memzero выше — достаточно. */
|
||||||
if (cerr != cudaSuccess) {
|
/* Slot descriptors — mem_handle поле deprecated (передаётся через FDs),
|
||||||
CUFRAMES_LOG_ERROR("cudaIpcGetEventHandle: %s", cudaGetErrorString(cerr));
|
* только seq atomic нужен. */
|
||||||
return CUFRAMES_ERR_CUDA;
|
|
||||||
}
|
|
||||||
/* v0.3 — export per-slot event handles */
|
|
||||||
for (int32_t i = 0; i < pub->ring_size_actual; i++) {
|
|
||||||
cerr = cudaIpcGetEventHandle(&pub->hdr->slot_event_handles[i],
|
|
||||||
pub->slot_events[i]);
|
|
||||||
if (cerr != cudaSuccess) {
|
|
||||||
CUFRAMES_LOG_ERROR("cudaIpcGetEventHandle (slot %d): %s",
|
|
||||||
i, cudaGetErrorString(cerr));
|
|
||||||
return CUFRAMES_ERR_CUDA;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
/* Fill slot descriptors */
|
|
||||||
for (int i = 0; i < pub->ring_size_actual; ++i) {
|
for (int i = 0; i < pub->ring_size_actual; ++i) {
|
||||||
pub->hdr->slots[i].mem_handle = pub->ipc_mem[i];
|
|
||||||
atomic_store_explicit(&pub->hdr->slots[i].seq, UINT64_MAX,
|
atomic_store_explicit(&pub->hdr->slots[i].seq, UINT64_MAX,
|
||||||
memory_order_release);
|
memory_order_release);
|
||||||
}
|
}
|
||||||
@@ -310,6 +342,7 @@ static int common_init(struct cuframes_publisher *pub,
|
|||||||
pub->next_seq = 0;
|
pub->next_seq = 0;
|
||||||
pub->current_slot = -1;
|
pub->current_slot = -1;
|
||||||
pub->has_acquired = 0;
|
pub->has_acquired = 0;
|
||||||
|
for (int i = 0; i < CUFRAMES_MAX_RING; i++) pub->vmm_fds[i] = -1;
|
||||||
pthread_mutex_init(&pub->state_mu, NULL);
|
pthread_mutex_init(&pub->state_mu, NULL);
|
||||||
return CUFRAMES_OK;
|
return CUFRAMES_OK;
|
||||||
}
|
}
|
||||||
@@ -325,7 +358,6 @@ int cuframes_publisher_create(const cuframes_publisher_config_t *cfg,
|
|||||||
common_init(pub, cfg);
|
common_init(pub, cfg);
|
||||||
|
|
||||||
if ((r = alloc_library_pool(pub)) != CUFRAMES_OK) goto fail;
|
if ((r = alloc_library_pool(pub)) != CUFRAMES_OK) goto fail;
|
||||||
if ((r = create_event_handle(pub)) != CUFRAMES_OK) goto fail;
|
|
||||||
if ((r = setup_shm(pub)) != CUFRAMES_OK) goto fail;
|
if ((r = setup_shm(pub)) != CUFRAMES_OK) goto fail;
|
||||||
if ((r = setup_socket(pub)) != CUFRAMES_OK) goto fail;
|
if ((r = setup_socket(pub)) != CUFRAMES_OK) goto fail;
|
||||||
|
|
||||||
@@ -337,7 +369,7 @@ int cuframes_publisher_create(const cuframes_publisher_config_t *cfg,
|
|||||||
}
|
}
|
||||||
pub->accept_thread_alive = 1;
|
pub->accept_thread_alive = 1;
|
||||||
|
|
||||||
CUFRAMES_LOG_INFO("publisher '%s' ready (ring=%d, %dx%d, fmt=%d, lib-owned)",
|
CUFRAMES_LOG_INFO("publisher '%s' ready (ring=%d, %dx%d, fmt=%d, lib-owned, v0.4 VMM)",
|
||||||
pub->key, pub->ring_size_actual,
|
pub->key, pub->ring_size_actual,
|
||||||
pub->cfg.width, pub->cfg.height, (int)pub->cfg.format);
|
pub->cfg.width, pub->cfg.height, (int)pub->cfg.format);
|
||||||
*out = pub;
|
*out = pub;
|
||||||
@@ -353,37 +385,12 @@ int cuframes_publisher_create_external(const cuframes_publisher_config_t *cfg,
|
|||||||
int32_t ptr_count,
|
int32_t ptr_count,
|
||||||
size_t frame_size,
|
size_t frame_size,
|
||||||
cuframes_publisher_t **out) {
|
cuframes_publisher_t **out) {
|
||||||
int r = validate_config(cfg);
|
/* v0.4: external ownership больше не поддерживается. VMM API требует
|
||||||
if (r != CUFRAMES_OK) return r;
|
* cuMemCreate-allocated memory; existing cudaMalloc-pointers нельзя
|
||||||
if (cfg->ownership != CUFRAMES_OWNERSHIP_EXTERNAL) return CUFRAMES_ERR_INVALID_ARG;
|
* export'нуть как POSIX FD. Use LIBRARY ownership. */
|
||||||
if (!cuda_ptrs || ptr_count < 1) return CUFRAMES_ERR_INVALID_ARG;
|
(void)cfg; (void)cuda_ptrs; (void)ptr_count; (void)frame_size; (void)out;
|
||||||
if (frame_size == 0) return CUFRAMES_ERR_INVALID_ARG;
|
CUFRAMES_LOG_ERROR("EXTERNAL ownership не поддерживается в v0.4 (VMM-only)");
|
||||||
|
return CUFRAMES_ERR_INVALID_ARG;
|
||||||
struct cuframes_publisher *pub = calloc(1, sizeof(*pub));
|
|
||||||
if (!pub) return CUFRAMES_ERR_OUT_OF_MEMORY;
|
|
||||||
common_init(pub, cfg);
|
|
||||||
|
|
||||||
if ((r = register_external_pool(pub, cuda_ptrs, ptr_count, frame_size)) != CUFRAMES_OK)
|
|
||||||
goto fail;
|
|
||||||
if ((r = create_event_handle(pub)) != CUFRAMES_OK) goto fail;
|
|
||||||
if ((r = setup_shm(pub)) != CUFRAMES_OK) goto fail;
|
|
||||||
if ((r = setup_socket(pub)) != CUFRAMES_OK) goto fail;
|
|
||||||
|
|
||||||
pub->stop_flag = 0;
|
|
||||||
if (pthread_create(&pub->accept_thread, NULL, accept_thread_main, pub) != 0) {
|
|
||||||
r = CUFRAMES_ERR_INTERNAL;
|
|
||||||
goto fail;
|
|
||||||
}
|
|
||||||
pub->accept_thread_alive = 1;
|
|
||||||
|
|
||||||
CUFRAMES_LOG_INFO("publisher '%s' ready (external pool=%d, %dx%d, fmt=%d)",
|
|
||||||
pub->key, ptr_count,
|
|
||||||
pub->cfg.width, pub->cfg.height, (int)pub->cfg.format);
|
|
||||||
*out = pub;
|
|
||||||
return CUFRAMES_OK;
|
|
||||||
fail:
|
|
||||||
cuframes_publisher_destroy(pub);
|
|
||||||
return r;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int cuframes_publisher_acquire(cuframes_publisher_t *pub, void **cuda_ptr_out) {
|
int cuframes_publisher_acquire(cuframes_publisher_t *pub, void **cuda_ptr_out) {
|
||||||
@@ -404,27 +411,24 @@ int cuframes_publisher_acquire(cuframes_publisher_t *pub, void **cuda_ptr_out) {
|
|||||||
while (1) {
|
while (1) {
|
||||||
uint64_t ack = atomic_load_explicit(&pub->hdr->slots[slot].ack_bitmap,
|
uint64_t ack = atomic_load_explicit(&pub->hdr->slots[slot].ack_bitmap,
|
||||||
memory_order_acquire);
|
memory_order_acquire);
|
||||||
/* Если slot ещё не публикован (seq == UINT64_MAX) — пропустить ack check */
|
|
||||||
uint64_t cur_seq = atomic_load_explicit(&pub->hdr->slots[slot].seq,
|
uint64_t cur_seq = atomic_load_explicit(&pub->hdr->slots[slot].seq,
|
||||||
memory_order_acquire);
|
memory_order_acquire);
|
||||||
if (cur_seq == UINT64_MAX || (ack & bitmap) == bitmap) break;
|
if (cur_seq == UINT64_MAX || (ack & bitmap) == bitmap) break;
|
||||||
if (deadline && cuframes_now_ns() > deadline) {
|
if (deadline && cuframes_now_ns() > deadline) {
|
||||||
/* Mark slow subscriber dead и continue */
|
|
||||||
uint64_t missing = bitmap & ~ack;
|
uint64_t missing = bitmap & ~ack;
|
||||||
CUFRAMES_LOG_WARN("strict-wait timeout, slow subscribers bitmap=0x%lx",
|
CUFRAMES_LOG_WARN("strict-wait timeout, slow subscribers bitmap=0x%lx",
|
||||||
(unsigned long)missing);
|
(unsigned long)missing);
|
||||||
/* clear missing subscribers — TODO: send unsubscribe in v0.2 */
|
|
||||||
atomic_fetch_and_explicit(&pub->hdr->subscriber_bitmap,
|
atomic_fetch_and_explicit(&pub->hdr->subscriber_bitmap,
|
||||||
~missing, memory_order_release);
|
~missing, memory_order_release);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
struct timespec ts = {.tv_sec = 0, .tv_nsec = 100000}; /* 100µs */
|
struct timespec ts = {.tv_sec = 0, .tv_nsec = 100000};
|
||||||
nanosleep(&ts, NULL);
|
nanosleep(&ts, NULL);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
*cuda_ptr_out = pub->cuda_ptrs[slot];
|
*cuda_ptr_out = (void *)(uintptr_t)pub->vmm_ptrs[slot];
|
||||||
pub->current_slot = slot;
|
pub->current_slot = slot;
|
||||||
pub->has_acquired = 1;
|
pub->has_acquired = 1;
|
||||||
return CUFRAMES_OK;
|
return CUFRAMES_OK;
|
||||||
@@ -432,21 +436,16 @@ int cuframes_publisher_acquire(cuframes_publisher_t *pub, void **cuda_ptr_out) {
|
|||||||
|
|
||||||
static int do_publish(cuframes_publisher_t *pub, int32_t slot,
|
static int do_publish(cuframes_publisher_t *pub, int32_t slot,
|
||||||
void *stream, int64_t pts_ns) {
|
void *stream, int64_t pts_ns) {
|
||||||
/* v0.3 — record per-slot event для precise consumer sync. Closes TOCTOU
|
/* v0.4 — заменяет cudaEventRecord+IPC events на cuStreamSynchronize.
|
||||||
* race где legacy `pub->event` signals "latest publish", not slot-specific. */
|
* Producer ждёт что stream flush'нулся (~1ms на 5090), потом publishes
|
||||||
cudaError_t cerr = cudaEventRecord(pub->slot_events[slot], (cudaStream_t)stream);
|
* seq atomically. Consumer читает данные через DtoD memcpy без event
|
||||||
|
* wait — hardware coherence гарантирована на одном GPU. */
|
||||||
|
cudaError_t cerr = cudaStreamSynchronize((cudaStream_t)stream);
|
||||||
if (cerr != cudaSuccess) {
|
if (cerr != cudaSuccess) {
|
||||||
CUFRAMES_LOG_ERROR("cudaEventRecord (slot %d): %s",
|
CUFRAMES_LOG_ERROR("cudaStreamSynchronize (slot %d): %s",
|
||||||
slot, cudaGetErrorString(cerr));
|
slot, cudaGetErrorString(cerr));
|
||||||
return CUFRAMES_ERR_CUDA;
|
return CUFRAMES_ERR_CUDA;
|
||||||
}
|
}
|
||||||
/* Legacy event — keep recording для v0.2 consumer compat fallback */
|
|
||||||
cerr = cudaEventRecord(pub->event, (cudaStream_t)stream);
|
|
||||||
if (cerr != cudaSuccess) {
|
|
||||||
CUFRAMES_LOG_ERROR("cudaEventRecord (legacy): %s",
|
|
||||||
cudaGetErrorString(cerr));
|
|
||||||
return CUFRAMES_ERR_CUDA;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Reset ack bitmap для нового frame'а */
|
/* Reset ack bitmap для нового frame'а */
|
||||||
atomic_store_explicit(&pub->hdr->slots[slot].ack_bitmap, 0,
|
atomic_store_explicit(&pub->hdr->slots[slot].ack_bitmap, 0,
|
||||||
@@ -477,44 +476,8 @@ int cuframes_publisher_publish(cuframes_publisher_t *pub, void *stream, int64_t
|
|||||||
|
|
||||||
int cuframes_publisher_publish_external(cuframes_publisher_t *pub,
|
int cuframes_publisher_publish_external(cuframes_publisher_t *pub,
|
||||||
void *cuda_ptr, void *stream, int64_t pts_ns) {
|
void *cuda_ptr, void *stream, int64_t pts_ns) {
|
||||||
if (!pub || !cuda_ptr) return CUFRAMES_ERR_INVALID_ARG;
|
(void)pub; (void)cuda_ptr; (void)stream; (void)pts_ns;
|
||||||
if (pub->cfg.ownership != CUFRAMES_OWNERSHIP_EXTERNAL) return CUFRAMES_ERR_INVALID_ARG;
|
return CUFRAMES_ERR_INVALID_ARG; /* v0.4 — нет external mode */
|
||||||
|
|
||||||
int32_t slot = -1;
|
|
||||||
for (int i = 0; i < pub->external_count; ++i) {
|
|
||||||
if (pub->external_ptrs[i] == cuda_ptr) { slot = i; break; }
|
|
||||||
}
|
|
||||||
if (slot < 0) {
|
|
||||||
CUFRAMES_LOG_ERROR("external pointer %p not registered", cuda_ptr);
|
|
||||||
return CUFRAMES_ERR_INVALID_ARG;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* STRICT_WAIT — то же что в acquire, но per-publish */
|
|
||||||
if (pub->cfg.policy == CUFRAMES_POLICY_STRICT_WAIT) {
|
|
||||||
uint64_t bitmap = atomic_load_explicit(&pub->hdr->subscriber_bitmap,
|
|
||||||
memory_order_acquire);
|
|
||||||
if (bitmap != 0) {
|
|
||||||
int64_t deadline = pub->cfg.consumer_ack_timeout_ms > 0
|
|
||||||
? cuframes_now_ns() + (int64_t)pub->cfg.consumer_ack_timeout_ms * 1000000LL
|
|
||||||
: 0;
|
|
||||||
while (1) {
|
|
||||||
uint64_t ack = atomic_load_explicit(&pub->hdr->slots[slot].ack_bitmap,
|
|
||||||
memory_order_acquire);
|
|
||||||
uint64_t cur_seq = atomic_load_explicit(&pub->hdr->slots[slot].seq,
|
|
||||||
memory_order_acquire);
|
|
||||||
if (cur_seq == UINT64_MAX || (ack & bitmap) == bitmap) break;
|
|
||||||
if (deadline && cuframes_now_ns() > deadline) {
|
|
||||||
uint64_t missing = bitmap & ~ack;
|
|
||||||
atomic_fetch_and_explicit(&pub->hdr->subscriber_bitmap,
|
|
||||||
~missing, memory_order_release);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
struct timespec ts = {.tv_sec = 0, .tv_nsec = 100000};
|
|
||||||
nanosleep(&ts, NULL);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return do_publish(pub, slot, stream, pts_ns);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int cuframes_publisher_destroy(cuframes_publisher_t *pub) {
|
int cuframes_publisher_destroy(cuframes_publisher_t *pub) {
|
||||||
@@ -536,15 +499,9 @@ int cuframes_publisher_destroy(cuframes_publisher_t *pub) {
|
|||||||
pub->accept_thread_alive = 0;
|
pub->accept_thread_alive = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Free CUDA */
|
/* Free VMM */
|
||||||
if (pub->cfg.ownership == CUFRAMES_OWNERSHIP_LIBRARY) {
|
if (pub->has_vmm_pool) {
|
||||||
for (int i = 0; i < pub->ring_size_actual; ++i) {
|
free_vmm_pool(pub);
|
||||||
if (pub->cuda_ptrs[i]) cudaFree(pub->cuda_ptrs[i]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (pub->event) cudaEventDestroy(pub->event);
|
|
||||||
for (int32_t i = 0; i < pub->ring_size_actual; i++) {
|
|
||||||
if (pub->slot_events[i]) cudaEventDestroy(pub->slot_events[i]);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Packet ring cleanup (если активирован) */
|
/* Packet ring cleanup (если активирован) */
|
||||||
@@ -599,11 +556,7 @@ int cuframes_publisher_enable_packets(cuframes_publisher_t *pub,
|
|||||||
|
|
||||||
pub->has_pkt_ring = 1;
|
pub->has_pkt_ring = 1;
|
||||||
pub->max_packet_size = max_pkt;
|
pub->max_packet_size = max_pkt;
|
||||||
|
/* v0.4 frame header proto не bumped из-за packet ring — оба коэкзистируют. */
|
||||||
/* Bump proto_version в frames header чтобы v2-subscribers видели поддержку. */
|
|
||||||
if (pub->hdr) {
|
|
||||||
pub->hdr->proto_version = CUFRAMES_PROTOCOL_V2;
|
|
||||||
}
|
|
||||||
return CUFRAMES_OK;
|
return CUFRAMES_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -628,9 +581,6 @@ int cuframes_publisher_publish_packet(cuframes_publisher_t *pub,
|
|||||||
|
|
||||||
/* ─── Accept thread + handshake ──────────────────────────────────────── */
|
/* ─── Accept thread + handshake ──────────────────────────────────────── */
|
||||||
|
|
||||||
/* Per-subscriber lifecycle monitor — detects socket close (subscriber container
|
|
||||||
* exited / crashed) и освобождает bit + subscribers[] slot. Без этого каждый
|
|
||||||
* pipeline recreate leaks bit → bitmap overflows after 32 connections. */
|
|
||||||
struct sub_monitor_args {
|
struct sub_monitor_args {
|
||||||
struct cuframes_publisher *pub;
|
struct cuframes_publisher *pub;
|
||||||
int fd;
|
int fd;
|
||||||
@@ -640,23 +590,18 @@ struct sub_monitor_args {
|
|||||||
static void *subscriber_monitor_thread(void *arg) {
|
static void *subscriber_monitor_thread(void *arg) {
|
||||||
struct sub_monitor_args *m = (struct sub_monitor_args *)arg;
|
struct sub_monitor_args *m = (struct sub_monitor_args *)arg;
|
||||||
char buf[64];
|
char buf[64];
|
||||||
/* Blocking read — return 0 (EOF) когда other side close socket, или
|
|
||||||
* <0 on error. Любой control message (PING — TODO в будущем) just consumed. */
|
|
||||||
while (1) {
|
while (1) {
|
||||||
ssize_t n = recv(m->fd, buf, sizeof(buf), 0);
|
ssize_t n = recv(m->fd, buf, sizeof(buf), 0);
|
||||||
if (n <= 0) {
|
if (n <= 0) {
|
||||||
/* Subscriber dead — clear bit + slot state. */
|
|
||||||
atomic_fetch_and_explicit(&m->pub->hdr->subscriber_bitmap,
|
atomic_fetch_and_explicit(&m->pub->hdr->subscriber_bitmap,
|
||||||
~(1ULL << m->bit), memory_order_release);
|
~(1ULL << m->bit), memory_order_release);
|
||||||
atomic_store_explicit(&m->pub->hdr->subscribers[m->bit].state, 0,
|
atomic_store_explicit(&m->pub->hdr->subscribers[m->bit].state, 0,
|
||||||
memory_order_release);
|
memory_order_release);
|
||||||
close(m->fd);
|
close(m->fd);
|
||||||
CUFRAMES_LOG_INFO("subscriber bit=%u disconnected — freed",
|
CUFRAMES_LOG_INFO("subscriber bit=%u disconnected — freed", m->bit);
|
||||||
m->bit);
|
|
||||||
free(m);
|
free(m);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
/* future: parse control msgs (PING, UNSUBSCRIBE) here */
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -672,8 +617,6 @@ static void *accept_thread_main(void *arg) {
|
|||||||
CUFRAMES_LOG_WARN("accept: %s", strerror(errno));
|
CUFRAMES_LOG_WARN("accept: %s", strerror(errno));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
/* Handshake — на error close socket (no monitor spawned). На success
|
|
||||||
* monitor thread становится owner socket'a + cleanup'ит при disconnect. */
|
|
||||||
int r = handshake_subscriber(pub, client);
|
int r = handshake_subscriber(pub, client);
|
||||||
if (r != CUFRAMES_OK) {
|
if (r != CUFRAMES_OK) {
|
||||||
close(client);
|
close(client);
|
||||||
@@ -684,7 +627,6 @@ static void *accept_thread_main(void *arg) {
|
|||||||
|
|
||||||
static int allocate_subscriber_bit(struct cuframes_publisher *pub,
|
static int allocate_subscriber_bit(struct cuframes_publisher *pub,
|
||||||
const char *name, uint32_t *bit_out) {
|
const char *name, uint32_t *bit_out) {
|
||||||
/* Bit 0 reserved (sentinel). Bits 1..31. */
|
|
||||||
pthread_mutex_lock(&pub->state_mu);
|
pthread_mutex_lock(&pub->state_mu);
|
||||||
for (uint32_t bit = 1; bit < CUFRAMES_MAX_SUBSCRIBERS; ++bit) {
|
for (uint32_t bit = 1; bit < CUFRAMES_MAX_SUBSCRIBERS; ++bit) {
|
||||||
uint64_t state = atomic_load_explicit(&pub->hdr->subscribers[bit].state,
|
uint64_t state = atomic_load_explicit(&pub->hdr->subscribers[bit].state,
|
||||||
@@ -704,7 +646,6 @@ static int allocate_subscriber_bit(struct cuframes_publisher *pub,
|
|||||||
pthread_mutex_unlock(&pub->state_mu);
|
pthread_mutex_unlock(&pub->state_mu);
|
||||||
return CUFRAMES_OK;
|
return CUFRAMES_OK;
|
||||||
}
|
}
|
||||||
/* Check for name collision */
|
|
||||||
if (name && state >= 2 &&
|
if (name && state >= 2 &&
|
||||||
strncmp(pub->hdr->subscribers[bit].consumer_name, name,
|
strncmp(pub->hdr->subscribers[bit].consumer_name, name,
|
||||||
sizeof(pub->hdr->subscribers[bit].consumer_name)) == 0) {
|
sizeof(pub->hdr->subscribers[bit].consumer_name)) == 0) {
|
||||||
@@ -731,7 +672,6 @@ static int handshake_subscriber(struct cuframes_publisher *pub, int client_fd) {
|
|||||||
return CUFRAMES_ERR_PROTOCOL;
|
return CUFRAMES_ERR_PROTOCOL;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Parse HELLO_REQ: proto_version + name_len + name + cuda_device + mode */
|
|
||||||
if (plen < sizeof(cuframes_msg_hello_req_t) + 20) return CUFRAMES_ERR_PROTOCOL;
|
if (plen < sizeof(cuframes_msg_hello_req_t) + 20) return CUFRAMES_ERR_PROTOCOL;
|
||||||
cuframes_msg_hello_req_t *hreq = (cuframes_msg_hello_req_t *)buf;
|
cuframes_msg_hello_req_t *hreq = (cuframes_msg_hello_req_t *)buf;
|
||||||
uint32_t want_proto = hreq->proto_version;
|
uint32_t want_proto = hreq->proto_version;
|
||||||
@@ -741,18 +681,18 @@ static int handshake_subscriber(struct cuframes_publisher *pub, int client_fd) {
|
|||||||
char name[32] = {0};
|
char name[32] = {0};
|
||||||
memcpy(name, buf + sizeof(*hreq), name_len);
|
memcpy(name, buf + sizeof(*hreq), name_len);
|
||||||
|
|
||||||
int proto_match = (want_proto == CUFRAMES_PROTOCOL_V1);
|
/* v0.4 принимает только V4 consumers. Старые v0.3 fail здесь cleanly. */
|
||||||
|
int proto_match = (want_proto == CUFRAMES_PROTOCOL_V4);
|
||||||
|
|
||||||
/* Send HELLO_RESP */
|
/* Send HELLO_RESP */
|
||||||
uint8_t resp_buf[CUFRAMES_MAX_MSG_PAYLOAD];
|
uint8_t resp_buf[CUFRAMES_MAX_MSG_PAYLOAD];
|
||||||
cuframes_msg_hello_resp_t *resp = (cuframes_msg_hello_resp_t *)resp_buf;
|
cuframes_msg_hello_resp_t *resp = (cuframes_msg_hello_resp_t *)resp_buf;
|
||||||
memset(resp, 0, sizeof(*resp));
|
memset(resp, 0, sizeof(*resp));
|
||||||
resp->result = proto_match ? CUFRAMES_OK : CUFRAMES_ERR_PROTOCOL;
|
resp->result = proto_match ? CUFRAMES_OK : CUFRAMES_ERR_PROTOCOL;
|
||||||
resp->proto_version_actual = CUFRAMES_PROTOCOL_V1;
|
resp->proto_version_actual = CUFRAMES_PROTOCOL_V4;
|
||||||
resp->ring_size = (uint32_t)pub->ring_size_actual;
|
resp->ring_size = (uint32_t)pub->ring_size_actual;
|
||||||
resp->ownership_mode = (uint32_t)pub->cfg.ownership;
|
resp->ownership_mode = (uint32_t)pub->cfg.ownership;
|
||||||
resp->meta = pub->hdr->meta;
|
resp->meta = pub->hdr->meta;
|
||||||
/* shm_path */
|
|
||||||
int slen = snprintf((char *)(resp_buf + sizeof(*resp)),
|
int slen = snprintf((char *)(resp_buf + sizeof(*resp)),
|
||||||
sizeof(resp_buf) - sizeof(*resp) - 12,
|
sizeof(resp_buf) - sizeof(*resp) - 12,
|
||||||
"%s", pub->shm_name);
|
"%s", pub->shm_name);
|
||||||
@@ -765,7 +705,11 @@ static int handshake_subscriber(struct cuframes_publisher *pub, int client_fd) {
|
|||||||
CUFRAMES_LOG_WARN("send HELLO_RESP: %s", cuframes_strerror(r));
|
CUFRAMES_LOG_WARN("send HELLO_RESP: %s", cuframes_strerror(r));
|
||||||
return r;
|
return r;
|
||||||
}
|
}
|
||||||
if (!proto_match) return CUFRAMES_ERR_PROTOCOL;
|
if (!proto_match) {
|
||||||
|
CUFRAMES_LOG_WARN("subscriber proto v%u rejected (want v%u)",
|
||||||
|
want_proto, CUFRAMES_PROTOCOL_V4);
|
||||||
|
return CUFRAMES_ERR_PROTOCOL;
|
||||||
|
}
|
||||||
|
|
||||||
/* recv SUBSCRIBE_REQ */
|
/* recv SUBSCRIBE_REQ */
|
||||||
plen = sizeof(buf);
|
plen = sizeof(buf);
|
||||||
@@ -773,11 +717,9 @@ static int handshake_subscriber(struct cuframes_publisher *pub, int client_fd) {
|
|||||||
if (r != CUFRAMES_OK) return r;
|
if (r != CUFRAMES_OK) return r;
|
||||||
if (mtype != CUFRAMES_MSG_SUBSCRIBE_REQ) return CUFRAMES_ERR_PROTOCOL;
|
if (mtype != CUFRAMES_MSG_SUBSCRIBE_REQ) return CUFRAMES_ERR_PROTOCOL;
|
||||||
|
|
||||||
/* Allocate subscriber bit */
|
|
||||||
uint32_t bit = 0;
|
uint32_t bit = 0;
|
||||||
int alloc_r = allocate_subscriber_bit(pub, name, &bit);
|
int alloc_r = allocate_subscriber_bit(pub, name, &bit);
|
||||||
|
|
||||||
/* Send SUBSCRIBE_RESP */
|
|
||||||
cuframes_msg_subscribe_resp_t sresp = {0};
|
cuframes_msg_subscribe_resp_t sresp = {0};
|
||||||
sresp.result = alloc_r;
|
sresp.result = alloc_r;
|
||||||
sresp.assigned_bit = bit;
|
sresp.assigned_bit = bit;
|
||||||
@@ -788,20 +730,33 @@ static int handshake_subscriber(struct cuframes_publisher *pub, int client_fd) {
|
|||||||
&sresp, sizeof(sresp));
|
&sresp, sizeof(sresp));
|
||||||
if (r != CUFRAMES_OK || alloc_r != CUFRAMES_OK) return r ? r : alloc_r;
|
if (r != CUFRAMES_OK || alloc_r != CUFRAMES_OK) return r ? r : alloc_r;
|
||||||
|
|
||||||
/* Activate subscriber slot */
|
/* v0.4 — отправить VMM_FDS с N posix FDs через SCM_RIGHTS */
|
||||||
|
cuframes_msg_vmm_fds_t vmm_payload = {0};
|
||||||
|
vmm_payload.slot_size_bytes = pub->vmm_slot_size;
|
||||||
|
vmm_payload.fd_count = (uint32_t)pub->ring_size_actual;
|
||||||
|
r = cuframes_internal_send_msg_with_fds(client_fd, CUFRAMES_MSG_VMM_FDS,
|
||||||
|
&vmm_payload, sizeof(vmm_payload),
|
||||||
|
pub->vmm_fds,
|
||||||
|
(uint32_t)pub->ring_size_actual);
|
||||||
|
if (r != CUFRAMES_OK) {
|
||||||
|
CUFRAMES_LOG_WARN("send VMM_FDS: %s", cuframes_strerror(r));
|
||||||
|
/* roll back bit allocation */
|
||||||
|
atomic_fetch_and_explicit(&pub->hdr->subscriber_bitmap,
|
||||||
|
~(1ULL << bit), memory_order_release);
|
||||||
|
atomic_store_explicit(&pub->hdr->subscribers[bit].state, 0,
|
||||||
|
memory_order_release);
|
||||||
|
return r;
|
||||||
|
}
|
||||||
|
|
||||||
atomic_store_explicit(&pub->hdr->subscribers[bit].state, 2,
|
atomic_store_explicit(&pub->hdr->subscribers[bit].state, 2,
|
||||||
memory_order_release);
|
memory_order_release);
|
||||||
|
|
||||||
CUFRAMES_LOG_INFO("subscriber '%s' connected (bit=%u)", name, bit);
|
CUFRAMES_LOG_INFO("subscriber '%s' connected (bit=%u, %d VMM FDs)",
|
||||||
|
name, bit, pub->ring_size_actual);
|
||||||
|
|
||||||
/* Spawn detached monitor thread — owns client_fd, frees bit on socket
|
/* Spawn monitor thread */
|
||||||
* close (subscriber container exit / crash). Без этого bitmap утекал
|
|
||||||
* каждый pipeline recreate. */
|
|
||||||
struct sub_monitor_args *m = malloc(sizeof(*m));
|
struct sub_monitor_args *m = malloc(sizeof(*m));
|
||||||
if (!m) {
|
if (!m) return CUFRAMES_OK;
|
||||||
/* OOM — fallback: leak fd, bit будет released только publisher_destroy */
|
|
||||||
return CUFRAMES_OK;
|
|
||||||
}
|
|
||||||
m->pub = pub;
|
m->pub = pub;
|
||||||
m->fd = client_fd;
|
m->fd = client_fd;
|
||||||
m->bit = bit;
|
m->bit = bit;
|
||||||
|
|||||||
@@ -3,7 +3,9 @@
|
|||||||
#include "internal.h"
|
#include "internal.h"
|
||||||
#include <errno.h>
|
#include <errno.h>
|
||||||
#include <poll.h>
|
#include <poll.h>
|
||||||
|
#include <string.h>
|
||||||
#include <sys/socket.h>
|
#include <sys/socket.h>
|
||||||
|
#include <sys/uio.h>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
|
|
||||||
/* Read exactly N bytes from socket, with poll-based timeout. */
|
/* Read exactly N bytes from socket, with poll-based timeout. */
|
||||||
@@ -97,3 +99,121 @@ int cuframes_internal_recv_msg(int fd, uint32_t *msg_type_out,
|
|||||||
if (payload_len_inout) *payload_len_inout = h.payload_length;
|
if (payload_len_inout) *payload_len_inout = h.payload_length;
|
||||||
return CUFRAMES_OK;
|
return CUFRAMES_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* v0.4 — send TLV msg + N FDs через SCM_RIGHTS. Один sendmsg(): header+payload
|
||||||
|
* в iovec, FDs в control. Header.payload_length описывает ТОЛЬКО payload bytes,
|
||||||
|
* FDs приходят out-of-band. */
|
||||||
|
int cuframes_internal_send_msg_with_fds(int sock_fd, uint32_t msg_type,
|
||||||
|
const void *payload, uint32_t payload_len,
|
||||||
|
const int *fds, uint32_t fd_count) {
|
||||||
|
if (payload_len > CUFRAMES_MAX_MSG_PAYLOAD) return CUFRAMES_ERR_INVALID_ARG;
|
||||||
|
if (fd_count > 0 && !fds) return CUFRAMES_ERR_INVALID_ARG;
|
||||||
|
|
||||||
|
cuframes_msg_header_t h = {.msg_type = msg_type, .payload_length = payload_len};
|
||||||
|
|
||||||
|
struct iovec iov[2];
|
||||||
|
iov[0].iov_base = &h; iov[0].iov_len = sizeof(h);
|
||||||
|
iov[1].iov_base = (void *)payload; iov[1].iov_len = payload_len;
|
||||||
|
|
||||||
|
struct msghdr msg = {0};
|
||||||
|
msg.msg_iov = iov;
|
||||||
|
msg.msg_iovlen = (payload_len > 0 && payload) ? 2 : 1;
|
||||||
|
|
||||||
|
char ctrl_buf[CMSG_SPACE(sizeof(int) * 64)] = {0};
|
||||||
|
if (fd_count > 0) {
|
||||||
|
if (fd_count > 64) return CUFRAMES_ERR_INVALID_ARG;
|
||||||
|
msg.msg_control = ctrl_buf;
|
||||||
|
msg.msg_controllen = CMSG_SPACE(sizeof(int) * fd_count);
|
||||||
|
struct cmsghdr *cmsg = CMSG_FIRSTHDR(&msg);
|
||||||
|
cmsg->cmsg_level = SOL_SOCKET;
|
||||||
|
cmsg->cmsg_type = SCM_RIGHTS;
|
||||||
|
cmsg->cmsg_len = CMSG_LEN(sizeof(int) * fd_count);
|
||||||
|
memcpy(CMSG_DATA(cmsg), fds, sizeof(int) * fd_count);
|
||||||
|
}
|
||||||
|
|
||||||
|
ssize_t n = sendmsg(sock_fd, &msg, MSG_NOSIGNAL);
|
||||||
|
if (n < 0) {
|
||||||
|
if (errno == EPIPE) return CUFRAMES_ERR_DISCONNECTED;
|
||||||
|
return CUFRAMES_ERR_IO;
|
||||||
|
}
|
||||||
|
/* Partial send rare для small payload — но обработаем gracefully */
|
||||||
|
size_t want = sizeof(h) + payload_len;
|
||||||
|
if ((size_t)n < want) {
|
||||||
|
return send_all(sock_fd, (uint8_t *)iov[0].iov_base + n,
|
||||||
|
want - (size_t)n);
|
||||||
|
}
|
||||||
|
return CUFRAMES_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
int cuframes_internal_recv_msg_with_fds(int sock_fd, uint32_t *msg_type_out,
|
||||||
|
void *payload, uint32_t *payload_len_inout,
|
||||||
|
int *fds_out, uint32_t *fd_count_inout,
|
||||||
|
int32_t timeout_ms) {
|
||||||
|
/* Poll первым делом — recvmsg блокирующий, иначе тайм-аут не сработает. */
|
||||||
|
if (timeout_ms >= 0) {
|
||||||
|
struct pollfd pfd = {.fd = sock_fd, .events = POLLIN};
|
||||||
|
int pr = poll(&pfd, 1, timeout_ms);
|
||||||
|
if (pr == 0) return CUFRAMES_ERR_TIMEOUT;
|
||||||
|
if (pr < 0) return CUFRAMES_ERR_IO;
|
||||||
|
}
|
||||||
|
|
||||||
|
cuframes_msg_header_t h;
|
||||||
|
struct iovec iov[2];
|
||||||
|
iov[0].iov_base = &h; iov[0].iov_len = sizeof(h);
|
||||||
|
iov[1].iov_base = payload; iov[1].iov_len = (payload && payload_len_inout) ? *payload_len_inout : 0;
|
||||||
|
|
||||||
|
uint32_t want_fds = (fd_count_inout && fds_out) ? *fd_count_inout : 0;
|
||||||
|
char ctrl_buf[CMSG_SPACE(sizeof(int) * 64)] = {0};
|
||||||
|
struct msghdr msg = {0};
|
||||||
|
msg.msg_iov = iov;
|
||||||
|
msg.msg_iovlen = (iov[1].iov_len > 0) ? 2 : 1;
|
||||||
|
msg.msg_control = ctrl_buf;
|
||||||
|
msg.msg_controllen = sizeof(ctrl_buf);
|
||||||
|
|
||||||
|
ssize_t n = recvmsg(sock_fd, &msg, 0);
|
||||||
|
if (n == 0) return CUFRAMES_ERR_DISCONNECTED;
|
||||||
|
if (n < 0) return CUFRAMES_ERR_IO;
|
||||||
|
if ((size_t)n < sizeof(h)) return CUFRAMES_ERR_PROTOCOL;
|
||||||
|
|
||||||
|
if (msg_type_out) *msg_type_out = h.msg_type;
|
||||||
|
if (h.payload_length > CUFRAMES_MAX_MSG_PAYLOAD) return CUFRAMES_ERR_PROTOCOL;
|
||||||
|
|
||||||
|
/* Если recvmsg вернул меньше payload_length — добираем через recv_all */
|
||||||
|
size_t got_payload = (size_t)n - sizeof(h);
|
||||||
|
if (h.payload_length > 0) {
|
||||||
|
if (!payload || !payload_len_inout || *payload_len_inout < h.payload_length) {
|
||||||
|
return CUFRAMES_ERR_INVALID_ARG;
|
||||||
|
}
|
||||||
|
if (got_payload < h.payload_length) {
|
||||||
|
int r = recv_all(sock_fd, (uint8_t *)payload + got_payload,
|
||||||
|
h.payload_length - got_payload, timeout_ms);
|
||||||
|
if (r != CUFRAMES_OK) return r;
|
||||||
|
}
|
||||||
|
*payload_len_inout = h.payload_length;
|
||||||
|
} else if (payload_len_inout) {
|
||||||
|
*payload_len_inout = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Parse SCM_RIGHTS FDs */
|
||||||
|
uint32_t got_fds = 0;
|
||||||
|
struct cmsghdr *cmsg = CMSG_FIRSTHDR(&msg);
|
||||||
|
for (; cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
|
||||||
|
if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
|
||||||
|
size_t blob = cmsg->cmsg_len - CMSG_LEN(0);
|
||||||
|
uint32_t n_fds = (uint32_t)(blob / sizeof(int));
|
||||||
|
if (got_fds + n_fds > want_fds) {
|
||||||
|
/* Close excess FDs чтобы не утекли */
|
||||||
|
for (uint32_t i = 0; i < n_fds; i++) {
|
||||||
|
int f;
|
||||||
|
memcpy(&f, CMSG_DATA(cmsg) + i * sizeof(int), sizeof(int));
|
||||||
|
close(f);
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
memcpy(fds_out + got_fds, CMSG_DATA(cmsg), blob);
|
||||||
|
got_fds += n_fds;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (fd_count_inout) *fd_count_inout = got_fds;
|
||||||
|
return CUFRAMES_OK;
|
||||||
|
}
|
||||||
|
|||||||
@@ -0,0 +1,4 @@
|
|||||||
|
vmm_fd_pingpong/producer
|
||||||
|
vmm_fd_pingpong/consumer
|
||||||
|
smoke_v04/smoke_pub
|
||||||
|
smoke_v04/smoke_sub
|
||||||
@@ -0,0 +1,13 @@
|
|||||||
|
CFLAGS = -O2 -Wall -I../../include -I/usr/local/cuda/include
|
||||||
|
LDFLAGS = -L../../build-v04/libcuframes -lcuframes -L/usr/local/cuda/lib64 -lcudart -lcuda -lpthread -lrt
|
||||||
|
|
||||||
|
all: smoke_pub smoke_sub
|
||||||
|
|
||||||
|
smoke_pub: smoke_pub.c
|
||||||
|
gcc $(CFLAGS) -o $@ $< $(LDFLAGS)
|
||||||
|
|
||||||
|
smoke_sub: smoke_sub.c
|
||||||
|
gcc $(CFLAGS) -o $@ $< $(LDFLAGS)
|
||||||
|
|
||||||
|
clean:
|
||||||
|
rm -f smoke_pub smoke_sub
|
||||||
@@ -0,0 +1,55 @@
|
|||||||
|
/* v0.4 smoke test publisher — NV12 1920x1080 ring 4, fill каждый slot
|
||||||
|
* с pattern (i % 256), publish, infinite loop. */
|
||||||
|
#include <cuframes/cuframes.h>
|
||||||
|
#include <cuda_runtime.h>
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <time.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
|
int main(int argc, char **argv) {
|
||||||
|
const char *key = argc > 1 ? argv[1] : "smoke";
|
||||||
|
|
||||||
|
cuframes_publisher_config_t cfg = {0};
|
||||||
|
cfg.key = key;
|
||||||
|
cfg.width = 1920;
|
||||||
|
cfg.height = 1080;
|
||||||
|
cfg.format = CUFRAMES_FORMAT_NV12;
|
||||||
|
cfg.ownership = CUFRAMES_OWNERSHIP_LIBRARY;
|
||||||
|
cfg.ring_size = 4;
|
||||||
|
cfg.policy = CUFRAMES_POLICY_DROP_OLDEST;
|
||||||
|
cfg.cuda_device = 0;
|
||||||
|
|
||||||
|
cuframes_publisher_t *pub = NULL;
|
||||||
|
int r = cuframes_publisher_create(&cfg, &pub);
|
||||||
|
if (r != CUFRAMES_OK) {
|
||||||
|
fprintf(stderr, "publisher create failed: %d (%s)\n", r, cuframes_strerror(r));
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
fprintf(stderr, "publisher 'cuframes-%s' ready (v0.4 VMM)\n", key);
|
||||||
|
|
||||||
|
cudaStream_t stream;
|
||||||
|
cudaStreamCreate(&stream);
|
||||||
|
|
||||||
|
int i = 0;
|
||||||
|
while (1) {
|
||||||
|
void *ptr = NULL;
|
||||||
|
r = cuframes_publisher_acquire(pub, &ptr);
|
||||||
|
if (r != CUFRAMES_OK) { fprintf(stderr, "acquire: %d\n", r); break; }
|
||||||
|
|
||||||
|
uint8_t pattern = (uint8_t)(i & 0xFF);
|
||||||
|
cudaMemsetAsync(ptr, pattern, 1920 * 1080 * 3 / 2, stream);
|
||||||
|
|
||||||
|
r = cuframes_publisher_publish(pub, stream,
|
||||||
|
(int64_t)cuframes_now_ns());
|
||||||
|
if (r != CUFRAMES_OK) { fprintf(stderr, "publish: %d\n", r); break; }
|
||||||
|
i++;
|
||||||
|
if (i % 50 == 0) fprintf(stderr, "published %d frames\n", i);
|
||||||
|
struct timespec ts = {.tv_sec = 0, .tv_nsec = 40000000}; /* 25 fps */
|
||||||
|
nanosleep(&ts, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
cudaStreamDestroy(stream);
|
||||||
|
cuframes_publisher_destroy(pub);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
@@ -0,0 +1,63 @@
|
|||||||
|
/* v0.4 smoke subscriber — connect, read 100 frames, verify pattern, exit 0/1. */
|
||||||
|
#include <cuframes/cuframes.h>
|
||||||
|
#include <cuda_runtime.h>
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
#include <string.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
|
int main(int argc, char **argv) {
|
||||||
|
const char *key = argc > 1 ? argv[1] : "smoke";
|
||||||
|
|
||||||
|
cuframes_subscriber_config_t cfg = {0};
|
||||||
|
cfg.key = key;
|
||||||
|
cfg.consumer_name = "smoke-sub";
|
||||||
|
cfg.mode = CUFRAMES_MODE_NEWEST_ONLY;
|
||||||
|
cfg.cuda_device = 0;
|
||||||
|
cfg.connect_timeout_ms = 10000;
|
||||||
|
|
||||||
|
cuframes_subscriber_t *sub = NULL;
|
||||||
|
int r = cuframes_subscriber_create(&cfg, &sub);
|
||||||
|
if (r != CUFRAMES_OK) {
|
||||||
|
fprintf(stderr, "subscriber create failed: %d (%s)\n", r, cuframes_strerror(r));
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
fprintf(stderr, "subscribed to '%s' (v0.4)\n", key);
|
||||||
|
|
||||||
|
cudaStream_t stream;
|
||||||
|
cudaStreamCreate(&stream);
|
||||||
|
size_t check_size = 1024; /* sample 1KB чтобы не тратить время */
|
||||||
|
uint8_t *host = malloc(check_size);
|
||||||
|
|
||||||
|
int frames = 0;
|
||||||
|
int good = 0;
|
||||||
|
while (frames < 100) {
|
||||||
|
cuframes_frame_t *f = NULL;
|
||||||
|
r = cuframes_subscriber_next(sub, stream, &f, 2000);
|
||||||
|
if (r != CUFRAMES_OK) {
|
||||||
|
fprintf(stderr, "next failed: %d (%s)\n", r, cuframes_strerror(r));
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
cudaMemcpyAsync(host, cuframes_frame_cuda_ptr(f), check_size,
|
||||||
|
cudaMemcpyDeviceToHost, stream);
|
||||||
|
cudaStreamSynchronize(stream);
|
||||||
|
uint8_t exp = host[0];
|
||||||
|
int mismatch = 0;
|
||||||
|
for (size_t i = 1; i < check_size; i++) {
|
||||||
|
if (host[i] != exp) { mismatch++; }
|
||||||
|
}
|
||||||
|
if (mismatch == 0) good++;
|
||||||
|
if (frames % 20 == 0) {
|
||||||
|
fprintf(stderr, "frame seq=%lu byte0=0x%02x mismatch=%d\n",
|
||||||
|
(unsigned long)cuframes_frame_seq(f), exp, mismatch);
|
||||||
|
}
|
||||||
|
cuframes_subscriber_release(sub, f);
|
||||||
|
frames++;
|
||||||
|
}
|
||||||
|
free(host);
|
||||||
|
cudaStreamDestroy(stream);
|
||||||
|
cuframes_subscriber_destroy(sub);
|
||||||
|
|
||||||
|
fprintf(stderr, "DONE: %d/%d frames OK\n", good, frames);
|
||||||
|
return (good == frames && frames > 0) ? 0 : 1;
|
||||||
|
}
|
||||||
@@ -0,0 +1,16 @@
|
|||||||
|
CC = gcc
|
||||||
|
CFLAGS = -O2 -Wall -I/usr/local/cuda/include
|
||||||
|
LDFLAGS = -L/usr/local/cuda/lib64 -lcuda
|
||||||
|
|
||||||
|
all: producer consumer
|
||||||
|
|
||||||
|
producer: producer.c common.h
|
||||||
|
$(CC) $(CFLAGS) -o $@ producer.c $(LDFLAGS)
|
||||||
|
|
||||||
|
consumer: consumer.c common.h
|
||||||
|
$(CC) $(CFLAGS) -o $@ consumer.c $(LDFLAGS)
|
||||||
|
|
||||||
|
clean:
|
||||||
|
rm -f producer consumer
|
||||||
|
|
||||||
|
.PHONY: all clean
|
||||||
@@ -0,0 +1,69 @@
|
|||||||
|
# vmm_fd_pingpong — spike для cuframes v0.4
|
||||||
|
|
||||||
|
Проверка: можно ли заменить CUDA IPC mem handles на VMM (cuMemCreate)
|
||||||
|
+ POSIX FD export, чтобы убрать требование shared pid/ipc namespaces
|
||||||
|
между producer и consumer контейнерами.
|
||||||
|
|
||||||
|
## Результат: ✅ работает
|
||||||
|
|
||||||
|
Запуск 2 контейнеров без shared pid/ipc, только volume mount для
|
||||||
|
unix-сокета:
|
||||||
|
|
||||||
|
```
|
||||||
|
producer: granularity=2097152
|
||||||
|
producer: alloc size=16777216
|
||||||
|
producer: exported fd=37 for handle
|
||||||
|
producer: listening on /run/spike/pingpong.sock, awaiting consumer...
|
||||||
|
|
||||||
|
consumer: connected to producer
|
||||||
|
consumer: recv fd=38 size=16777216 magic=0xa7
|
||||||
|
consumer: imported handle OK
|
||||||
|
consumer: mapped + access OK
|
||||||
|
consumer: verify mismatch=0/1048576 → ACK=O
|
||||||
|
consumer: done (OK)
|
||||||
|
```
|
||||||
|
|
||||||
|
## Ключевые наблюдения
|
||||||
|
|
||||||
|
- **Granularity на 5090 = 2 MB**. 1920×1080 NV12 (~3.1 MB) округлится до 4 MB.
|
||||||
|
16 slots × 4 камеры × +1 MB = +64 MB VRAM поверх текущих cuda IPC аллокаций.
|
||||||
|
- **FD передаётся через `sendmsg(SCM_RIGHTS)`** — kernel прокидывает реальный FD
|
||||||
|
в receiver namespace, переименовывая в свободный номер. Volume mount unix
|
||||||
|
socket'а — единственное требование (`/run/cuframes` уже монтируется как shared).
|
||||||
|
- **`cuMemImportFromShareableHandle`** принимает FD как `(void *)(uintptr_t)fd`.
|
||||||
|
- **Доступ на consumer side требует `cuMemSetAccess` с правильным `CUmemLocation`** —
|
||||||
|
device id из своего `cuDeviceGet`, не наследуется от producer.
|
||||||
|
|
||||||
|
## Замена events (упрощение этапа C)
|
||||||
|
|
||||||
|
CUDA events для IPC не имеют POSIX FD path. Внедрять external semaphores
|
||||||
|
(OPAQUE_FD) — отдельный API, другая sigal/wait семантика. **Вместо этого:**
|
||||||
|
producer вызывает `cuStreamSynchronize(stream)` ПЕРЕД `atomic_store(seq)` в
|
||||||
|
`do_publish`. Consumer тогда просто читает seq и копирует DtoD — без event wait.
|
||||||
|
|
||||||
|
Overhead: ~1 ms на publish × 25 fps = 2.5% CPU time producer'а. Memory
|
||||||
|
coherence гарантирована (один GPU, hardware ensures writes visible после
|
||||||
|
stream sync).
|
||||||
|
|
||||||
|
## Сборка
|
||||||
|
|
||||||
|
```bash
|
||||||
|
docker run --rm -v $PWD:/work -w /work nvidia/cuda:12.4.1-devel-ubuntu22.04 \
|
||||||
|
bash -c "apt-get install -y build-essential && make"
|
||||||
|
```
|
||||||
|
|
||||||
|
## Запуск теста
|
||||||
|
|
||||||
|
```bash
|
||||||
|
sudo mkdir -p /var/run/spike-pingpong && sudo chmod 777 /var/run/spike-pingpong
|
||||||
|
|
||||||
|
docker run -d --name spike-prod --runtime=nvidia --gpus all \
|
||||||
|
-v $PWD:/work -v /var/run/spike-pingpong:/run/spike \
|
||||||
|
nvidia/cuda:12.4.1-base-ubuntu22.04 /work/producer
|
||||||
|
|
||||||
|
docker run --rm --name spike-cons --runtime=nvidia --gpus all \
|
||||||
|
-v $PWD:/work -v /var/run/spike-pingpong:/run/spike \
|
||||||
|
nvidia/cuda:12.4.1-base-ubuntu22.04 /work/consumer
|
||||||
|
|
||||||
|
docker logs spike-prod && docker rm -f spike-prod
|
||||||
|
```
|
||||||
@@ -0,0 +1,20 @@
|
|||||||
|
#pragma once
|
||||||
|
#include <cuda.h>
|
||||||
|
#include <stdint.h>
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
|
||||||
|
#define POOL_SIZE (16 * 1024 * 1024)
|
||||||
|
#define MAGIC_BYTE 0xA7
|
||||||
|
#define SOCK_PATH "/run/spike/pingpong.sock"
|
||||||
|
|
||||||
|
#define CHECK(expr) do { \
|
||||||
|
CUresult _r = (expr); \
|
||||||
|
if (_r != CUDA_SUCCESS) { \
|
||||||
|
const char *_msg = NULL; \
|
||||||
|
cuGetErrorString(_r, &_msg); \
|
||||||
|
fprintf(stderr, "%s:%d %s -> %d (%s)\n", \
|
||||||
|
__FILE__, __LINE__, #expr, (int)_r, _msg ? _msg : "?"); \
|
||||||
|
exit(1); \
|
||||||
|
} \
|
||||||
|
} while (0)
|
||||||
@@ -0,0 +1,97 @@
|
|||||||
|
#include "common.h"
|
||||||
|
#include <errno.h>
|
||||||
|
#include <string.h>
|
||||||
|
#include <sys/socket.h>
|
||||||
|
#include <sys/un.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
|
static int recv_fd(int sock, int *out_fd, uint64_t *out_size, uint8_t *out_magic) {
|
||||||
|
struct msghdr msg = {0};
|
||||||
|
char ctrl[CMSG_SPACE(sizeof(int))];
|
||||||
|
struct iovec iov[2];
|
||||||
|
iov[0].iov_base = out_size; iov[0].iov_len = sizeof(*out_size);
|
||||||
|
iov[1].iov_base = out_magic; iov[1].iov_len = sizeof(*out_magic);
|
||||||
|
msg.msg_iov = iov; msg.msg_iovlen = 2;
|
||||||
|
msg.msg_control = ctrl; msg.msg_controllen = sizeof(ctrl);
|
||||||
|
ssize_t n = recvmsg(sock, &msg, 0);
|
||||||
|
if (n < 0) { perror("recvmsg"); return -1; }
|
||||||
|
struct cmsghdr *cmsg = CMSG_FIRSTHDR(&msg);
|
||||||
|
if (!cmsg || cmsg->cmsg_level != SOL_SOCKET || cmsg->cmsg_type != SCM_RIGHTS) {
|
||||||
|
fprintf(stderr, "no SCM_RIGHTS in msg\n");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
memcpy(out_fd, CMSG_DATA(cmsg), sizeof(int));
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(void) {
|
||||||
|
CHECK(cuInit(0));
|
||||||
|
CUdevice dev;
|
||||||
|
CHECK(cuDeviceGet(&dev, 0));
|
||||||
|
CUcontext ctx;
|
||||||
|
CHECK(cuCtxCreate(&ctx, 0, dev));
|
||||||
|
|
||||||
|
/* Connect to producer */
|
||||||
|
int sock = socket(AF_UNIX, SOCK_STREAM, 0);
|
||||||
|
if (sock < 0) { perror("socket"); return 1; }
|
||||||
|
struct sockaddr_un sa = {.sun_family = AF_UNIX};
|
||||||
|
strncpy(sa.sun_path, SOCK_PATH, sizeof(sa.sun_path) - 1);
|
||||||
|
|
||||||
|
for (int retry = 0; retry < 50; retry++) {
|
||||||
|
if (connect(sock, (struct sockaddr *)&sa, sizeof(sa)) == 0) break;
|
||||||
|
if (retry == 49) { perror("connect (final)"); return 1; }
|
||||||
|
usleep(100000);
|
||||||
|
}
|
||||||
|
fprintf(stderr, "consumer: connected to producer\n");
|
||||||
|
|
||||||
|
int fd = -1;
|
||||||
|
uint64_t size = 0;
|
||||||
|
uint8_t magic = 0;
|
||||||
|
if (recv_fd(sock, &fd, &size, &magic) < 0) return 1;
|
||||||
|
fprintf(stderr, "consumer: recv fd=%d size=%llu magic=0x%02x\n",
|
||||||
|
fd, (unsigned long long)size, magic);
|
||||||
|
|
||||||
|
CUmemGenericAllocationHandle mem;
|
||||||
|
CHECK(cuMemImportFromShareableHandle(&mem, (void *)(uintptr_t)fd,
|
||||||
|
CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR));
|
||||||
|
fprintf(stderr, "consumer: imported handle OK\n");
|
||||||
|
|
||||||
|
CUdeviceptr ptr;
|
||||||
|
CHECK(cuMemAddressReserve(&ptr, size, 0, 0, 0));
|
||||||
|
CHECK(cuMemMap(ptr, size, 0, mem, 0));
|
||||||
|
|
||||||
|
CUmemAccessDesc access = {0};
|
||||||
|
access.location.type = CU_MEM_LOCATION_TYPE_DEVICE;
|
||||||
|
access.location.id = dev;
|
||||||
|
access.flags = CU_MEM_ACCESS_FLAGS_PROT_READ;
|
||||||
|
CHECK(cuMemSetAccess(ptr, size, &access, 1));
|
||||||
|
fprintf(stderr, "consumer: mapped + access OK\n");
|
||||||
|
|
||||||
|
/* Copy out 1MB чтобы убедиться что pattern там */
|
||||||
|
size_t check = size < (1 << 20) ? size : (1 << 20);
|
||||||
|
uint8_t *host = malloc(check);
|
||||||
|
CHECK(cuMemcpyDtoH(host, ptr, check));
|
||||||
|
CHECK(cuCtxSynchronize());
|
||||||
|
|
||||||
|
size_t mismatch = 0;
|
||||||
|
for (size_t i = 0; i < check; i++) {
|
||||||
|
if (host[i] != magic) mismatch++;
|
||||||
|
}
|
||||||
|
free(host);
|
||||||
|
|
||||||
|
char ack = (mismatch == 0) ? 'O' : 'X';
|
||||||
|
fprintf(stderr, "consumer: verify mismatch=%zu/%zu → ACK=%c\n",
|
||||||
|
mismatch, check, ack);
|
||||||
|
|
||||||
|
write(sock, &ack, 1);
|
||||||
|
close(sock);
|
||||||
|
close(fd);
|
||||||
|
|
||||||
|
CHECK(cuMemUnmap(ptr, size));
|
||||||
|
CHECK(cuMemAddressFree(ptr, size));
|
||||||
|
CHECK(cuMemRelease(mem));
|
||||||
|
CHECK(cuCtxDestroy(ctx));
|
||||||
|
|
||||||
|
fprintf(stderr, "consumer: done (%s)\n", ack == 'O' ? "OK" : "FAIL");
|
||||||
|
return ack == 'O' ? 0 : 1;
|
||||||
|
}
|
||||||
@@ -0,0 +1,103 @@
|
|||||||
|
#include "common.h"
|
||||||
|
#include <errno.h>
|
||||||
|
#include <string.h>
|
||||||
|
#include <sys/socket.h>
|
||||||
|
#include <sys/un.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
|
/* Send fd через SCM_RIGHTS вместе с (uint64_t size, uint8_t magic) payload. */
|
||||||
|
static int send_fd(int sock, int fd, uint64_t size, uint8_t magic) {
|
||||||
|
struct msghdr msg = {0};
|
||||||
|
char ctrl[CMSG_SPACE(sizeof(int))];
|
||||||
|
struct iovec iov[2];
|
||||||
|
iov[0].iov_base = &size; iov[0].iov_len = sizeof(size);
|
||||||
|
iov[1].iov_base = &magic; iov[1].iov_len = sizeof(magic);
|
||||||
|
msg.msg_iov = iov; msg.msg_iovlen = 2;
|
||||||
|
msg.msg_control = ctrl; msg.msg_controllen = sizeof(ctrl);
|
||||||
|
struct cmsghdr *cmsg = CMSG_FIRSTHDR(&msg);
|
||||||
|
cmsg->cmsg_level = SOL_SOCKET;
|
||||||
|
cmsg->cmsg_type = SCM_RIGHTS;
|
||||||
|
cmsg->cmsg_len = CMSG_LEN(sizeof(int));
|
||||||
|
memcpy(CMSG_DATA(cmsg), &fd, sizeof(int));
|
||||||
|
ssize_t n = sendmsg(sock, &msg, 0);
|
||||||
|
if (n < 0) { perror("sendmsg"); return -1; }
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(void) {
|
||||||
|
CHECK(cuInit(0));
|
||||||
|
CUdevice dev;
|
||||||
|
CHECK(cuDeviceGet(&dev, 0));
|
||||||
|
CUcontext ctx;
|
||||||
|
CHECK(cuCtxCreate(&ctx, 0, dev));
|
||||||
|
|
||||||
|
CUmemAllocationProp prop = {0};
|
||||||
|
prop.type = CU_MEM_ALLOCATION_TYPE_PINNED;
|
||||||
|
prop.location.type = CU_MEM_LOCATION_TYPE_DEVICE;
|
||||||
|
prop.location.id = dev;
|
||||||
|
prop.requestedHandleTypes = CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR;
|
||||||
|
|
||||||
|
size_t granularity = 0;
|
||||||
|
CHECK(cuMemGetAllocationGranularity(&granularity, &prop,
|
||||||
|
CU_MEM_ALLOC_GRANULARITY_MINIMUM));
|
||||||
|
fprintf(stderr, "producer: granularity=%zu\n", granularity);
|
||||||
|
|
||||||
|
size_t size = ((POOL_SIZE + granularity - 1) / granularity) * granularity;
|
||||||
|
fprintf(stderr, "producer: alloc size=%zu\n", size);
|
||||||
|
|
||||||
|
CUmemGenericAllocationHandle mem;
|
||||||
|
CHECK(cuMemCreate(&mem, size, &prop, 0));
|
||||||
|
|
||||||
|
CUdeviceptr ptr;
|
||||||
|
CHECK(cuMemAddressReserve(&ptr, size, 0, 0, 0));
|
||||||
|
CHECK(cuMemMap(ptr, size, 0, mem, 0));
|
||||||
|
|
||||||
|
CUmemAccessDesc access = {0};
|
||||||
|
access.location = prop.location;
|
||||||
|
access.flags = CU_MEM_ACCESS_FLAGS_PROT_READWRITE;
|
||||||
|
CHECK(cuMemSetAccess(ptr, size, &access, 1));
|
||||||
|
|
||||||
|
/* Fill with MAGIC pattern */
|
||||||
|
CHECK(cuMemsetD8(ptr, MAGIC_BYTE, size));
|
||||||
|
CHECK(cuCtxSynchronize());
|
||||||
|
|
||||||
|
int fd;
|
||||||
|
CHECK(cuMemExportToShareableHandle(&fd, mem,
|
||||||
|
CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR, 0));
|
||||||
|
fprintf(stderr, "producer: exported fd=%d for handle\n", fd);
|
||||||
|
|
||||||
|
/* Unix socket server */
|
||||||
|
unlink(SOCK_PATH);
|
||||||
|
int srv = socket(AF_UNIX, SOCK_STREAM, 0);
|
||||||
|
if (srv < 0) { perror("socket"); return 1; }
|
||||||
|
struct sockaddr_un sa = {.sun_family = AF_UNIX};
|
||||||
|
strncpy(sa.sun_path, SOCK_PATH, sizeof(sa.sun_path) - 1);
|
||||||
|
if (bind(srv, (struct sockaddr *)&sa, sizeof(sa)) < 0) { perror("bind"); return 1; }
|
||||||
|
if (listen(srv, 1) < 0) { perror("listen"); return 1; }
|
||||||
|
|
||||||
|
fprintf(stderr, "producer: listening on %s, awaiting consumer...\n", SOCK_PATH);
|
||||||
|
int cli = accept(srv, NULL, NULL);
|
||||||
|
if (cli < 0) { perror("accept"); return 1; }
|
||||||
|
|
||||||
|
if (send_fd(cli, fd, (uint64_t)size, MAGIC_BYTE) < 0) return 1;
|
||||||
|
fprintf(stderr, "producer: sent fd + size=%zu + magic=0x%02x\n",
|
||||||
|
size, MAGIC_BYTE);
|
||||||
|
|
||||||
|
/* Wait for consumer ACK */
|
||||||
|
char ack;
|
||||||
|
if (read(cli, &ack, 1) != 1) { perror("read ack"); return 1; }
|
||||||
|
fprintf(stderr, "producer: got ACK=0x%02x\n", (unsigned char)ack);
|
||||||
|
|
||||||
|
close(cli);
|
||||||
|
close(srv);
|
||||||
|
unlink(SOCK_PATH);
|
||||||
|
close(fd);
|
||||||
|
|
||||||
|
CHECK(cuMemUnmap(ptr, size));
|
||||||
|
CHECK(cuMemAddressFree(ptr, size));
|
||||||
|
CHECK(cuMemRelease(mem));
|
||||||
|
CHECK(cuCtxDestroy(ctx));
|
||||||
|
|
||||||
|
fprintf(stderr, "producer: done\n");
|
||||||
|
return ack == 'O' ? 0 : 1;
|
||||||
|
}
|
||||||
@@ -231,21 +231,16 @@ int main(int argc, char **argv) {
|
|||||||
return 2;
|
return 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Pre-allocate cuframes pool (NV12 — что nvdec выдаёт) */
|
/* Pre-allocate cuframes pool (NV12 — что nvdec выдаёт).
|
||||||
|
* v0.4: publisher сам аллоцирует через cuMemCreate (VMM). Раньше tool
|
||||||
|
* передавал external pool, но v0.4 не может export'нуть cudaMalloc-pointers
|
||||||
|
* как POSIX FD — VMM API требует cuMemCreate-allocated memory. */
|
||||||
int32_t pitch_y = 0, pitch_uv = 0;
|
int32_t pitch_y = 0, pitch_uv = 0;
|
||||||
size_t frame_size = cuframes::calc_frame_size(CUFRAMES_FORMAT_NV12,
|
size_t frame_size = cuframes::calc_frame_size(CUFRAMES_FORMAT_NV12,
|
||||||
width, height,
|
width, height,
|
||||||
&pitch_y, &pitch_uv);
|
&pitch_y, &pitch_uv);
|
||||||
|
|
||||||
cudaSetDevice(a.cuda_device);
|
cudaSetDevice(a.cuda_device);
|
||||||
std::vector<void *> pool(a.ring_size, nullptr);
|
|
||||||
for (int i = 0; i < a.ring_size; ++i) {
|
|
||||||
cudaError_t cerr = cudaMalloc(&pool[i], frame_size);
|
|
||||||
if (cerr != cudaSuccess) {
|
|
||||||
std::cerr << "cudaMalloc pool[" << i << "]: " << cudaGetErrorString(cerr) << "\n";
|
|
||||||
return 2;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
cuframes::PublisherOptions po;
|
cuframes::PublisherOptions po;
|
||||||
po.key = a.key;
|
po.key = a.key;
|
||||||
@@ -257,12 +252,12 @@ int main(int argc, char **argv) {
|
|||||||
: CUFRAMES_POLICY_DROP_OLDEST;
|
: CUFRAMES_POLICY_DROP_OLDEST;
|
||||||
po.consumer_ack_timeout_ms = a.ack_timeout_ms;
|
po.consumer_ack_timeout_ms = a.ack_timeout_ms;
|
||||||
po.cuda_device = a.cuda_device;
|
po.cuda_device = a.cuda_device;
|
||||||
po.ring_size = a.ring_size; /* для logging */
|
po.ring_size = a.ring_size;
|
||||||
|
|
||||||
cuframes::Publisher pub(po, pool.data(), a.ring_size, frame_size);
|
cuframes::Publisher pub(po); /* LIBRARY ownership — publisher owns VMM pool */
|
||||||
std::cerr << "[cuframes-src] publisher 'cuframes-" << a.key
|
std::cerr << "[cuframes-src] publisher 'cuframes-" << a.key
|
||||||
<< "' ready, ring=" << a.ring_size
|
<< "' ready (v0.4 VMM), ring=" << a.ring_size
|
||||||
<< " pool_size=" << frame_size << " bytes/frame\n";
|
<< " frame_size=" << frame_size << " bytes\n";
|
||||||
|
|
||||||
/* v0.2 — encoded packet ring (опционально). */
|
/* v0.2 — encoded packet ring (опционально). */
|
||||||
if (a.enable_packet_ring) {
|
if (a.enable_packet_ring) {
|
||||||
@@ -293,7 +288,6 @@ int main(int argc, char **argv) {
|
|||||||
AVFrame *frame = av_frame_alloc();
|
AVFrame *frame = av_frame_alloc();
|
||||||
if (!pkt || !frame) return 2;
|
if (!pkt || !frame) return 2;
|
||||||
|
|
||||||
int pool_idx = 0;
|
|
||||||
uint64_t frame_count = 0;
|
uint64_t frame_count = 0;
|
||||||
auto t_last_log = std::chrono::steady_clock::now();
|
auto t_last_log = std::chrono::steady_clock::now();
|
||||||
uint64_t last_log_count = 0;
|
uint64_t last_log_count = 0;
|
||||||
@@ -375,7 +369,15 @@ int main(int argc, char **argv) {
|
|||||||
int src_pitch_y = frame->linesize[0];
|
int src_pitch_y = frame->linesize[0];
|
||||||
int src_pitch_uv = frame->linesize[1];
|
int src_pitch_uv = frame->linesize[1];
|
||||||
|
|
||||||
void *dst = pool[pool_idx];
|
/* v0.4: acquire slot из publisher's VMM pool */
|
||||||
|
void *dst = nullptr;
|
||||||
|
try {
|
||||||
|
dst = pub.acquire();
|
||||||
|
} catch (const cuframes::Error &e) {
|
||||||
|
std::cerr << "acquire: " << e.what() << "\n";
|
||||||
|
av_frame_unref(frame);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
/* D2D 2D-copy Y plane */
|
/* D2D 2D-copy Y plane */
|
||||||
cudaError_t cerr = cudaMemcpy2DAsync(
|
cudaError_t cerr = cudaMemcpy2DAsync(
|
||||||
@@ -413,14 +415,13 @@ int main(int argc, char **argv) {
|
|||||||
|
|
||||||
int64_t pts_ns = cuframes::now_ns();
|
int64_t pts_ns = cuframes::now_ns();
|
||||||
try {
|
try {
|
||||||
pub.publish_external(dst, stream, pts_ns);
|
pub.publish(stream, pts_ns);
|
||||||
} catch (const cuframes::Error &e) {
|
} catch (const cuframes::Error &e) {
|
||||||
std::cerr << "publish_external: " << e.what() << "\n";
|
std::cerr << "publish: " << e.what() << "\n";
|
||||||
av_frame_unref(frame);
|
av_frame_unref(frame);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
pool_idx = (pool_idx + 1) % a.ring_size;
|
|
||||||
frame_count++;
|
frame_count++;
|
||||||
av_frame_unref(frame);
|
av_frame_unref(frame);
|
||||||
|
|
||||||
@@ -447,9 +448,7 @@ int main(int argc, char **argv) {
|
|||||||
av_buffer_unref(&hw_device);
|
av_buffer_unref(&hw_device);
|
||||||
|
|
||||||
cudaStreamDestroy(stream);
|
cudaStreamDestroy(stream);
|
||||||
/* Publisher destructor freed first; теперь освободим pool */
|
/* v0.4: publisher owns VMM pool — destructor освободит cuMemRelease etc. */
|
||||||
/* Note: publisher уже destroyed by RAII, IPC handles closed by subscribers */
|
|
||||||
for (auto p : pool) if (p) cudaFree(p);
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user