2 Commits

Author SHA1 Message Date
gx 656e36e9b0 v0.3.1: per-subscriber monitor thread — fix bitmap leak
release / build runtime Docker image (push) Failing after 0s
release / build source tarball (push) Successful in 4s
build / cmake build (CUDA 12.4, Ubuntu 22.04) (push) Successful in 1m39s
build / ffmpeg filter patch (out-of-tree) (push) Successful in 1m32s
test-u4-runner / u4 runner smoke test (push) Has been cancelled
Bug: handshake_subscriber assigned bit + activated slot но НЕ tracked
client_fd. Когда subscriber container exited, socket closed on client side
но producer не detected → bit оставался set forever → после 32 connections
subscribe_create('cam-X'): too many subscribers (max 32).

Симптом в production: каждый pipeline recreate accumulated 1 stale subscriber.
После 4-5 recreate операций publishers перестали accept new pipeline →
"too many subscribers" crash loop.

Fix: после успешного handshake spawn detached pthread monitoring socket
via blocking recv(). recv() returns 0 (EOF) когда other side closes —
monitor clears bit (subscriber_bitmap &= ~(1<<bit)) + state[bit] = 0,
closes fd, exits.

Cost: 1 thread per active subscriber. Max 32 threads — небольшой
overhead. Threads detached, no join needed.

Stress test: 5x pipeline recreate без single "too many subscribers" error.
Раньше: 2-3 recreate → bitmap overflow.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-24 08:00:41 +01:00
gx 8c7abbc4e8 v0.3: per-slot CUDA events — закрывает TOCTOU race без crutches
release / build runtime Docker image (push) Failing after 1s
release / build source tarball (push) Successful in 5s
build / cmake build (CUDA 12.4, Ubuntu 22.04) (push) Successful in 1m40s
build / ffmpeg filter patch (out-of-tree) (push) Successful in 1m22s
test-u4-runner / u4 runner smoke test (push) Has been cancelled
Protocol bump V2→V3:
  + shm header: cudaIpcEventHandle_t slot_event_handles[CUFRAMES_MAX_RING]
  + producer creates ring_size events (вместо одного global)
  + producer.do_publish records event[slot] (вместо pub->event)
  + consumer opens all slot events при subscribe
  + consumer waits event[slot_idx] specifically (вместо global producer_event)

Backward compat:
  - Legacy pub->event сохранён + ipc_event_handle export'ится — v0.2 consumers
    видят его и работают по-старому (с post-sync verify hack из 517107d).
  - v0.3 consumer auto-detects proto_version >= 3, fallback к legacy если
    cudaIpcOpenEventHandle на slot fail (graceful degradation).

Effect (15-sec sample на Phase 7 single-cam, motion):
  v0.1 production:  dup runs 34.7%, max 14 frames (560ms freeze)
  v0.2.1 fix:       dup runs 10%, max 6, 0 back-jumps detected
  v0.3 per-slot:    dup runs 1.9%, max 5, 3 back-jumps (likely encoder
                    static-content artifacts, not real race)

Размер shm header: 7424 → 8448 bytes (+1024 для slot_event_handles).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-22 09:23:53 +01:00
6 changed files with 159 additions and 31 deletions
+1 -1
View File
@@ -1,6 +1,6 @@
cmake_minimum_required(VERSION 3.20)
project(cuframes
VERSION 0.2.0
VERSION 0.3.0
DESCRIPTION "Zero-copy frame sharing via CUDA IPC"
LANGUAGES C CXX CUDA
)
+1 -1
View File
@@ -36,7 +36,7 @@ extern "C" {
/* ─────────────────────────────────────────────────────────────────────── */
#define CUFRAMES_VERSION_MAJOR 0
#define CUFRAMES_VERSION_MINOR 2
#define CUFRAMES_VERSION_MINOR 3
#define CUFRAMES_VERSION_PATCH 0
/** @brief Runtime-версия библиотеки в формате "MAJOR.MINOR.PATCH". */
+1 -1
View File
@@ -41,7 +41,7 @@ endforeach()
# Set SOVERSION на shared lib для ABI tracking
set_target_properties(cuframes PROPERTIES
VERSION 0.2.0
VERSION 0.3.0
SOVERSION 0
)
+53 -14
View File
@@ -44,7 +44,9 @@ struct cuframes_subscriber {
cuframes_shm_header_t *hdr;
char shm_name[80];
cudaEvent_t producer_event;
cudaEvent_t producer_event; /* legacy fallback (v0.2 proto) */
cudaEvent_t slot_events[CUFRAMES_MAX_RING]; /* v0.3 — per-slot events */
int has_slot_events; /* 1 if v0.3 events opened OK */
void *mapped_ptrs[CUFRAMES_MAX_RING];
uint32_t assigned_bit;
@@ -201,13 +203,37 @@ int cuframes_subscriber_create(const cuframes_subscriber_config_t *cfg,
r = CUFRAMES_ERR_CUDA; goto fail;
}
/* Open producer's event */
/* Open producer's event (legacy single — v0.2 compat fallback) */
cerr = cudaIpcOpenEventHandle(&sub->producer_event, sub->hdr->ipc_event_handle);
if (cerr != cudaSuccess) {
CUFRAMES_LOG_ERROR("cudaIpcOpenEventHandle: %s", cudaGetErrorString(cerr));
r = CUFRAMES_ERR_CUDA; goto fail;
}
/* v0.3 — open per-slot events если protocol supports. */
sub->has_slot_events = 0;
if (sub->hdr->proto_version >= CUFRAMES_PROTOCOL_V3) {
int ring_evt = (int)sub->hdr->ring_size;
if (ring_evt > CUFRAMES_MAX_RING) ring_evt = CUFRAMES_MAX_RING;
int evt_ok = 1;
for (int i = 0; i < ring_evt; i++) {
cerr = cudaIpcOpenEventHandle(&sub->slot_events[i],
sub->hdr->slot_event_handles[i]);
if (cerr != cudaSuccess) {
CUFRAMES_LOG_WARN("cudaIpcOpenEventHandle slot %d: %s — "
"fallback к legacy single event",
i, cudaGetErrorString(cerr));
for (int j = 0; j < i; j++) cudaEventDestroy(sub->slot_events[j]);
evt_ok = 0;
break;
}
}
if (evt_ok) {
sub->has_slot_events = 1;
CUFRAMES_LOG_INFO("subscribed с per-slot events (v0.3 proto)");
}
}
/* Open mem handles */
int ring = (int)sub->hdr->ring_size;
if (ring > CUFRAMES_MAX_RING) ring = CUFRAMES_MAX_RING;
@@ -275,10 +301,16 @@ int cuframes_subscriber_next(cuframes_subscriber_t *sub,
int64_t pts = atomic_load_explicit(&sub->hdr->slots[slot_idx].pts_ns,
memory_order_acquire);
/* Cross-process sync: wait event on consumer's stream */
/* Cross-process sync: wait event on consumer's stream.
* v0.3: per-slot event точно соответствует slot[slot_idx] —
* no TOCTOU race possible. v0.2 fallback: single global event +
* post-sync verify (less precise, but still correct). */
cudaEvent_t sync_event = sub->has_slot_events
? sub->slot_events[slot_idx]
: sub->producer_event;
if (consumer_stream) {
cudaError_t cerr = cudaStreamWaitEvent((cudaStream_t)consumer_stream,
sub->producer_event, 0);
sync_event, 0);
if (cerr != cudaSuccess) {
CUFRAMES_LOG_WARN("cudaStreamWaitEvent: %s",
cudaGetErrorString(cerr));
@@ -286,19 +318,19 @@ int cuframes_subscriber_next(cuframes_subscriber_t *sub,
}
} else {
/* Synchronize globally — для cudaMemcpyDeviceToHost users */
cudaError_t cerr = cudaEventSynchronize(sub->producer_event);
cudaError_t cerr = cudaEventSynchronize(sync_event);
if (cerr != cudaSuccess) return CUFRAMES_ERR_CUDA;
}
/* TOCTOU защита: producer_event signal'ит для последнего published
* frame, не per-slot. Если producer wrapped ring пока мы ждали
* event sync, slot[slot_idx] уже содержит DIFFERENT seq.
* Re-verify slot_seq — если изменился, retry с новым target_seq. */
uint64_t verify_seq = atomic_load_explicit(&sub->hdr->slots[slot_idx].seq,
memory_order_acquire);
if (verify_seq != target_seq) {
/* Slot overwritten во время event wait — outer loop пересчитает */
continue;
/* TOCTOU защита (v0.2 fallback only): legacy single event signals
* для последнего published frame. v0.3 per-slot events не нужны
* этой проверки — event[slot] = strict slot ordering guarantee. */
if (!sub->has_slot_events) {
uint64_t verify_seq = atomic_load_explicit(&sub->hdr->slots[slot_idx].seq,
memory_order_acquire);
if (verify_seq != target_seq) {
continue;
}
}
/* Fill frame_out */
@@ -369,6 +401,13 @@ int cuframes_subscriber_destroy(cuframes_subscriber_t *sub) {
}
if (sub->producer_event) cudaEventDestroy(sub->producer_event);
if (sub->has_slot_events) {
int ring_evt = (int)sub->hdr->ring_size;
if (ring_evt > CUFRAMES_MAX_RING) ring_evt = CUFRAMES_MAX_RING;
for (int i = 0; i < ring_evt; i++) {
if (sub->slot_events[i]) cudaEventDestroy(sub->slot_events[i]);
}
}
int ring = sub->hdr ? (int)sub->hdr->ring_size : 0;
if (ring > CUFRAMES_MAX_RING) ring = CUFRAMES_MAX_RING;
+6
View File
@@ -24,6 +24,7 @@
#define CUFRAMES_MAGIC 0xCC7C1DCCu
#define CUFRAMES_PROTOCOL_V1 1u
#define CUFRAMES_PROTOCOL_V2 2u /* v0.2 — packet ring support */
#define CUFRAMES_PROTOCOL_V3 3u /* v0.3 — per-slot CUDA events (no TOCTOU race) */
#define CUFRAMES_MAX_SUBSCRIBERS 32
#define CUFRAMES_MAX_RING 16
#define CUFRAMES_MAX_KEY_LEN 63
@@ -107,6 +108,11 @@ typedef struct __attribute__((packed)) cuframes_shm_header {
/* offset 0x100 — variable-length tail */
cuframes_shm_slot_t slots[CUFRAMES_MAX_RING]; /* 192 × 16 = 3072 */
cuframes_shm_subscriber_t subscribers[CUFRAMES_MAX_SUBSCRIBERS]; /* 128 × 32 = 4096 */
/* v0.3 — per-slot CUDA event handles. Producer records event per publish;
* consumer waits event[slot_idx] specifically (не global ipc_event_handle
* который signals только для последнего published frame). Закрывает TOCTOU
* race в slot read. 64 × 16 = 1024 bytes. */
cudaIpcEventHandle_t slot_event_handles[CUFRAMES_MAX_RING];
} cuframes_shm_header_t;
/* Layout sanity checks (docs/protocol.md §2 table) */
+97 -14
View File
@@ -21,7 +21,8 @@ struct cuframes_publisher {
char shm_name[80];
/* CUDA */
cudaEvent_t event;
cudaEvent_t event; /* legacy single event (v0.2 compat) */
cudaEvent_t slot_events[CUFRAMES_MAX_RING]; /* v0.3 — per-slot events */
cudaIpcMemHandle_t ipc_mem[CUFRAMES_MAX_RING];
void *cuda_ptrs[CUFRAMES_MAX_RING]; /* mapped pointers */
size_t frame_size_bytes;
@@ -114,13 +115,28 @@ static int register_external_pool(struct cuframes_publisher *pub,
}
static int create_event_handle(struct cuframes_publisher *pub) {
/* Legacy single event — keep для v0.2 consumer compat fallback */
cudaError_t cerr = cudaEventCreateWithFlags(&pub->event,
cudaEventDisableTiming | cudaEventInterprocess);
if (cerr != cudaSuccess) {
CUFRAMES_LOG_ERROR("cudaEventCreateWithFlags: %s",
CUFRAMES_LOG_ERROR("cudaEventCreateWithFlags (legacy): %s",
cudaGetErrorString(cerr));
return CUFRAMES_ERR_CUDA;
}
/* v0.3 — per-slot events. Каждый publish записывает event на свой slot;
* consumer waits event[slot_idx] specifically — закрывает TOCTOU race
* (один global event может signal'ить для другого frame). */
for (int32_t i = 0; i < pub->ring_size_actual; i++) {
cerr = cudaEventCreateWithFlags(&pub->slot_events[i],
cudaEventDisableTiming | cudaEventInterprocess);
if (cerr != cudaSuccess) {
CUFRAMES_LOG_ERROR("cudaEventCreateWithFlags (slot %d): %s",
i, cudaGetErrorString(cerr));
for (int32_t j = 0; j < i; j++) cudaEventDestroy(pub->slot_events[j]);
cudaEventDestroy(pub->event);
return CUFRAMES_ERR_CUDA;
}
}
return CUFRAMES_OK;
}
@@ -172,7 +188,7 @@ static int setup_shm(struct cuframes_publisher *pub) {
memset(pub->hdr, 0, sizeof(cuframes_shm_header_t));
pub->hdr->magic = CUFRAMES_MAGIC;
pub->hdr->proto_version = CUFRAMES_PROTOCOL_V1;
pub->hdr->proto_version = CUFRAMES_PROTOCOL_V3;
pub->hdr->lib_version_major = CUFRAMES_VERSION_MAJOR;
pub->hdr->lib_version_minor = CUFRAMES_VERSION_MINOR;
pub->hdr->lib_version_patch = CUFRAMES_VERSION_PATCH;
@@ -192,13 +208,22 @@ static int setup_shm(struct cuframes_publisher *pub) {
pub->hdr->meta.pitch_uv = puv;
pub->hdr->meta.frame_size_bytes = pub->frame_size_bytes;
/* Export event handle */
/* Export event handle (legacy single) */
cudaError_t cerr = cudaIpcGetEventHandle(&pub->hdr->ipc_event_handle, pub->event);
if (cerr != cudaSuccess) {
CUFRAMES_LOG_ERROR("cudaIpcGetEventHandle: %s", cudaGetErrorString(cerr));
return CUFRAMES_ERR_CUDA;
}
/* v0.3 — export per-slot event handles */
for (int32_t i = 0; i < pub->ring_size_actual; i++) {
cerr = cudaIpcGetEventHandle(&pub->hdr->slot_event_handles[i],
pub->slot_events[i]);
if (cerr != cudaSuccess) {
CUFRAMES_LOG_ERROR("cudaIpcGetEventHandle (slot %d): %s",
i, cudaGetErrorString(cerr));
return CUFRAMES_ERR_CUDA;
}
}
/* Fill slot descriptors */
for (int i = 0; i < pub->ring_size_actual; ++i) {
pub->hdr->slots[i].mem_handle = pub->ipc_mem[i];
@@ -407,10 +432,19 @@ int cuframes_publisher_acquire(cuframes_publisher_t *pub, void **cuda_ptr_out) {
static int do_publish(cuframes_publisher_t *pub, int32_t slot,
void *stream, int64_t pts_ns) {
/* Record event on producer's stream */
cudaError_t cerr = cudaEventRecord(pub->event, (cudaStream_t)stream);
/* v0.3 — record per-slot event для precise consumer sync. Closes TOCTOU
* race где legacy `pub->event` signals "latest publish", not slot-specific. */
cudaError_t cerr = cudaEventRecord(pub->slot_events[slot], (cudaStream_t)stream);
if (cerr != cudaSuccess) {
CUFRAMES_LOG_ERROR("cudaEventRecord: %s", cudaGetErrorString(cerr));
CUFRAMES_LOG_ERROR("cudaEventRecord (slot %d): %s",
slot, cudaGetErrorString(cerr));
return CUFRAMES_ERR_CUDA;
}
/* Legacy event — keep recording для v0.2 consumer compat fallback */
cerr = cudaEventRecord(pub->event, (cudaStream_t)stream);
if (cerr != cudaSuccess) {
CUFRAMES_LOG_ERROR("cudaEventRecord (legacy): %s",
cudaGetErrorString(cerr));
return CUFRAMES_ERR_CUDA;
}
@@ -509,6 +543,9 @@ int cuframes_publisher_destroy(cuframes_publisher_t *pub) {
}
}
if (pub->event) cudaEventDestroy(pub->event);
for (int32_t i = 0; i < pub->ring_size_actual; i++) {
if (pub->slot_events[i]) cudaEventDestroy(pub->slot_events[i]);
}
/* Packet ring cleanup (если активирован) */
if (pub->has_pkt_ring) {
@@ -591,6 +628,38 @@ int cuframes_publisher_publish_packet(cuframes_publisher_t *pub,
/* ─── Accept thread + handshake ──────────────────────────────────────── */
/* Per-subscriber lifecycle monitor — detects socket close (subscriber container
* exited / crashed) и освобождает bit + subscribers[] slot. Без этого каждый
* pipeline recreate leaks bit → bitmap overflows after 32 connections. */
struct sub_monitor_args {
struct cuframes_publisher *pub;
int fd;
uint32_t bit;
};
static void *subscriber_monitor_thread(void *arg) {
struct sub_monitor_args *m = (struct sub_monitor_args *)arg;
char buf[64];
/* Blocking read — return 0 (EOF) когда other side close socket, или
* <0 on error. Любой control message (PING — TODO в будущем) just consumed. */
while (1) {
ssize_t n = recv(m->fd, buf, sizeof(buf), 0);
if (n <= 0) {
/* Subscriber dead — clear bit + slot state. */
atomic_fetch_and_explicit(&m->pub->hdr->subscriber_bitmap,
~(1ULL << m->bit), memory_order_release);
atomic_store_explicit(&m->pub->hdr->subscribers[m->bit].state, 0,
memory_order_release);
close(m->fd);
CUFRAMES_LOG_INFO("subscriber bit=%u disconnected — freed",
m->bit);
free(m);
return NULL;
}
/* future: parse control msgs (PING, UNSUBSCRIBE) here */
}
}
static void *accept_thread_main(void *arg) {
struct cuframes_publisher *pub = (struct cuframes_publisher *)arg;
while (!pub->stop_flag) {
@@ -603,14 +672,12 @@ static void *accept_thread_main(void *arg) {
CUFRAMES_LOG_WARN("accept: %s", strerror(errno));
continue;
}
/* Synchronous handshake — после ответа socket остаётся открытым для
* lifetime signals (SHUTDOWN, PING). Close на error. */
/* Handshake — на error close socket (no monitor spawned). На success
* monitor thread становится owner socket'a + cleanup'ит при disconnect. */
int r = handshake_subscriber(pub, client);
if (r != CUFRAMES_OK) {
close(client);
}
/* TODO v0.2: track client fds для broadcast SHUTDOWN. Сейчас clients
* сами detect socket EOF при publisher_destroy через shutdown(). */
}
return NULL;
}
@@ -727,7 +794,23 @@ static int handshake_subscriber(struct cuframes_publisher *pub, int client_fd) {
CUFRAMES_LOG_INFO("subscriber '%s' connected (bit=%u)", name, bit);
/* TODO v0.2: spawn per-client thread для liveness/PING/UNSUBSCRIBE.
* Сейчас socket остаётся открытым на heap'е до publisher_destroy. */
/* Spawn detached monitor thread — owns client_fd, frees bit on socket
* close (subscriber container exit / crash). Без этого bitmap утекал
* каждый pipeline recreate. */
struct sub_monitor_args *m = malloc(sizeof(*m));
if (!m) {
/* OOM — fallback: leak fd, bit будет released только publisher_destroy */
return CUFRAMES_OK;
}
m->pub = pub;
m->fd = client_fd;
m->bit = bit;
pthread_t monitor_tid;
if (pthread_create(&monitor_tid, NULL, subscriber_monitor_thread, m) != 0) {
CUFRAMES_LOG_WARN("monitor pthread_create fail — bit %u may leak", bit);
free(m);
} else {
pthread_detach(monitor_tid);
}
return CUFRAMES_OK;
}