diff --git a/CMakeLists.txt b/CMakeLists.txt index 042f7b4..6c7ed1c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,7 +1,7 @@ cmake_minimum_required(VERSION 3.20) project(cuframes - VERSION 0.3.0 - DESCRIPTION "Zero-copy frame sharing via CUDA IPC" + VERSION 0.4.0 + DESCRIPTION "Zero-copy frame sharing via CUDA VMM + POSIX FD" LANGUAGES C CXX CUDA ) diff --git a/docker/Dockerfile.runtime b/docker/Dockerfile.runtime index f68e4ff..51c417a 100644 --- a/docker/Dockerfile.runtime +++ b/docker/Dockerfile.runtime @@ -16,7 +16,8 @@ # /usr/local/bin/cuframes-rtsp-source --rtsp ... --key ... # ─── Build stage ───────────────────────────────────────────────────────── -FROM nvidia/cuda:13.0.3-cudnn-devel-ubuntu24.04 AS build +# CUDA 12.4 — matching ffmpeg-vf-cuda-grid base + Frigate stable-tensorrt +FROM nvidia/cuda:12.4.1-devel-ubuntu22.04 AS build ENV DEBIAN_FRONTEND=noninteractive RUN apt-get update && apt-get install -y --no-install-recommends \ @@ -36,11 +37,11 @@ RUN cmake -B build -S . -G Ninja \ && cmake --build build --parallel # ─── Runtime stage ──────────────────────────────────────────────────────── -FROM nvidia/cuda:13.0.3-cudnn-runtime-ubuntu24.04 AS runtime +FROM nvidia/cuda:12.4.1-runtime-ubuntu22.04 AS runtime ENV DEBIAN_FRONTEND=noninteractive RUN apt-get update && apt-get install -y --no-install-recommends \ - libavcodec60 libavformat60 libavutil58 \ + libavcodec58 libavformat58 libavutil56 \ ca-certificates \ && rm -rf /var/lib/apt/lists/* diff --git a/include/cuframes/cuframes.h b/include/cuframes/cuframes.h index 984b9a2..a98ede0 100644 --- a/include/cuframes/cuframes.h +++ b/include/cuframes/cuframes.h @@ -36,7 +36,7 @@ extern "C" { /* ─────────────────────────────────────────────────────────────────────── */ #define CUFRAMES_VERSION_MAJOR 0 -#define CUFRAMES_VERSION_MINOR 3 +#define CUFRAMES_VERSION_MINOR 4 #define CUFRAMES_VERSION_PATCH 0 /** @brief Runtime-версия библиотеки в формате "MAJOR.MINOR.PATCH". */ diff --git a/libcuframes/CMakeLists.txt b/libcuframes/CMakeLists.txt index 9115144..cda8fd6 100644 --- a/libcuframes/CMakeLists.txt +++ b/libcuframes/CMakeLists.txt @@ -34,6 +34,7 @@ foreach(target cuframes cuframes_static) target_link_libraries(${target} PUBLIC CUDA::cudart + CUDA::cuda_driver # v0.4 — cuMemCreate/cuMemMap/cuMemExportToShareableHandle Threads::Threads rt # для shm_open ) @@ -41,7 +42,7 @@ endforeach() # Set SOVERSION на shared lib для ABI tracking set_target_properties(cuframes PROPERTIES - VERSION 0.3.0 + VERSION 0.4.0 SOVERSION 0 ) diff --git a/libcuframes/src/consumer.c b/libcuframes/src/consumer.c index fce8170..4d3c6bb 100644 --- a/libcuframes/src/consumer.c +++ b/libcuframes/src/consumer.c @@ -1,4 +1,13 @@ -/* Subscriber implementation (sync). */ +/* Subscriber implementation (sync). + * + * v0.4 — VMM + POSIX FD. Принимает FDs через SCM_RIGHTS в handshake, + * импортирует через cuMemImportFromShareableHandle + cuMemMap. Не требует + * shared pid/ipc namespace с producer'ом. + * + * Sync: producer cuStreamSynchronize'ит свой stream перед atomic_store(seq). + * Consumer просто читает seq (acquire) и копирует данные через DtoD memcpy — + * никаких cudaEventWait не нужно (HW coherence на одном GPU). + */ #include "internal.h" #include @@ -21,14 +30,13 @@ struct cuframes_frame { int64_t pts_ns; uint32_t slot_idx; - void *subscriber; /* back-ref для release() */ + void *subscriber; }; -/* Opaque packet handle — single-packet pattern (как frame_obj). */ struct cuframes_packet { - uint8_t *data; /* heap buffer, allocated by subscriber на enable_packets */ - size_t capacity; /* size of allocation */ - size_t size; /* actual payload size */ + uint8_t *data; + size_t capacity; + size_t size; int64_t pts_ns; int64_t dts_ns; uint32_t flags; @@ -44,26 +52,31 @@ struct cuframes_subscriber { cuframes_shm_header_t *hdr; char shm_name[80]; - cudaEvent_t producer_event; /* legacy fallback (v0.2 proto) */ - cudaEvent_t slot_events[CUFRAMES_MAX_RING]; /* v0.3 — per-slot events */ - int has_slot_events; /* 1 if v0.3 events opened OK */ - void *mapped_ptrs[CUFRAMES_MAX_RING]; + /* v0.4 — VMM imported slots */ + CUmemGenericAllocationHandle vmm_handles[CUFRAMES_MAX_RING]; + CUdeviceptr vmm_ptrs[CUFRAMES_MAX_RING]; + size_t vmm_slot_size; + int imported_count; uint32_t assigned_bit; uint64_t last_seen_seq; - /* Frame pool — переиспользуем одну frame_t structure (single-thread API). */ struct cuframes_frame frame_obj; int frame_busy; - /* v0.2 — packet ring (optional, opened via enable_packets). */ int has_pkt_ring; cuframes_pkt_ring_t pkt_ring; - uint64_t last_packet_seq; /* UINT64_MAX = no packet read yet */ + uint64_t last_packet_seq; struct cuframes_packet packet_obj; int packet_busy; }; +static const char *cu_err_str(CUresult r) { + const char *s = NULL; + cuGetErrorString(r, &s); + return s ? s : "?"; +} + /* ─── Frame accessors ────────────────────────────────────────────────── */ void *cuframes_frame_cuda_ptr(const cuframes_frame_t *f) { return f ? f->cuda_ptr : NULL; } cuframes_format_t cuframes_frame_format(const cuframes_frame_t *f) { return f ? f->format : 0; } @@ -79,11 +92,13 @@ int64_t cuframes_frame_pts_ns(const cuframes_frame_t *f) { return f ? f->pts_ns /* ─── Subscriber create ──────────────────────────────────────────────── */ -static int do_handshake(struct cuframes_subscriber *sub, const char *name) { - /* Send HELLO_REQ */ +static int do_handshake(struct cuframes_subscriber *sub, const char *name, + int *fds_out, uint32_t *fd_count_inout, + uint64_t *slot_size_out) { + /* Send HELLO_REQ — proto v4 */ uint8_t buf[CUFRAMES_MAX_MSG_PAYLOAD]; cuframes_msg_hello_req_t *hreq = (cuframes_msg_hello_req_t *)buf; - hreq->proto_version = CUFRAMES_PROTOCOL_V1; + hreq->proto_version = CUFRAMES_PROTOCOL_V4; uint32_t nl = name ? (uint32_t)strlen(name) : 0; if (nl > 31) nl = 31; hreq->consumer_name_len = nl; @@ -100,7 +115,6 @@ static int do_handshake(struct cuframes_subscriber *sub, const char *name) { buf, plen); if (r != CUFRAMES_OK) return r; - /* Recv HELLO_RESP */ uint32_t rmt = 0, rpl = sizeof(buf); r = cuframes_internal_recv_msg(sub->sock_fd, &rmt, buf, &rpl, 5000); if (r != CUFRAMES_OK) return r; @@ -108,10 +122,15 @@ static int do_handshake(struct cuframes_subscriber *sub, const char *name) { cuframes_msg_hello_resp_t *hresp = (cuframes_msg_hello_resp_t *)buf; if (hresp->result != CUFRAMES_OK) return hresp->result; + if (hresp->proto_version_actual != CUFRAMES_PROTOCOL_V4) { + CUFRAMES_LOG_ERROR("publisher proto v%u — нужен v%u (v0.4)", + hresp->proto_version_actual, CUFRAMES_PROTOCOL_V4); + return CUFRAMES_ERR_PROTOCOL; + } /* Send SUBSCRIBE_REQ */ uint32_t srbuf[8]; - srbuf[0] = CUFRAMES_PROTOCOL_V1; + srbuf[0] = CUFRAMES_PROTOCOL_V4; memset(srbuf + 1, 0, 28); r = cuframes_internal_send_msg(sub->sock_fd, CUFRAMES_MSG_SUBSCRIBE_REQ, srbuf, sizeof(srbuf)); @@ -126,7 +145,29 @@ static int do_handshake(struct cuframes_subscriber *sub, const char *name) { if (sresp.result != CUFRAMES_OK) return sresp.result; sub->assigned_bit = sresp.assigned_bit; - sub->last_seen_seq = sresp.initial_seq; /* start от текущей точки */ + sub->last_seen_seq = sresp.initial_seq; + + /* Recv VMM_FDS */ + cuframes_msg_vmm_fds_t vmm_payload = {0}; + uint32_t vmm_plen = sizeof(vmm_payload); + rmt = 0; + r = cuframes_internal_recv_msg_with_fds(sub->sock_fd, &rmt, + &vmm_payload, &vmm_plen, + fds_out, fd_count_inout, 5000); + if (r != CUFRAMES_OK) { + CUFRAMES_LOG_ERROR("recv VMM_FDS: %s", cuframes_strerror(r)); + return r; + } + if (rmt != CUFRAMES_MSG_VMM_FDS) { + CUFRAMES_LOG_ERROR("expected VMM_FDS got 0x%x", rmt); + return CUFRAMES_ERR_PROTOCOL; + } + if (vmm_payload.fd_count != *fd_count_inout) { + CUFRAMES_LOG_ERROR("VMM_FDS: payload fd_count=%u, received %u", + vmm_payload.fd_count, *fd_count_inout); + return CUFRAMES_ERR_PROTOCOL; + } + *slot_size_out = vmm_payload.slot_size_bytes; return CUFRAMES_OK; } @@ -143,7 +184,6 @@ int cuframes_subscriber_create(const cuframes_subscriber_config_t *cfg, sub->sock_fd = -1; sub->shm_fd = -1; - /* Generate fallback name if NULL */ char name_buf[32]; const char *name = cfg->consumer_name; if (!name) { @@ -152,12 +192,10 @@ int cuframes_subscriber_create(const cuframes_subscriber_config_t *cfg, name = name_buf; } - /* Build paths */ char sock_path[128]; int r = cuframes_internal_socket_path(cfg->key, sock_path, sizeof(sock_path)); if (r != CUFRAMES_OK) { free(sub); return r; } - /* Connect with timeout retry */ int64_t deadline = cfg->connect_timeout_ms > 0 ? cuframes_now_ns() + (int64_t)cfg->connect_timeout_ms * 1000000LL : 0; @@ -172,87 +210,117 @@ int cuframes_subscriber_create(const cuframes_subscriber_config_t *cfg, sub->sock_fd = -1; if (cfg->connect_timeout_ms == 0) { r = CUFRAMES_ERR_NOT_FOUND; goto fail; } if (deadline && cuframes_now_ns() > deadline) { r = CUFRAMES_ERR_TIMEOUT; goto fail; } - struct timespec ts = {.tv_sec = 0, .tv_nsec = 100000000}; /* 100ms */ + struct timespec ts = {.tv_sec = 0, .tv_nsec = 100000000}; nanosleep(&ts, NULL); } - /* Handshake */ - r = do_handshake(sub, name); + /* Handshake (включая VMM_FDS) */ + int fds[CUFRAMES_MAX_RING]; + for (int i = 0; i < CUFRAMES_MAX_RING; i++) fds[i] = -1; + uint32_t fd_count = CUFRAMES_MAX_RING; + uint64_t slot_size = 0; + r = do_handshake(sub, name, fds, &fd_count, &slot_size); if (r != CUFRAMES_OK) goto fail; - /* Open SHM */ + /* Open SHM (для seq atomics + meta) */ r = cuframes_internal_shm_name(cfg->key, sub->shm_name, sizeof(sub->shm_name)); - if (r != CUFRAMES_OK) goto fail; + if (r != CUFRAMES_OK) goto fail_close_fds; sub->shm_fd = shm_open(sub->shm_name, O_RDWR, 0); if (sub->shm_fd < 0) { CUFRAMES_LOG_ERROR("shm_open %s: %s", sub->shm_name, strerror(errno)); - r = CUFRAMES_ERR_IO; goto fail; + r = CUFRAMES_ERR_IO; goto fail_close_fds; } sub->hdr = mmap(NULL, sizeof(cuframes_shm_header_t), PROT_READ | PROT_WRITE, MAP_SHARED, sub->shm_fd, 0); if (sub->hdr == MAP_FAILED) { sub->hdr = NULL; - r = CUFRAMES_ERR_IO; goto fail; + r = CUFRAMES_ERR_IO; goto fail_close_fds; + } + if (sub->hdr->magic != CUFRAMES_MAGIC) { + if (sub->hdr->magic == CUFRAMES_MAGIC_LEGACY) { + CUFRAMES_LOG_ERROR("publisher uses legacy v0.1-v0.3 SHM — нужен v0.4 publisher"); + } else { + CUFRAMES_LOG_ERROR("SHM magic mismatch: 0x%x", sub->hdr->magic); + } + r = CUFRAMES_ERR_PROTOCOL; goto fail_close_fds; + } + if (sub->hdr->proto_version != CUFRAMES_PROTOCOL_V4) { + CUFRAMES_LOG_ERROR("SHM proto v%u — нужен v%u", + sub->hdr->proto_version, CUFRAMES_PROTOCOL_V4); + r = CUFRAMES_ERR_PROTOCOL; goto fail_close_fds; } - if (sub->hdr->magic != CUFRAMES_MAGIC) { r = CUFRAMES_ERR_PROTOCOL; goto fail; } - /* CUDA setup */ + /* CUDA driver init + import VMM handles */ + CUresult cr = cuInit(0); + if (cr != CUDA_SUCCESS) { + CUFRAMES_LOG_ERROR("cuInit: %s", cu_err_str(cr)); + r = CUFRAMES_ERR_CUDA; goto fail_close_fds; + } + /* Ensure a runtime context exists (cudaMemcpyAsync from this pool needs it) */ cudaError_t cerr = cudaSetDevice(sub->cfg.cuda_device); if (cerr != cudaSuccess) { CUFRAMES_LOG_ERROR("cudaSetDevice: %s", cudaGetErrorString(cerr)); - r = CUFRAMES_ERR_CUDA; goto fail; + r = CUFRAMES_ERR_CUDA; goto fail_close_fds; } - /* Open producer's event (legacy single — v0.2 compat fallback) */ - cerr = cudaIpcOpenEventHandle(&sub->producer_event, sub->hdr->ipc_event_handle); - if (cerr != cudaSuccess) { - CUFRAMES_LOG_ERROR("cudaIpcOpenEventHandle: %s", cudaGetErrorString(cerr)); - r = CUFRAMES_ERR_CUDA; goto fail; - } + CUmemAccessDesc access = {0}; + access.location.type = CU_MEM_LOCATION_TYPE_DEVICE; + access.location.id = sub->cfg.cuda_device; + access.flags = CU_MEM_ACCESS_FLAGS_PROT_READWRITE; - /* v0.3 — open per-slot events если protocol supports. */ - sub->has_slot_events = 0; - if (sub->hdr->proto_version >= CUFRAMES_PROTOCOL_V3) { - int ring_evt = (int)sub->hdr->ring_size; - if (ring_evt > CUFRAMES_MAX_RING) ring_evt = CUFRAMES_MAX_RING; - int evt_ok = 1; - for (int i = 0; i < ring_evt; i++) { - cerr = cudaIpcOpenEventHandle(&sub->slot_events[i], - sub->hdr->slot_event_handles[i]); - if (cerr != cudaSuccess) { - CUFRAMES_LOG_WARN("cudaIpcOpenEventHandle slot %d: %s — " - "fallback к legacy single event", - i, cudaGetErrorString(cerr)); - for (int j = 0; j < i; j++) cudaEventDestroy(sub->slot_events[j]); - evt_ok = 0; - break; - } + sub->vmm_slot_size = (size_t)slot_size; + sub->imported_count = 0; + for (uint32_t i = 0; i < fd_count; ++i) { + cr = cuMemImportFromShareableHandle(&sub->vmm_handles[i], + (void *)(uintptr_t)fds[i], + CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR); + if (cr != CUDA_SUCCESS) { + CUFRAMES_LOG_ERROR("cuMemImportFromShareableHandle slot %u: %s", + i, cu_err_str(cr)); + r = CUFRAMES_ERR_CUDA; goto fail_unmap; } - if (evt_ok) { - sub->has_slot_events = 1; - CUFRAMES_LOG_INFO("subscribed с per-slot events (v0.3 proto)"); + /* После import можно закрыть FD — kernel держит reference через handle */ + close(fds[i]); + fds[i] = -1; + + cr = cuMemAddressReserve(&sub->vmm_ptrs[i], sub->vmm_slot_size, 0, 0, 0); + if (cr != CUDA_SUCCESS) { + CUFRAMES_LOG_ERROR("cuMemAddressReserve slot %u: %s", + i, cu_err_str(cr)); + r = CUFRAMES_ERR_CUDA; goto fail_unmap; } + cr = cuMemMap(sub->vmm_ptrs[i], sub->vmm_slot_size, 0, + sub->vmm_handles[i], 0); + if (cr != CUDA_SUCCESS) { + CUFRAMES_LOG_ERROR("cuMemMap slot %u: %s", i, cu_err_str(cr)); + r = CUFRAMES_ERR_CUDA; goto fail_unmap; + } + cr = cuMemSetAccess(sub->vmm_ptrs[i], sub->vmm_slot_size, &access, 1); + if (cr != CUDA_SUCCESS) { + CUFRAMES_LOG_ERROR("cuMemSetAccess slot %u: %s", i, cu_err_str(cr)); + r = CUFRAMES_ERR_CUDA; goto fail_unmap; + } + sub->imported_count++; } - /* Open mem handles */ - int ring = (int)sub->hdr->ring_size; - if (ring > CUFRAMES_MAX_RING) ring = CUFRAMES_MAX_RING; - for (int i = 0; i < ring; ++i) { - cerr = cudaIpcOpenMemHandle(&sub->mapped_ptrs[i], - sub->hdr->slots[i].mem_handle, - cudaIpcMemLazyEnablePeerAccess); - if (cerr != cudaSuccess) { - CUFRAMES_LOG_ERROR("cudaIpcOpenMemHandle slot %d: %s", - i, cudaGetErrorString(cerr)); - r = CUFRAMES_ERR_CUDA; goto fail; - } - } - - CUFRAMES_LOG_INFO("subscriber '%s' connected to '%s' (bit=%u, ring=%d)", - name, sub->key, sub->assigned_bit, ring); + CUFRAMES_LOG_INFO("subscriber '%s' connected to '%s' (bit=%u, ring=%u, v0.4 VMM)", + name, sub->key, sub->assigned_bit, fd_count); *out = sub; return CUFRAMES_OK; +fail_unmap: + /* Cleanup partial VMM */ + for (int i = 0; i < sub->imported_count; i++) { + if (sub->vmm_ptrs[i]) { + cuMemUnmap(sub->vmm_ptrs[i], sub->vmm_slot_size); + cuMemAddressFree(sub->vmm_ptrs[i], sub->vmm_slot_size); + } + if (sub->vmm_handles[i]) cuMemRelease(sub->vmm_handles[i]); + } +fail_close_fds: + for (int i = 0; i < CUFRAMES_MAX_RING; i++) { + if (fds[i] >= 0) close(fds[i]); + } fail: cuframes_subscriber_destroy(sub); return r; @@ -268,6 +336,7 @@ int cuframes_subscriber_next(cuframes_subscriber_t *sub, memory_order_acquire) != 0) { return CUFRAMES_ERR_DISCONNECTED; } + (void)consumer_stream; /* v0.4: producer уже StreamSync'нул, sync не нужен */ int64_t deadline = (timeout_ms > 0) ? cuframes_now_ns() + (int64_t)timeout_ms * 1000000LL @@ -281,11 +350,9 @@ int cuframes_subscriber_next(cuframes_subscriber_t *sub, if (sub->cfg.mode == CUFRAMES_MODE_NEWEST_ONLY) { target_seq = gs; } else { - /* STRICT_ORDER */ if (sub->last_seen_seq == UINT64_MAX) { target_seq = gs; } else if (gs > sub->last_seen_seq + (uint64_t)sub->hdr->ring_size) { - /* Producer overran us. */ return CUFRAMES_ERR_DISCONNECTED; } else { target_seq = sub->last_seen_seq + 1; @@ -295,54 +362,22 @@ int cuframes_subscriber_next(cuframes_subscriber_t *sub, uint64_t slot_seq = atomic_load_explicit(&sub->hdr->slots[slot_idx].seq, memory_order_acquire); if (slot_seq != target_seq) { - /* Slot уже перезаписан producer'ом — пересчитать */ continue; } int64_t pts = atomic_load_explicit(&sub->hdr->slots[slot_idx].pts_ns, memory_order_acquire); - /* Cross-process sync: wait event on consumer's stream. - * v0.3: per-slot event точно соответствует slot[slot_idx] — - * no TOCTOU race possible. v0.2 fallback: single global event + - * post-sync verify (less precise, but still correct). */ - cudaEvent_t sync_event = sub->has_slot_events - ? sub->slot_events[slot_idx] - : sub->producer_event; - if (consumer_stream) { - cudaError_t cerr = cudaStreamWaitEvent((cudaStream_t)consumer_stream, - sync_event, 0); - if (cerr != cudaSuccess) { - CUFRAMES_LOG_WARN("cudaStreamWaitEvent: %s", - cudaGetErrorString(cerr)); - return CUFRAMES_ERR_CUDA; - } - } else { - /* Synchronize globally — для cudaMemcpyDeviceToHost users */ - cudaError_t cerr = cudaEventSynchronize(sync_event); - if (cerr != cudaSuccess) return CUFRAMES_ERR_CUDA; - } - - /* TOCTOU защита — unconditional (v0.2 и v0.3 обa). v0.3 per-slot - * events НЕ guaranty ordering: cudaEventRecord overwrites previous - * state каждый publish. Если producer wrapped ring пока consumer - * ждал event sync, slot[slot_idx] уже содержит seq > target. - * Event signal от nового publish satisfies stale wait — consumer - * читает new content thinking it's old (lazy consumption). - * - * Симптом в long-running pipeline: 50k+ ring wraps накапливают drift, - * output stream duplicates 60-70% frames despite stable encoder fps. - * - * Proper v0.4 fix: per-slot+per-publish event handle (event pool). - * Сейчас — post-sync verify catches main race window. */ + /* v0.4: producer уже cuStreamSynchronize'нул перед atomic_store seq. + * Данные физически в GPU memory к моменту acquire fence. Post-sync + * verify оставляем — defending against ring wrap pока мы читали pts. */ uint64_t verify_seq = atomic_load_explicit(&sub->hdr->slots[slot_idx].seq, memory_order_acquire); if (verify_seq != target_seq) { continue; } - /* Fill frame_out */ struct cuframes_frame *f = &sub->frame_obj; - f->cuda_ptr = sub->mapped_ptrs[slot_idx]; + f->cuda_ptr = (void *)(uintptr_t)sub->vmm_ptrs[slot_idx]; f->format = (cuframes_format_t)sub->hdr->meta.format; f->width = sub->hdr->meta.width; f->height = sub->hdr->meta.height; @@ -358,12 +393,9 @@ int cuframes_subscriber_next(cuframes_subscriber_t *sub, return CUFRAMES_OK; } - /* Не было frame'ов */ if (timeout_ms == 0) return CUFRAMES_ERR_WOULD_BLOCK; if (timeout_ms > 0 && cuframes_now_ns() > deadline) return CUFRAMES_ERR_TIMEOUT; - /* Poll-based wait (eventfd — v0.2). 50µs interval — компромисс - * latency vs CPU. */ struct timespec ts = {.tv_sec = 0, .tv_nsec = 50000}; nanosleep(&ts, NULL); @@ -379,7 +411,6 @@ int cuframes_subscriber_release(cuframes_subscriber_t *sub, if (!frame) return CUFRAMES_OK; if (!sub || frame->subscriber != sub) return CUFRAMES_ERR_INVALID_ARG; - /* ACK через bitmap */ if (sub->assigned_bit > 0 && sub->assigned_bit < 64) { atomic_fetch_or_explicit(&sub->hdr->slots[frame->slot_idx].ack_bitmap, 1ULL << sub->assigned_bit, @@ -398,7 +429,6 @@ int cuframes_subscriber_release(cuframes_subscriber_t *sub, int cuframes_subscriber_destroy(cuframes_subscriber_t *sub) { if (!sub) return CUFRAMES_OK; - /* Clear subscriber bit */ if (sub->hdr && sub->assigned_bit > 0) { atomic_fetch_and_explicit(&sub->hdr->subscriber_bitmap, ~(1ULL << sub->assigned_bit), @@ -407,22 +437,15 @@ int cuframes_subscriber_destroy(cuframes_subscriber_t *sub) { 0, memory_order_release); } - if (sub->producer_event) cudaEventDestroy(sub->producer_event); - if (sub->has_slot_events) { - int ring_evt = (int)sub->hdr->ring_size; - if (ring_evt > CUFRAMES_MAX_RING) ring_evt = CUFRAMES_MAX_RING; - for (int i = 0; i < ring_evt; i++) { - if (sub->slot_events[i]) cudaEventDestroy(sub->slot_events[i]); + /* VMM cleanup */ + for (int i = 0; i < sub->imported_count; i++) { + if (sub->vmm_ptrs[i]) { + cuMemUnmap(sub->vmm_ptrs[i], sub->vmm_slot_size); + cuMemAddressFree(sub->vmm_ptrs[i], sub->vmm_slot_size); } + if (sub->vmm_handles[i]) cuMemRelease(sub->vmm_handles[i]); } - int ring = sub->hdr ? (int)sub->hdr->ring_size : 0; - if (ring > CUFRAMES_MAX_RING) ring = CUFRAMES_MAX_RING; - for (int i = 0; i < ring; ++i) { - if (sub->mapped_ptrs[i]) cudaIpcCloseMemHandle(sub->mapped_ptrs[i]); - } - - /* Packet ring cleanup */ if (sub->has_pkt_ring) { cuframes_internal_pkt_ring_destroy(&sub->pkt_ring); } @@ -442,7 +465,6 @@ int cuframes_subscriber_destroy(cuframes_subscriber_t *sub) { /* v0.2 — encoded packet ring API (см. docs/protocol.md §10) */ /* ─────────────────────────────────────────────────────────────────────── */ -/* Packet accessors */ const void *cuframes_packet_data(const cuframes_packet_t *p) { return p ? p->data : NULL; } size_t cuframes_packet_size(const cuframes_packet_t *p) { return p ? p->size : 0; } int64_t cuframes_packet_pts(const cuframes_packet_t *p) { return p ? p->pts_ns : 0; } @@ -452,7 +474,7 @@ uint64_t cuframes_packet_seq(const cuframes_packet_t *p) { return p ? p->se int cuframes_subscriber_enable_packets(cuframes_subscriber_t *sub) { if (!sub) return CUFRAMES_ERR_INVALID_ARG; - if (sub->has_pkt_ring) return CUFRAMES_OK; /* idempotent */ + if (sub->has_pkt_ring) return CUFRAMES_OK; char pkt_name[128]; int r = cuframes_internal_pkt_shm_name(sub->key, pkt_name, sizeof(pkt_name)); @@ -461,8 +483,6 @@ int cuframes_subscriber_enable_packets(cuframes_subscriber_t *sub) { r = cuframes_internal_pkt_ring_open(pkt_name, &sub->pkt_ring); if (r != CUFRAMES_OK) return r; - /* Allocate copy-buffer (max packet size). Используем data_size как - * conservative upper bound (publisher гарантирует data_size >= max_packet_size). */ size_t capacity = sub->pkt_ring.hdr->data_size; sub->packet_obj.data = (uint8_t *)malloc(capacity); if (!sub->packet_obj.data) { @@ -471,7 +491,6 @@ int cuframes_subscriber_enable_packets(cuframes_subscriber_t *sub) { } sub->packet_obj.capacity = capacity; - /* Start с last_keyframe_seq - 1 → первый read даст IDR (§10.14). */ uint64_t kf = atomic_load_explicit(&sub->pkt_ring.hdr->last_keyframe_seq, memory_order_acquire); sub->last_packet_seq = (kf == UINT64_MAX) ? UINT64_MAX : kf - 1; @@ -484,7 +503,7 @@ int cuframes_subscriber_next_packet(cuframes_subscriber_t *sub, int32_t timeout_ms) { if (!sub || !pkt_out) return CUFRAMES_ERR_INVALID_ARG; if (!sub->has_pkt_ring) return CUFRAMES_ERR_NO_PACKET_RING; - if (sub->packet_busy) return CUFRAMES_ERR_INVALID_ARG; /* previous packet not released */ + if (sub->packet_busy) return CUFRAMES_ERR_INVALID_ARG; int64_t deadline_ns = (timeout_ms > 0) ? cuframes_now_ns() + (int64_t)timeout_ms * 1000000LL : 0; @@ -513,28 +532,25 @@ int cuframes_subscriber_next_packet(cuframes_subscriber_t *sub, } if (r == CUFRAMES_ERR_PACKET_OVERRUN) { - /* Resync — установить last_seq = last_keyframe_seq - 1, повторить. */ uint64_t kf = atomic_load_explicit( &sub->pkt_ring.hdr->last_keyframe_seq, memory_order_acquire); if (kf != UINT64_MAX) { sub->last_packet_seq = kf - 1; } - /* Возвращаем OVERRUN наружу — caller знает что был discontinuity. */ *pkt_out = NULL; return CUFRAMES_ERR_PACKET_OVERRUN; } if (r != CUFRAMES_ERR_TIMEOUT) { *pkt_out = NULL; - return r; /* DISCONNECTED, INVALID_ARG, etc. */ + return r; } - /* TIMEOUT branch — poll/sleep */ if (timeout_ms == 0) return CUFRAMES_ERR_WOULD_BLOCK; if (timeout_ms > 0 && cuframes_now_ns() >= deadline_ns) { return CUFRAMES_ERR_TIMEOUT; } - struct timespec ts = {0, 1 * 1000 * 1000}; /* 1 ms poll interval */ + struct timespec ts = {0, 1 * 1000 * 1000}; nanosleep(&ts, NULL); } } @@ -542,7 +558,7 @@ int cuframes_subscriber_next_packet(cuframes_subscriber_t *sub, int cuframes_subscriber_release_packet(cuframes_subscriber_t *sub, cuframes_packet_t *pkt) { if (!sub) return CUFRAMES_ERR_INVALID_ARG; - if (!pkt) return CUFRAMES_OK; /* NULL-safe */ + if (!pkt) return CUFRAMES_OK; if (pkt != &sub->packet_obj) return CUFRAMES_ERR_INVALID_ARG; sub->packet_busy = 0; return CUFRAMES_OK; @@ -556,7 +572,6 @@ int cuframes_subscriber_get_codec_params(cuframes_subscriber_t *sub, if (!sub->has_pkt_ring) return CUFRAMES_ERR_NO_PACKET_RING; cuframes_pkt_header_t *hdr = sub->pkt_ring.hdr; if (codec_id_out) *codec_id_out = hdr->codec_id; - /* Если extradata ещё не выставлен publisher'ом — size=0, pointer ok но empty. */ if (extradata_out) *extradata_out = hdr->codec_extradata; if (extradata_size_out) *extradata_size_out = hdr->codec_extradata_size; if (hdr->codec_extradata_size == 0) return CUFRAMES_ERR_NO_CODEC_PARAMS; diff --git a/libcuframes/src/internal.h b/libcuframes/src/internal.h index 3a97fbc..cce403a 100644 --- a/libcuframes/src/internal.h +++ b/libcuframes/src/internal.h @@ -8,6 +8,7 @@ #define CUFRAMES_INTERNAL_H #define _GNU_SOURCE +#include /* v0.4 — driver API: cuMemCreate/cuMemMap/cuMemExportToShareableHandle */ #include #include #include @@ -21,10 +22,12 @@ /* ─── Protocol constants ──────────────────────────────────────────────── */ -#define CUFRAMES_MAGIC 0xCC7C1DCCu +#define CUFRAMES_MAGIC 0xCC7C1DCEu /* v0.4 — bumped с 0xCC7C1DCC (full ABI break) */ +#define CUFRAMES_MAGIC_LEGACY 0xCC7C1DCCu /* v0.1—v0.3 magic; ловится consumer'ом как clean PROTOCOL error */ #define CUFRAMES_PROTOCOL_V1 1u #define CUFRAMES_PROTOCOL_V2 2u /* v0.2 — packet ring support */ -#define CUFRAMES_PROTOCOL_V3 3u /* v0.3 — per-slot CUDA events (no TOCTOU race) */ +#define CUFRAMES_PROTOCOL_V3 3u /* v0.3 — per-slot CUDA events (deprecated; не работает без pid share) */ +#define CUFRAMES_PROTOCOL_V4 4u /* v0.4 — VMM + POSIX FD: pid/ipc namespace share не требуется */ #define CUFRAMES_MAX_SUBSCRIBERS 32 #define CUFRAMES_MAX_RING 16 #define CUFRAMES_MAX_KEY_LEN 63 @@ -204,6 +207,10 @@ typedef struct cuframes_pkt_ring { #define CUFRAMES_MSG_PING 0xF0 #define CUFRAMES_MSG_PONG 0xF1 #define CUFRAMES_MSG_ERROR 0xFE +/* v0.4: после SUBSCRIBE_RESP publisher шлёт VMM_FDS с N posix FD handles в + * SCM_RIGHTS control. Payload: uint64_t slot_size + uint32_t fd_count + + * uint32_t reserved (для alignment). FDs приходят отдельным контрол-блоком. */ +#define CUFRAMES_MSG_VMM_FDS 0x05 #define CUFRAMES_MAX_MSG_PAYLOAD 4096 @@ -237,6 +244,14 @@ typedef struct __attribute__((packed)) cuframes_msg_subscribe_resp { uint8_t reserved[12]; } cuframes_msg_subscribe_resp_t; +/* v0.4: payload VMM_FDS message. Сами FDs идут в SCM_RIGHTS control-msg + * (см. cuframes_internal_send_msg_with_fds). */ +typedef struct __attribute__((packed)) cuframes_msg_vmm_fds { + uint64_t slot_size_bytes; /* физический размер одного slot после round-up к granularity */ + uint32_t fd_count; /* должно совпадать с ring_size */ + uint32_t reserved; +} cuframes_msg_vmm_fds_t; + /* ─── Logging (minimal — to stderr) ────────────────────────────────────── */ #define CUFRAMES_LOG_ERROR(fmt, ...) \ @@ -272,6 +287,15 @@ int cuframes_internal_recv_msg(int sock_fd, uint32_t *msg_type_out, void *payload, uint32_t *payload_len_inout, int32_t timeout_ms); +/* v0.4 — send/recv с FD-attached. Используется только для VMM_FDS message. */ +int cuframes_internal_send_msg_with_fds(int sock_fd, uint32_t msg_type, + const void *payload, uint32_t payload_len, + const int *fds, uint32_t fd_count); +int cuframes_internal_recv_msg_with_fds(int sock_fd, uint32_t *msg_type_out, + void *payload, uint32_t *payload_len_inout, + int *fds_out, uint32_t *fd_count_inout, + int32_t timeout_ms); + /* ─── Packet ring helpers (libcuframes/src/packet_ring.c) ─────────────── */ /* Publisher: create SHM + initialize header + slots. Stale recovery как у frames. */ diff --git a/libcuframes/src/producer.c b/libcuframes/src/producer.c index be5e4eb..4a5e925 100644 --- a/libcuframes/src/producer.c +++ b/libcuframes/src/producer.c @@ -1,4 +1,14 @@ -/* Publisher implementation (docs/protocol.md §1, §2, §3.2, §4.2, §5). */ +/* Publisher implementation (docs/protocol.md §1, §2, §3.2, §4.2, §5). + * + * v0.4 — VMM + POSIX FD. Заменяет cudaMalloc+cudaIpcGetMemHandle на + * cuMemCreate + cuMemExportToShareableHandle(POSIX_FILE_DESCRIPTOR). FDs + * передаются consumer'у через SCM_RIGHTS, не нужны shared pid/ipc namespace. + * + * Sync (вместо cudaEventRecord+cudaIpcEventHandle): cuStreamSynchronize в + * do_publish — producer ждёт ~ms что stream flush'нулся, потом publishes seq. + * Consumer читает данные через DtoD копию без event wait — HW coherence + * гарантирована на одном GPU. + */ #include "internal.h" #include @@ -20,11 +30,18 @@ struct cuframes_publisher { char socket_path[128]; char shm_name[80]; - /* CUDA */ - cudaEvent_t event; /* legacy single event (v0.2 compat) */ - cudaEvent_t slot_events[CUFRAMES_MAX_RING]; /* v0.3 — per-slot events */ - cudaIpcMemHandle_t ipc_mem[CUFRAMES_MAX_RING]; - void *cuda_ptrs[CUFRAMES_MAX_RING]; /* mapped pointers */ + /* v0.4 — VMM-allocated pool. Каждый slot: cuMemCreate → cuMemAddressReserve + * → cuMemMap → cuMemSetAccess. FD экспортируется один раз и передаётся всем + * subscribers через SCM_RIGHTS. */ + CUmemGenericAllocationHandle vmm_handles[CUFRAMES_MAX_RING]; + CUdeviceptr vmm_ptrs[CUFRAMES_MAX_RING]; + int vmm_fds[CUFRAMES_MAX_RING]; + size_t vmm_slot_size; /* rounded к granularity */ + int has_vmm_pool; + + /* CUDA stream sync — заменяет per-slot events. Producer перед каждым publish + * вызывает cuStreamSynchronize чтобы гарантировать что previous writes + * завершены (data visible для consumer'ов на любом GPU stream). */ size_t frame_size_bytes; int32_t ring_size_actual; @@ -33,10 +50,6 @@ struct cuframes_publisher { int32_t current_slot; /* индекс slot'а полученного через acquire() */ int has_acquired; - /* EXTERNAL ownership: map user pointer → ring index */ - void *external_ptrs[CUFRAMES_MAX_RING]; - int32_t external_count; - /* Subscriber-management thread */ pthread_t accept_thread; int accept_thread_alive; @@ -52,8 +65,16 @@ struct cuframes_publisher { /* Forward decls */ static void *accept_thread_main(void *arg); static int handshake_subscriber(struct cuframes_publisher *pub, int client_fd); +static void free_vmm_pool(struct cuframes_publisher *pub); -/* ─── Internal: alloc/setup CUDA pool and SHM ─────────────────────────── */ +/* Helper: format CUresult error для CUFRAMES_LOG_ERROR */ +static const char *cu_err_str(CUresult r) { + const char *s = NULL; + cuGetErrorString(r, &s); + return s ? s : "?"; +} + +/* ─── Internal: alloc VMM pool + export POSIX FDs ─────────────────────── */ static int alloc_library_pool(struct cuframes_publisher *pub) { int r = cuframes_internal_calc_size(pub->cfg.format, @@ -62,7 +83,37 @@ static int alloc_library_pool(struct cuframes_publisher *pub) { if (r != CUFRAMES_OK) return r; pub->ring_size_actual = pub->cfg.ring_size; + for (int i = 0; i < CUFRAMES_MAX_RING; i++) pub->vmm_fds[i] = -1; + /* Initialize CUDA driver API context */ + CUresult cr = cuInit(0); + if (cr != CUDA_SUCCESS) { + CUFRAMES_LOG_ERROR("cuInit: %s", cu_err_str(cr)); + return CUFRAMES_ERR_CUDA; + } + + /* Pick allocation prop: pinned device memory с POSIX FD handle */ + CUmemAllocationProp prop = {0}; + prop.type = CU_MEM_ALLOCATION_TYPE_PINNED; + prop.location.type = CU_MEM_LOCATION_TYPE_DEVICE; + prop.location.id = pub->cfg.cuda_device; + prop.requestedHandleTypes = CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR; + + /* Round slot size up to granularity */ + size_t granularity = 0; + cr = cuMemGetAllocationGranularity(&granularity, &prop, + CU_MEM_ALLOC_GRANULARITY_MINIMUM); + if (cr != CUDA_SUCCESS) { + CUFRAMES_LOG_ERROR("cuMemGetAllocationGranularity: %s", cu_err_str(cr)); + return CUFRAMES_ERR_CUDA; + } + pub->vmm_slot_size = ((pub->frame_size_bytes + granularity - 1) / granularity) + * granularity; + CUFRAMES_LOG_INFO("VMM granularity=%zu frame=%zu slot=%zu", + granularity, pub->frame_size_bytes, pub->vmm_slot_size); + + /* Required: also need a runtime API context so that cudaMemcpyAsync from + * user works on this allocation. cudaSetDevice достаточно. */ cudaError_t cerr = cudaSetDevice(pub->cfg.cuda_device); if (cerr != cudaSuccess) { CUFRAMES_LOG_ERROR("cudaSetDevice(%d): %s", @@ -70,74 +121,68 @@ static int alloc_library_pool(struct cuframes_publisher *pub) { return CUFRAMES_ERR_CUDA; } + CUmemAccessDesc access = {0}; + access.location = prop.location; + access.flags = CU_MEM_ACCESS_FLAGS_PROT_READWRITE; + for (int i = 0; i < pub->ring_size_actual; ++i) { - cerr = cudaMalloc(&pub->cuda_ptrs[i], pub->frame_size_bytes); - if (cerr != cudaSuccess) { - CUFRAMES_LOG_ERROR("cudaMalloc slot %d: %s", - i, cudaGetErrorString(cerr)); + cr = cuMemCreate(&pub->vmm_handles[i], pub->vmm_slot_size, &prop, 0); + if (cr != CUDA_SUCCESS) { + CUFRAMES_LOG_ERROR("cuMemCreate slot %d: %s", i, cu_err_str(cr)); + free_vmm_pool(pub); return CUFRAMES_ERR_CUDA; } - cerr = cudaIpcGetMemHandle(&pub->ipc_mem[i], pub->cuda_ptrs[i]); - if (cerr != cudaSuccess) { - CUFRAMES_LOG_ERROR("cudaIpcGetMemHandle slot %d: %s", - i, cudaGetErrorString(cerr)); + cr = cuMemAddressReserve(&pub->vmm_ptrs[i], pub->vmm_slot_size, 0, 0, 0); + if (cr != CUDA_SUCCESS) { + CUFRAMES_LOG_ERROR("cuMemAddressReserve slot %d: %s", i, cu_err_str(cr)); + free_vmm_pool(pub); + return CUFRAMES_ERR_CUDA; + } + cr = cuMemMap(pub->vmm_ptrs[i], pub->vmm_slot_size, 0, + pub->vmm_handles[i], 0); + if (cr != CUDA_SUCCESS) { + CUFRAMES_LOG_ERROR("cuMemMap slot %d: %s", i, cu_err_str(cr)); + free_vmm_pool(pub); + return CUFRAMES_ERR_CUDA; + } + cr = cuMemSetAccess(pub->vmm_ptrs[i], pub->vmm_slot_size, &access, 1); + if (cr != CUDA_SUCCESS) { + CUFRAMES_LOG_ERROR("cuMemSetAccess slot %d: %s", i, cu_err_str(cr)); + free_vmm_pool(pub); + return CUFRAMES_ERR_CUDA; + } + /* Export POSIX FD — будет shared с consumers через SCM_RIGHTS */ + cr = cuMemExportToShareableHandle((void *)&pub->vmm_fds[i], + pub->vmm_handles[i], + CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR, 0); + if (cr != CUDA_SUCCESS) { + CUFRAMES_LOG_ERROR("cuMemExportToShareableHandle slot %d: %s", + i, cu_err_str(cr)); + free_vmm_pool(pub); return CUFRAMES_ERR_CUDA; } } + pub->has_vmm_pool = 1; return CUFRAMES_OK; } -static int register_external_pool(struct cuframes_publisher *pub, - void *const *ptrs, int32_t count, - size_t frame_size) { - if (count < 1 || count > CUFRAMES_MAX_RING) return CUFRAMES_ERR_INVALID_ARG; - pub->frame_size_bytes = frame_size; - pub->ring_size_actual = count; - pub->external_count = count; - - cudaError_t cerr = cudaSetDevice(pub->cfg.cuda_device); - if (cerr != cudaSuccess) { - CUFRAMES_LOG_ERROR("cudaSetDevice: %s", cudaGetErrorString(cerr)); - return CUFRAMES_ERR_CUDA; - } - for (int i = 0; i < count; ++i) { - if (!ptrs[i]) return CUFRAMES_ERR_INVALID_ARG; - pub->cuda_ptrs[i] = ptrs[i]; - pub->external_ptrs[i] = ptrs[i]; - cerr = cudaIpcGetMemHandle(&pub->ipc_mem[i], ptrs[i]); - if (cerr != cudaSuccess) { - CUFRAMES_LOG_ERROR("cudaIpcGetMemHandle on external ptr %p: %s", - ptrs[i], cudaGetErrorString(cerr)); - return CUFRAMES_ERR_CUDA; +static void free_vmm_pool(struct cuframes_publisher *pub) { + for (int i = 0; i < CUFRAMES_MAX_RING; i++) { + if (pub->vmm_fds[i] >= 0) { + close(pub->vmm_fds[i]); + pub->vmm_fds[i] = -1; + } + if (pub->vmm_ptrs[i]) { + cuMemUnmap(pub->vmm_ptrs[i], pub->vmm_slot_size); + cuMemAddressFree(pub->vmm_ptrs[i], pub->vmm_slot_size); + pub->vmm_ptrs[i] = 0; + } + if (pub->vmm_handles[i]) { + cuMemRelease(pub->vmm_handles[i]); + pub->vmm_handles[i] = 0; } } - return CUFRAMES_OK; -} - -static int create_event_handle(struct cuframes_publisher *pub) { - /* Legacy single event — keep для v0.2 consumer compat fallback */ - cudaError_t cerr = cudaEventCreateWithFlags(&pub->event, - cudaEventDisableTiming | cudaEventInterprocess); - if (cerr != cudaSuccess) { - CUFRAMES_LOG_ERROR("cudaEventCreateWithFlags (legacy): %s", - cudaGetErrorString(cerr)); - return CUFRAMES_ERR_CUDA; - } - /* v0.3 — per-slot events. Каждый publish записывает event на свой slot; - * consumer waits event[slot_idx] specifically — закрывает TOCTOU race - * (один global event может signal'ить для другого frame). */ - for (int32_t i = 0; i < pub->ring_size_actual; i++) { - cerr = cudaEventCreateWithFlags(&pub->slot_events[i], - cudaEventDisableTiming | cudaEventInterprocess); - if (cerr != cudaSuccess) { - CUFRAMES_LOG_ERROR("cudaEventCreateWithFlags (slot %d): %s", - i, cudaGetErrorString(cerr)); - for (int32_t j = 0; j < i; j++) cudaEventDestroy(pub->slot_events[j]); - cudaEventDestroy(pub->event); - return CUFRAMES_ERR_CUDA; - } - } - return CUFRAMES_OK; + pub->has_vmm_pool = 0; } static int setup_shm(struct cuframes_publisher *pub) { @@ -155,7 +200,8 @@ static int setup_shm(struct cuframes_publisher *pub) { cuframes_shm_header_t tmp; ssize_t rb = read(existing, &tmp, sizeof(tmp)); close(existing); - if (rb == (ssize_t)sizeof(tmp) && tmp.magic == CUFRAMES_MAGIC) { + if (rb == (ssize_t)sizeof(tmp) && + (tmp.magic == CUFRAMES_MAGIC || tmp.magic == CUFRAMES_MAGIC_LEGACY)) { if (cuframes_internal_pid_alive((pid_t)tmp.producer_pid)) { CUFRAMES_LOG_ERROR("publisher with key=%s already running (pid %lu)", pub->key, (unsigned long)tmp.producer_pid); @@ -188,7 +234,7 @@ static int setup_shm(struct cuframes_publisher *pub) { memset(pub->hdr, 0, sizeof(cuframes_shm_header_t)); pub->hdr->magic = CUFRAMES_MAGIC; - pub->hdr->proto_version = CUFRAMES_PROTOCOL_V3; + pub->hdr->proto_version = CUFRAMES_PROTOCOL_V4; pub->hdr->lib_version_major = CUFRAMES_VERSION_MAJOR; pub->hdr->lib_version_minor = CUFRAMES_VERSION_MINOR; pub->hdr->lib_version_patch = CUFRAMES_VERSION_PATCH; @@ -208,25 +254,11 @@ static int setup_shm(struct cuframes_publisher *pub) { pub->hdr->meta.pitch_uv = puv; pub->hdr->meta.frame_size_bytes = pub->frame_size_bytes; - /* Export event handle (legacy single) */ - cudaError_t cerr = cudaIpcGetEventHandle(&pub->hdr->ipc_event_handle, pub->event); - if (cerr != cudaSuccess) { - CUFRAMES_LOG_ERROR("cudaIpcGetEventHandle: %s", cudaGetErrorString(cerr)); - return CUFRAMES_ERR_CUDA; - } - /* v0.3 — export per-slot event handles */ - for (int32_t i = 0; i < pub->ring_size_actual; i++) { - cerr = cudaIpcGetEventHandle(&pub->hdr->slot_event_handles[i], - pub->slot_events[i]); - if (cerr != cudaSuccess) { - CUFRAMES_LOG_ERROR("cudaIpcGetEventHandle (slot %d): %s", - i, cudaGetErrorString(cerr)); - return CUFRAMES_ERR_CUDA; - } - } - /* Fill slot descriptors */ + /* v0.4: legacy event fields в header не используются (cuStreamSynchronize + * заменяет IPC events). Memzero выше — достаточно. */ + /* Slot descriptors — mem_handle поле deprecated (передаётся через FDs), + * только seq atomic нужен. */ for (int i = 0; i < pub->ring_size_actual; ++i) { - pub->hdr->slots[i].mem_handle = pub->ipc_mem[i]; atomic_store_explicit(&pub->hdr->slots[i].seq, UINT64_MAX, memory_order_release); } @@ -310,6 +342,7 @@ static int common_init(struct cuframes_publisher *pub, pub->next_seq = 0; pub->current_slot = -1; pub->has_acquired = 0; + for (int i = 0; i < CUFRAMES_MAX_RING; i++) pub->vmm_fds[i] = -1; pthread_mutex_init(&pub->state_mu, NULL); return CUFRAMES_OK; } @@ -325,7 +358,6 @@ int cuframes_publisher_create(const cuframes_publisher_config_t *cfg, common_init(pub, cfg); if ((r = alloc_library_pool(pub)) != CUFRAMES_OK) goto fail; - if ((r = create_event_handle(pub)) != CUFRAMES_OK) goto fail; if ((r = setup_shm(pub)) != CUFRAMES_OK) goto fail; if ((r = setup_socket(pub)) != CUFRAMES_OK) goto fail; @@ -337,7 +369,7 @@ int cuframes_publisher_create(const cuframes_publisher_config_t *cfg, } pub->accept_thread_alive = 1; - CUFRAMES_LOG_INFO("publisher '%s' ready (ring=%d, %dx%d, fmt=%d, lib-owned)", + CUFRAMES_LOG_INFO("publisher '%s' ready (ring=%d, %dx%d, fmt=%d, lib-owned, v0.4 VMM)", pub->key, pub->ring_size_actual, pub->cfg.width, pub->cfg.height, (int)pub->cfg.format); *out = pub; @@ -353,37 +385,12 @@ int cuframes_publisher_create_external(const cuframes_publisher_config_t *cfg, int32_t ptr_count, size_t frame_size, cuframes_publisher_t **out) { - int r = validate_config(cfg); - if (r != CUFRAMES_OK) return r; - if (cfg->ownership != CUFRAMES_OWNERSHIP_EXTERNAL) return CUFRAMES_ERR_INVALID_ARG; - if (!cuda_ptrs || ptr_count < 1) return CUFRAMES_ERR_INVALID_ARG; - if (frame_size == 0) return CUFRAMES_ERR_INVALID_ARG; - - struct cuframes_publisher *pub = calloc(1, sizeof(*pub)); - if (!pub) return CUFRAMES_ERR_OUT_OF_MEMORY; - common_init(pub, cfg); - - if ((r = register_external_pool(pub, cuda_ptrs, ptr_count, frame_size)) != CUFRAMES_OK) - goto fail; - if ((r = create_event_handle(pub)) != CUFRAMES_OK) goto fail; - if ((r = setup_shm(pub)) != CUFRAMES_OK) goto fail; - if ((r = setup_socket(pub)) != CUFRAMES_OK) goto fail; - - pub->stop_flag = 0; - if (pthread_create(&pub->accept_thread, NULL, accept_thread_main, pub) != 0) { - r = CUFRAMES_ERR_INTERNAL; - goto fail; - } - pub->accept_thread_alive = 1; - - CUFRAMES_LOG_INFO("publisher '%s' ready (external pool=%d, %dx%d, fmt=%d)", - pub->key, ptr_count, - pub->cfg.width, pub->cfg.height, (int)pub->cfg.format); - *out = pub; - return CUFRAMES_OK; -fail: - cuframes_publisher_destroy(pub); - return r; + /* v0.4: external ownership больше не поддерживается. VMM API требует + * cuMemCreate-allocated memory; existing cudaMalloc-pointers нельзя + * export'нуть как POSIX FD. Use LIBRARY ownership. */ + (void)cfg; (void)cuda_ptrs; (void)ptr_count; (void)frame_size; (void)out; + CUFRAMES_LOG_ERROR("EXTERNAL ownership не поддерживается в v0.4 (VMM-only)"); + return CUFRAMES_ERR_INVALID_ARG; } int cuframes_publisher_acquire(cuframes_publisher_t *pub, void **cuda_ptr_out) { @@ -404,27 +411,24 @@ int cuframes_publisher_acquire(cuframes_publisher_t *pub, void **cuda_ptr_out) { while (1) { uint64_t ack = atomic_load_explicit(&pub->hdr->slots[slot].ack_bitmap, memory_order_acquire); - /* Если slot ещё не публикован (seq == UINT64_MAX) — пропустить ack check */ uint64_t cur_seq = atomic_load_explicit(&pub->hdr->slots[slot].seq, memory_order_acquire); if (cur_seq == UINT64_MAX || (ack & bitmap) == bitmap) break; if (deadline && cuframes_now_ns() > deadline) { - /* Mark slow subscriber dead и continue */ uint64_t missing = bitmap & ~ack; CUFRAMES_LOG_WARN("strict-wait timeout, slow subscribers bitmap=0x%lx", (unsigned long)missing); - /* clear missing subscribers — TODO: send unsubscribe in v0.2 */ atomic_fetch_and_explicit(&pub->hdr->subscriber_bitmap, ~missing, memory_order_release); break; } - struct timespec ts = {.tv_sec = 0, .tv_nsec = 100000}; /* 100µs */ + struct timespec ts = {.tv_sec = 0, .tv_nsec = 100000}; nanosleep(&ts, NULL); } } } - *cuda_ptr_out = pub->cuda_ptrs[slot]; + *cuda_ptr_out = (void *)(uintptr_t)pub->vmm_ptrs[slot]; pub->current_slot = slot; pub->has_acquired = 1; return CUFRAMES_OK; @@ -432,21 +436,16 @@ int cuframes_publisher_acquire(cuframes_publisher_t *pub, void **cuda_ptr_out) { static int do_publish(cuframes_publisher_t *pub, int32_t slot, void *stream, int64_t pts_ns) { - /* v0.3 — record per-slot event для precise consumer sync. Closes TOCTOU - * race где legacy `pub->event` signals "latest publish", not slot-specific. */ - cudaError_t cerr = cudaEventRecord(pub->slot_events[slot], (cudaStream_t)stream); + /* v0.4 — заменяет cudaEventRecord+IPC events на cuStreamSynchronize. + * Producer ждёт что stream flush'нулся (~1ms на 5090), потом publishes + * seq atomically. Consumer читает данные через DtoD memcpy без event + * wait — hardware coherence гарантирована на одном GPU. */ + cudaError_t cerr = cudaStreamSynchronize((cudaStream_t)stream); if (cerr != cudaSuccess) { - CUFRAMES_LOG_ERROR("cudaEventRecord (slot %d): %s", + CUFRAMES_LOG_ERROR("cudaStreamSynchronize (slot %d): %s", slot, cudaGetErrorString(cerr)); return CUFRAMES_ERR_CUDA; } - /* Legacy event — keep recording для v0.2 consumer compat fallback */ - cerr = cudaEventRecord(pub->event, (cudaStream_t)stream); - if (cerr != cudaSuccess) { - CUFRAMES_LOG_ERROR("cudaEventRecord (legacy): %s", - cudaGetErrorString(cerr)); - return CUFRAMES_ERR_CUDA; - } /* Reset ack bitmap для нового frame'а */ atomic_store_explicit(&pub->hdr->slots[slot].ack_bitmap, 0, @@ -477,44 +476,8 @@ int cuframes_publisher_publish(cuframes_publisher_t *pub, void *stream, int64_t int cuframes_publisher_publish_external(cuframes_publisher_t *pub, void *cuda_ptr, void *stream, int64_t pts_ns) { - if (!pub || !cuda_ptr) return CUFRAMES_ERR_INVALID_ARG; - if (pub->cfg.ownership != CUFRAMES_OWNERSHIP_EXTERNAL) return CUFRAMES_ERR_INVALID_ARG; - - int32_t slot = -1; - for (int i = 0; i < pub->external_count; ++i) { - if (pub->external_ptrs[i] == cuda_ptr) { slot = i; break; } - } - if (slot < 0) { - CUFRAMES_LOG_ERROR("external pointer %p not registered", cuda_ptr); - return CUFRAMES_ERR_INVALID_ARG; - } - - /* STRICT_WAIT — то же что в acquire, но per-publish */ - if (pub->cfg.policy == CUFRAMES_POLICY_STRICT_WAIT) { - uint64_t bitmap = atomic_load_explicit(&pub->hdr->subscriber_bitmap, - memory_order_acquire); - if (bitmap != 0) { - int64_t deadline = pub->cfg.consumer_ack_timeout_ms > 0 - ? cuframes_now_ns() + (int64_t)pub->cfg.consumer_ack_timeout_ms * 1000000LL - : 0; - while (1) { - uint64_t ack = atomic_load_explicit(&pub->hdr->slots[slot].ack_bitmap, - memory_order_acquire); - uint64_t cur_seq = atomic_load_explicit(&pub->hdr->slots[slot].seq, - memory_order_acquire); - if (cur_seq == UINT64_MAX || (ack & bitmap) == bitmap) break; - if (deadline && cuframes_now_ns() > deadline) { - uint64_t missing = bitmap & ~ack; - atomic_fetch_and_explicit(&pub->hdr->subscriber_bitmap, - ~missing, memory_order_release); - break; - } - struct timespec ts = {.tv_sec = 0, .tv_nsec = 100000}; - nanosleep(&ts, NULL); - } - } - } - return do_publish(pub, slot, stream, pts_ns); + (void)pub; (void)cuda_ptr; (void)stream; (void)pts_ns; + return CUFRAMES_ERR_INVALID_ARG; /* v0.4 — нет external mode */ } int cuframes_publisher_destroy(cuframes_publisher_t *pub) { @@ -536,15 +499,9 @@ int cuframes_publisher_destroy(cuframes_publisher_t *pub) { pub->accept_thread_alive = 0; } - /* Free CUDA */ - if (pub->cfg.ownership == CUFRAMES_OWNERSHIP_LIBRARY) { - for (int i = 0; i < pub->ring_size_actual; ++i) { - if (pub->cuda_ptrs[i]) cudaFree(pub->cuda_ptrs[i]); - } - } - if (pub->event) cudaEventDestroy(pub->event); - for (int32_t i = 0; i < pub->ring_size_actual; i++) { - if (pub->slot_events[i]) cudaEventDestroy(pub->slot_events[i]); + /* Free VMM */ + if (pub->has_vmm_pool) { + free_vmm_pool(pub); } /* Packet ring cleanup (если активирован) */ @@ -599,11 +556,7 @@ int cuframes_publisher_enable_packets(cuframes_publisher_t *pub, pub->has_pkt_ring = 1; pub->max_packet_size = max_pkt; - - /* Bump proto_version в frames header чтобы v2-subscribers видели поддержку. */ - if (pub->hdr) { - pub->hdr->proto_version = CUFRAMES_PROTOCOL_V2; - } + /* v0.4 frame header proto не bumped из-за packet ring — оба коэкзистируют. */ return CUFRAMES_OK; } @@ -628,9 +581,6 @@ int cuframes_publisher_publish_packet(cuframes_publisher_t *pub, /* ─── Accept thread + handshake ──────────────────────────────────────── */ -/* Per-subscriber lifecycle monitor — detects socket close (subscriber container - * exited / crashed) и освобождает bit + subscribers[] slot. Без этого каждый - * pipeline recreate leaks bit → bitmap overflows after 32 connections. */ struct sub_monitor_args { struct cuframes_publisher *pub; int fd; @@ -640,23 +590,18 @@ struct sub_monitor_args { static void *subscriber_monitor_thread(void *arg) { struct sub_monitor_args *m = (struct sub_monitor_args *)arg; char buf[64]; - /* Blocking read — return 0 (EOF) когда other side close socket, или - * <0 on error. Любой control message (PING — TODO в будущем) just consumed. */ while (1) { ssize_t n = recv(m->fd, buf, sizeof(buf), 0); if (n <= 0) { - /* Subscriber dead — clear bit + slot state. */ atomic_fetch_and_explicit(&m->pub->hdr->subscriber_bitmap, ~(1ULL << m->bit), memory_order_release); atomic_store_explicit(&m->pub->hdr->subscribers[m->bit].state, 0, memory_order_release); close(m->fd); - CUFRAMES_LOG_INFO("subscriber bit=%u disconnected — freed", - m->bit); + CUFRAMES_LOG_INFO("subscriber bit=%u disconnected — freed", m->bit); free(m); return NULL; } - /* future: parse control msgs (PING, UNSUBSCRIBE) here */ } } @@ -672,8 +617,6 @@ static void *accept_thread_main(void *arg) { CUFRAMES_LOG_WARN("accept: %s", strerror(errno)); continue; } - /* Handshake — на error close socket (no monitor spawned). На success - * monitor thread становится owner socket'a + cleanup'ит при disconnect. */ int r = handshake_subscriber(pub, client); if (r != CUFRAMES_OK) { close(client); @@ -684,7 +627,6 @@ static void *accept_thread_main(void *arg) { static int allocate_subscriber_bit(struct cuframes_publisher *pub, const char *name, uint32_t *bit_out) { - /* Bit 0 reserved (sentinel). Bits 1..31. */ pthread_mutex_lock(&pub->state_mu); for (uint32_t bit = 1; bit < CUFRAMES_MAX_SUBSCRIBERS; ++bit) { uint64_t state = atomic_load_explicit(&pub->hdr->subscribers[bit].state, @@ -704,7 +646,6 @@ static int allocate_subscriber_bit(struct cuframes_publisher *pub, pthread_mutex_unlock(&pub->state_mu); return CUFRAMES_OK; } - /* Check for name collision */ if (name && state >= 2 && strncmp(pub->hdr->subscribers[bit].consumer_name, name, sizeof(pub->hdr->subscribers[bit].consumer_name)) == 0) { @@ -731,7 +672,6 @@ static int handshake_subscriber(struct cuframes_publisher *pub, int client_fd) { return CUFRAMES_ERR_PROTOCOL; } - /* Parse HELLO_REQ: proto_version + name_len + name + cuda_device + mode */ if (plen < sizeof(cuframes_msg_hello_req_t) + 20) return CUFRAMES_ERR_PROTOCOL; cuframes_msg_hello_req_t *hreq = (cuframes_msg_hello_req_t *)buf; uint32_t want_proto = hreq->proto_version; @@ -741,18 +681,18 @@ static int handshake_subscriber(struct cuframes_publisher *pub, int client_fd) { char name[32] = {0}; memcpy(name, buf + sizeof(*hreq), name_len); - int proto_match = (want_proto == CUFRAMES_PROTOCOL_V1); + /* v0.4 принимает только V4 consumers. Старые v0.3 fail здесь cleanly. */ + int proto_match = (want_proto == CUFRAMES_PROTOCOL_V4); /* Send HELLO_RESP */ uint8_t resp_buf[CUFRAMES_MAX_MSG_PAYLOAD]; cuframes_msg_hello_resp_t *resp = (cuframes_msg_hello_resp_t *)resp_buf; memset(resp, 0, sizeof(*resp)); resp->result = proto_match ? CUFRAMES_OK : CUFRAMES_ERR_PROTOCOL; - resp->proto_version_actual = CUFRAMES_PROTOCOL_V1; + resp->proto_version_actual = CUFRAMES_PROTOCOL_V4; resp->ring_size = (uint32_t)pub->ring_size_actual; resp->ownership_mode = (uint32_t)pub->cfg.ownership; resp->meta = pub->hdr->meta; - /* shm_path */ int slen = snprintf((char *)(resp_buf + sizeof(*resp)), sizeof(resp_buf) - sizeof(*resp) - 12, "%s", pub->shm_name); @@ -765,7 +705,11 @@ static int handshake_subscriber(struct cuframes_publisher *pub, int client_fd) { CUFRAMES_LOG_WARN("send HELLO_RESP: %s", cuframes_strerror(r)); return r; } - if (!proto_match) return CUFRAMES_ERR_PROTOCOL; + if (!proto_match) { + CUFRAMES_LOG_WARN("subscriber proto v%u rejected (want v%u)", + want_proto, CUFRAMES_PROTOCOL_V4); + return CUFRAMES_ERR_PROTOCOL; + } /* recv SUBSCRIBE_REQ */ plen = sizeof(buf); @@ -773,11 +717,9 @@ static int handshake_subscriber(struct cuframes_publisher *pub, int client_fd) { if (r != CUFRAMES_OK) return r; if (mtype != CUFRAMES_MSG_SUBSCRIBE_REQ) return CUFRAMES_ERR_PROTOCOL; - /* Allocate subscriber bit */ uint32_t bit = 0; int alloc_r = allocate_subscriber_bit(pub, name, &bit); - /* Send SUBSCRIBE_RESP */ cuframes_msg_subscribe_resp_t sresp = {0}; sresp.result = alloc_r; sresp.assigned_bit = bit; @@ -788,20 +730,33 @@ static int handshake_subscriber(struct cuframes_publisher *pub, int client_fd) { &sresp, sizeof(sresp)); if (r != CUFRAMES_OK || alloc_r != CUFRAMES_OK) return r ? r : alloc_r; - /* Activate subscriber slot */ + /* v0.4 — отправить VMM_FDS с N posix FDs через SCM_RIGHTS */ + cuframes_msg_vmm_fds_t vmm_payload = {0}; + vmm_payload.slot_size_bytes = pub->vmm_slot_size; + vmm_payload.fd_count = (uint32_t)pub->ring_size_actual; + r = cuframes_internal_send_msg_with_fds(client_fd, CUFRAMES_MSG_VMM_FDS, + &vmm_payload, sizeof(vmm_payload), + pub->vmm_fds, + (uint32_t)pub->ring_size_actual); + if (r != CUFRAMES_OK) { + CUFRAMES_LOG_WARN("send VMM_FDS: %s", cuframes_strerror(r)); + /* roll back bit allocation */ + atomic_fetch_and_explicit(&pub->hdr->subscriber_bitmap, + ~(1ULL << bit), memory_order_release); + atomic_store_explicit(&pub->hdr->subscribers[bit].state, 0, + memory_order_release); + return r; + } + atomic_store_explicit(&pub->hdr->subscribers[bit].state, 2, memory_order_release); - CUFRAMES_LOG_INFO("subscriber '%s' connected (bit=%u)", name, bit); + CUFRAMES_LOG_INFO("subscriber '%s' connected (bit=%u, %d VMM FDs)", + name, bit, pub->ring_size_actual); - /* Spawn detached monitor thread — owns client_fd, frees bit on socket - * close (subscriber container exit / crash). Без этого bitmap утекал - * каждый pipeline recreate. */ + /* Spawn monitor thread */ struct sub_monitor_args *m = malloc(sizeof(*m)); - if (!m) { - /* OOM — fallback: leak fd, bit будет released только publisher_destroy */ - return CUFRAMES_OK; - } + if (!m) return CUFRAMES_OK; m->pub = pub; m->fd = client_fd; m->bit = bit; diff --git a/libcuframes/src/protocol.c b/libcuframes/src/protocol.c index b2ce12a..e8eeb38 100644 --- a/libcuframes/src/protocol.c +++ b/libcuframes/src/protocol.c @@ -3,7 +3,9 @@ #include "internal.h" #include #include +#include #include +#include #include /* Read exactly N bytes from socket, with poll-based timeout. */ @@ -97,3 +99,121 @@ int cuframes_internal_recv_msg(int fd, uint32_t *msg_type_out, if (payload_len_inout) *payload_len_inout = h.payload_length; return CUFRAMES_OK; } + +/* v0.4 — send TLV msg + N FDs через SCM_RIGHTS. Один sendmsg(): header+payload + * в iovec, FDs в control. Header.payload_length описывает ТОЛЬКО payload bytes, + * FDs приходят out-of-band. */ +int cuframes_internal_send_msg_with_fds(int sock_fd, uint32_t msg_type, + const void *payload, uint32_t payload_len, + const int *fds, uint32_t fd_count) { + if (payload_len > CUFRAMES_MAX_MSG_PAYLOAD) return CUFRAMES_ERR_INVALID_ARG; + if (fd_count > 0 && !fds) return CUFRAMES_ERR_INVALID_ARG; + + cuframes_msg_header_t h = {.msg_type = msg_type, .payload_length = payload_len}; + + struct iovec iov[2]; + iov[0].iov_base = &h; iov[0].iov_len = sizeof(h); + iov[1].iov_base = (void *)payload; iov[1].iov_len = payload_len; + + struct msghdr msg = {0}; + msg.msg_iov = iov; + msg.msg_iovlen = (payload_len > 0 && payload) ? 2 : 1; + + char ctrl_buf[CMSG_SPACE(sizeof(int) * 64)] = {0}; + if (fd_count > 0) { + if (fd_count > 64) return CUFRAMES_ERR_INVALID_ARG; + msg.msg_control = ctrl_buf; + msg.msg_controllen = CMSG_SPACE(sizeof(int) * fd_count); + struct cmsghdr *cmsg = CMSG_FIRSTHDR(&msg); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_RIGHTS; + cmsg->cmsg_len = CMSG_LEN(sizeof(int) * fd_count); + memcpy(CMSG_DATA(cmsg), fds, sizeof(int) * fd_count); + } + + ssize_t n = sendmsg(sock_fd, &msg, MSG_NOSIGNAL); + if (n < 0) { + if (errno == EPIPE) return CUFRAMES_ERR_DISCONNECTED; + return CUFRAMES_ERR_IO; + } + /* Partial send rare для small payload — но обработаем gracefully */ + size_t want = sizeof(h) + payload_len; + if ((size_t)n < want) { + return send_all(sock_fd, (uint8_t *)iov[0].iov_base + n, + want - (size_t)n); + } + return CUFRAMES_OK; +} + +int cuframes_internal_recv_msg_with_fds(int sock_fd, uint32_t *msg_type_out, + void *payload, uint32_t *payload_len_inout, + int *fds_out, uint32_t *fd_count_inout, + int32_t timeout_ms) { + /* Poll первым делом — recvmsg блокирующий, иначе тайм-аут не сработает. */ + if (timeout_ms >= 0) { + struct pollfd pfd = {.fd = sock_fd, .events = POLLIN}; + int pr = poll(&pfd, 1, timeout_ms); + if (pr == 0) return CUFRAMES_ERR_TIMEOUT; + if (pr < 0) return CUFRAMES_ERR_IO; + } + + cuframes_msg_header_t h; + struct iovec iov[2]; + iov[0].iov_base = &h; iov[0].iov_len = sizeof(h); + iov[1].iov_base = payload; iov[1].iov_len = (payload && payload_len_inout) ? *payload_len_inout : 0; + + uint32_t want_fds = (fd_count_inout && fds_out) ? *fd_count_inout : 0; + char ctrl_buf[CMSG_SPACE(sizeof(int) * 64)] = {0}; + struct msghdr msg = {0}; + msg.msg_iov = iov; + msg.msg_iovlen = (iov[1].iov_len > 0) ? 2 : 1; + msg.msg_control = ctrl_buf; + msg.msg_controllen = sizeof(ctrl_buf); + + ssize_t n = recvmsg(sock_fd, &msg, 0); + if (n == 0) return CUFRAMES_ERR_DISCONNECTED; + if (n < 0) return CUFRAMES_ERR_IO; + if ((size_t)n < sizeof(h)) return CUFRAMES_ERR_PROTOCOL; + + if (msg_type_out) *msg_type_out = h.msg_type; + if (h.payload_length > CUFRAMES_MAX_MSG_PAYLOAD) return CUFRAMES_ERR_PROTOCOL; + + /* Если recvmsg вернул меньше payload_length — добираем через recv_all */ + size_t got_payload = (size_t)n - sizeof(h); + if (h.payload_length > 0) { + if (!payload || !payload_len_inout || *payload_len_inout < h.payload_length) { + return CUFRAMES_ERR_INVALID_ARG; + } + if (got_payload < h.payload_length) { + int r = recv_all(sock_fd, (uint8_t *)payload + got_payload, + h.payload_length - got_payload, timeout_ms); + if (r != CUFRAMES_OK) return r; + } + *payload_len_inout = h.payload_length; + } else if (payload_len_inout) { + *payload_len_inout = 0; + } + + /* Parse SCM_RIGHTS FDs */ + uint32_t got_fds = 0; + struct cmsghdr *cmsg = CMSG_FIRSTHDR(&msg); + for (; cmsg; cmsg = CMSG_NXTHDR(&msg, cmsg)) { + if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) { + size_t blob = cmsg->cmsg_len - CMSG_LEN(0); + uint32_t n_fds = (uint32_t)(blob / sizeof(int)); + if (got_fds + n_fds > want_fds) { + /* Close excess FDs чтобы не утекли */ + for (uint32_t i = 0; i < n_fds; i++) { + int f; + memcpy(&f, CMSG_DATA(cmsg) + i * sizeof(int), sizeof(int)); + close(f); + } + continue; + } + memcpy(fds_out + got_fds, CMSG_DATA(cmsg), blob); + got_fds += n_fds; + } + } + if (fd_count_inout) *fd_count_inout = got_fds; + return CUFRAMES_OK; +} diff --git a/spike/.gitignore b/spike/.gitignore new file mode 100644 index 0000000..d70bb54 --- /dev/null +++ b/spike/.gitignore @@ -0,0 +1,4 @@ +vmm_fd_pingpong/producer +vmm_fd_pingpong/consumer +smoke_v04/smoke_pub +smoke_v04/smoke_sub diff --git a/spike/smoke_v04/Makefile b/spike/smoke_v04/Makefile new file mode 100644 index 0000000..121cd44 --- /dev/null +++ b/spike/smoke_v04/Makefile @@ -0,0 +1,13 @@ +CFLAGS = -O2 -Wall -I../../include -I/usr/local/cuda/include +LDFLAGS = -L../../build-v04/libcuframes -lcuframes -L/usr/local/cuda/lib64 -lcudart -lcuda -lpthread -lrt + +all: smoke_pub smoke_sub + +smoke_pub: smoke_pub.c + gcc $(CFLAGS) -o $@ $< $(LDFLAGS) + +smoke_sub: smoke_sub.c + gcc $(CFLAGS) -o $@ $< $(LDFLAGS) + +clean: + rm -f smoke_pub smoke_sub diff --git a/spike/smoke_v04/smoke_pub.c b/spike/smoke_v04/smoke_pub.c new file mode 100644 index 0000000..30f5341 --- /dev/null +++ b/spike/smoke_v04/smoke_pub.c @@ -0,0 +1,55 @@ +/* v0.4 smoke test publisher — NV12 1920x1080 ring 4, fill каждый slot + * с pattern (i % 256), publish, infinite loop. */ +#include +#include +#include +#include +#include +#include + +int main(int argc, char **argv) { + const char *key = argc > 1 ? argv[1] : "smoke"; + + cuframes_publisher_config_t cfg = {0}; + cfg.key = key; + cfg.width = 1920; + cfg.height = 1080; + cfg.format = CUFRAMES_FORMAT_NV12; + cfg.ownership = CUFRAMES_OWNERSHIP_LIBRARY; + cfg.ring_size = 4; + cfg.policy = CUFRAMES_POLICY_DROP_OLDEST; + cfg.cuda_device = 0; + + cuframes_publisher_t *pub = NULL; + int r = cuframes_publisher_create(&cfg, &pub); + if (r != CUFRAMES_OK) { + fprintf(stderr, "publisher create failed: %d (%s)\n", r, cuframes_strerror(r)); + return 1; + } + fprintf(stderr, "publisher 'cuframes-%s' ready (v0.4 VMM)\n", key); + + cudaStream_t stream; + cudaStreamCreate(&stream); + + int i = 0; + while (1) { + void *ptr = NULL; + r = cuframes_publisher_acquire(pub, &ptr); + if (r != CUFRAMES_OK) { fprintf(stderr, "acquire: %d\n", r); break; } + + uint8_t pattern = (uint8_t)(i & 0xFF); + cudaMemsetAsync(ptr, pattern, 1920 * 1080 * 3 / 2, stream); + + r = cuframes_publisher_publish(pub, stream, + (int64_t)cuframes_now_ns()); + if (r != CUFRAMES_OK) { fprintf(stderr, "publish: %d\n", r); break; } + i++; + if (i % 50 == 0) fprintf(stderr, "published %d frames\n", i); + struct timespec ts = {.tv_sec = 0, .tv_nsec = 40000000}; /* 25 fps */ + nanosleep(&ts, NULL); + } + + cudaStreamDestroy(stream); + cuframes_publisher_destroy(pub); + return 0; +} diff --git a/spike/smoke_v04/smoke_sub.c b/spike/smoke_v04/smoke_sub.c new file mode 100644 index 0000000..5528cee --- /dev/null +++ b/spike/smoke_v04/smoke_sub.c @@ -0,0 +1,63 @@ +/* v0.4 smoke subscriber — connect, read 100 frames, verify pattern, exit 0/1. */ +#include +#include +#include +#include +#include +#include + +int main(int argc, char **argv) { + const char *key = argc > 1 ? argv[1] : "smoke"; + + cuframes_subscriber_config_t cfg = {0}; + cfg.key = key; + cfg.consumer_name = "smoke-sub"; + cfg.mode = CUFRAMES_MODE_NEWEST_ONLY; + cfg.cuda_device = 0; + cfg.connect_timeout_ms = 10000; + + cuframes_subscriber_t *sub = NULL; + int r = cuframes_subscriber_create(&cfg, &sub); + if (r != CUFRAMES_OK) { + fprintf(stderr, "subscriber create failed: %d (%s)\n", r, cuframes_strerror(r)); + return 1; + } + fprintf(stderr, "subscribed to '%s' (v0.4)\n", key); + + cudaStream_t stream; + cudaStreamCreate(&stream); + size_t check_size = 1024; /* sample 1KB чтобы не тратить время */ + uint8_t *host = malloc(check_size); + + int frames = 0; + int good = 0; + while (frames < 100) { + cuframes_frame_t *f = NULL; + r = cuframes_subscriber_next(sub, stream, &f, 2000); + if (r != CUFRAMES_OK) { + fprintf(stderr, "next failed: %d (%s)\n", r, cuframes_strerror(r)); + break; + } + cudaMemcpyAsync(host, cuframes_frame_cuda_ptr(f), check_size, + cudaMemcpyDeviceToHost, stream); + cudaStreamSynchronize(stream); + uint8_t exp = host[0]; + int mismatch = 0; + for (size_t i = 1; i < check_size; i++) { + if (host[i] != exp) { mismatch++; } + } + if (mismatch == 0) good++; + if (frames % 20 == 0) { + fprintf(stderr, "frame seq=%lu byte0=0x%02x mismatch=%d\n", + (unsigned long)cuframes_frame_seq(f), exp, mismatch); + } + cuframes_subscriber_release(sub, f); + frames++; + } + free(host); + cudaStreamDestroy(stream); + cuframes_subscriber_destroy(sub); + + fprintf(stderr, "DONE: %d/%d frames OK\n", good, frames); + return (good == frames && frames > 0) ? 0 : 1; +} diff --git a/spike/vmm_fd_pingpong/Makefile b/spike/vmm_fd_pingpong/Makefile new file mode 100644 index 0000000..572ed8e --- /dev/null +++ b/spike/vmm_fd_pingpong/Makefile @@ -0,0 +1,16 @@ +CC = gcc +CFLAGS = -O2 -Wall -I/usr/local/cuda/include +LDFLAGS = -L/usr/local/cuda/lib64 -lcuda + +all: producer consumer + +producer: producer.c common.h + $(CC) $(CFLAGS) -o $@ producer.c $(LDFLAGS) + +consumer: consumer.c common.h + $(CC) $(CFLAGS) -o $@ consumer.c $(LDFLAGS) + +clean: + rm -f producer consumer + +.PHONY: all clean diff --git a/spike/vmm_fd_pingpong/README.md b/spike/vmm_fd_pingpong/README.md new file mode 100644 index 0000000..6bc7c75 --- /dev/null +++ b/spike/vmm_fd_pingpong/README.md @@ -0,0 +1,69 @@ +# vmm_fd_pingpong — spike для cuframes v0.4 + +Проверка: можно ли заменить CUDA IPC mem handles на VMM (cuMemCreate) ++ POSIX FD export, чтобы убрать требование shared pid/ipc namespaces +между producer и consumer контейнерами. + +## Результат: ✅ работает + +Запуск 2 контейнеров без shared pid/ipc, только volume mount для +unix-сокета: + +``` +producer: granularity=2097152 +producer: alloc size=16777216 +producer: exported fd=37 for handle +producer: listening on /run/spike/pingpong.sock, awaiting consumer... + +consumer: connected to producer +consumer: recv fd=38 size=16777216 magic=0xa7 +consumer: imported handle OK +consumer: mapped + access OK +consumer: verify mismatch=0/1048576 → ACK=O +consumer: done (OK) +``` + +## Ключевые наблюдения + +- **Granularity на 5090 = 2 MB**. 1920×1080 NV12 (~3.1 MB) округлится до 4 MB. + 16 slots × 4 камеры × +1 MB = +64 MB VRAM поверх текущих cuda IPC аллокаций. +- **FD передаётся через `sendmsg(SCM_RIGHTS)`** — kernel прокидывает реальный FD + в receiver namespace, переименовывая в свободный номер. Volume mount unix + socket'а — единственное требование (`/run/cuframes` уже монтируется как shared). +- **`cuMemImportFromShareableHandle`** принимает FD как `(void *)(uintptr_t)fd`. +- **Доступ на consumer side требует `cuMemSetAccess` с правильным `CUmemLocation`** — + device id из своего `cuDeviceGet`, не наследуется от producer. + +## Замена events (упрощение этапа C) + +CUDA events для IPC не имеют POSIX FD path. Внедрять external semaphores +(OPAQUE_FD) — отдельный API, другая sigal/wait семантика. **Вместо этого:** +producer вызывает `cuStreamSynchronize(stream)` ПЕРЕД `atomic_store(seq)` в +`do_publish`. Consumer тогда просто читает seq и копирует DtoD — без event wait. + +Overhead: ~1 ms на publish × 25 fps = 2.5% CPU time producer'а. Memory +coherence гарантирована (один GPU, hardware ensures writes visible после +stream sync). + +## Сборка + +```bash +docker run --rm -v $PWD:/work -w /work nvidia/cuda:12.4.1-devel-ubuntu22.04 \ + bash -c "apt-get install -y build-essential && make" +``` + +## Запуск теста + +```bash +sudo mkdir -p /var/run/spike-pingpong && sudo chmod 777 /var/run/spike-pingpong + +docker run -d --name spike-prod --runtime=nvidia --gpus all \ + -v $PWD:/work -v /var/run/spike-pingpong:/run/spike \ + nvidia/cuda:12.4.1-base-ubuntu22.04 /work/producer + +docker run --rm --name spike-cons --runtime=nvidia --gpus all \ + -v $PWD:/work -v /var/run/spike-pingpong:/run/spike \ + nvidia/cuda:12.4.1-base-ubuntu22.04 /work/consumer + +docker logs spike-prod && docker rm -f spike-prod +``` diff --git a/spike/vmm_fd_pingpong/common.h b/spike/vmm_fd_pingpong/common.h new file mode 100644 index 0000000..7482a9e --- /dev/null +++ b/spike/vmm_fd_pingpong/common.h @@ -0,0 +1,20 @@ +#pragma once +#include +#include +#include +#include + +#define POOL_SIZE (16 * 1024 * 1024) +#define MAGIC_BYTE 0xA7 +#define SOCK_PATH "/run/spike/pingpong.sock" + +#define CHECK(expr) do { \ + CUresult _r = (expr); \ + if (_r != CUDA_SUCCESS) { \ + const char *_msg = NULL; \ + cuGetErrorString(_r, &_msg); \ + fprintf(stderr, "%s:%d %s -> %d (%s)\n", \ + __FILE__, __LINE__, #expr, (int)_r, _msg ? _msg : "?"); \ + exit(1); \ + } \ +} while (0) diff --git a/spike/vmm_fd_pingpong/consumer.c b/spike/vmm_fd_pingpong/consumer.c new file mode 100644 index 0000000..e134584 --- /dev/null +++ b/spike/vmm_fd_pingpong/consumer.c @@ -0,0 +1,97 @@ +#include "common.h" +#include +#include +#include +#include +#include + +static int recv_fd(int sock, int *out_fd, uint64_t *out_size, uint8_t *out_magic) { + struct msghdr msg = {0}; + char ctrl[CMSG_SPACE(sizeof(int))]; + struct iovec iov[2]; + iov[0].iov_base = out_size; iov[0].iov_len = sizeof(*out_size); + iov[1].iov_base = out_magic; iov[1].iov_len = sizeof(*out_magic); + msg.msg_iov = iov; msg.msg_iovlen = 2; + msg.msg_control = ctrl; msg.msg_controllen = sizeof(ctrl); + ssize_t n = recvmsg(sock, &msg, 0); + if (n < 0) { perror("recvmsg"); return -1; } + struct cmsghdr *cmsg = CMSG_FIRSTHDR(&msg); + if (!cmsg || cmsg->cmsg_level != SOL_SOCKET || cmsg->cmsg_type != SCM_RIGHTS) { + fprintf(stderr, "no SCM_RIGHTS in msg\n"); + return -1; + } + memcpy(out_fd, CMSG_DATA(cmsg), sizeof(int)); + return 0; +} + +int main(void) { + CHECK(cuInit(0)); + CUdevice dev; + CHECK(cuDeviceGet(&dev, 0)); + CUcontext ctx; + CHECK(cuCtxCreate(&ctx, 0, dev)); + + /* Connect to producer */ + int sock = socket(AF_UNIX, SOCK_STREAM, 0); + if (sock < 0) { perror("socket"); return 1; } + struct sockaddr_un sa = {.sun_family = AF_UNIX}; + strncpy(sa.sun_path, SOCK_PATH, sizeof(sa.sun_path) - 1); + + for (int retry = 0; retry < 50; retry++) { + if (connect(sock, (struct sockaddr *)&sa, sizeof(sa)) == 0) break; + if (retry == 49) { perror("connect (final)"); return 1; } + usleep(100000); + } + fprintf(stderr, "consumer: connected to producer\n"); + + int fd = -1; + uint64_t size = 0; + uint8_t magic = 0; + if (recv_fd(sock, &fd, &size, &magic) < 0) return 1; + fprintf(stderr, "consumer: recv fd=%d size=%llu magic=0x%02x\n", + fd, (unsigned long long)size, magic); + + CUmemGenericAllocationHandle mem; + CHECK(cuMemImportFromShareableHandle(&mem, (void *)(uintptr_t)fd, + CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR)); + fprintf(stderr, "consumer: imported handle OK\n"); + + CUdeviceptr ptr; + CHECK(cuMemAddressReserve(&ptr, size, 0, 0, 0)); + CHECK(cuMemMap(ptr, size, 0, mem, 0)); + + CUmemAccessDesc access = {0}; + access.location.type = CU_MEM_LOCATION_TYPE_DEVICE; + access.location.id = dev; + access.flags = CU_MEM_ACCESS_FLAGS_PROT_READ; + CHECK(cuMemSetAccess(ptr, size, &access, 1)); + fprintf(stderr, "consumer: mapped + access OK\n"); + + /* Copy out 1MB чтобы убедиться что pattern там */ + size_t check = size < (1 << 20) ? size : (1 << 20); + uint8_t *host = malloc(check); + CHECK(cuMemcpyDtoH(host, ptr, check)); + CHECK(cuCtxSynchronize()); + + size_t mismatch = 0; + for (size_t i = 0; i < check; i++) { + if (host[i] != magic) mismatch++; + } + free(host); + + char ack = (mismatch == 0) ? 'O' : 'X'; + fprintf(stderr, "consumer: verify mismatch=%zu/%zu → ACK=%c\n", + mismatch, check, ack); + + write(sock, &ack, 1); + close(sock); + close(fd); + + CHECK(cuMemUnmap(ptr, size)); + CHECK(cuMemAddressFree(ptr, size)); + CHECK(cuMemRelease(mem)); + CHECK(cuCtxDestroy(ctx)); + + fprintf(stderr, "consumer: done (%s)\n", ack == 'O' ? "OK" : "FAIL"); + return ack == 'O' ? 0 : 1; +} diff --git a/spike/vmm_fd_pingpong/producer.c b/spike/vmm_fd_pingpong/producer.c new file mode 100644 index 0000000..d5b013b --- /dev/null +++ b/spike/vmm_fd_pingpong/producer.c @@ -0,0 +1,103 @@ +#include "common.h" +#include +#include +#include +#include +#include + +/* Send fd через SCM_RIGHTS вместе с (uint64_t size, uint8_t magic) payload. */ +static int send_fd(int sock, int fd, uint64_t size, uint8_t magic) { + struct msghdr msg = {0}; + char ctrl[CMSG_SPACE(sizeof(int))]; + struct iovec iov[2]; + iov[0].iov_base = &size; iov[0].iov_len = sizeof(size); + iov[1].iov_base = &magic; iov[1].iov_len = sizeof(magic); + msg.msg_iov = iov; msg.msg_iovlen = 2; + msg.msg_control = ctrl; msg.msg_controllen = sizeof(ctrl); + struct cmsghdr *cmsg = CMSG_FIRSTHDR(&msg); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_RIGHTS; + cmsg->cmsg_len = CMSG_LEN(sizeof(int)); + memcpy(CMSG_DATA(cmsg), &fd, sizeof(int)); + ssize_t n = sendmsg(sock, &msg, 0); + if (n < 0) { perror("sendmsg"); return -1; } + return 0; +} + +int main(void) { + CHECK(cuInit(0)); + CUdevice dev; + CHECK(cuDeviceGet(&dev, 0)); + CUcontext ctx; + CHECK(cuCtxCreate(&ctx, 0, dev)); + + CUmemAllocationProp prop = {0}; + prop.type = CU_MEM_ALLOCATION_TYPE_PINNED; + prop.location.type = CU_MEM_LOCATION_TYPE_DEVICE; + prop.location.id = dev; + prop.requestedHandleTypes = CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR; + + size_t granularity = 0; + CHECK(cuMemGetAllocationGranularity(&granularity, &prop, + CU_MEM_ALLOC_GRANULARITY_MINIMUM)); + fprintf(stderr, "producer: granularity=%zu\n", granularity); + + size_t size = ((POOL_SIZE + granularity - 1) / granularity) * granularity; + fprintf(stderr, "producer: alloc size=%zu\n", size); + + CUmemGenericAllocationHandle mem; + CHECK(cuMemCreate(&mem, size, &prop, 0)); + + CUdeviceptr ptr; + CHECK(cuMemAddressReserve(&ptr, size, 0, 0, 0)); + CHECK(cuMemMap(ptr, size, 0, mem, 0)); + + CUmemAccessDesc access = {0}; + access.location = prop.location; + access.flags = CU_MEM_ACCESS_FLAGS_PROT_READWRITE; + CHECK(cuMemSetAccess(ptr, size, &access, 1)); + + /* Fill with MAGIC pattern */ + CHECK(cuMemsetD8(ptr, MAGIC_BYTE, size)); + CHECK(cuCtxSynchronize()); + + int fd; + CHECK(cuMemExportToShareableHandle(&fd, mem, + CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR, 0)); + fprintf(stderr, "producer: exported fd=%d for handle\n", fd); + + /* Unix socket server */ + unlink(SOCK_PATH); + int srv = socket(AF_UNIX, SOCK_STREAM, 0); + if (srv < 0) { perror("socket"); return 1; } + struct sockaddr_un sa = {.sun_family = AF_UNIX}; + strncpy(sa.sun_path, SOCK_PATH, sizeof(sa.sun_path) - 1); + if (bind(srv, (struct sockaddr *)&sa, sizeof(sa)) < 0) { perror("bind"); return 1; } + if (listen(srv, 1) < 0) { perror("listen"); return 1; } + + fprintf(stderr, "producer: listening on %s, awaiting consumer...\n", SOCK_PATH); + int cli = accept(srv, NULL, NULL); + if (cli < 0) { perror("accept"); return 1; } + + if (send_fd(cli, fd, (uint64_t)size, MAGIC_BYTE) < 0) return 1; + fprintf(stderr, "producer: sent fd + size=%zu + magic=0x%02x\n", + size, MAGIC_BYTE); + + /* Wait for consumer ACK */ + char ack; + if (read(cli, &ack, 1) != 1) { perror("read ack"); return 1; } + fprintf(stderr, "producer: got ACK=0x%02x\n", (unsigned char)ack); + + close(cli); + close(srv); + unlink(SOCK_PATH); + close(fd); + + CHECK(cuMemUnmap(ptr, size)); + CHECK(cuMemAddressFree(ptr, size)); + CHECK(cuMemRelease(mem)); + CHECK(cuCtxDestroy(ctx)); + + fprintf(stderr, "producer: done\n"); + return ack == 'O' ? 0 : 1; +} diff --git a/tools/cuframes-rtsp-source/main.cpp b/tools/cuframes-rtsp-source/main.cpp index a5be25b..656353d 100644 --- a/tools/cuframes-rtsp-source/main.cpp +++ b/tools/cuframes-rtsp-source/main.cpp @@ -231,21 +231,16 @@ int main(int argc, char **argv) { return 2; } - /* Pre-allocate cuframes pool (NV12 — что nvdec выдаёт) */ + /* Pre-allocate cuframes pool (NV12 — что nvdec выдаёт). + * v0.4: publisher сам аллоцирует через cuMemCreate (VMM). Раньше tool + * передавал external pool, но v0.4 не может export'нуть cudaMalloc-pointers + * как POSIX FD — VMM API требует cuMemCreate-allocated memory. */ int32_t pitch_y = 0, pitch_uv = 0; size_t frame_size = cuframes::calc_frame_size(CUFRAMES_FORMAT_NV12, width, height, &pitch_y, &pitch_uv); cudaSetDevice(a.cuda_device); - std::vector pool(a.ring_size, nullptr); - for (int i = 0; i < a.ring_size; ++i) { - cudaError_t cerr = cudaMalloc(&pool[i], frame_size); - if (cerr != cudaSuccess) { - std::cerr << "cudaMalloc pool[" << i << "]: " << cudaGetErrorString(cerr) << "\n"; - return 2; - } - } cuframes::PublisherOptions po; po.key = a.key; @@ -257,12 +252,12 @@ int main(int argc, char **argv) { : CUFRAMES_POLICY_DROP_OLDEST; po.consumer_ack_timeout_ms = a.ack_timeout_ms; po.cuda_device = a.cuda_device; - po.ring_size = a.ring_size; /* для logging */ + po.ring_size = a.ring_size; - cuframes::Publisher pub(po, pool.data(), a.ring_size, frame_size); + cuframes::Publisher pub(po); /* LIBRARY ownership — publisher owns VMM pool */ std::cerr << "[cuframes-src] publisher 'cuframes-" << a.key - << "' ready, ring=" << a.ring_size - << " pool_size=" << frame_size << " bytes/frame\n"; + << "' ready (v0.4 VMM), ring=" << a.ring_size + << " frame_size=" << frame_size << " bytes\n"; /* v0.2 — encoded packet ring (опционально). */ if (a.enable_packet_ring) { @@ -293,7 +288,6 @@ int main(int argc, char **argv) { AVFrame *frame = av_frame_alloc(); if (!pkt || !frame) return 2; - int pool_idx = 0; uint64_t frame_count = 0; auto t_last_log = std::chrono::steady_clock::now(); uint64_t last_log_count = 0; @@ -375,7 +369,15 @@ int main(int argc, char **argv) { int src_pitch_y = frame->linesize[0]; int src_pitch_uv = frame->linesize[1]; - void *dst = pool[pool_idx]; + /* v0.4: acquire slot из publisher's VMM pool */ + void *dst = nullptr; + try { + dst = pub.acquire(); + } catch (const cuframes::Error &e) { + std::cerr << "acquire: " << e.what() << "\n"; + av_frame_unref(frame); + continue; + } /* D2D 2D-copy Y plane */ cudaError_t cerr = cudaMemcpy2DAsync( @@ -413,14 +415,13 @@ int main(int argc, char **argv) { int64_t pts_ns = cuframes::now_ns(); try { - pub.publish_external(dst, stream, pts_ns); + pub.publish(stream, pts_ns); } catch (const cuframes::Error &e) { - std::cerr << "publish_external: " << e.what() << "\n"; + std::cerr << "publish: " << e.what() << "\n"; av_frame_unref(frame); continue; } - pool_idx = (pool_idx + 1) % a.ring_size; frame_count++; av_frame_unref(frame); @@ -447,9 +448,7 @@ int main(int argc, char **argv) { av_buffer_unref(&hw_device); cudaStreamDestroy(stream); - /* Publisher destructor freed first; теперь освободим pool */ - /* Note: publisher уже destroyed by RAII, IPC handles closed by subscribers */ - for (auto p : pool) if (p) cudaFree(p); + /* v0.4: publisher owns VMM pool — destructor освободит cuMemRelease etc. */ return 0; }