v0.3.1: per-subscriber monitor thread — fix bitmap leak
release / build runtime Docker image (push) Failing after 0s
release / build source tarball (push) Successful in 4s
build / cmake build (CUDA 12.4, Ubuntu 22.04) (push) Successful in 1m39s
build / ffmpeg filter patch (out-of-tree) (push) Successful in 1m32s
test-u4-runner / u4 runner smoke test (push) Has been cancelled
release / build runtime Docker image (push) Failing after 0s
release / build source tarball (push) Successful in 4s
build / cmake build (CUDA 12.4, Ubuntu 22.04) (push) Successful in 1m39s
build / ffmpeg filter patch (out-of-tree) (push) Successful in 1m32s
test-u4-runner / u4 runner smoke test (push) Has been cancelled
Bug: handshake_subscriber assigned bit + activated slot но НЕ tracked
client_fd. Когда subscriber container exited, socket closed on client side
но producer не detected → bit оставался set forever → после 32 connections
subscribe_create('cam-X'): too many subscribers (max 32).
Симптом в production: каждый pipeline recreate accumulated 1 stale subscriber.
После 4-5 recreate операций publishers перестали accept new pipeline →
"too many subscribers" crash loop.
Fix: после успешного handshake spawn detached pthread monitoring socket
via blocking recv(). recv() returns 0 (EOF) когда other side closes —
monitor clears bit (subscriber_bitmap &= ~(1<<bit)) + state[bit] = 0,
closes fd, exits.
Cost: 1 thread per active subscriber. Max 32 threads — небольшой
overhead. Threads detached, no join needed.
Stress test: 5x pipeline recreate без single "too many subscribers" error.
Раньше: 2-3 recreate → bitmap overflow.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -628,6 +628,38 @@ int cuframes_publisher_publish_packet(cuframes_publisher_t *pub,
|
|||||||
|
|
||||||
/* ─── Accept thread + handshake ──────────────────────────────────────── */
|
/* ─── Accept thread + handshake ──────────────────────────────────────── */
|
||||||
|
|
||||||
|
/* Per-subscriber lifecycle monitor — detects socket close (subscriber container
|
||||||
|
* exited / crashed) и освобождает bit + subscribers[] slot. Без этого каждый
|
||||||
|
* pipeline recreate leaks bit → bitmap overflows after 32 connections. */
|
||||||
|
struct sub_monitor_args {
|
||||||
|
struct cuframes_publisher *pub;
|
||||||
|
int fd;
|
||||||
|
uint32_t bit;
|
||||||
|
};
|
||||||
|
|
||||||
|
static void *subscriber_monitor_thread(void *arg) {
|
||||||
|
struct sub_monitor_args *m = (struct sub_monitor_args *)arg;
|
||||||
|
char buf[64];
|
||||||
|
/* Blocking read — return 0 (EOF) когда other side close socket, или
|
||||||
|
* <0 on error. Любой control message (PING — TODO в будущем) just consumed. */
|
||||||
|
while (1) {
|
||||||
|
ssize_t n = recv(m->fd, buf, sizeof(buf), 0);
|
||||||
|
if (n <= 0) {
|
||||||
|
/* Subscriber dead — clear bit + slot state. */
|
||||||
|
atomic_fetch_and_explicit(&m->pub->hdr->subscriber_bitmap,
|
||||||
|
~(1ULL << m->bit), memory_order_release);
|
||||||
|
atomic_store_explicit(&m->pub->hdr->subscribers[m->bit].state, 0,
|
||||||
|
memory_order_release);
|
||||||
|
close(m->fd);
|
||||||
|
CUFRAMES_LOG_INFO("subscriber bit=%u disconnected — freed",
|
||||||
|
m->bit);
|
||||||
|
free(m);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
/* future: parse control msgs (PING, UNSUBSCRIBE) here */
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void *accept_thread_main(void *arg) {
|
static void *accept_thread_main(void *arg) {
|
||||||
struct cuframes_publisher *pub = (struct cuframes_publisher *)arg;
|
struct cuframes_publisher *pub = (struct cuframes_publisher *)arg;
|
||||||
while (!pub->stop_flag) {
|
while (!pub->stop_flag) {
|
||||||
@@ -640,14 +672,12 @@ static void *accept_thread_main(void *arg) {
|
|||||||
CUFRAMES_LOG_WARN("accept: %s", strerror(errno));
|
CUFRAMES_LOG_WARN("accept: %s", strerror(errno));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
/* Synchronous handshake — после ответа socket остаётся открытым для
|
/* Handshake — на error close socket (no monitor spawned). На success
|
||||||
* lifetime signals (SHUTDOWN, PING). Close на error. */
|
* monitor thread становится owner socket'a + cleanup'ит при disconnect. */
|
||||||
int r = handshake_subscriber(pub, client);
|
int r = handshake_subscriber(pub, client);
|
||||||
if (r != CUFRAMES_OK) {
|
if (r != CUFRAMES_OK) {
|
||||||
close(client);
|
close(client);
|
||||||
}
|
}
|
||||||
/* TODO v0.2: track client fds для broadcast SHUTDOWN. Сейчас clients
|
|
||||||
* сами detect socket EOF при publisher_destroy через shutdown(). */
|
|
||||||
}
|
}
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
@@ -764,7 +794,23 @@ static int handshake_subscriber(struct cuframes_publisher *pub, int client_fd) {
|
|||||||
|
|
||||||
CUFRAMES_LOG_INFO("subscriber '%s' connected (bit=%u)", name, bit);
|
CUFRAMES_LOG_INFO("subscriber '%s' connected (bit=%u)", name, bit);
|
||||||
|
|
||||||
/* TODO v0.2: spawn per-client thread для liveness/PING/UNSUBSCRIBE.
|
/* Spawn detached monitor thread — owns client_fd, frees bit on socket
|
||||||
* Сейчас socket остаётся открытым на heap'е до publisher_destroy. */
|
* close (subscriber container exit / crash). Без этого bitmap утекал
|
||||||
|
* каждый pipeline recreate. */
|
||||||
|
struct sub_monitor_args *m = malloc(sizeof(*m));
|
||||||
|
if (!m) {
|
||||||
|
/* OOM — fallback: leak fd, bit будет released только publisher_destroy */
|
||||||
|
return CUFRAMES_OK;
|
||||||
|
}
|
||||||
|
m->pub = pub;
|
||||||
|
m->fd = client_fd;
|
||||||
|
m->bit = bit;
|
||||||
|
pthread_t monitor_tid;
|
||||||
|
if (pthread_create(&monitor_tid, NULL, subscriber_monitor_thread, m) != 0) {
|
||||||
|
CUFRAMES_LOG_WARN("monitor pthread_create fail — bit %u may leak", bit);
|
||||||
|
free(m);
|
||||||
|
} else {
|
||||||
|
pthread_detach(monitor_tid);
|
||||||
|
}
|
||||||
return CUFRAMES_OK;
|
return CUFRAMES_OK;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user