diff --git a/CHANGELOG.md b/CHANGELOG.md index 2b7c2e2..9c39847 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,45 @@ Формат основан на [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), проект следует [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] — v0.2 work in progress + +См. PR [#4](https://git.goldix.org/gx/cuframes/pulls/4). + +### Added + +- **Encoded packet ring** — параллельный ring для H.264/H.265 NAL units + (отдельный SHM `/dev/shm/cuframes--packets`, variable-length byte + buffer + slot index, seqlock-style read для защиты от overrun). +- **Wire protocol v2** (`proto_version = 2` в SHM header). Backward-compat: + v2 publishers принимают v1 subscribers (frames-only). +- **Public C API** (`include/cuframes/cuframes.h`): + - `cuframes_publisher_enable_packets(opts)` — активирует ring + - `cuframes_publisher_set_codec_extradata(data, size)` — SPS/PPS + - `cuframes_publisher_publish_packet(data, size, pts, dts, flags)` + - `cuframes_subscriber_enable_packets()` + `_next_packet()` + accessors + - `cuframes_subscriber_get_codec_params(codec_id, extradata, size)` +- **`cuframes::Publisher`** (C++ RAII): `enable_packets`, `set_codec_extradata`, + `publish_packet` методы. +- **`cuframes-rtsp-source`**: новый CLI flag `--enable-packet-ring`. + Дублирует `AVPacket` в encoded ring до передачи декодеру. +- **FFmpeg demuxer `cuframes_packets://`** (отдельная ветка + [gx/ffmpeg-patched PR #1](https://git.goldix.org/gx/ffmpeg-patched/pulls/1)). + Companion к `cuframes://`. Use case: Frigate `record` role без + второго RTSP к камере. +- **4 новых error codes**: `PACKET_OVERSIZED`, `NO_PACKET_RING`, + `NO_CODEC_PARAMS`, `PACKET_OVERRUN`. +- **Stress test** `libcuframes/tests/test_packet_ring.c`: 2 scenarios — + normal flow (1 pub × 1 sub × 2000 packets, integrity check) + + slow consumer (must hit OVERRUN + library auto-resync на keyframe). +- **Protocol spec §10** в `docs/protocol.md` (397 строк): byte-exact + layout, seqlock semantics, late-subscriber GOP-aligned start. + +### Limitations (документировано) + +- Sub-stream selection отложено в v0.3 (`-substream-` naming). +- Audio packets — v0.3 (тот же ring layout, codec_id = audio). +- Codec change mid-stream — требует publisher destroy+recreate. + ## [0.1.0] — 2026-05-17 Первый функциональный release с production deployment. diff --git a/docs/integrations/frigate.md b/docs/integrations/frigate.md index 3ddd03c..d8c9a50 100644 --- a/docs/integrations/frigate.md +++ b/docs/integrations/frigate.md @@ -297,9 +297,68 @@ encoded packet path (v0.2). Не относится к cuframes — это нормальное поведение Frigate's go2rtc для TCP transport. TV/VLC обычно использует UDP — оно работает. +## v0.2: dual-input (detect + record через один RTSP) + +После cuframes v0.2 publisher активирует **encoded packet ring** параллельно +с decoded frames ring. Это даёт Frigate одновременно: + +- `cuframes://` — **decoded NV12** для `detect` role (как в v0.1) +- `cuframes_packets://` — **encoded H.264/H.265** для `record` role + (passthrough, без decode) + +→ **1 RTSP connection** к камере вместо 2-3 (Frigate сейчас открывает +отдельный stream для record). + +### Setup + +```bash +cuframes-rtsp-source \ + --rtsp rtsp://admin:pw@192.168.88.98/cam/realmonitor?channel=1 \ + --key cam-parking \ + --enable-packet-ring +``` + +Publisher держит **два** SHM: +- `/dev/shm/cuframes-cam-parking` (decoded NV12, v0.1) +- `/dev/shm/cuframes-cam-parking-packets` (encoded packets, v0.2) + +### Frigate config + +```yaml +cameras: + cam_parking: + ffmpeg: + inputs: + - path: cuframes://cam-parking + input_args: -f cuframes + roles: [detect] + - path: cuframes_packets://cam-parking + input_args: -f cuframes_packets + roles: [record] +``` + +### Requirements + +- Patched FFmpeg с обоими demuxer'ами: + [gx/ffmpeg-patched PR #1](https://git.goldix.org/gx/ffmpeg-patched/pulls/1). +- Frigate Dockerfile перекомпилирован с этим ffmpeg (см. секцию выше про + `cuframes-frigate:0.17` build). + +### Trade-offs + +| Метрика | v0.1 (frames only) | v0.2 (frames + packets) | +|---|---|---| +| RTSP к камере | 1 (publisher) | 1 (publisher) | +| Frigate-side RTSP | 1+ (record отдельно) | **0** — всё через cuframes | +| Camera RTSP streams | 2+ | **1** | +| Доп. VRAM | ring (~10 MB) | без изменений | +| Доп. host RAM | минимум | + 8 MB на packet ring | +| Доп. CPU | nominal | nominal (memcpy в shared ring) | + ## См. также - [filter/README.md](../../filter/README.md) — детали FFmpeg demuxer + patch - [docs/integration.md](../integration.md) — общий integration guide +- [docs/protocol.md §10](../protocol.md#10-v02-extension-encoded-packet-ring-proto_version2) — wire-protocol spec для packet ring - [BENCHMARKS.md](../../BENCHMARKS.md) — production-measured результаты -- [ROADMAP.md](../../ROADMAP.md) — v0.2 что улучшит для Frigate +- [ROADMAP.md](../../ROADMAP.md) — v0.3+ planned features diff --git a/libcuframes/tests/CMakeLists.txt b/libcuframes/tests/CMakeLists.txt index 11ecdc7..620d080 100644 --- a/libcuframes/tests/CMakeLists.txt +++ b/libcuframes/tests/CMakeLists.txt @@ -22,3 +22,11 @@ target_include_directories(test_stress PRIVATE ${CMAKE_SOURCE_DIR}/include) add_test(NAME stress_4consumer COMMAND test_stress) set_tests_properties(stress_4consumer PROPERTIES TIMEOUT 120) + +# v0.2 — packet ring tests (host-only, без CUDA в test-коде) +add_executable(test_packet_ring test_packet_ring.c) +target_link_libraries(test_packet_ring PRIVATE cuframes) +target_include_directories(test_packet_ring PRIVATE + ${CMAKE_SOURCE_DIR}/include) +add_test(NAME packet_ring_basic COMMAND test_packet_ring) +set_tests_properties(packet_ring_basic PROPERTIES TIMEOUT 120) diff --git a/libcuframes/tests/test_packet_ring.c b/libcuframes/tests/test_packet_ring.c new file mode 100644 index 0000000..1b97e11 --- /dev/null +++ b/libcuframes/tests/test_packet_ring.c @@ -0,0 +1,280 @@ +/* Stress test для encoded packet ring (v0.2). + * + * Сценарии: + * 1) Normal flow: 1 publisher × 1 subscriber × 2000 packets, varied sizes, + * каждые 30 packets — KEY flag (имитация GOP). Subscriber проверяет: + * - монотонные seq (без пропусков в этом тесте — fast consumer) + * - data integrity через checksum (XOR fold) + * - PTS/DTS monotonic, KEY flag доходит + * 2) Slow subscriber: publisher шлёт быстрее чем subscriber читает → + * должен случиться OVERRUN, library resync'нет на keyframe. + * 3) Cleanup: после exit нет leaked SHM в /dev/shm. + * + * Без CUDA-зависимостей (packets host-side). + */ +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define KEY "test_pkt_ring" +#define TOTAL_PACKETS 2000 +#define GOP_SIZE 30 +#define SMALL_PKT 4096 +#define LARGE_PKT (256 * 1024) + +#define CHECK(call) do { int _r = (call); if (_r != 0) { \ + fprintf(stderr, "FAIL %s:%d (rc=%d): %s\n", __FILE__, __LINE__, _r, \ + cuframes_strerror(_r)); exit(2); } } while (0) + +#define EXPECT_TRUE(cond) do { if (!(cond)) { \ + fprintf(stderr, "EXPECT_TRUE failed at %s:%d: %s\n", \ + __FILE__, __LINE__, #cond); exit(2); } } while (0) + +/* Сгенерировать payload: первые 8 байт = seq (little-endian), остальное pattern. */ +static void gen_payload(uint8_t *buf, size_t size, uint64_t seq) { + memcpy(buf, &seq, sizeof(seq)); + for (size_t i = sizeof(seq); i < size; ++i) { + buf[i] = (uint8_t)((seq + i) & 0xFF); + } +} + +/* Verify payload matches seq. Возвращает 0 если ok. */ +static int verify_payload(const uint8_t *buf, size_t size, uint64_t expected_seq) { + uint64_t seq_in_buf; + if (size < sizeof(seq_in_buf)) return -1; + memcpy(&seq_in_buf, buf, sizeof(seq_in_buf)); + if (seq_in_buf != expected_seq) return -2; + for (size_t i = sizeof(seq_in_buf); i < size; ++i) { + if (buf[i] != (uint8_t)((expected_seq + i) & 0xFF)) return -3; + } + return 0; +} + +static cuframes_publisher_t *make_publisher(void) { + cuframes_publisher_config_t cfg = {0}; + cfg.key = KEY; + cfg.width = 320; + cfg.height = 240; + cfg.format = CUFRAMES_FORMAT_NV12; + cfg.ownership = CUFRAMES_OWNERSHIP_LIBRARY; + cfg.ring_size = 2; + cfg.policy = CUFRAMES_POLICY_DROP_OLDEST; + cfg.cuda_device = 0; + cuframes_publisher_t *pub = NULL; + CHECK(cuframes_publisher_create(&cfg, &pub)); + + cuframes_packet_ring_options_t pkt_opts = {0}; + pkt_opts.codec_id = 27; /* AV_CODEC_ID_H264 */ + pkt_opts.ring_slots = 64; + pkt_opts.data_size = 8 * 1024 * 1024; + pkt_opts.max_packet_size = LARGE_PKT * 2; + CHECK(cuframes_publisher_enable_packets(pub, &pkt_opts)); + + /* Fake SPS/PPS — 16 байт */ + uint8_t extradata[16]; + for (int i = 0; i < 16; ++i) extradata[i] = (uint8_t)(0xAA + i); + CHECK(cuframes_publisher_set_codec_extradata(pub, extradata, sizeof(extradata))); + return pub; +} + +/* Subscriber-процесс. read_delay_us позволяет имитировать slow consumer. */ +static int run_subscriber(int read_delay_us, int *out_received, int *out_overruns, + int *out_first_key_seq) { + /* Wait чтобы publisher успел создать SHM */ + usleep(100 * 1000); + + cuframes_subscriber_config_t cfg = {0}; + cfg.key = KEY; + cfg.mode = CUFRAMES_MODE_NEWEST_ONLY; + cfg.cuda_device = 0; + cfg.connect_timeout_ms = 5000; + cuframes_subscriber_t *sub = NULL; + CHECK(cuframes_subscriber_create(&cfg, &sub)); + + CHECK(cuframes_subscriber_enable_packets(sub)); + + /* Verify codec params */ + uint32_t codec_id = 0; + const void *extradata = NULL; + size_t extradata_sz = 0; + int r = cuframes_subscriber_get_codec_params(sub, &codec_id, &extradata, &extradata_sz); + EXPECT_TRUE(r == CUFRAMES_OK); + EXPECT_TRUE(codec_id == 27); + EXPECT_TRUE(extradata_sz == 16); + + int received = 0; + int overruns = 0; + int first_key_seq = -1; + int64_t last_pts = -1; + int data_errors = 0; + + /* Run на ~30s или до того как publisher закончит. */ + time_t start = time(NULL); + while (time(NULL) - start < 30) { + cuframes_packet_t *pkt = NULL; + int rc = cuframes_subscriber_next_packet(sub, &pkt, 500); + if (rc == CUFRAMES_ERR_TIMEOUT || rc == CUFRAMES_ERR_WOULD_BLOCK) { + if (received >= TOTAL_PACKETS / 2) break; /* достаточно для теста */ + continue; + } + if (rc == CUFRAMES_ERR_DISCONNECTED) break; + if (rc == CUFRAMES_ERR_PACKET_OVERRUN) { + overruns++; + continue; /* library resync'нет на next call */ + } + if (rc != CUFRAMES_OK) { + fprintf(stderr, "next_packet rc=%d (%s)\n", rc, cuframes_strerror(rc)); + break; + } + + const uint8_t *data = (const uint8_t *)cuframes_packet_data(pkt); + size_t size = cuframes_packet_size(pkt); + int64_t pts = cuframes_packet_pts(pkt); + uint32_t flags = cuframes_packet_flags(pkt); + uint64_t seq = cuframes_packet_seq(pkt); + + if (verify_payload(data, size, seq) != 0) { + data_errors++; + } + + if ((flags & CUFRAMES_PKT_FLAG_KEY) && first_key_seq < 0) { + first_key_seq = (int)seq; + } + if (pts <= last_pts && last_pts >= 0) { + fprintf(stderr, "PTS не монотонно: %ld <= %ld (seq=%lu)\n", + pts, last_pts, seq); + } + last_pts = pts; + received++; + + cuframes_subscriber_release_packet(sub, pkt); + + if (read_delay_us > 0) usleep(read_delay_us); + } + + EXPECT_TRUE(data_errors == 0); + cuframes_subscriber_destroy(sub); + + *out_received = received; + *out_overruns = overruns; + *out_first_key_seq = first_key_seq; + return 0; +} + +static void publisher_loop(int total_packets, int inter_packet_us) { + cuframes_publisher_t *pub = make_publisher(); + + /* Buffer pre-alloc — max size */ + uint8_t *buf = (uint8_t *)malloc(LARGE_PKT); + EXPECT_TRUE(buf != NULL); + + for (int i = 0; i < total_packets; ++i) { + int is_key = (i % GOP_SIZE == 0); + size_t size = is_key ? LARGE_PKT : SMALL_PKT + (i % 8) * 1024; + gen_payload(buf, size, (uint64_t)i); + + int64_t pts_ns = (int64_t)i * 33333333LL; /* ~30 fps */ + uint32_t flags = is_key ? CUFRAMES_PKT_FLAG_KEY : 0; + int rc = cuframes_publisher_publish_packet(pub, buf, size, + pts_ns, pts_ns, flags); + if (rc != CUFRAMES_OK) { + fprintf(stderr, "publish rc=%d size=%zu\n", rc, size); + } + if (inter_packet_us > 0) usleep(inter_packet_us); + } + free(buf); + cuframes_publisher_destroy(pub); +} + +static int check_no_leaked_shm(void) { + int fail = 0; + char path[256]; + snprintf(path, sizeof(path), "/dev/shm/cuframes-%s", KEY); + if (access(path, F_OK) == 0) { + fprintf(stderr, "LEAKED %s\n", path); + fail = 1; + } + snprintf(path, sizeof(path), "/dev/shm/cuframes-%s-packets", KEY); + if (access(path, F_OK) == 0) { + fprintf(stderr, "LEAKED %s\n", path); + fail = 1; + } + return fail; +} + +static int scenario_normal_flow(void) { + fprintf(stderr, "[scenario 1] normal flow — fast consumer\n"); + + pid_t pid = fork(); + EXPECT_TRUE(pid >= 0); + if (pid == 0) { + /* child = subscriber */ + int received = 0, overruns = 0, first_key = -1; + run_subscriber(0, &received, &overruns, &first_key); + fprintf(stderr, " consumer: received=%d overruns=%d first_key_seq=%d\n", + received, overruns, first_key); + EXPECT_TRUE(received >= TOTAL_PACKETS / 2); + EXPECT_TRUE(overruns == 0); + EXPECT_TRUE(first_key >= 0); + exit(0); + } + + /* parent = publisher (медленнее чем consumer) */ + publisher_loop(TOTAL_PACKETS, 1000); /* 1ms между packets = 1000 fps */ + int status = 0; + waitpid(pid, &status, 0); + EXPECT_TRUE(WIFEXITED(status) && WEXITSTATUS(status) == 0); + return 0; +} + +static int scenario_slow_consumer(void) { + fprintf(stderr, "[scenario 2] slow consumer — must hit OVERRUN + resync\n"); + + pid_t pid = fork(); + EXPECT_TRUE(pid >= 0); + if (pid == 0) { + /* child = очень медленный subscriber */ + int received = 0, overruns = 0, first_key = -1; + run_subscriber(10 * 1000, &received, &overruns, &first_key); /* 10ms */ + fprintf(stderr, " consumer: received=%d overruns=%d first_key_seq=%d\n", + received, overruns, first_key); + /* Должны быть overruns поскольку publisher faster */ + EXPECT_TRUE(overruns > 0); + /* И всё-таки что-то получили (resync работает) */ + EXPECT_TRUE(received > 10); + exit(0); + } + + /* publisher fast — 200 fps */ + publisher_loop(TOTAL_PACKETS, 5 * 1000); + int status = 0; + waitpid(pid, &status, 0); + EXPECT_TRUE(WIFEXITED(status) && WEXITSTATUS(status) == 0); + return 0; +} + +int main(void) { + signal(SIGPIPE, SIG_IGN); + + scenario_normal_flow(); + /* Ensure clean inter-test state */ + usleep(200 * 1000); + if (check_no_leaked_shm()) exit(2); + + scenario_slow_consumer(); + usleep(200 * 1000); + if (check_no_leaked_shm()) exit(2); + + fprintf(stderr, "OK — all scenarios passed\n"); + return 0; +}