3 Commits

Author SHA1 Message Date
gx d646f5a4e4 v0.3.3: consumer post-sync verify даже для v0.3 per-slot events
release / build runtime Docker image (push) Failing after 0s
release / build source tarball (push) Successful in 4s
build / cmake build (CUDA 12.4, Ubuntu 22.04) (push) Successful in 1m41s
build / ffmpeg filter patch (out-of-tree) (push) Successful in 1m29s
test-u4-runner / u4 runner smoke test (push) Has been cancelled
Bug: cudaEventRecord(event[slot]) overwrites previous state каждый publish.
Когда producer wraps ring (~640ms при ring=16), event[slot] re-recorded для
new content. Consumer's pending cudaStreamWaitEvent satisfied новым signal —
consumer reads slot[slot_idx] thinking it's target_seq, реально получает
seq+ring_size content (stale-by-1-wrap drift).

После 50k+ wraps в long-running pipeline (9h uptime) drift накапливается:
output stream имеет 60-70% duplicate frames (vs 10% сразу после restart).

Симптом: TV picture freezes на 1-2 sec периодически. Encoder fps=25 stable
(content duplicates same PTS-advance), но motion choppy на 8-9 fps real.

Fix: unconditional post-sync verify (atomic re-read slot.seq после event wait).
Если producer wrap occurred — slot.seq != target_seq → continue к новому
target_seq. Cheap (one atomic load), correctness > perf.

Verified: после deploy с fresh pipeline, 18-sec sample = 4% duplicates
(vs 8.4% при том же setup но без fix).

Proper v0.4 fix: per-slot+per-publish event pool с unique handle per cycle.
Текущий v0.3.3 — sufficient mitigation для current production scale.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-24 20:27:00 +01:00
gx becfbebc78 cuframes-rtsp-source: + --policy + --ack-timeout-ms CLI flags
release / build runtime Docker image (push) Failing after 0s
release / build source tarball (push) Successful in 2s
build / cmake build (CUDA 12.4, Ubuntu 22.04) (push) Successful in 1m39s
build / ffmpeg filter patch (out-of-tree) (push) Successful in 1m25s
test-u4-runner / u4 runner smoke test (push) Has been cancelled
Opt-in для STRICT_WAIT policy (default остаётся DROP_OLDEST).

Use case STRICT_WAIT:
  Frame integrity критичен (e.g. recording, frame-accurate analytics).
  Producer ждёт ack от всех subscribers перед wrap ring → no torn frames.
  Trade-off: slow consumer задерживает all (default 200ms timeout затем
  subscriber dropped from bitmap).

Use case DROP_OLDEST (default):
  Low-latency real-time display (TV grid). Producer wraps freely; v0.3
  per-slot CUDA events закрывают race без waiting.

Validation: policy=wait + ack-timeout-ms<=0 = infinite hold dead consumer —
warning + force к 200ms safe default.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-24 08:47:14 +01:00
gx 656e36e9b0 v0.3.1: per-subscriber monitor thread — fix bitmap leak
release / build runtime Docker image (push) Failing after 0s
release / build source tarball (push) Successful in 4s
build / cmake build (CUDA 12.4, Ubuntu 22.04) (push) Successful in 1m39s
build / ffmpeg filter patch (out-of-tree) (push) Successful in 1m32s
test-u4-runner / u4 runner smoke test (push) Has been cancelled
Bug: handshake_subscriber assigned bit + activated slot но НЕ tracked
client_fd. Когда subscriber container exited, socket closed on client side
но producer не detected → bit оставался set forever → после 32 connections
subscribe_create('cam-X'): too many subscribers (max 32).

Симптом в production: каждый pipeline recreate accumulated 1 stale subscriber.
После 4-5 recreate операций publishers перестали accept new pipeline →
"too many subscribers" crash loop.

Fix: после успешного handshake spawn detached pthread monitoring socket
via blocking recv(). recv() returns 0 (EOF) когда other side closes —
monitor clears bit (subscriber_bitmap &= ~(1<<bit)) + state[bit] = 0,
closes fd, exits.

Cost: 1 thread per active subscriber. Max 32 threads — небольшой
overhead. Threads detached, no join needed.

Stress test: 5x pipeline recreate без single "too many subscribers" error.
Раньше: 2-3 recreate → bitmap overflow.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-24 08:00:41 +01:00
3 changed files with 94 additions and 16 deletions
+16 -9
View File
@@ -322,15 +322,22 @@ int cuframes_subscriber_next(cuframes_subscriber_t *sub,
if (cerr != cudaSuccess) return CUFRAMES_ERR_CUDA;
}
/* TOCTOU защита (v0.2 fallback only): legacy single event signals
* для последнего published frame. v0.3 per-slot events не нужны
* этой проверки — event[slot] = strict slot ordering guarantee. */
if (!sub->has_slot_events) {
uint64_t verify_seq = atomic_load_explicit(&sub->hdr->slots[slot_idx].seq,
memory_order_acquire);
if (verify_seq != target_seq) {
continue;
}
/* TOCTOU защита — unconditional (v0.2 и v0.3 обa). v0.3 per-slot
* events НЕ guaranty ordering: cudaEventRecord overwrites previous
* state каждый publish. Если producer wrapped ring пока consumer
* ждал event sync, slot[slot_idx] уже содержит seq > target.
* Event signal от nового publish satisfies stale wait — consumer
* читает new content thinking it's old (lazy consumption).
*
* Симптом в long-running pipeline: 50k+ ring wraps накапливают drift,
* output stream duplicates 60-70% frames despite stable encoder fps.
*
* Proper v0.4 fix: per-slot+per-publish event handle (event pool).
* Сейчас — post-sync verify catches main race window. */
uint64_t verify_seq = atomic_load_explicit(&sub->hdr->slots[slot_idx].seq,
memory_order_acquire);
if (verify_seq != target_seq) {
continue;
}
/* Fill frame_out */
+52 -6
View File
@@ -628,6 +628,38 @@ int cuframes_publisher_publish_packet(cuframes_publisher_t *pub,
/* ─── Accept thread + handshake ──────────────────────────────────────── */
/* Per-subscriber lifecycle monitor — detects socket close (subscriber container
* exited / crashed) и освобождает bit + subscribers[] slot. Без этого каждый
* pipeline recreate leaks bit → bitmap overflows after 32 connections. */
struct sub_monitor_args {
struct cuframes_publisher *pub;
int fd;
uint32_t bit;
};
static void *subscriber_monitor_thread(void *arg) {
struct sub_monitor_args *m = (struct sub_monitor_args *)arg;
char buf[64];
/* Blocking read — return 0 (EOF) когда other side close socket, или
* <0 on error. Любой control message (PING — TODO в будущем) just consumed. */
while (1) {
ssize_t n = recv(m->fd, buf, sizeof(buf), 0);
if (n <= 0) {
/* Subscriber dead — clear bit + slot state. */
atomic_fetch_and_explicit(&m->pub->hdr->subscriber_bitmap,
~(1ULL << m->bit), memory_order_release);
atomic_store_explicit(&m->pub->hdr->subscribers[m->bit].state, 0,
memory_order_release);
close(m->fd);
CUFRAMES_LOG_INFO("subscriber bit=%u disconnected — freed",
m->bit);
free(m);
return NULL;
}
/* future: parse control msgs (PING, UNSUBSCRIBE) here */
}
}
static void *accept_thread_main(void *arg) {
struct cuframes_publisher *pub = (struct cuframes_publisher *)arg;
while (!pub->stop_flag) {
@@ -640,14 +672,12 @@ static void *accept_thread_main(void *arg) {
CUFRAMES_LOG_WARN("accept: %s", strerror(errno));
continue;
}
/* Synchronous handshake — после ответа socket остаётся открытым для
* lifetime signals (SHUTDOWN, PING). Close на error. */
/* Handshake — на error close socket (no monitor spawned). На success
* monitor thread становится owner socket'a + cleanup'ит при disconnect. */
int r = handshake_subscriber(pub, client);
if (r != CUFRAMES_OK) {
close(client);
}
/* TODO v0.2: track client fds для broadcast SHUTDOWN. Сейчас clients
* сами detect socket EOF при publisher_destroy через shutdown(). */
}
return NULL;
}
@@ -764,7 +794,23 @@ static int handshake_subscriber(struct cuframes_publisher *pub, int client_fd) {
CUFRAMES_LOG_INFO("subscriber '%s' connected (bit=%u)", name, bit);
/* TODO v0.2: spawn per-client thread для liveness/PING/UNSUBSCRIBE.
* Сейчас socket остаётся открытым на heap'е до publisher_destroy. */
/* Spawn detached monitor thread — owns client_fd, frees bit on socket
* close (subscriber container exit / crash). Без этого bitmap утекал
* каждый pipeline recreate. */
struct sub_monitor_args *m = malloc(sizeof(*m));
if (!m) {
/* OOM — fallback: leak fd, bit будет released только publisher_destroy */
return CUFRAMES_OK;
}
m->pub = pub;
m->fd = client_fd;
m->bit = bit;
pthread_t monitor_tid;
if (pthread_create(&monitor_tid, NULL, subscriber_monitor_thread, m) != 0) {
CUFRAMES_LOG_WARN("monitor pthread_create fail — bit %u may leak", bit);
free(m);
} else {
pthread_detach(monitor_tid);
}
return CUFRAMES_OK;
}
+26 -1
View File
@@ -61,6 +61,8 @@ struct Args {
bool realtime = false; // emulate -re у ffmpeg CLI: sleep по pts
bool loop = false; // loop input на eof (для file://)
bool enable_packet_ring = false; // v0.2 — публиковать encoded packets
std::string policy = "drop"; // "drop" = DROP_OLDEST, "wait" = STRICT_WAIT
int ack_timeout_ms = 200; // only used при policy=wait; <=0 = infinite (unsafe)
};
static void print_usage() {
@@ -78,6 +80,14 @@ static void print_usage() {
" --loop loop input на EOF (только для file://)\n"
" --enable-packet-ring v0.2: дополнительно публиковать encoded packets\n"
" (для consumer'ов с -c:v copy, Frigate record path)\n"
" --policy MODE drop (default) = DROP_OLDEST — producer wrap'ает ring\n"
" без ожидания consumer ack. Подходит для multi-consumer.\n"
" wait = STRICT_WAIT — producer ждёт ack от всех subscribers\n"
" перед overwrite. Безопаснее для frame integrity, но slow\n"
" consumer задерживает all (default ack-timeout 200ms).\n"
" --ack-timeout-ms N только при --policy wait. Max wait для ack (default 200).\n"
" <=0 = infinite — НЕ РЕКОМЕНДУЕТСЯ (dead consumer вешает\n"
" producer навсегда).\n"
" --verbose debug logs\n"
" -h, --help this help\n";
}
@@ -96,11 +106,23 @@ static int parse_args(int argc, char **argv, Args &a) {
else if (s == "--realtime") a.realtime = true;
else if (s == "--loop") a.loop = true;
else if (s == "--enable-packet-ring") a.enable_packet_ring = true;
else if (s == "--policy") a.policy = next();
else if (s == "--ack-timeout-ms") a.ack_timeout_ms = std::stoi(next());
else if (s == "--verbose") a.verbose = true;
else if (s == "-h" || s == "--help") { print_usage(); std::exit(0); }
else { std::cerr << "Unknown arg: " << s << "\n"; print_usage(); std::exit(1); }
}
if (a.rtsp_url.empty() || a.key.empty()) { print_usage(); return 1; }
if (a.policy != "drop" && a.policy != "wait") {
std::cerr << "Invalid --policy '" << a.policy << "' (use drop|wait)\n";
return 1;
}
if (a.policy == "wait" && a.ack_timeout_ms <= 0) {
std::cerr << "WARNING: --policy wait + --ack-timeout-ms<=0 = infinite wait.\n"
<< " Dead consumer повесит producer навсегда. Forcing к 200ms.\n"
<< " Set явно --ack-timeout-ms 200 (или больше) чтобы убрать warning.\n";
a.ack_timeout_ms = 200;
}
return 0;
}
@@ -230,7 +252,10 @@ int main(int argc, char **argv) {
po.width = width;
po.height = height;
po.format = CUFRAMES_FORMAT_NV12;
po.policy = CUFRAMES_POLICY_DROP_OLDEST;
po.policy = (a.policy == "wait")
? CUFRAMES_POLICY_STRICT_WAIT
: CUFRAMES_POLICY_DROP_OLDEST;
po.consumer_ack_timeout_ms = a.ack_timeout_ms;
po.cuda_device = a.cuda_device;
po.ring_size = a.ring_size; /* для logging */