test+docs: packet ring stress test + Frigate dual-input guide (v0.2 Step 6)
Тесты:
- libcuframes/tests/test_packet_ring.c — 2 scenarios:
1) normal flow: 1 pub × 1 sub × 2000 packets, varied sizes, GOP=30,
payload integrity check (seq в первых 8 байтах + pattern). PTS
monotonicity, first KEY seq, нет data errors.
2) slow consumer (10ms delay): publisher 200 fps, subscriber должен
detect OVERRUN, library resync на keyframe — verify received >10
даже на сильно медленном консьюмере.
- libcuframes/tests/CMakeLists.txt: add_test packet_ring_basic.
Docs:
- CHANGELOG.md: новая [Unreleased] секция с full v0.2 highlights и
явно declared limitations (sub-stream, audio, codec change → v0.3).
- docs/integrations/frigate.md: новая секция "v0.2: dual-input (detect +
record через один RTSP)" с config example, requirements, trade-offs.
Связано: #2, PR #4. Step 6 (final) перед снятием draft.
This commit is contained in:
@@ -5,6 +5,45 @@
|
||||
Формат основан на [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
|
||||
проект следует [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
|
||||
|
||||
## [Unreleased] — v0.2 work in progress
|
||||
|
||||
См. PR [#4](https://git.goldix.org/gx/cuframes/pulls/4).
|
||||
|
||||
### Added
|
||||
|
||||
- **Encoded packet ring** — параллельный ring для H.264/H.265 NAL units
|
||||
(отдельный SHM `/dev/shm/cuframes-<key>-packets`, variable-length byte
|
||||
buffer + slot index, seqlock-style read для защиты от overrun).
|
||||
- **Wire protocol v2** (`proto_version = 2` в SHM header). Backward-compat:
|
||||
v2 publishers принимают v1 subscribers (frames-only).
|
||||
- **Public C API** (`include/cuframes/cuframes.h`):
|
||||
- `cuframes_publisher_enable_packets(opts)` — активирует ring
|
||||
- `cuframes_publisher_set_codec_extradata(data, size)` — SPS/PPS
|
||||
- `cuframes_publisher_publish_packet(data, size, pts, dts, flags)`
|
||||
- `cuframes_subscriber_enable_packets()` + `_next_packet()` + accessors
|
||||
- `cuframes_subscriber_get_codec_params(codec_id, extradata, size)`
|
||||
- **`cuframes::Publisher`** (C++ RAII): `enable_packets`, `set_codec_extradata`,
|
||||
`publish_packet` методы.
|
||||
- **`cuframes-rtsp-source`**: новый CLI flag `--enable-packet-ring`.
|
||||
Дублирует `AVPacket` в encoded ring до передачи декодеру.
|
||||
- **FFmpeg demuxer `cuframes_packets://<key>`** (отдельная ветка
|
||||
[gx/ffmpeg-patched PR #1](https://git.goldix.org/gx/ffmpeg-patched/pulls/1)).
|
||||
Companion к `cuframes://`. Use case: Frigate `record` role без
|
||||
второго RTSP к камере.
|
||||
- **4 новых error codes**: `PACKET_OVERSIZED`, `NO_PACKET_RING`,
|
||||
`NO_CODEC_PARAMS`, `PACKET_OVERRUN`.
|
||||
- **Stress test** `libcuframes/tests/test_packet_ring.c`: 2 scenarios —
|
||||
normal flow (1 pub × 1 sub × 2000 packets, integrity check) +
|
||||
slow consumer (must hit OVERRUN + library auto-resync на keyframe).
|
||||
- **Protocol spec §10** в `docs/protocol.md` (397 строк): byte-exact
|
||||
layout, seqlock semantics, late-subscriber GOP-aligned start.
|
||||
|
||||
### Limitations (документировано)
|
||||
|
||||
- Sub-stream selection отложено в v0.3 (`<key>-substream-<N>` naming).
|
||||
- Audio packets — v0.3 (тот же ring layout, codec_id = audio).
|
||||
- Codec change mid-stream — требует publisher destroy+recreate.
|
||||
|
||||
## [0.1.0] — 2026-05-17
|
||||
|
||||
Первый функциональный release с production deployment.
|
||||
|
||||
@@ -297,9 +297,68 @@ encoded packet path (v0.2).
|
||||
Не относится к cuframes — это нормальное поведение Frigate's go2rtc для
|
||||
TCP transport. TV/VLC обычно использует UDP — оно работает.
|
||||
|
||||
## v0.2: dual-input (detect + record через один RTSP)
|
||||
|
||||
После cuframes v0.2 publisher активирует **encoded packet ring** параллельно
|
||||
с decoded frames ring. Это даёт Frigate одновременно:
|
||||
|
||||
- `cuframes://<key>` — **decoded NV12** для `detect` role (как в v0.1)
|
||||
- `cuframes_packets://<key>` — **encoded H.264/H.265** для `record` role
|
||||
(passthrough, без decode)
|
||||
|
||||
→ **1 RTSP connection** к камере вместо 2-3 (Frigate сейчас открывает
|
||||
отдельный stream для record).
|
||||
|
||||
### Setup
|
||||
|
||||
```bash
|
||||
cuframes-rtsp-source \
|
||||
--rtsp rtsp://admin:pw@192.168.88.98/cam/realmonitor?channel=1 \
|
||||
--key cam-parking \
|
||||
--enable-packet-ring
|
||||
```
|
||||
|
||||
Publisher держит **два** SHM:
|
||||
- `/dev/shm/cuframes-cam-parking` (decoded NV12, v0.1)
|
||||
- `/dev/shm/cuframes-cam-parking-packets` (encoded packets, v0.2)
|
||||
|
||||
### Frigate config
|
||||
|
||||
```yaml
|
||||
cameras:
|
||||
cam_parking:
|
||||
ffmpeg:
|
||||
inputs:
|
||||
- path: cuframes://cam-parking
|
||||
input_args: -f cuframes
|
||||
roles: [detect]
|
||||
- path: cuframes_packets://cam-parking
|
||||
input_args: -f cuframes_packets
|
||||
roles: [record]
|
||||
```
|
||||
|
||||
### Requirements
|
||||
|
||||
- Patched FFmpeg с обоими demuxer'ами:
|
||||
[gx/ffmpeg-patched PR #1](https://git.goldix.org/gx/ffmpeg-patched/pulls/1).
|
||||
- Frigate Dockerfile перекомпилирован с этим ffmpeg (см. секцию выше про
|
||||
`cuframes-frigate:0.17` build).
|
||||
|
||||
### Trade-offs
|
||||
|
||||
| Метрика | v0.1 (frames only) | v0.2 (frames + packets) |
|
||||
|---|---|---|
|
||||
| RTSP к камере | 1 (publisher) | 1 (publisher) |
|
||||
| Frigate-side RTSP | 1+ (record отдельно) | **0** — всё через cuframes |
|
||||
| Camera RTSP streams | 2+ | **1** |
|
||||
| Доп. VRAM | ring (~10 MB) | без изменений |
|
||||
| Доп. host RAM | минимум | + 8 MB на packet ring |
|
||||
| Доп. CPU | nominal | nominal (memcpy в shared ring) |
|
||||
|
||||
## См. также
|
||||
|
||||
- [filter/README.md](../../filter/README.md) — детали FFmpeg demuxer + patch
|
||||
- [docs/integration.md](../integration.md) — общий integration guide
|
||||
- [docs/protocol.md §10](../protocol.md#10-v02-extension-encoded-packet-ring-proto_version2) — wire-protocol spec для packet ring
|
||||
- [BENCHMARKS.md](../../BENCHMARKS.md) — production-measured результаты
|
||||
- [ROADMAP.md](../../ROADMAP.md) — v0.2 что улучшит для Frigate
|
||||
- [ROADMAP.md](../../ROADMAP.md) — v0.3+ planned features
|
||||
|
||||
@@ -22,3 +22,11 @@ target_include_directories(test_stress PRIVATE
|
||||
${CMAKE_SOURCE_DIR}/include)
|
||||
add_test(NAME stress_4consumer COMMAND test_stress)
|
||||
set_tests_properties(stress_4consumer PROPERTIES TIMEOUT 120)
|
||||
|
||||
# v0.2 — packet ring tests (host-only, без CUDA в test-коде)
|
||||
add_executable(test_packet_ring test_packet_ring.c)
|
||||
target_link_libraries(test_packet_ring PRIVATE cuframes)
|
||||
target_include_directories(test_packet_ring PRIVATE
|
||||
${CMAKE_SOURCE_DIR}/include)
|
||||
add_test(NAME packet_ring_basic COMMAND test_packet_ring)
|
||||
set_tests_properties(packet_ring_basic PROPERTIES TIMEOUT 120)
|
||||
|
||||
@@ -0,0 +1,280 @@
|
||||
/* Stress test для encoded packet ring (v0.2).
|
||||
*
|
||||
* Сценарии:
|
||||
* 1) Normal flow: 1 publisher × 1 subscriber × 2000 packets, varied sizes,
|
||||
* каждые 30 packets — KEY flag (имитация GOP). Subscriber проверяет:
|
||||
* - монотонные seq (без пропусков в этом тесте — fast consumer)
|
||||
* - data integrity через checksum (XOR fold)
|
||||
* - PTS/DTS monotonic, KEY flag доходит
|
||||
* 2) Slow subscriber: publisher шлёт быстрее чем subscriber читает →
|
||||
* должен случиться OVERRUN, library resync'нет на keyframe.
|
||||
* 3) Cleanup: после exit нет leaked SHM в /dev/shm.
|
||||
*
|
||||
* Без CUDA-зависимостей (packets host-side).
|
||||
*/
|
||||
#include <cuframes/cuframes.h>
|
||||
|
||||
#include <errno.h>
|
||||
#include <fcntl.h>
|
||||
#include <signal.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <string.h>
|
||||
#include <sys/stat.h>
|
||||
#include <sys/types.h>
|
||||
#include <sys/wait.h>
|
||||
#include <time.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#define KEY "test_pkt_ring"
|
||||
#define TOTAL_PACKETS 2000
|
||||
#define GOP_SIZE 30
|
||||
#define SMALL_PKT 4096
|
||||
#define LARGE_PKT (256 * 1024)
|
||||
|
||||
#define CHECK(call) do { int _r = (call); if (_r != 0) { \
|
||||
fprintf(stderr, "FAIL %s:%d (rc=%d): %s\n", __FILE__, __LINE__, _r, \
|
||||
cuframes_strerror(_r)); exit(2); } } while (0)
|
||||
|
||||
#define EXPECT_TRUE(cond) do { if (!(cond)) { \
|
||||
fprintf(stderr, "EXPECT_TRUE failed at %s:%d: %s\n", \
|
||||
__FILE__, __LINE__, #cond); exit(2); } } while (0)
|
||||
|
||||
/* Сгенерировать payload: первые 8 байт = seq (little-endian), остальное pattern. */
|
||||
static void gen_payload(uint8_t *buf, size_t size, uint64_t seq) {
|
||||
memcpy(buf, &seq, sizeof(seq));
|
||||
for (size_t i = sizeof(seq); i < size; ++i) {
|
||||
buf[i] = (uint8_t)((seq + i) & 0xFF);
|
||||
}
|
||||
}
|
||||
|
||||
/* Verify payload matches seq. Возвращает 0 если ok. */
|
||||
static int verify_payload(const uint8_t *buf, size_t size, uint64_t expected_seq) {
|
||||
uint64_t seq_in_buf;
|
||||
if (size < sizeof(seq_in_buf)) return -1;
|
||||
memcpy(&seq_in_buf, buf, sizeof(seq_in_buf));
|
||||
if (seq_in_buf != expected_seq) return -2;
|
||||
for (size_t i = sizeof(seq_in_buf); i < size; ++i) {
|
||||
if (buf[i] != (uint8_t)((expected_seq + i) & 0xFF)) return -3;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static cuframes_publisher_t *make_publisher(void) {
|
||||
cuframes_publisher_config_t cfg = {0};
|
||||
cfg.key = KEY;
|
||||
cfg.width = 320;
|
||||
cfg.height = 240;
|
||||
cfg.format = CUFRAMES_FORMAT_NV12;
|
||||
cfg.ownership = CUFRAMES_OWNERSHIP_LIBRARY;
|
||||
cfg.ring_size = 2;
|
||||
cfg.policy = CUFRAMES_POLICY_DROP_OLDEST;
|
||||
cfg.cuda_device = 0;
|
||||
cuframes_publisher_t *pub = NULL;
|
||||
CHECK(cuframes_publisher_create(&cfg, &pub));
|
||||
|
||||
cuframes_packet_ring_options_t pkt_opts = {0};
|
||||
pkt_opts.codec_id = 27; /* AV_CODEC_ID_H264 */
|
||||
pkt_opts.ring_slots = 64;
|
||||
pkt_opts.data_size = 8 * 1024 * 1024;
|
||||
pkt_opts.max_packet_size = LARGE_PKT * 2;
|
||||
CHECK(cuframes_publisher_enable_packets(pub, &pkt_opts));
|
||||
|
||||
/* Fake SPS/PPS — 16 байт */
|
||||
uint8_t extradata[16];
|
||||
for (int i = 0; i < 16; ++i) extradata[i] = (uint8_t)(0xAA + i);
|
||||
CHECK(cuframes_publisher_set_codec_extradata(pub, extradata, sizeof(extradata)));
|
||||
return pub;
|
||||
}
|
||||
|
||||
/* Subscriber-процесс. read_delay_us позволяет имитировать slow consumer. */
|
||||
static int run_subscriber(int read_delay_us, int *out_received, int *out_overruns,
|
||||
int *out_first_key_seq) {
|
||||
/* Wait чтобы publisher успел создать SHM */
|
||||
usleep(100 * 1000);
|
||||
|
||||
cuframes_subscriber_config_t cfg = {0};
|
||||
cfg.key = KEY;
|
||||
cfg.mode = CUFRAMES_MODE_NEWEST_ONLY;
|
||||
cfg.cuda_device = 0;
|
||||
cfg.connect_timeout_ms = 5000;
|
||||
cuframes_subscriber_t *sub = NULL;
|
||||
CHECK(cuframes_subscriber_create(&cfg, &sub));
|
||||
|
||||
CHECK(cuframes_subscriber_enable_packets(sub));
|
||||
|
||||
/* Verify codec params */
|
||||
uint32_t codec_id = 0;
|
||||
const void *extradata = NULL;
|
||||
size_t extradata_sz = 0;
|
||||
int r = cuframes_subscriber_get_codec_params(sub, &codec_id, &extradata, &extradata_sz);
|
||||
EXPECT_TRUE(r == CUFRAMES_OK);
|
||||
EXPECT_TRUE(codec_id == 27);
|
||||
EXPECT_TRUE(extradata_sz == 16);
|
||||
|
||||
int received = 0;
|
||||
int overruns = 0;
|
||||
int first_key_seq = -1;
|
||||
int64_t last_pts = -1;
|
||||
int data_errors = 0;
|
||||
|
||||
/* Run на ~30s или до того как publisher закончит. */
|
||||
time_t start = time(NULL);
|
||||
while (time(NULL) - start < 30) {
|
||||
cuframes_packet_t *pkt = NULL;
|
||||
int rc = cuframes_subscriber_next_packet(sub, &pkt, 500);
|
||||
if (rc == CUFRAMES_ERR_TIMEOUT || rc == CUFRAMES_ERR_WOULD_BLOCK) {
|
||||
if (received >= TOTAL_PACKETS / 2) break; /* достаточно для теста */
|
||||
continue;
|
||||
}
|
||||
if (rc == CUFRAMES_ERR_DISCONNECTED) break;
|
||||
if (rc == CUFRAMES_ERR_PACKET_OVERRUN) {
|
||||
overruns++;
|
||||
continue; /* library resync'нет на next call */
|
||||
}
|
||||
if (rc != CUFRAMES_OK) {
|
||||
fprintf(stderr, "next_packet rc=%d (%s)\n", rc, cuframes_strerror(rc));
|
||||
break;
|
||||
}
|
||||
|
||||
const uint8_t *data = (const uint8_t *)cuframes_packet_data(pkt);
|
||||
size_t size = cuframes_packet_size(pkt);
|
||||
int64_t pts = cuframes_packet_pts(pkt);
|
||||
uint32_t flags = cuframes_packet_flags(pkt);
|
||||
uint64_t seq = cuframes_packet_seq(pkt);
|
||||
|
||||
if (verify_payload(data, size, seq) != 0) {
|
||||
data_errors++;
|
||||
}
|
||||
|
||||
if ((flags & CUFRAMES_PKT_FLAG_KEY) && first_key_seq < 0) {
|
||||
first_key_seq = (int)seq;
|
||||
}
|
||||
if (pts <= last_pts && last_pts >= 0) {
|
||||
fprintf(stderr, "PTS не монотонно: %ld <= %ld (seq=%lu)\n",
|
||||
pts, last_pts, seq);
|
||||
}
|
||||
last_pts = pts;
|
||||
received++;
|
||||
|
||||
cuframes_subscriber_release_packet(sub, pkt);
|
||||
|
||||
if (read_delay_us > 0) usleep(read_delay_us);
|
||||
}
|
||||
|
||||
EXPECT_TRUE(data_errors == 0);
|
||||
cuframes_subscriber_destroy(sub);
|
||||
|
||||
*out_received = received;
|
||||
*out_overruns = overruns;
|
||||
*out_first_key_seq = first_key_seq;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void publisher_loop(int total_packets, int inter_packet_us) {
|
||||
cuframes_publisher_t *pub = make_publisher();
|
||||
|
||||
/* Buffer pre-alloc — max size */
|
||||
uint8_t *buf = (uint8_t *)malloc(LARGE_PKT);
|
||||
EXPECT_TRUE(buf != NULL);
|
||||
|
||||
for (int i = 0; i < total_packets; ++i) {
|
||||
int is_key = (i % GOP_SIZE == 0);
|
||||
size_t size = is_key ? LARGE_PKT : SMALL_PKT + (i % 8) * 1024;
|
||||
gen_payload(buf, size, (uint64_t)i);
|
||||
|
||||
int64_t pts_ns = (int64_t)i * 33333333LL; /* ~30 fps */
|
||||
uint32_t flags = is_key ? CUFRAMES_PKT_FLAG_KEY : 0;
|
||||
int rc = cuframes_publisher_publish_packet(pub, buf, size,
|
||||
pts_ns, pts_ns, flags);
|
||||
if (rc != CUFRAMES_OK) {
|
||||
fprintf(stderr, "publish rc=%d size=%zu\n", rc, size);
|
||||
}
|
||||
if (inter_packet_us > 0) usleep(inter_packet_us);
|
||||
}
|
||||
free(buf);
|
||||
cuframes_publisher_destroy(pub);
|
||||
}
|
||||
|
||||
static int check_no_leaked_shm(void) {
|
||||
int fail = 0;
|
||||
char path[256];
|
||||
snprintf(path, sizeof(path), "/dev/shm/cuframes-%s", KEY);
|
||||
if (access(path, F_OK) == 0) {
|
||||
fprintf(stderr, "LEAKED %s\n", path);
|
||||
fail = 1;
|
||||
}
|
||||
snprintf(path, sizeof(path), "/dev/shm/cuframes-%s-packets", KEY);
|
||||
if (access(path, F_OK) == 0) {
|
||||
fprintf(stderr, "LEAKED %s\n", path);
|
||||
fail = 1;
|
||||
}
|
||||
return fail;
|
||||
}
|
||||
|
||||
static int scenario_normal_flow(void) {
|
||||
fprintf(stderr, "[scenario 1] normal flow — fast consumer\n");
|
||||
|
||||
pid_t pid = fork();
|
||||
EXPECT_TRUE(pid >= 0);
|
||||
if (pid == 0) {
|
||||
/* child = subscriber */
|
||||
int received = 0, overruns = 0, first_key = -1;
|
||||
run_subscriber(0, &received, &overruns, &first_key);
|
||||
fprintf(stderr, " consumer: received=%d overruns=%d first_key_seq=%d\n",
|
||||
received, overruns, first_key);
|
||||
EXPECT_TRUE(received >= TOTAL_PACKETS / 2);
|
||||
EXPECT_TRUE(overruns == 0);
|
||||
EXPECT_TRUE(first_key >= 0);
|
||||
exit(0);
|
||||
}
|
||||
|
||||
/* parent = publisher (медленнее чем consumer) */
|
||||
publisher_loop(TOTAL_PACKETS, 1000); /* 1ms между packets = 1000 fps */
|
||||
int status = 0;
|
||||
waitpid(pid, &status, 0);
|
||||
EXPECT_TRUE(WIFEXITED(status) && WEXITSTATUS(status) == 0);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int scenario_slow_consumer(void) {
|
||||
fprintf(stderr, "[scenario 2] slow consumer — must hit OVERRUN + resync\n");
|
||||
|
||||
pid_t pid = fork();
|
||||
EXPECT_TRUE(pid >= 0);
|
||||
if (pid == 0) {
|
||||
/* child = очень медленный subscriber */
|
||||
int received = 0, overruns = 0, first_key = -1;
|
||||
run_subscriber(10 * 1000, &received, &overruns, &first_key); /* 10ms */
|
||||
fprintf(stderr, " consumer: received=%d overruns=%d first_key_seq=%d\n",
|
||||
received, overruns, first_key);
|
||||
/* Должны быть overruns поскольку publisher faster */
|
||||
EXPECT_TRUE(overruns > 0);
|
||||
/* И всё-таки что-то получили (resync работает) */
|
||||
EXPECT_TRUE(received > 10);
|
||||
exit(0);
|
||||
}
|
||||
|
||||
/* publisher fast — 200 fps */
|
||||
publisher_loop(TOTAL_PACKETS, 5 * 1000);
|
||||
int status = 0;
|
||||
waitpid(pid, &status, 0);
|
||||
EXPECT_TRUE(WIFEXITED(status) && WEXITSTATUS(status) == 0);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int main(void) {
|
||||
signal(SIGPIPE, SIG_IGN);
|
||||
|
||||
scenario_normal_flow();
|
||||
/* Ensure clean inter-test state */
|
||||
usleep(200 * 1000);
|
||||
if (check_no_leaked_shm()) exit(2);
|
||||
|
||||
scenario_slow_consumer();
|
||||
usleep(200 * 1000);
|
||||
if (check_no_leaked_shm()) exit(2);
|
||||
|
||||
fprintf(stderr, "OK — all scenarios passed\n");
|
||||
return 0;
|
||||
}
|
||||
Reference in New Issue
Block a user