diff --git a/CMakeLists.txt b/CMakeLists.txt index 9376841..042f7b4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,6 +1,6 @@ cmake_minimum_required(VERSION 3.20) project(cuframes - VERSION 0.2.0 + VERSION 0.3.0 DESCRIPTION "Zero-copy frame sharing via CUDA IPC" LANGUAGES C CXX CUDA ) diff --git a/include/cuframes/cuframes.h b/include/cuframes/cuframes.h index acc69ba..984b9a2 100644 --- a/include/cuframes/cuframes.h +++ b/include/cuframes/cuframes.h @@ -36,7 +36,7 @@ extern "C" { /* ─────────────────────────────────────────────────────────────────────── */ #define CUFRAMES_VERSION_MAJOR 0 -#define CUFRAMES_VERSION_MINOR 2 +#define CUFRAMES_VERSION_MINOR 3 #define CUFRAMES_VERSION_PATCH 0 /** @brief Runtime-версия библиотеки в формате "MAJOR.MINOR.PATCH". */ diff --git a/libcuframes/CMakeLists.txt b/libcuframes/CMakeLists.txt index 0730acf..9115144 100644 --- a/libcuframes/CMakeLists.txt +++ b/libcuframes/CMakeLists.txt @@ -41,7 +41,7 @@ endforeach() # Set SOVERSION на shared lib для ABI tracking set_target_properties(cuframes PROPERTIES - VERSION 0.2.0 + VERSION 0.3.0 SOVERSION 0 ) diff --git a/libcuframes/src/consumer.c b/libcuframes/src/consumer.c index 595cba4..5cc8b33 100644 --- a/libcuframes/src/consumer.c +++ b/libcuframes/src/consumer.c @@ -44,7 +44,9 @@ struct cuframes_subscriber { cuframes_shm_header_t *hdr; char shm_name[80]; - cudaEvent_t producer_event; + cudaEvent_t producer_event; /* legacy fallback (v0.2 proto) */ + cudaEvent_t slot_events[CUFRAMES_MAX_RING]; /* v0.3 — per-slot events */ + int has_slot_events; /* 1 if v0.3 events opened OK */ void *mapped_ptrs[CUFRAMES_MAX_RING]; uint32_t assigned_bit; @@ -201,13 +203,37 @@ int cuframes_subscriber_create(const cuframes_subscriber_config_t *cfg, r = CUFRAMES_ERR_CUDA; goto fail; } - /* Open producer's event */ + /* Open producer's event (legacy single — v0.2 compat fallback) */ cerr = cudaIpcOpenEventHandle(&sub->producer_event, sub->hdr->ipc_event_handle); if (cerr != cudaSuccess) { CUFRAMES_LOG_ERROR("cudaIpcOpenEventHandle: %s", cudaGetErrorString(cerr)); r = CUFRAMES_ERR_CUDA; goto fail; } + /* v0.3 — open per-slot events если protocol supports. */ + sub->has_slot_events = 0; + if (sub->hdr->proto_version >= CUFRAMES_PROTOCOL_V3) { + int ring_evt = (int)sub->hdr->ring_size; + if (ring_evt > CUFRAMES_MAX_RING) ring_evt = CUFRAMES_MAX_RING; + int evt_ok = 1; + for (int i = 0; i < ring_evt; i++) { + cerr = cudaIpcOpenEventHandle(&sub->slot_events[i], + sub->hdr->slot_event_handles[i]); + if (cerr != cudaSuccess) { + CUFRAMES_LOG_WARN("cudaIpcOpenEventHandle slot %d: %s — " + "fallback к legacy single event", + i, cudaGetErrorString(cerr)); + for (int j = 0; j < i; j++) cudaEventDestroy(sub->slot_events[j]); + evt_ok = 0; + break; + } + } + if (evt_ok) { + sub->has_slot_events = 1; + CUFRAMES_LOG_INFO("subscribed с per-slot events (v0.3 proto)"); + } + } + /* Open mem handles */ int ring = (int)sub->hdr->ring_size; if (ring > CUFRAMES_MAX_RING) ring = CUFRAMES_MAX_RING; @@ -275,10 +301,16 @@ int cuframes_subscriber_next(cuframes_subscriber_t *sub, int64_t pts = atomic_load_explicit(&sub->hdr->slots[slot_idx].pts_ns, memory_order_acquire); - /* Cross-process sync: wait event on consumer's stream */ + /* Cross-process sync: wait event on consumer's stream. + * v0.3: per-slot event точно соответствует slot[slot_idx] — + * no TOCTOU race possible. v0.2 fallback: single global event + + * post-sync verify (less precise, but still correct). */ + cudaEvent_t sync_event = sub->has_slot_events + ? sub->slot_events[slot_idx] + : sub->producer_event; if (consumer_stream) { cudaError_t cerr = cudaStreamWaitEvent((cudaStream_t)consumer_stream, - sub->producer_event, 0); + sync_event, 0); if (cerr != cudaSuccess) { CUFRAMES_LOG_WARN("cudaStreamWaitEvent: %s", cudaGetErrorString(cerr)); @@ -286,19 +318,19 @@ int cuframes_subscriber_next(cuframes_subscriber_t *sub, } } else { /* Synchronize globally — для cudaMemcpyDeviceToHost users */ - cudaError_t cerr = cudaEventSynchronize(sub->producer_event); + cudaError_t cerr = cudaEventSynchronize(sync_event); if (cerr != cudaSuccess) return CUFRAMES_ERR_CUDA; } - /* TOCTOU защита: producer_event signal'ит для последнего published - * frame, не per-slot. Если producer wrapped ring пока мы ждали - * event sync, slot[slot_idx] уже содержит DIFFERENT seq. - * Re-verify slot_seq — если изменился, retry с новым target_seq. */ - uint64_t verify_seq = atomic_load_explicit(&sub->hdr->slots[slot_idx].seq, - memory_order_acquire); - if (verify_seq != target_seq) { - /* Slot overwritten во время event wait — outer loop пересчитает */ - continue; + /* TOCTOU защита (v0.2 fallback only): legacy single event signals + * для последнего published frame. v0.3 per-slot events не нужны + * этой проверки — event[slot] = strict slot ordering guarantee. */ + if (!sub->has_slot_events) { + uint64_t verify_seq = atomic_load_explicit(&sub->hdr->slots[slot_idx].seq, + memory_order_acquire); + if (verify_seq != target_seq) { + continue; + } } /* Fill frame_out */ @@ -369,6 +401,13 @@ int cuframes_subscriber_destroy(cuframes_subscriber_t *sub) { } if (sub->producer_event) cudaEventDestroy(sub->producer_event); + if (sub->has_slot_events) { + int ring_evt = (int)sub->hdr->ring_size; + if (ring_evt > CUFRAMES_MAX_RING) ring_evt = CUFRAMES_MAX_RING; + for (int i = 0; i < ring_evt; i++) { + if (sub->slot_events[i]) cudaEventDestroy(sub->slot_events[i]); + } + } int ring = sub->hdr ? (int)sub->hdr->ring_size : 0; if (ring > CUFRAMES_MAX_RING) ring = CUFRAMES_MAX_RING; diff --git a/libcuframes/src/internal.h b/libcuframes/src/internal.h index 3377357..3a97fbc 100644 --- a/libcuframes/src/internal.h +++ b/libcuframes/src/internal.h @@ -24,6 +24,7 @@ #define CUFRAMES_MAGIC 0xCC7C1DCCu #define CUFRAMES_PROTOCOL_V1 1u #define CUFRAMES_PROTOCOL_V2 2u /* v0.2 — packet ring support */ +#define CUFRAMES_PROTOCOL_V3 3u /* v0.3 — per-slot CUDA events (no TOCTOU race) */ #define CUFRAMES_MAX_SUBSCRIBERS 32 #define CUFRAMES_MAX_RING 16 #define CUFRAMES_MAX_KEY_LEN 63 @@ -107,6 +108,11 @@ typedef struct __attribute__((packed)) cuframes_shm_header { /* offset 0x100 — variable-length tail */ cuframes_shm_slot_t slots[CUFRAMES_MAX_RING]; /* 192 × 16 = 3072 */ cuframes_shm_subscriber_t subscribers[CUFRAMES_MAX_SUBSCRIBERS]; /* 128 × 32 = 4096 */ + /* v0.3 — per-slot CUDA event handles. Producer records event per publish; + * consumer waits event[slot_idx] specifically (не global ipc_event_handle + * который signals только для последнего published frame). Закрывает TOCTOU + * race в slot read. 64 × 16 = 1024 bytes. */ + cudaIpcEventHandle_t slot_event_handles[CUFRAMES_MAX_RING]; } cuframes_shm_header_t; /* Layout sanity checks (docs/protocol.md §2 table) */ diff --git a/libcuframes/src/producer.c b/libcuframes/src/producer.c index ab9a842..a5fde59 100644 --- a/libcuframes/src/producer.c +++ b/libcuframes/src/producer.c @@ -21,7 +21,8 @@ struct cuframes_publisher { char shm_name[80]; /* CUDA */ - cudaEvent_t event; + cudaEvent_t event; /* legacy single event (v0.2 compat) */ + cudaEvent_t slot_events[CUFRAMES_MAX_RING]; /* v0.3 — per-slot events */ cudaIpcMemHandle_t ipc_mem[CUFRAMES_MAX_RING]; void *cuda_ptrs[CUFRAMES_MAX_RING]; /* mapped pointers */ size_t frame_size_bytes; @@ -114,13 +115,28 @@ static int register_external_pool(struct cuframes_publisher *pub, } static int create_event_handle(struct cuframes_publisher *pub) { + /* Legacy single event — keep для v0.2 consumer compat fallback */ cudaError_t cerr = cudaEventCreateWithFlags(&pub->event, cudaEventDisableTiming | cudaEventInterprocess); if (cerr != cudaSuccess) { - CUFRAMES_LOG_ERROR("cudaEventCreateWithFlags: %s", + CUFRAMES_LOG_ERROR("cudaEventCreateWithFlags (legacy): %s", cudaGetErrorString(cerr)); return CUFRAMES_ERR_CUDA; } + /* v0.3 — per-slot events. Каждый publish записывает event на свой slot; + * consumer waits event[slot_idx] specifically — закрывает TOCTOU race + * (один global event может signal'ить для другого frame). */ + for (int32_t i = 0; i < pub->ring_size_actual; i++) { + cerr = cudaEventCreateWithFlags(&pub->slot_events[i], + cudaEventDisableTiming | cudaEventInterprocess); + if (cerr != cudaSuccess) { + CUFRAMES_LOG_ERROR("cudaEventCreateWithFlags (slot %d): %s", + i, cudaGetErrorString(cerr)); + for (int32_t j = 0; j < i; j++) cudaEventDestroy(pub->slot_events[j]); + cudaEventDestroy(pub->event); + return CUFRAMES_ERR_CUDA; + } + } return CUFRAMES_OK; } @@ -172,7 +188,7 @@ static int setup_shm(struct cuframes_publisher *pub) { memset(pub->hdr, 0, sizeof(cuframes_shm_header_t)); pub->hdr->magic = CUFRAMES_MAGIC; - pub->hdr->proto_version = CUFRAMES_PROTOCOL_V1; + pub->hdr->proto_version = CUFRAMES_PROTOCOL_V3; pub->hdr->lib_version_major = CUFRAMES_VERSION_MAJOR; pub->hdr->lib_version_minor = CUFRAMES_VERSION_MINOR; pub->hdr->lib_version_patch = CUFRAMES_VERSION_PATCH; @@ -192,13 +208,22 @@ static int setup_shm(struct cuframes_publisher *pub) { pub->hdr->meta.pitch_uv = puv; pub->hdr->meta.frame_size_bytes = pub->frame_size_bytes; - /* Export event handle */ + /* Export event handle (legacy single) */ cudaError_t cerr = cudaIpcGetEventHandle(&pub->hdr->ipc_event_handle, pub->event); if (cerr != cudaSuccess) { CUFRAMES_LOG_ERROR("cudaIpcGetEventHandle: %s", cudaGetErrorString(cerr)); return CUFRAMES_ERR_CUDA; } - + /* v0.3 — export per-slot event handles */ + for (int32_t i = 0; i < pub->ring_size_actual; i++) { + cerr = cudaIpcGetEventHandle(&pub->hdr->slot_event_handles[i], + pub->slot_events[i]); + if (cerr != cudaSuccess) { + CUFRAMES_LOG_ERROR("cudaIpcGetEventHandle (slot %d): %s", + i, cudaGetErrorString(cerr)); + return CUFRAMES_ERR_CUDA; + } + } /* Fill slot descriptors */ for (int i = 0; i < pub->ring_size_actual; ++i) { pub->hdr->slots[i].mem_handle = pub->ipc_mem[i]; @@ -407,10 +432,19 @@ int cuframes_publisher_acquire(cuframes_publisher_t *pub, void **cuda_ptr_out) { static int do_publish(cuframes_publisher_t *pub, int32_t slot, void *stream, int64_t pts_ns) { - /* Record event on producer's stream */ - cudaError_t cerr = cudaEventRecord(pub->event, (cudaStream_t)stream); + /* v0.3 — record per-slot event для precise consumer sync. Closes TOCTOU + * race где legacy `pub->event` signals "latest publish", not slot-specific. */ + cudaError_t cerr = cudaEventRecord(pub->slot_events[slot], (cudaStream_t)stream); if (cerr != cudaSuccess) { - CUFRAMES_LOG_ERROR("cudaEventRecord: %s", cudaGetErrorString(cerr)); + CUFRAMES_LOG_ERROR("cudaEventRecord (slot %d): %s", + slot, cudaGetErrorString(cerr)); + return CUFRAMES_ERR_CUDA; + } + /* Legacy event — keep recording для v0.2 consumer compat fallback */ + cerr = cudaEventRecord(pub->event, (cudaStream_t)stream); + if (cerr != cudaSuccess) { + CUFRAMES_LOG_ERROR("cudaEventRecord (legacy): %s", + cudaGetErrorString(cerr)); return CUFRAMES_ERR_CUDA; } @@ -509,6 +543,9 @@ int cuframes_publisher_destroy(cuframes_publisher_t *pub) { } } if (pub->event) cudaEventDestroy(pub->event); + for (int32_t i = 0; i < pub->ring_size_actual; i++) { + if (pub->slot_events[i]) cudaEventDestroy(pub->slot_events[i]); + } /* Packet ring cleanup (если активирован) */ if (pub->has_pkt_ring) {