From 656e36e9b0de48119bfdf17e4d38fd4e5ed582e9 Mon Sep 17 00:00:00 2001 From: Evgeny Demchenko Date: Sun, 24 May 2026 08:00:41 +0100 Subject: [PATCH] =?UTF-8?q?v0.3.1:=20per-subscriber=20monitor=20thread=20?= =?UTF-8?q?=E2=80=94=20fix=20bitmap=20leak?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Bug: handshake_subscriber assigned bit + activated slot но НЕ tracked client_fd. Когда subscriber container exited, socket closed on client side но producer не detected → bit оставался set forever → после 32 connections subscribe_create('cam-X'): too many subscribers (max 32). Симптом в production: каждый pipeline recreate accumulated 1 stale subscriber. После 4-5 recreate операций publishers перестали accept new pipeline → "too many subscribers" crash loop. Fix: после успешного handshake spawn detached pthread monitoring socket via blocking recv(). recv() returns 0 (EOF) когда other side closes — monitor clears bit (subscriber_bitmap &= ~(1< --- libcuframes/src/producer.c | 58 ++++++++++++++++++++++++++++++++++---- 1 file changed, 52 insertions(+), 6 deletions(-) diff --git a/libcuframes/src/producer.c b/libcuframes/src/producer.c index a5fde59..be5e4eb 100644 --- a/libcuframes/src/producer.c +++ b/libcuframes/src/producer.c @@ -628,6 +628,38 @@ int cuframes_publisher_publish_packet(cuframes_publisher_t *pub, /* ─── Accept thread + handshake ──────────────────────────────────────── */ +/* Per-subscriber lifecycle monitor — detects socket close (subscriber container + * exited / crashed) и освобождает bit + subscribers[] slot. Без этого каждый + * pipeline recreate leaks bit → bitmap overflows after 32 connections. */ +struct sub_monitor_args { + struct cuframes_publisher *pub; + int fd; + uint32_t bit; +}; + +static void *subscriber_monitor_thread(void *arg) { + struct sub_monitor_args *m = (struct sub_monitor_args *)arg; + char buf[64]; + /* Blocking read — return 0 (EOF) когда other side close socket, или + * <0 on error. Любой control message (PING — TODO в будущем) just consumed. */ + while (1) { + ssize_t n = recv(m->fd, buf, sizeof(buf), 0); + if (n <= 0) { + /* Subscriber dead — clear bit + slot state. */ + atomic_fetch_and_explicit(&m->pub->hdr->subscriber_bitmap, + ~(1ULL << m->bit), memory_order_release); + atomic_store_explicit(&m->pub->hdr->subscribers[m->bit].state, 0, + memory_order_release); + close(m->fd); + CUFRAMES_LOG_INFO("subscriber bit=%u disconnected — freed", + m->bit); + free(m); + return NULL; + } + /* future: parse control msgs (PING, UNSUBSCRIBE) here */ + } +} + static void *accept_thread_main(void *arg) { struct cuframes_publisher *pub = (struct cuframes_publisher *)arg; while (!pub->stop_flag) { @@ -640,14 +672,12 @@ static void *accept_thread_main(void *arg) { CUFRAMES_LOG_WARN("accept: %s", strerror(errno)); continue; } - /* Synchronous handshake — после ответа socket остаётся открытым для - * lifetime signals (SHUTDOWN, PING). Close на error. */ + /* Handshake — на error close socket (no monitor spawned). На success + * monitor thread становится owner socket'a + cleanup'ит при disconnect. */ int r = handshake_subscriber(pub, client); if (r != CUFRAMES_OK) { close(client); } - /* TODO v0.2: track client fds для broadcast SHUTDOWN. Сейчас clients - * сами detect socket EOF при publisher_destroy через shutdown(). */ } return NULL; } @@ -764,7 +794,23 @@ static int handshake_subscriber(struct cuframes_publisher *pub, int client_fd) { CUFRAMES_LOG_INFO("subscriber '%s' connected (bit=%u)", name, bit); - /* TODO v0.2: spawn per-client thread для liveness/PING/UNSUBSCRIBE. - * Сейчас socket остаётся открытым на heap'е до publisher_destroy. */ + /* Spawn detached monitor thread — owns client_fd, frees bit on socket + * close (subscriber container exit / crash). Без этого bitmap утекал + * каждый pipeline recreate. */ + struct sub_monitor_args *m = malloc(sizeof(*m)); + if (!m) { + /* OOM — fallback: leak fd, bit будет released только publisher_destroy */ + return CUFRAMES_OK; + } + m->pub = pub; + m->fd = client_fd; + m->bit = bit; + pthread_t monitor_tid; + if (pthread_create(&monitor_tid, NULL, subscriber_monitor_thread, m) != 0) { + CUFRAMES_LOG_WARN("monitor pthread_create fail — bit %u may leak", bit); + free(m); + } else { + pthread_detach(monitor_tid); + } return CUFRAMES_OK; }