Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| d646f5a4e4 | |||
| becfbebc78 |
@@ -322,15 +322,22 @@ int cuframes_subscriber_next(cuframes_subscriber_t *sub,
|
|||||||
if (cerr != cudaSuccess) return CUFRAMES_ERR_CUDA;
|
if (cerr != cudaSuccess) return CUFRAMES_ERR_CUDA;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* TOCTOU защита (v0.2 fallback only): legacy single event signals
|
/* TOCTOU защита — unconditional (v0.2 и v0.3 обa). v0.3 per-slot
|
||||||
* для последнего published frame. v0.3 per-slot events не нужны
|
* events НЕ guaranty ordering: cudaEventRecord overwrites previous
|
||||||
* этой проверки — event[slot] = strict slot ordering guarantee. */
|
* state каждый publish. Если producer wrapped ring пока consumer
|
||||||
if (!sub->has_slot_events) {
|
* ждал event sync, slot[slot_idx] уже содержит seq > target.
|
||||||
uint64_t verify_seq = atomic_load_explicit(&sub->hdr->slots[slot_idx].seq,
|
* Event signal от nового publish satisfies stale wait — consumer
|
||||||
memory_order_acquire);
|
* читает new content thinking it's old (lazy consumption).
|
||||||
if (verify_seq != target_seq) {
|
*
|
||||||
continue;
|
* Симптом в long-running pipeline: 50k+ ring wraps накапливают drift,
|
||||||
}
|
* output stream duplicates 60-70% frames despite stable encoder fps.
|
||||||
|
*
|
||||||
|
* Proper v0.4 fix: per-slot+per-publish event handle (event pool).
|
||||||
|
* Сейчас — post-sync verify catches main race window. */
|
||||||
|
uint64_t verify_seq = atomic_load_explicit(&sub->hdr->slots[slot_idx].seq,
|
||||||
|
memory_order_acquire);
|
||||||
|
if (verify_seq != target_seq) {
|
||||||
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Fill frame_out */
|
/* Fill frame_out */
|
||||||
|
|||||||
@@ -61,6 +61,8 @@ struct Args {
|
|||||||
bool realtime = false; // emulate -re у ffmpeg CLI: sleep по pts
|
bool realtime = false; // emulate -re у ffmpeg CLI: sleep по pts
|
||||||
bool loop = false; // loop input на eof (для file://)
|
bool loop = false; // loop input на eof (для file://)
|
||||||
bool enable_packet_ring = false; // v0.2 — публиковать encoded packets
|
bool enable_packet_ring = false; // v0.2 — публиковать encoded packets
|
||||||
|
std::string policy = "drop"; // "drop" = DROP_OLDEST, "wait" = STRICT_WAIT
|
||||||
|
int ack_timeout_ms = 200; // only used при policy=wait; <=0 = infinite (unsafe)
|
||||||
};
|
};
|
||||||
|
|
||||||
static void print_usage() {
|
static void print_usage() {
|
||||||
@@ -78,6 +80,14 @@ static void print_usage() {
|
|||||||
" --loop loop input на EOF (только для file://)\n"
|
" --loop loop input на EOF (только для file://)\n"
|
||||||
" --enable-packet-ring v0.2: дополнительно публиковать encoded packets\n"
|
" --enable-packet-ring v0.2: дополнительно публиковать encoded packets\n"
|
||||||
" (для consumer'ов с -c:v copy, Frigate record path)\n"
|
" (для consumer'ов с -c:v copy, Frigate record path)\n"
|
||||||
|
" --policy MODE drop (default) = DROP_OLDEST — producer wrap'ает ring\n"
|
||||||
|
" без ожидания consumer ack. Подходит для multi-consumer.\n"
|
||||||
|
" wait = STRICT_WAIT — producer ждёт ack от всех subscribers\n"
|
||||||
|
" перед overwrite. Безопаснее для frame integrity, но slow\n"
|
||||||
|
" consumer задерживает all (default ack-timeout 200ms).\n"
|
||||||
|
" --ack-timeout-ms N только при --policy wait. Max wait для ack (default 200).\n"
|
||||||
|
" <=0 = infinite — НЕ РЕКОМЕНДУЕТСЯ (dead consumer вешает\n"
|
||||||
|
" producer навсегда).\n"
|
||||||
" --verbose debug logs\n"
|
" --verbose debug logs\n"
|
||||||
" -h, --help this help\n";
|
" -h, --help this help\n";
|
||||||
}
|
}
|
||||||
@@ -96,11 +106,23 @@ static int parse_args(int argc, char **argv, Args &a) {
|
|||||||
else if (s == "--realtime") a.realtime = true;
|
else if (s == "--realtime") a.realtime = true;
|
||||||
else if (s == "--loop") a.loop = true;
|
else if (s == "--loop") a.loop = true;
|
||||||
else if (s == "--enable-packet-ring") a.enable_packet_ring = true;
|
else if (s == "--enable-packet-ring") a.enable_packet_ring = true;
|
||||||
|
else if (s == "--policy") a.policy = next();
|
||||||
|
else if (s == "--ack-timeout-ms") a.ack_timeout_ms = std::stoi(next());
|
||||||
else if (s == "--verbose") a.verbose = true;
|
else if (s == "--verbose") a.verbose = true;
|
||||||
else if (s == "-h" || s == "--help") { print_usage(); std::exit(0); }
|
else if (s == "-h" || s == "--help") { print_usage(); std::exit(0); }
|
||||||
else { std::cerr << "Unknown arg: " << s << "\n"; print_usage(); std::exit(1); }
|
else { std::cerr << "Unknown arg: " << s << "\n"; print_usage(); std::exit(1); }
|
||||||
}
|
}
|
||||||
if (a.rtsp_url.empty() || a.key.empty()) { print_usage(); return 1; }
|
if (a.rtsp_url.empty() || a.key.empty()) { print_usage(); return 1; }
|
||||||
|
if (a.policy != "drop" && a.policy != "wait") {
|
||||||
|
std::cerr << "Invalid --policy '" << a.policy << "' (use drop|wait)\n";
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
if (a.policy == "wait" && a.ack_timeout_ms <= 0) {
|
||||||
|
std::cerr << "WARNING: --policy wait + --ack-timeout-ms<=0 = infinite wait.\n"
|
||||||
|
<< " Dead consumer повесит producer навсегда. Forcing к 200ms.\n"
|
||||||
|
<< " Set явно --ack-timeout-ms 200 (или больше) чтобы убрать warning.\n";
|
||||||
|
a.ack_timeout_ms = 200;
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -230,7 +252,10 @@ int main(int argc, char **argv) {
|
|||||||
po.width = width;
|
po.width = width;
|
||||||
po.height = height;
|
po.height = height;
|
||||||
po.format = CUFRAMES_FORMAT_NV12;
|
po.format = CUFRAMES_FORMAT_NV12;
|
||||||
po.policy = CUFRAMES_POLICY_DROP_OLDEST;
|
po.policy = (a.policy == "wait")
|
||||||
|
? CUFRAMES_POLICY_STRICT_WAIT
|
||||||
|
: CUFRAMES_POLICY_DROP_OLDEST;
|
||||||
|
po.consumer_ack_timeout_ms = a.ack_timeout_ms;
|
||||||
po.cuda_device = a.cuda_device;
|
po.cuda_device = a.cuda_device;
|
||||||
po.ring_size = a.ring_size; /* для logging */
|
po.ring_size = a.ring_size; /* для logging */
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user