Files
gx a21812d3f6 tools+examples+test: end-to-end pipeline ready (Steps 9-10)
cuframes-rtsp-source — standalone bridge между RTSP/file и cuframes IPC.
Декодирует на CUDA (nvdec), копирует D2D в pre-allocated pool (EXTERNAL
ownership), публикует через cuframes. --realtime для pacing файлового
ввода, --loop для зацикливания. Альтернатива FFmpeg-фильтра до v0.2
(filter требует patch FFmpeg, конфликтует с Frigate's bundled build).

examples/sub_count — reference subscriber на raw C API: counts frames,
trackit gaps, выходит clean при disconnect/timeout/SIGINT.

test_stress (4 subscribers × 2000 frames @ 120fps) — PASS на RTX 5090.
0 torn frames у всех consumers (включая 2 slow с 5ms sleep).

Smoke-проверено: testsrc 25fps → cuframes-rtsp-source → cuframes IPC
→ sub_count (отдельный процесс) → 200/200 frames, 0 gaps, avg_fps=25.2.
2026-05-14 23:39:01 +01:00

133 lines
4.3 KiB
C++
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/* sub_count — minimal cuframes subscriber: подключается, считает frames,
* выходит по SIGINT, по достижении max-frames или при disconnect publisher.
*
* Use case: smoke-test для cuframes-rtsp-source / любого publisher'а.
* Reference-имплементация consumer'а на raw C API.
*
* Build: через CMake (см. examples/CMakeLists.txt)
* Run: sub_count --key cam1 [--max-frames N] [--timeout-ms N] [--verbose]
*/
#include <cuframes/cuframes.h>
#include <cuda_runtime.h>
#include <atomic>
#include <chrono>
#include <csignal>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <string>
static std::atomic<int> g_stop{0};
static void on_signal(int) { g_stop.store(1); }
int main(int argc, char **argv) {
std::string key;
int timeout_ms = 2000;
int max_frames = -1;
bool verbose = false;
for (int i = 1; i < argc; ++i) {
std::string s = argv[i];
if (s == "--key" && i + 1 < argc) key = argv[++i];
else if (s == "--timeout-ms" && i + 1 < argc) timeout_ms = std::atoi(argv[++i]);
else if (s == "--max-frames" && i + 1 < argc) max_frames = std::atoi(argv[++i]);
else if (s == "--verbose") verbose = true;
else if (s == "-h" || s == "--help") {
std::cerr << "Usage: sub_count --key NAME [--timeout-ms N] "
"[--max-frames N] [--verbose]\n";
return 0;
} else {
std::cerr << "Unknown arg: " << s << "\n";
return 1;
}
}
if (key.empty()) {
std::cerr << "--key required\n";
return 1;
}
signal(SIGINT, on_signal);
signal(SIGTERM, on_signal);
cuframes_subscriber_config_t cfg{};
cfg.key = key.c_str();
cfg.consumer_name = "sub_count";
cfg.mode = CUFRAMES_MODE_NEWEST_ONLY;
cfg.connect_timeout_ms = 5000;
cuframes_subscriber_t *sub = nullptr;
int r = cuframes_subscriber_create(&cfg, &sub);
if (r != CUFRAMES_OK) {
std::cerr << "[sub_count] connect to '" << key << "': "
<< cuframes_strerror(r) << "\n";
return 2;
}
std::cerr << "[sub_count] connected to 'cuframes-" << key << "'\n";
cudaStream_t stream;
cudaStreamCreate(&stream);
uint64_t received = 0;
uint64_t last_seq = 0;
uint64_t gaps = 0;
int consecutive_timeouts = 0;
auto t0 = std::chrono::steady_clock::now();
while (!g_stop.load()) {
cuframes_frame_t *f = nullptr;
r = cuframes_subscriber_next(sub, stream, &f, timeout_ms);
if (r == CUFRAMES_ERR_DISCONNECTED) {
std::cerr << "[sub_count] publisher disconnected\n";
break;
}
if (r == CUFRAMES_ERR_TIMEOUT) {
consecutive_timeouts++;
std::cerr << "[sub_count] timeout (" << consecutive_timeouts << ")\n";
if (consecutive_timeouts >= 3) {
std::cerr << "[sub_count] " << consecutive_timeouts
<< " consecutive timeouts, giving up\n";
break;
}
continue;
}
if (r != CUFRAMES_OK) {
std::cerr << "[sub_count] next: " << cuframes_strerror(r) << "\n";
break;
}
consecutive_timeouts = 0;
cudaStreamSynchronize(stream); // ensure event-wait завершён
uint64_t seq = cuframes_frame_seq(f);
if (received > 0 && seq > last_seq + 1) gaps += (seq - last_seq - 1);
last_seq = seq;
received++;
if (verbose && (received % 50 == 0)) {
int32_t w = 0, h = 0;
cuframes_frame_size(f, &w, &h);
std::cerr << "[sub_count] seq=" << seq
<< " w=" << w << " h=" << h
<< " pts_ns=" << cuframes_frame_pts_ns(f) << "\n";
}
cuframes_subscriber_release(sub, f);
if (max_frames > 0 && (int64_t)received >= max_frames) break;
}
auto t1 = std::chrono::steady_clock::now();
double sec = std::chrono::duration<double>(t1 - t0).count();
std::cerr << "[sub_count] received=" << received
<< " gaps=" << gaps
<< " elapsed=" << sec << "s"
<< " avg_fps=" << (sec > 0 ? received / sec : 0) << "\n";
cudaStreamDestroy(stream);
cuframes_subscriber_destroy(sub);
return received > 0 ? 0 : 2;
}