fca07bf669
Тесты:
- libcuframes/tests/test_packet_ring.c — 2 scenarios:
1) normal flow: 1 pub × 1 sub × 2000 packets, varied sizes, GOP=30,
payload integrity check (seq в первых 8 байтах + pattern). PTS
monotonicity, first KEY seq, нет data errors.
2) slow consumer (10ms delay): publisher 200 fps, subscriber должен
detect OVERRUN, library resync на keyframe — verify received >10
даже на сильно медленном консьюмере.
- libcuframes/tests/CMakeLists.txt: add_test packet_ring_basic.
Docs:
- CHANGELOG.md: новая [Unreleased] секция с full v0.2 highlights и
явно declared limitations (sub-stream, audio, codec change → v0.3).
- docs/integrations/frigate.md: новая секция "v0.2: dual-input (detect +
record через один RTSP)" с config example, requirements, trade-offs.
Связано: #2, PR #4. Step 6 (final) перед снятием draft.
281 lines
9.6 KiB
C
281 lines
9.6 KiB
C
/* Stress test для encoded packet ring (v0.2).
|
||
*
|
||
* Сценарии:
|
||
* 1) Normal flow: 1 publisher × 1 subscriber × 2000 packets, varied sizes,
|
||
* каждые 30 packets — KEY flag (имитация GOP). Subscriber проверяет:
|
||
* - монотонные seq (без пропусков в этом тесте — fast consumer)
|
||
* - data integrity через checksum (XOR fold)
|
||
* - PTS/DTS monotonic, KEY flag доходит
|
||
* 2) Slow subscriber: publisher шлёт быстрее чем subscriber читает →
|
||
* должен случиться OVERRUN, library resync'нет на keyframe.
|
||
* 3) Cleanup: после exit нет leaked SHM в /dev/shm.
|
||
*
|
||
* Без CUDA-зависимостей (packets host-side).
|
||
*/
|
||
#include <cuframes/cuframes.h>
|
||
|
||
#include <errno.h>
|
||
#include <fcntl.h>
|
||
#include <signal.h>
|
||
#include <stdio.h>
|
||
#include <stdlib.h>
|
||
#include <string.h>
|
||
#include <sys/stat.h>
|
||
#include <sys/types.h>
|
||
#include <sys/wait.h>
|
||
#include <time.h>
|
||
#include <unistd.h>
|
||
|
||
#define KEY "test_pkt_ring"
|
||
#define TOTAL_PACKETS 2000
|
||
#define GOP_SIZE 30
|
||
#define SMALL_PKT 4096
|
||
#define LARGE_PKT (256 * 1024)
|
||
|
||
#define CHECK(call) do { int _r = (call); if (_r != 0) { \
|
||
fprintf(stderr, "FAIL %s:%d (rc=%d): %s\n", __FILE__, __LINE__, _r, \
|
||
cuframes_strerror(_r)); exit(2); } } while (0)
|
||
|
||
#define EXPECT_TRUE(cond) do { if (!(cond)) { \
|
||
fprintf(stderr, "EXPECT_TRUE failed at %s:%d: %s\n", \
|
||
__FILE__, __LINE__, #cond); exit(2); } } while (0)
|
||
|
||
/* Сгенерировать payload: первые 8 байт = seq (little-endian), остальное pattern. */
|
||
static void gen_payload(uint8_t *buf, size_t size, uint64_t seq) {
|
||
memcpy(buf, &seq, sizeof(seq));
|
||
for (size_t i = sizeof(seq); i < size; ++i) {
|
||
buf[i] = (uint8_t)((seq + i) & 0xFF);
|
||
}
|
||
}
|
||
|
||
/* Verify payload matches seq. Возвращает 0 если ok. */
|
||
static int verify_payload(const uint8_t *buf, size_t size, uint64_t expected_seq) {
|
||
uint64_t seq_in_buf;
|
||
if (size < sizeof(seq_in_buf)) return -1;
|
||
memcpy(&seq_in_buf, buf, sizeof(seq_in_buf));
|
||
if (seq_in_buf != expected_seq) return -2;
|
||
for (size_t i = sizeof(seq_in_buf); i < size; ++i) {
|
||
if (buf[i] != (uint8_t)((expected_seq + i) & 0xFF)) return -3;
|
||
}
|
||
return 0;
|
||
}
|
||
|
||
static cuframes_publisher_t *make_publisher(void) {
|
||
cuframes_publisher_config_t cfg = {0};
|
||
cfg.key = KEY;
|
||
cfg.width = 320;
|
||
cfg.height = 240;
|
||
cfg.format = CUFRAMES_FORMAT_NV12;
|
||
cfg.ownership = CUFRAMES_OWNERSHIP_LIBRARY;
|
||
cfg.ring_size = 2;
|
||
cfg.policy = CUFRAMES_POLICY_DROP_OLDEST;
|
||
cfg.cuda_device = 0;
|
||
cuframes_publisher_t *pub = NULL;
|
||
CHECK(cuframes_publisher_create(&cfg, &pub));
|
||
|
||
cuframes_packet_ring_options_t pkt_opts = {0};
|
||
pkt_opts.codec_id = 27; /* AV_CODEC_ID_H264 */
|
||
pkt_opts.ring_slots = 64;
|
||
pkt_opts.data_size = 8 * 1024 * 1024;
|
||
pkt_opts.max_packet_size = LARGE_PKT * 2;
|
||
CHECK(cuframes_publisher_enable_packets(pub, &pkt_opts));
|
||
|
||
/* Fake SPS/PPS — 16 байт */
|
||
uint8_t extradata[16];
|
||
for (int i = 0; i < 16; ++i) extradata[i] = (uint8_t)(0xAA + i);
|
||
CHECK(cuframes_publisher_set_codec_extradata(pub, extradata, sizeof(extradata)));
|
||
return pub;
|
||
}
|
||
|
||
/* Subscriber-процесс. read_delay_us позволяет имитировать slow consumer. */
|
||
static int run_subscriber(int read_delay_us, int *out_received, int *out_overruns,
|
||
int *out_first_key_seq) {
|
||
/* Wait чтобы publisher успел создать SHM */
|
||
usleep(100 * 1000);
|
||
|
||
cuframes_subscriber_config_t cfg = {0};
|
||
cfg.key = KEY;
|
||
cfg.mode = CUFRAMES_MODE_NEWEST_ONLY;
|
||
cfg.cuda_device = 0;
|
||
cfg.connect_timeout_ms = 5000;
|
||
cuframes_subscriber_t *sub = NULL;
|
||
CHECK(cuframes_subscriber_create(&cfg, &sub));
|
||
|
||
CHECK(cuframes_subscriber_enable_packets(sub));
|
||
|
||
/* Verify codec params */
|
||
uint32_t codec_id = 0;
|
||
const void *extradata = NULL;
|
||
size_t extradata_sz = 0;
|
||
int r = cuframes_subscriber_get_codec_params(sub, &codec_id, &extradata, &extradata_sz);
|
||
EXPECT_TRUE(r == CUFRAMES_OK);
|
||
EXPECT_TRUE(codec_id == 27);
|
||
EXPECT_TRUE(extradata_sz == 16);
|
||
|
||
int received = 0;
|
||
int overruns = 0;
|
||
int first_key_seq = -1;
|
||
int64_t last_pts = -1;
|
||
int data_errors = 0;
|
||
|
||
/* Run на ~30s или до того как publisher закончит. */
|
||
time_t start = time(NULL);
|
||
while (time(NULL) - start < 30) {
|
||
cuframes_packet_t *pkt = NULL;
|
||
int rc = cuframes_subscriber_next_packet(sub, &pkt, 500);
|
||
if (rc == CUFRAMES_ERR_TIMEOUT || rc == CUFRAMES_ERR_WOULD_BLOCK) {
|
||
if (received >= TOTAL_PACKETS / 2) break; /* достаточно для теста */
|
||
continue;
|
||
}
|
||
if (rc == CUFRAMES_ERR_DISCONNECTED) break;
|
||
if (rc == CUFRAMES_ERR_PACKET_OVERRUN) {
|
||
overruns++;
|
||
continue; /* library resync'нет на next call */
|
||
}
|
||
if (rc != CUFRAMES_OK) {
|
||
fprintf(stderr, "next_packet rc=%d (%s)\n", rc, cuframes_strerror(rc));
|
||
break;
|
||
}
|
||
|
||
const uint8_t *data = (const uint8_t *)cuframes_packet_data(pkt);
|
||
size_t size = cuframes_packet_size(pkt);
|
||
int64_t pts = cuframes_packet_pts(pkt);
|
||
uint32_t flags = cuframes_packet_flags(pkt);
|
||
uint64_t seq = cuframes_packet_seq(pkt);
|
||
|
||
if (verify_payload(data, size, seq) != 0) {
|
||
data_errors++;
|
||
}
|
||
|
||
if ((flags & CUFRAMES_PKT_FLAG_KEY) && first_key_seq < 0) {
|
||
first_key_seq = (int)seq;
|
||
}
|
||
if (pts <= last_pts && last_pts >= 0) {
|
||
fprintf(stderr, "PTS не монотонно: %ld <= %ld (seq=%lu)\n",
|
||
pts, last_pts, seq);
|
||
}
|
||
last_pts = pts;
|
||
received++;
|
||
|
||
cuframes_subscriber_release_packet(sub, pkt);
|
||
|
||
if (read_delay_us > 0) usleep(read_delay_us);
|
||
}
|
||
|
||
EXPECT_TRUE(data_errors == 0);
|
||
cuframes_subscriber_destroy(sub);
|
||
|
||
*out_received = received;
|
||
*out_overruns = overruns;
|
||
*out_first_key_seq = first_key_seq;
|
||
return 0;
|
||
}
|
||
|
||
static void publisher_loop(int total_packets, int inter_packet_us) {
|
||
cuframes_publisher_t *pub = make_publisher();
|
||
|
||
/* Buffer pre-alloc — max size */
|
||
uint8_t *buf = (uint8_t *)malloc(LARGE_PKT);
|
||
EXPECT_TRUE(buf != NULL);
|
||
|
||
for (int i = 0; i < total_packets; ++i) {
|
||
int is_key = (i % GOP_SIZE == 0);
|
||
size_t size = is_key ? LARGE_PKT : SMALL_PKT + (i % 8) * 1024;
|
||
gen_payload(buf, size, (uint64_t)i);
|
||
|
||
int64_t pts_ns = (int64_t)i * 33333333LL; /* ~30 fps */
|
||
uint32_t flags = is_key ? CUFRAMES_PKT_FLAG_KEY : 0;
|
||
int rc = cuframes_publisher_publish_packet(pub, buf, size,
|
||
pts_ns, pts_ns, flags);
|
||
if (rc != CUFRAMES_OK) {
|
||
fprintf(stderr, "publish rc=%d size=%zu\n", rc, size);
|
||
}
|
||
if (inter_packet_us > 0) usleep(inter_packet_us);
|
||
}
|
||
free(buf);
|
||
cuframes_publisher_destroy(pub);
|
||
}
|
||
|
||
static int check_no_leaked_shm(void) {
|
||
int fail = 0;
|
||
char path[256];
|
||
snprintf(path, sizeof(path), "/dev/shm/cuframes-%s", KEY);
|
||
if (access(path, F_OK) == 0) {
|
||
fprintf(stderr, "LEAKED %s\n", path);
|
||
fail = 1;
|
||
}
|
||
snprintf(path, sizeof(path), "/dev/shm/cuframes-%s-packets", KEY);
|
||
if (access(path, F_OK) == 0) {
|
||
fprintf(stderr, "LEAKED %s\n", path);
|
||
fail = 1;
|
||
}
|
||
return fail;
|
||
}
|
||
|
||
static int scenario_normal_flow(void) {
|
||
fprintf(stderr, "[scenario 1] normal flow — fast consumer\n");
|
||
|
||
pid_t pid = fork();
|
||
EXPECT_TRUE(pid >= 0);
|
||
if (pid == 0) {
|
||
/* child = subscriber */
|
||
int received = 0, overruns = 0, first_key = -1;
|
||
run_subscriber(0, &received, &overruns, &first_key);
|
||
fprintf(stderr, " consumer: received=%d overruns=%d first_key_seq=%d\n",
|
||
received, overruns, first_key);
|
||
EXPECT_TRUE(received >= TOTAL_PACKETS / 2);
|
||
EXPECT_TRUE(overruns == 0);
|
||
EXPECT_TRUE(first_key >= 0);
|
||
exit(0);
|
||
}
|
||
|
||
/* parent = publisher (медленнее чем consumer) */
|
||
publisher_loop(TOTAL_PACKETS, 1000); /* 1ms между packets = 1000 fps */
|
||
int status = 0;
|
||
waitpid(pid, &status, 0);
|
||
EXPECT_TRUE(WIFEXITED(status) && WEXITSTATUS(status) == 0);
|
||
return 0;
|
||
}
|
||
|
||
static int scenario_slow_consumer(void) {
|
||
fprintf(stderr, "[scenario 2] slow consumer — must hit OVERRUN + resync\n");
|
||
|
||
pid_t pid = fork();
|
||
EXPECT_TRUE(pid >= 0);
|
||
if (pid == 0) {
|
||
/* child = очень медленный subscriber */
|
||
int received = 0, overruns = 0, first_key = -1;
|
||
run_subscriber(10 * 1000, &received, &overruns, &first_key); /* 10ms */
|
||
fprintf(stderr, " consumer: received=%d overruns=%d first_key_seq=%d\n",
|
||
received, overruns, first_key);
|
||
/* Должны быть overruns поскольку publisher faster */
|
||
EXPECT_TRUE(overruns > 0);
|
||
/* И всё-таки что-то получили (resync работает) */
|
||
EXPECT_TRUE(received > 10);
|
||
exit(0);
|
||
}
|
||
|
||
/* publisher fast — 200 fps */
|
||
publisher_loop(TOTAL_PACKETS, 5 * 1000);
|
||
int status = 0;
|
||
waitpid(pid, &status, 0);
|
||
EXPECT_TRUE(WIFEXITED(status) && WEXITSTATUS(status) == 0);
|
||
return 0;
|
||
}
|
||
|
||
int main(void) {
|
||
signal(SIGPIPE, SIG_IGN);
|
||
|
||
scenario_normal_flow();
|
||
/* Ensure clean inter-test state */
|
||
usleep(200 * 1000);
|
||
if (check_no_leaked_shm()) exit(2);
|
||
|
||
scenario_slow_consumer();
|
||
usleep(200 * 1000);
|
||
if (check_no_leaked_shm()) exit(2);
|
||
|
||
fprintf(stderr, "OK — all scenarios passed\n");
|
||
return 0;
|
||
}
|