tools+examples+test: end-to-end pipeline ready (Steps 9-10)

cuframes-rtsp-source — standalone bridge между RTSP/file и cuframes IPC.
Декодирует на CUDA (nvdec), копирует D2D в pre-allocated pool (EXTERNAL
ownership), публикует через cuframes. --realtime для pacing файлового
ввода, --loop для зацикливания. Альтернатива FFmpeg-фильтра до v0.2
(filter требует patch FFmpeg, конфликтует с Frigate's bundled build).

examples/sub_count — reference subscriber на raw C API: counts frames,
trackit gaps, выходит clean при disconnect/timeout/SIGINT.

test_stress (4 subscribers × 2000 frames @ 120fps) — PASS на RTX 5090.
0 torn frames у всех consumers (включая 2 slow с 5ms sleep).

Smoke-проверено: testsrc 25fps → cuframes-rtsp-source → cuframes IPC
→ sub_count (отдельный процесс) → 200/200 frames, 0 gaps, avg_fps=25.2.
This commit is contained in:
2026-05-14 23:39:01 +01:00
parent 2530057507
commit a21812d3f6
9 changed files with 727 additions and 2 deletions
+7
View File
@@ -15,3 +15,10 @@ target_include_directories(test_multi PRIVATE
${CMAKE_SOURCE_DIR}/include)
add_test(NAME multi_consumer COMMAND test_multi)
set_tests_properties(multi_consumer PROPERTIES TIMEOUT 60)
add_executable(test_stress test_stress.cu)
target_link_libraries(test_stress PRIVATE cuframes CUDA::cudart)
target_include_directories(test_stress PRIVATE
${CMAKE_SOURCE_DIR}/include)
add_test(NAME stress_4consumer COMMAND test_stress)
set_tests_properties(stress_4consumer PROPERTIES TIMEOUT 120)
+169
View File
@@ -0,0 +1,169 @@
/* Stress test: 1 publisher × 4 consumers, 2000 frames, intermediate-rate (~120 fps),
* проверка zero-loss в NEWEST_ONLY mode не требуется (это политика DROP_OLDEST по
* spec); проверяем что:
* 1) ВСЕ subscribers получают frames continuously без torn-detection failures
* 2) Producer не deadlock'ит при slow consumer
* 3) После teardown — нет leaked файлов в /dev/shm
* 4) После teardown — process exit clean без segfault
*/
#include <cuframes/cuframes.h>
#include <cuda_runtime.h>
#include <sys/wait.h>
#include <sys/stat.h>
#include <unistd.h>
#include <atomic>
#include <chrono>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <thread>
#define CHECK(call) do { int _r = (call); if (_r != 0) { \
fprintf(stderr, "FAIL %s:%d: %s\n", __FILE__, __LINE__, cuframes_strerror(_r)); std::exit(2); } } while(0)
#define CHECK_CUDA(call) do { cudaError_t _e = (call); if (_e != cudaSuccess) { \
fprintf(stderr, "CUDA FAIL %s:%d: %s\n", __FILE__, __LINE__, cudaGetErrorString(_e)); std::exit(2); } } while(0)
static const char *KEY = "test_stress";
static const int W = 1280, H = 720;
static const int N = 2000;
static const int NUM_CONSUMERS = 4;
__host__ __device__ inline uint8_t pat(uint64_t seq, int row) {
return static_cast<uint8_t>((seq * 31u + row * 7u) & 0xFF);
}
__global__ void fill_y(uint8_t *y, int w, int h, int py, uint64_t seq) {
int x = blockIdx.x * blockDim.x + threadIdx.x;
int r = blockIdx.y * blockDim.y + threadIdx.y;
if (x < w && r < h) y[r * py + x] = pat(seq, r);
}
__global__ void verify_y(const uint8_t *y, int w, int h, int py, uint64_t seq, int *bad) {
int x = blockIdx.x * blockDim.x + threadIdx.x;
int r = blockIdx.y * blockDim.y + threadIdx.y;
if (x < w && r < h) if (y[r * py + x] != pat(seq, r)) atomicAdd(bad, 1);
}
int run_consumer(const char *name, int slow_ms) {
cuframes_subscriber_config_t cfg = {};
cfg.key = KEY;
cfg.consumer_name = name;
cfg.mode = CUFRAMES_MODE_NEWEST_ONLY;
cfg.connect_timeout_ms = 5000;
cuframes_subscriber_t *sub = NULL;
CHECK(cuframes_subscriber_create(&cfg, &sub));
cudaStream_t s;
CHECK_CUDA(cudaStreamCreate(&s));
int *d_bad;
CHECK_CUDA(cudaMalloc(&d_bad, sizeof(int)));
dim3 b(32, 8);
dim3 g((W + b.x - 1) / b.x, (H + b.y - 1) / b.y);
int recv = 0, torn = 0;
while (1) {
cuframes_frame_t *f = NULL;
int r = cuframes_subscriber_next(sub, s, &f, 3000);
if (r == CUFRAMES_ERR_TIMEOUT || r == CUFRAMES_ERR_DISCONNECTED) break;
if (r != 0) { fprintf(stderr, "[%s] next: %s\n", name, cuframes_strerror(r)); std::exit(2); }
CHECK_CUDA(cudaMemsetAsync(d_bad, 0, sizeof(int), s));
verify_y<<<g, b, 0, s>>>((const uint8_t *)cuframes_frame_cuda_ptr(f),
W, H, cuframes_frame_pitch_y(f),
cuframes_frame_seq(f), d_bad);
int bad = 0;
CHECK_CUDA(cudaMemcpyAsync(&bad, d_bad, sizeof(int), cudaMemcpyDeviceToHost, s));
CHECK_CUDA(cudaStreamSynchronize(s));
if (bad > 0) torn++;
recv++;
CHECK(cuframes_subscriber_release(sub, f));
if (slow_ms > 0) std::this_thread::sleep_for(std::chrono::milliseconds(slow_ms));
}
fprintf(stderr, "[%s] received=%d torn=%d\n", name, recv, torn);
cudaFree(d_bad);
cudaStreamDestroy(s);
cuframes_subscriber_destroy(sub);
return (torn == 0 && recv >= 10) ? 0 : 1;
}
int run_producer() {
cuframes_publisher_config_t cfg = {};
cfg.key = KEY;
cfg.width = W;
cfg.height = H;
cfg.format = CUFRAMES_FORMAT_NV12;
cfg.ownership = CUFRAMES_OWNERSHIP_LIBRARY;
cfg.ring_size = 8;
cfg.policy = CUFRAMES_POLICY_DROP_OLDEST;
cuframes_publisher_t *pub = NULL;
CHECK(cuframes_publisher_create(&cfg, &pub));
int32_t pitch_y = 0;
CHECK(cuframes_calc_frame_size(CUFRAMES_FORMAT_NV12, W, H, NULL, &pitch_y, NULL));
cudaStream_t s;
CHECK_CUDA(cudaStreamCreate(&s));
dim3 b(32, 8);
dim3 g((W + b.x - 1) / b.x, (H + b.y - 1) / b.y);
std::this_thread::sleep_for(std::chrono::milliseconds(800));
/* ~120 fps */
auto iv = std::chrono::nanoseconds(1000000000LL / 120);
auto t = std::chrono::steady_clock::now();
for (int i = 0; i < N; ++i) {
void *p = NULL;
CHECK(cuframes_publisher_acquire(pub, &p));
fill_y<<<g, b, 0, s>>>((uint8_t *)p, W, H, pitch_y, (uint64_t)i);
CHECK(cuframes_publisher_publish(pub, s, cuframes_now_ns()));
t += iv;
std::this_thread::sleep_until(t);
}
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
cuframes_publisher_destroy(pub);
cudaStreamDestroy(s);
return 0;
}
int main() {
char shm[80]; snprintf(shm, 80, "/dev/shm/cuframes-%s", KEY); unlink(shm);
char sock[128]; snprintf(sock, 128, "/run/cuframes/%s.sock", KEY); unlink(sock);
pid_t pids[NUM_CONSUMERS];
/* Mix: 2 быстрых, 2 slow consumers (5ms sleep — ~200fps cap, медленнее publisher'а) */
int slow[NUM_CONSUMERS] = {0, 0, 5, 5};
char names[NUM_CONSUMERS][16];
for (int i = 0; i < NUM_CONSUMERS; ++i) {
snprintf(names[i], 16, "c%d", i + 1);
pids[i] = fork();
if (pids[i] == 0) return run_consumer(names[i], slow[i]);
}
int prod_r = run_producer();
int fail = (prod_r != 0);
for (int i = 0; i < NUM_CONSUMERS; ++i) {
int st = 0;
waitpid(pids[i], &st, 0);
if (!WIFEXITED(st) || WEXITSTATUS(st) != 0) {
fprintf(stderr, "consumer %s failed (status=%d)\n", names[i], st);
fail = 1;
}
}
/* Check teardown clean */
struct stat st;
if (stat(shm, &st) == 0) {
fprintf(stderr, "WARN: %s остался после teardown (но это OK — IPC объект)\n", shm);
}
if (stat(sock, &st) == 0) {
fprintf(stderr, "WARN: %s остался после teardown\n", sock);
}
if (fail) { fprintf(stderr, "test_stress FAIL\n"); return 1; }
fprintf(stderr, "test_stress PASS (1×pub × %d×sub × %d frames)\n", NUM_CONSUMERS, N);
return 0;
}