Files
gx fca07bf669
build / cmake build (CUDA 12.4, Ubuntu 22.04) (pull_request) Failing after 3m43s
build / ffmpeg filter patch (out-of-tree) (pull_request) Has been skipped
test+docs: packet ring stress test + Frigate dual-input guide (v0.2 Step 6)
Тесты:
- libcuframes/tests/test_packet_ring.c — 2 scenarios:
  1) normal flow: 1 pub × 1 sub × 2000 packets, varied sizes, GOP=30,
     payload integrity check (seq в первых 8 байтах + pattern). PTS
     monotonicity, first KEY seq, нет data errors.
  2) slow consumer (10ms delay): publisher 200 fps, subscriber должен
     detect OVERRUN, library resync на keyframe — verify received >10
     даже на сильно медленном консьюмере.
- libcuframes/tests/CMakeLists.txt: add_test packet_ring_basic.

Docs:
- CHANGELOG.md: новая [Unreleased] секция с full v0.2 highlights и
  явно declared limitations (sub-stream, audio, codec change → v0.3).
- docs/integrations/frigate.md: новая секция "v0.2: dual-input (detect +
  record через один RTSP)" с config example, requirements, trade-offs.

Связано: #2, PR #4. Step 6 (final) перед снятием draft.
2026-05-19 17:08:17 +01:00

281 lines
9.6 KiB
C
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/* Stress test для encoded packet ring (v0.2).
*
* Сценарии:
* 1) Normal flow: 1 publisher × 1 subscriber × 2000 packets, varied sizes,
* каждые 30 packets — KEY flag (имитация GOP). Subscriber проверяет:
* - монотонные seq (без пропусков в этом тесте — fast consumer)
* - data integrity через checksum (XOR fold)
* - PTS/DTS monotonic, KEY flag доходит
* 2) Slow subscriber: publisher шлёт быстрее чем subscriber читает →
* должен случиться OVERRUN, library resync'нет на keyframe.
* 3) Cleanup: после exit нет leaked SHM в /dev/shm.
*
* Без CUDA-зависимостей (packets host-side).
*/
#include <cuframes/cuframes.h>
#include <errno.h>
#include <fcntl.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <time.h>
#include <unistd.h>
#define KEY "test_pkt_ring"
#define TOTAL_PACKETS 2000
#define GOP_SIZE 30
#define SMALL_PKT 4096
#define LARGE_PKT (256 * 1024)
#define CHECK(call) do { int _r = (call); if (_r != 0) { \
fprintf(stderr, "FAIL %s:%d (rc=%d): %s\n", __FILE__, __LINE__, _r, \
cuframes_strerror(_r)); exit(2); } } while (0)
#define EXPECT_TRUE(cond) do { if (!(cond)) { \
fprintf(stderr, "EXPECT_TRUE failed at %s:%d: %s\n", \
__FILE__, __LINE__, #cond); exit(2); } } while (0)
/* Сгенерировать payload: первые 8 байт = seq (little-endian), остальное pattern. */
static void gen_payload(uint8_t *buf, size_t size, uint64_t seq) {
memcpy(buf, &seq, sizeof(seq));
for (size_t i = sizeof(seq); i < size; ++i) {
buf[i] = (uint8_t)((seq + i) & 0xFF);
}
}
/* Verify payload matches seq. Возвращает 0 если ok. */
static int verify_payload(const uint8_t *buf, size_t size, uint64_t expected_seq) {
uint64_t seq_in_buf;
if (size < sizeof(seq_in_buf)) return -1;
memcpy(&seq_in_buf, buf, sizeof(seq_in_buf));
if (seq_in_buf != expected_seq) return -2;
for (size_t i = sizeof(seq_in_buf); i < size; ++i) {
if (buf[i] != (uint8_t)((expected_seq + i) & 0xFF)) return -3;
}
return 0;
}
static cuframes_publisher_t *make_publisher(void) {
cuframes_publisher_config_t cfg = {0};
cfg.key = KEY;
cfg.width = 320;
cfg.height = 240;
cfg.format = CUFRAMES_FORMAT_NV12;
cfg.ownership = CUFRAMES_OWNERSHIP_LIBRARY;
cfg.ring_size = 2;
cfg.policy = CUFRAMES_POLICY_DROP_OLDEST;
cfg.cuda_device = 0;
cuframes_publisher_t *pub = NULL;
CHECK(cuframes_publisher_create(&cfg, &pub));
cuframes_packet_ring_options_t pkt_opts = {0};
pkt_opts.codec_id = 27; /* AV_CODEC_ID_H264 */
pkt_opts.ring_slots = 64;
pkt_opts.data_size = 8 * 1024 * 1024;
pkt_opts.max_packet_size = LARGE_PKT * 2;
CHECK(cuframes_publisher_enable_packets(pub, &pkt_opts));
/* Fake SPS/PPS — 16 байт */
uint8_t extradata[16];
for (int i = 0; i < 16; ++i) extradata[i] = (uint8_t)(0xAA + i);
CHECK(cuframes_publisher_set_codec_extradata(pub, extradata, sizeof(extradata)));
return pub;
}
/* Subscriber-процесс. read_delay_us позволяет имитировать slow consumer. */
static int run_subscriber(int read_delay_us, int *out_received, int *out_overruns,
int *out_first_key_seq) {
/* Wait чтобы publisher успел создать SHM */
usleep(100 * 1000);
cuframes_subscriber_config_t cfg = {0};
cfg.key = KEY;
cfg.mode = CUFRAMES_MODE_NEWEST_ONLY;
cfg.cuda_device = 0;
cfg.connect_timeout_ms = 5000;
cuframes_subscriber_t *sub = NULL;
CHECK(cuframes_subscriber_create(&cfg, &sub));
CHECK(cuframes_subscriber_enable_packets(sub));
/* Verify codec params */
uint32_t codec_id = 0;
const void *extradata = NULL;
size_t extradata_sz = 0;
int r = cuframes_subscriber_get_codec_params(sub, &codec_id, &extradata, &extradata_sz);
EXPECT_TRUE(r == CUFRAMES_OK);
EXPECT_TRUE(codec_id == 27);
EXPECT_TRUE(extradata_sz == 16);
int received = 0;
int overruns = 0;
int first_key_seq = -1;
int64_t last_pts = -1;
int data_errors = 0;
/* Run на ~30s или до того как publisher закончит. */
time_t start = time(NULL);
while (time(NULL) - start < 30) {
cuframes_packet_t *pkt = NULL;
int rc = cuframes_subscriber_next_packet(sub, &pkt, 500);
if (rc == CUFRAMES_ERR_TIMEOUT || rc == CUFRAMES_ERR_WOULD_BLOCK) {
if (received >= TOTAL_PACKETS / 2) break; /* достаточно для теста */
continue;
}
if (rc == CUFRAMES_ERR_DISCONNECTED) break;
if (rc == CUFRAMES_ERR_PACKET_OVERRUN) {
overruns++;
continue; /* library resync'нет на next call */
}
if (rc != CUFRAMES_OK) {
fprintf(stderr, "next_packet rc=%d (%s)\n", rc, cuframes_strerror(rc));
break;
}
const uint8_t *data = (const uint8_t *)cuframes_packet_data(pkt);
size_t size = cuframes_packet_size(pkt);
int64_t pts = cuframes_packet_pts(pkt);
uint32_t flags = cuframes_packet_flags(pkt);
uint64_t seq = cuframes_packet_seq(pkt);
if (verify_payload(data, size, seq) != 0) {
data_errors++;
}
if ((flags & CUFRAMES_PKT_FLAG_KEY) && first_key_seq < 0) {
first_key_seq = (int)seq;
}
if (pts <= last_pts && last_pts >= 0) {
fprintf(stderr, "PTS не монотонно: %ld <= %ld (seq=%lu)\n",
pts, last_pts, seq);
}
last_pts = pts;
received++;
cuframes_subscriber_release_packet(sub, pkt);
if (read_delay_us > 0) usleep(read_delay_us);
}
EXPECT_TRUE(data_errors == 0);
cuframes_subscriber_destroy(sub);
*out_received = received;
*out_overruns = overruns;
*out_first_key_seq = first_key_seq;
return 0;
}
static void publisher_loop(int total_packets, int inter_packet_us) {
cuframes_publisher_t *pub = make_publisher();
/* Buffer pre-alloc — max size */
uint8_t *buf = (uint8_t *)malloc(LARGE_PKT);
EXPECT_TRUE(buf != NULL);
for (int i = 0; i < total_packets; ++i) {
int is_key = (i % GOP_SIZE == 0);
size_t size = is_key ? LARGE_PKT : SMALL_PKT + (i % 8) * 1024;
gen_payload(buf, size, (uint64_t)i);
int64_t pts_ns = (int64_t)i * 33333333LL; /* ~30 fps */
uint32_t flags = is_key ? CUFRAMES_PKT_FLAG_KEY : 0;
int rc = cuframes_publisher_publish_packet(pub, buf, size,
pts_ns, pts_ns, flags);
if (rc != CUFRAMES_OK) {
fprintf(stderr, "publish rc=%d size=%zu\n", rc, size);
}
if (inter_packet_us > 0) usleep(inter_packet_us);
}
free(buf);
cuframes_publisher_destroy(pub);
}
static int check_no_leaked_shm(void) {
int fail = 0;
char path[256];
snprintf(path, sizeof(path), "/dev/shm/cuframes-%s", KEY);
if (access(path, F_OK) == 0) {
fprintf(stderr, "LEAKED %s\n", path);
fail = 1;
}
snprintf(path, sizeof(path), "/dev/shm/cuframes-%s-packets", KEY);
if (access(path, F_OK) == 0) {
fprintf(stderr, "LEAKED %s\n", path);
fail = 1;
}
return fail;
}
static int scenario_normal_flow(void) {
fprintf(stderr, "[scenario 1] normal flow — fast consumer\n");
pid_t pid = fork();
EXPECT_TRUE(pid >= 0);
if (pid == 0) {
/* child = subscriber */
int received = 0, overruns = 0, first_key = -1;
run_subscriber(0, &received, &overruns, &first_key);
fprintf(stderr, " consumer: received=%d overruns=%d first_key_seq=%d\n",
received, overruns, first_key);
EXPECT_TRUE(received >= TOTAL_PACKETS / 2);
EXPECT_TRUE(overruns == 0);
EXPECT_TRUE(first_key >= 0);
exit(0);
}
/* parent = publisher (медленнее чем consumer) */
publisher_loop(TOTAL_PACKETS, 1000); /* 1ms между packets = 1000 fps */
int status = 0;
waitpid(pid, &status, 0);
EXPECT_TRUE(WIFEXITED(status) && WEXITSTATUS(status) == 0);
return 0;
}
static int scenario_slow_consumer(void) {
fprintf(stderr, "[scenario 2] slow consumer — must hit OVERRUN + resync\n");
pid_t pid = fork();
EXPECT_TRUE(pid >= 0);
if (pid == 0) {
/* child = очень медленный subscriber */
int received = 0, overruns = 0, first_key = -1;
run_subscriber(10 * 1000, &received, &overruns, &first_key); /* 10ms */
fprintf(stderr, " consumer: received=%d overruns=%d first_key_seq=%d\n",
received, overruns, first_key);
/* Должны быть overruns поскольку publisher faster */
EXPECT_TRUE(overruns > 0);
/* И всё-таки что-то получили (resync работает) */
EXPECT_TRUE(received > 10);
exit(0);
}
/* publisher fast — 200 fps */
publisher_loop(TOTAL_PACKETS, 5 * 1000);
int status = 0;
waitpid(pid, &status, 0);
EXPECT_TRUE(WIFEXITED(status) && WEXITSTATUS(status) == 0);
return 0;
}
int main(void) {
signal(SIGPIPE, SIG_IGN);
scenario_normal_flow();
/* Ensure clean inter-test state */
usleep(200 * 1000);
if (check_no_leaked_shm()) exit(2);
scenario_slow_consumer();
usleep(200 * 1000);
if (check_no_leaked_shm()) exit(2);
fprintf(stderr, "OK — all scenarios passed\n");
return 0;
}