diff --git a/.gitea/workflows/build.yml b/.gitea/workflows/build.yml index cfabd1a..e868875 100644 --- a/.gitea/workflows/build.yml +++ b/.gitea/workflows/build.yml @@ -25,12 +25,30 @@ jobs: # Ставим Node 20 из NodeSource repo. - name: Bootstrap Node 20 + git (для actions/checkout) run: | + set -e export DEBIAN_FRONTEND=noninteractive apt-get update apt-get install -y --no-install-recommends curl git ca-certificates gnupg - curl -fsSL https://deb.nodesource.com/setup_20.x | bash - + # NodeSource setup может молча упасть на slow networks (особенно через VPN + # на u4-runner); retry + явная verification что Node >= 18 после install. + for i in 1 2 3; do + if curl -fsSL --retry 3 --retry-delay 5 --connect-timeout 30 \ + https://deb.nodesource.com/setup_20.x | bash -; then + break + fi + echo "NodeSource setup attempt $i failed, retrying..." + sleep 10 + done apt-get install -y --no-install-recommends nodejs - node --version + NODE_VER=$(node --version) + echo "node: $NODE_VER" + # actions/checkout@v4 требует Node 20+ (ES2022 static blocks). + # Если NodeSource setup упал и установился Ubuntu's Node 12 — фейлим явно. + NODE_MAJOR=$(echo "$NODE_VER" | sed -E 's/^v([0-9]+).*/\1/') + if [ "$NODE_MAJOR" -lt 18 ]; then + echo "ERROR: Node $NODE_VER too old, NodeSource setup likely failed" >&2 + exit 1 + fi - name: Install build deps run: | @@ -80,12 +98,30 @@ jobs: steps: - name: Bootstrap Node 20 + git (для actions/checkout) run: | + set -e export DEBIAN_FRONTEND=noninteractive apt-get update apt-get install -y --no-install-recommends curl git ca-certificates gnupg - curl -fsSL https://deb.nodesource.com/setup_20.x | bash - + # NodeSource setup может молча упасть на slow networks (особенно через VPN + # на u4-runner); retry + явная verification что Node >= 18 после install. + for i in 1 2 3; do + if curl -fsSL --retry 3 --retry-delay 5 --connect-timeout 30 \ + https://deb.nodesource.com/setup_20.x | bash -; then + break + fi + echo "NodeSource setup attempt $i failed, retrying..." + sleep 10 + done apt-get install -y --no-install-recommends nodejs - node --version + NODE_VER=$(node --version) + echo "node: $NODE_VER" + # actions/checkout@v4 требует Node 20+ (ES2022 static blocks). + # Если NodeSource setup упал и установился Ubuntu's Node 12 — фейлим явно. + NODE_MAJOR=$(echo "$NODE_VER" | sed -E 's/^v([0-9]+).*/\1/') + if [ "$NODE_MAJOR" -lt 18 ]; then + echo "ERROR: Node $NODE_VER too old, NodeSource setup likely failed" >&2 + exit 1 + fi - name: Install build deps run: | diff --git a/CHANGELOG.md b/CHANGELOG.md index 2b7c2e2..9c39847 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,45 @@ Формат основан на [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), проект следует [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [Unreleased] — v0.2 work in progress + +См. PR [#4](https://git.goldix.org/gx/cuframes/pulls/4). + +### Added + +- **Encoded packet ring** — параллельный ring для H.264/H.265 NAL units + (отдельный SHM `/dev/shm/cuframes--packets`, variable-length byte + buffer + slot index, seqlock-style read для защиты от overrun). +- **Wire protocol v2** (`proto_version = 2` в SHM header). Backward-compat: + v2 publishers принимают v1 subscribers (frames-only). +- **Public C API** (`include/cuframes/cuframes.h`): + - `cuframes_publisher_enable_packets(opts)` — активирует ring + - `cuframes_publisher_set_codec_extradata(data, size)` — SPS/PPS + - `cuframes_publisher_publish_packet(data, size, pts, dts, flags)` + - `cuframes_subscriber_enable_packets()` + `_next_packet()` + accessors + - `cuframes_subscriber_get_codec_params(codec_id, extradata, size)` +- **`cuframes::Publisher`** (C++ RAII): `enable_packets`, `set_codec_extradata`, + `publish_packet` методы. +- **`cuframes-rtsp-source`**: новый CLI flag `--enable-packet-ring`. + Дублирует `AVPacket` в encoded ring до передачи декодеру. +- **FFmpeg demuxer `cuframes_packets://`** (отдельная ветка + [gx/ffmpeg-patched PR #1](https://git.goldix.org/gx/ffmpeg-patched/pulls/1)). + Companion к `cuframes://`. Use case: Frigate `record` role без + второго RTSP к камере. +- **4 новых error codes**: `PACKET_OVERSIZED`, `NO_PACKET_RING`, + `NO_CODEC_PARAMS`, `PACKET_OVERRUN`. +- **Stress test** `libcuframes/tests/test_packet_ring.c`: 2 scenarios — + normal flow (1 pub × 1 sub × 2000 packets, integrity check) + + slow consumer (must hit OVERRUN + library auto-resync на keyframe). +- **Protocol spec §10** в `docs/protocol.md` (397 строк): byte-exact + layout, seqlock semantics, late-subscriber GOP-aligned start. + +### Limitations (документировано) + +- Sub-stream selection отложено в v0.3 (`-substream-` naming). +- Audio packets — v0.3 (тот же ring layout, codec_id = audio). +- Codec change mid-stream — требует publisher destroy+recreate. + ## [0.1.0] — 2026-05-17 Первый функциональный release с production deployment. diff --git a/docs/integrations/frigate.md b/docs/integrations/frigate.md index 3ddd03c..d8c9a50 100644 --- a/docs/integrations/frigate.md +++ b/docs/integrations/frigate.md @@ -297,9 +297,68 @@ encoded packet path (v0.2). Не относится к cuframes — это нормальное поведение Frigate's go2rtc для TCP transport. TV/VLC обычно использует UDP — оно работает. +## v0.2: dual-input (detect + record через один RTSP) + +После cuframes v0.2 publisher активирует **encoded packet ring** параллельно +с decoded frames ring. Это даёт Frigate одновременно: + +- `cuframes://` — **decoded NV12** для `detect` role (как в v0.1) +- `cuframes_packets://` — **encoded H.264/H.265** для `record` role + (passthrough, без decode) + +→ **1 RTSP connection** к камере вместо 2-3 (Frigate сейчас открывает +отдельный stream для record). + +### Setup + +```bash +cuframes-rtsp-source \ + --rtsp rtsp://admin:pw@192.168.88.98/cam/realmonitor?channel=1 \ + --key cam-parking \ + --enable-packet-ring +``` + +Publisher держит **два** SHM: +- `/dev/shm/cuframes-cam-parking` (decoded NV12, v0.1) +- `/dev/shm/cuframes-cam-parking-packets` (encoded packets, v0.2) + +### Frigate config + +```yaml +cameras: + cam_parking: + ffmpeg: + inputs: + - path: cuframes://cam-parking + input_args: -f cuframes + roles: [detect] + - path: cuframes_packets://cam-parking + input_args: -f cuframes_packets + roles: [record] +``` + +### Requirements + +- Patched FFmpeg с обоими demuxer'ами: + [gx/ffmpeg-patched PR #1](https://git.goldix.org/gx/ffmpeg-patched/pulls/1). +- Frigate Dockerfile перекомпилирован с этим ffmpeg (см. секцию выше про + `cuframes-frigate:0.17` build). + +### Trade-offs + +| Метрика | v0.1 (frames only) | v0.2 (frames + packets) | +|---|---|---| +| RTSP к камере | 1 (publisher) | 1 (publisher) | +| Frigate-side RTSP | 1+ (record отдельно) | **0** — всё через cuframes | +| Camera RTSP streams | 2+ | **1** | +| Доп. VRAM | ring (~10 MB) | без изменений | +| Доп. host RAM | минимум | + 8 MB на packet ring | +| Доп. CPU | nominal | nominal (memcpy в shared ring) | + ## См. также - [filter/README.md](../../filter/README.md) — детали FFmpeg demuxer + patch - [docs/integration.md](../integration.md) — общий integration guide +- [docs/protocol.md §10](../protocol.md#10-v02-extension-encoded-packet-ring-proto_version2) — wire-protocol spec для packet ring - [BENCHMARKS.md](../../BENCHMARKS.md) — production-measured результаты -- [ROADMAP.md](../../ROADMAP.md) — v0.2 что улучшит для Frigate +- [ROADMAP.md](../../ROADMAP.md) — v0.3+ planned features diff --git a/docs/protocol.md b/docs/protocol.md index 8b30783..a344a00 100644 --- a/docs/protocol.md +++ b/docs/protocol.md @@ -423,3 +423,400 @@ TEST(Handshake, HelloRespMismatchProto) { `libcuframes/src/protocol.c` (Phase 1, Step 2) — единственная reference. Любая другая реализация (Python ctypes, Rust bindings, FFmpeg plugin) должна **conformance-tested** против этого документа. + +## 10. v0.2 extension: encoded packet ring (proto_version=2) + +**Статус:** design draft, ещё не реализовано (см. issue #2). + +Параллельно с decoded-frames ring (§2) publisher может опционально +поддерживать **encoded packet ring** — публикует raw H.264/H.265 NAL units +**до** decoder, для consumer'ов которые делают `-c:v copy` (recording, mux). + +### 10.1 Совместимость с v1 + +- v2 publisher принимает **v1-subscribers** — они получают только frames + ring (как v0.1), packet ring им не показывается. +- v1 publisher отвергает v2-subscribers с `wants_packets=true` + (HELLO_RESP error PROTOCOL). +- v1 layout (§2) **не меняется** для frames ring — packet ring это отдельный SHM. + +Publisher version bumping: +- `proto_version` = 2 в SHM header и в HELLO_RESP когда packet ring active. +- Если publisher v2 не активирует packet ring (`enable_packet_ring=false`) + — `proto_version` остаётся 1 (полная v1 compat). + +### 10.2 Дополнительные ресурсы + +| Resource | Path | Назначение | Когда | +|---|---|---|---| +| Packet shared memory | `/dev/shm/cuframes--packets` | Packet ring header + slots + byte buffer | если publisher активировал packet ring | + +Cleanup — симметрично §1: `shm_unlink` при destroy(); orphaned автоматически +если nobody mmap'ит. + +### 10.3 Packet ring layout + +Размер пакетного SHM: `sizeof(packet_ring_header_t) + N×PSE + DATA_SIZE`, +где: +- N = `packet_ring_slots`, default 64 (configurable) +- PSE = `sizeof(packet_slot_entry_t)` = 64 байт (см. §10.5) +- DATA_SIZE = `packet_data_size`, default 8 MB (configurable) + +#### Byte layout + +``` +Offset Size Field Comments +─────────────────────── ────── ────────────────────────── ───────────────────────────── +0x0000 4 magic (LE u32) 0xCC7C1DCD (frames magic + 1) +0x0004 4 proto_version (LE u32) 2 +0x0008 4 ring_slots (LE u32) N (1..1024) +0x000C 4 data_size (LE u32) bytes for packet data ring +0x0010 4 codec_id (LE u32) AV_CODEC_ID_* enum (см. §10.4) +0x0014 4 codec_extradata_size (LE u32) ≤ 4096 +0x0018 8 producer_pid (LE u64) +0x0020 8 global_seq (LE u64, atomic) монотонная по packets +0x0028 8 last_keyframe_seq (LE u64, atomic) для late subscribers +0x0030 8 write_offset (LE u64, atomic) текущий cursor в data ring +0x0038 8 shutdown_flag (LE u64, atomic) +0x0040 4096 codec_extradata SPS/PPS/VPS bytes (см. §10.4) +0x1040 N×64 slots[N] packet_slot_entry_t (см. §10.5) +0x1040+N×64 DATA_SIZE data[] wraparound byte buffer +``` + +Все atomic fields — C11 `_Atomic` (release/acquire semantics для seq updates). + +### 10.4 Codec extradata + +H.264 — SPS + PPS, конкатенированные в **Annex B** формате +(start codes `00 00 00 01`). H.265 — VPS + SPS + PPS. + +`codec_id` соответствует FFmpeg `AV_CODEC_ID_H264`, `AV_CODEC_ID_HEVC`, +`AV_CODEC_ID_AV1` (future). Subscriber пишет этот extradata в +`AVCodecContext.extradata` своего decoder'а (если он его создаёт) +или в `AVStream.codecpar->extradata` для muxer'ов. + +Extradata устанавливается publisher'ом **один раз** при первом keyframe +(или из RTSP SDP до первого packet). После — fixed на lifetime publisher'а +(codec change mid-stream → publisher destroy+recreate с новым ``). + +### 10.5 Packet slot entry (64 байта) + +``` +Offset Size Field Comments +0x00 8 seq (LE u64, atomic) published seq; UINT64_MAX = invalid +0x08 8 pts_ns (LE i64) +0x10 8 dts_ns (LE i64) для B-frames pipelines +0x18 8 data_offset (LE u64) offset в `data[]` секции SHM +0x20 4 data_size (LE u32) size of payload bytes +0x24 4 flags (LE u32) §10.6 +0x28 24 reserved 0 +``` + +`data_offset` может быть **больше** `data_size` секции SHM — semantics +"absolute byte cursor", фактический byte index = `data_offset % data_size`. +Subscriber может detect wrap (если payload crosses end → split read). + +### 10.6 Packet flags + +``` +Bit Name Comments +0 KEY keyframe (IDR for H.264, или CRA/IDR для HEVC). + Critical для late subscribers — must wait IDR. +1 CORRUPT publisher detect'нул что packet damaged + (RTP loss и т.п.) — subscriber может skip +2 DISCONTINUITY был gap перед этим packet + (publisher reconnect к камере) +3 LAST_IN_AU last NAL в access unit (полный frame) + — для muxer'ов которые ждут полный frame +4-31 reserved 0 +``` + +Mapping в `AVPacket.flags`: +- bit 0 (KEY) → `AV_PKT_FLAG_KEY` +- bit 1 (CORRUPT) → `AV_PKT_FLAG_CORRUPT` +- bit 2 (DISCONTINUITY) → `AV_PKT_FLAG_DISCONTINUITY` (FFmpeg 5+) + +### 10.7 Atomic publish (publisher-side) + +```c +// Pseudo-C (упрощено, без error handling) +uint64_t seq = atomic_load(&hdr->global_seq, RELAXED) + 1; +uint64_t off = atomic_load(&hdr->write_offset, RELAXED); + +// 1. Найти free slot (overwrite oldest) +size_t slot_idx = seq % hdr->ring_slots; +packet_slot_entry_t *slot = &slots[slot_idx]; + +// 2. Записать payload bytes (wraparound, может потребовать 2 memcpy) +size_t off_in_ring = off % hdr->data_size; +size_t first_chunk = min(size, hdr->data_size - off_in_ring); +memcpy(data + off_in_ring, payload, first_chunk); +if (first_chunk < size) + memcpy(data, payload + first_chunk, size - first_chunk); + +// 3. RELEASE: записать metadata в slot +slot->pts_ns = pts; +slot->dts_ns = dts; +slot->data_offset = off; +slot->data_size = size; +slot->flags = flags; +atomic_store(&slot->seq, seq, RELEASE); + +// 4. Update global cursor + global_seq +atomic_store(&hdr->write_offset, off + size, RELEASE); +atomic_store(&hdr->global_seq, seq, RELEASE); + +// 5. If KEY → update last_keyframe_seq +if (flags & PKT_FLAG_KEY) + atomic_store(&hdr->last_keyframe_seq, seq, RELEASE); +``` + +### 10.8 Atomic read (subscriber-side) + +```c +// Pseudo-C +uint64_t cur = atomic_load(&hdr->global_seq, ACQUIRE); +if (cur <= my_last_seq) return TIMEOUT; // ничего нового + +uint64_t want_seq = my_last_seq + 1; +size_t slot_idx = want_seq % hdr->ring_slots; +packet_slot_entry_t *slot = &slots[slot_idx]; + +uint64_t slot_seq = atomic_load(&slot->seq, ACQUIRE); +if (slot_seq != want_seq) { + // overrun — slow subscriber. Re-anchor: + want_seq = atomic_load(&hdr->last_keyframe_seq, ACQUIRE); + slot_idx = want_seq % hdr->ring_slots; + slot = &slots[slot_idx]; + return DROPPED; // signal user через flags = DISCONTINUITY +} + +// Copy payload (wraparound aware) +uint64_t off = slot->data_offset % hdr->data_size; +uint32_t size = slot->data_size; +uint32_t first_chunk = min(size, hdr->data_size - off); +memcpy(out_buf, data + off, first_chunk); +if (first_chunk < size) + memcpy(out_buf + first_chunk, data, size - first_chunk); + +// Re-check slot->seq не изменился (защита от overrun mid-read) +if (atomic_load(&slot->seq, ACQUIRE) != want_seq) { + return DROPPED; // publisher overwrote во время copy +} + +my_last_seq = want_seq; +return OK; +``` + +Защита от overrun mid-read через **post-check `slot->seq`** — простая +вариант seqlock. Если publisher успел overwrite между metadata-read и +data-copy — subscriber detect и retry. + +### 10.9 Socket protocol extensions + +#### HELLO_REQ — добавляются flags в reserved field + +v1 layout (§3.3): +``` +[4 bytes] proto_version +[4 bytes] consumer_name_len +[N bytes] consumer_name +[4 bytes] cuda_device +[4 bytes] mode +[12 bytes] reserved (must be 0) ← v0.2 использует первые 4 байта +``` + +v0.2 интерпретирует первые 4 байта `reserved` как `subscribe_flags`: + +| Bit | Name | Comments | +|---|---|---| +| 0 | `WANTS_FRAMES` | подписаться на decoded frames ring (default ON в v1 — implicit) | +| 1 | `WANTS_PACKETS` | подписаться на encoded packet ring | +| 2-31 | reserved | 0 | + +Если v1-subscriber оставляет reserved=0 — publisher v2 интерпретирует это +как `WANTS_FRAMES=true, WANTS_PACKETS=false` (v1 backward-compat). + +#### HELLO_RESP — добавляются packet-ring fields + +v1 layout (§3.4) расширяется в reserved секции: + +``` +[4 bytes] result +[4 bytes] proto_version_actual ← теперь может быть 1 или 2 +[4 bytes] ring_size ← frames ring +[4 bytes] ownership_mode +[64 bytes] frame_meta +[4 bytes] shm_path_len ← frames SHM +[N bytes] shm_path +[12 bytes] reserved ← v0.2 интерпретирует +``` + +v0.2 reserved layout (если `proto_version_actual == 2` И publisher +поддерживает packets): +``` +[4 bytes] packet_shm_path_len (LE u32) 0 = packets disabled at publisher +[N bytes] packet_shm_path (UTF-8) — относительно /dev/shm/, например "cuframes-camA-packets" +[4 bytes] codec_id (LE u32) AV_CODEC_ID_* +[4 bytes] initial_packet_seq (LE u64) last_keyframe_seq на момент handshake + (subscriber должен start с этого seq) +``` + +Если subscriber запросил `WANTS_PACKETS=1` но publisher не имеет packet ring +— `result = ERR_NOT_AVAILABLE`. + +### 10.10 Subscriber state machine extension + +Подключение к **обоим** rings (или одному из): + +``` + ┌──────────┐ + │ HELLO_OK │ proto_version_actual=2, packet_shm_path_len>0 + └────┬─────┘ + │ + ▼ + ┌────────────────────────────────┐ + │ Open frames SHM (если WANTS_FRAMES) │ → standard v1 flow + └────────────────────────────────┘ + │ + ▼ + ┌────────────────────────────────┐ + │ Open packet SHM (если WANTS_PACKETS) │ + │ - mmap /dev/shm/cuframes--packets │ + │ - check magic, proto_version │ + │ - set my_last_packet_seq = initial_packet_seq - 1 │ + │ (так что первый next_packet вернёт IDR) │ + └────────────────────────────────┘ + │ + ▼ + ┌─────────┐ + │ READY │ — frames или packets или оба доступны + └─────────┘ +``` + +### 10.11 Threading в subscriber + +Frames ring и packet ring имеют **разные** `global_seq` counters. +Subscriber имеет **отдельные** `my_last_seq` для каждого. Может +poll'ить обе независимо (или через два threads). + +Producer's `cudaEventRecord` (frames sync) не релевантен для packets — +encoded data на CPU, без CUDA sync. + +### 10.12 Конфигурируемость packet ring + +Publisher API extension (§10.13) принимает параметры: + +```c +typedef struct { + uint32_t packet_ring_slots; // default 64 + uint32_t packet_data_size; // default 8 MB (8388608) + uint32_t max_packet_size; // default 2 MB — sanity guard для оversized + // packets (publisher rejects with error) + uint32_t codec_id; // AV_CODEC_ID_H264 / HEVC / ... +} cuframes_packet_ring_options_t; +``` + +### 10.13 API extension (для cuframes.h) + +```c +/* Сreate publisher с активным packet ring. NULL для opts → packet ring disabled. */ +int cuframes_publisher_create_ex( + const cuframes_publisher_options_t *frames_opts, + const cuframes_packet_ring_options_t *packet_opts, /* NULL = no packet ring */ + cuframes_publisher_t **pub_out +); + +/* Set codec extradata (SPS/PPS) — должен быть called до первого publish_packet. */ +int cuframes_publisher_set_codec_extradata( + cuframes_publisher_t *pub, + const void *extradata, + size_t size +); + +/* Публикация packet. Slow consumer = overwrite oldest. */ +int cuframes_publisher_publish_packet( + cuframes_publisher_t *pub, + const void *data, + size_t size, + int64_t pts_ns, + int64_t dts_ns, + uint32_t flags /* CUFRAMES_PKT_FLAG_KEY | _CORRUPT | _DISCONTINUITY | _LAST_IN_AU */ +); + +/* Subscriber-side: подписаться с opt-in для packets. */ +typedef struct { + /* ... existing v1 fields ... */ + uint32_t subscribe_flags; /* WANTS_FRAMES, WANTS_PACKETS bits */ +} cuframes_subscriber_options_v2_t; + +int cuframes_subscriber_create_v2( + const cuframes_subscriber_options_v2_t *opts, + cuframes_subscriber_t **sub_out +); + +/* Чтение packet. Opaque handle — каллер вызывает release_packet после. */ +typedef struct cuframes_packet cuframes_packet_t; + +int cuframes_subscriber_next_packet( + cuframes_subscriber_t *sub, + cuframes_packet_t **pkt_out, + int32_t timeout_ms +); + +const void * cuframes_packet_data(const cuframes_packet_t *p); +size_t cuframes_packet_size(const cuframes_packet_t *p); +int64_t cuframes_packet_pts(const cuframes_packet_t *p); +int64_t cuframes_packet_dts(const cuframes_packet_t *p); +uint32_t cuframes_packet_flags(const cuframes_packet_t *p); + +int cuframes_subscriber_release_packet(cuframes_subscriber_t *sub, cuframes_packet_t *p); + +/* Codec params для subscriber (extracted из shared header). */ +int cuframes_subscriber_get_codec_params( + cuframes_subscriber_t *sub, + uint32_t *codec_id_out, + const void **extradata_out, + size_t *extradata_size_out +); +``` + +`cuframes_packet_t` opaque — фактически указатель в local-mapped data (на +heap subscriber'а — copy при `next_packet`, освобождение при `release`). +Subscriber **не** держит ссылки на shared ring data между `next_packet` и +`release_packet` — это избавляет от reader-locks. + +### 10.14 Late subscriber → keyframe-aligned start + +При SUBSCRIBE_RESP publisher отвечает `initial_packet_seq = last_keyframe_seq`. + +Subscriber устанавливает `my_last_seq = initial_packet_seq - 1`, так что +первый `next_packet` вернёт keyframe (decoder может start без glitches). + +**Risk:** если в момент handshake **last_keyframe_seq уже выехал из +ring** (slow start subscriber, GOP > ring_slots packets) — subscriber +detect overrun в первом read и переходит на следующий keyframe. + +В implementation `publisher_publish_packet` для оптимизации может маркировать +slot перед IDR как **persistent** (флаг в reserved), но **v0.2 keep simple** — +просто требуем что `packet_ring_slots × avg_packet_size > GOP_size_in_bytes` +для нормальной работы. Sizing guide см. в [docs/integration.md](integration.md). + +### 10.15 Error codes (новые) + +| Code | Name | Когда | +|---|---|---| +| -20 | `CUFRAMES_ERR_PACKET_OVERSIZED` | publish_packet с size > max_packet_size | +| -21 | `CUFRAMES_ERR_NO_PACKET_RING` | subscriber запросил packets, publisher без packet ring | +| -22 | `CUFRAMES_ERR_NO_CODEC_PARAMS` | get_codec_params вызван до set_codec_extradata publisher'ом | +| -23 | `CUFRAMES_ERR_PACKET_OVERRUN` | subscriber slow — packet seq уехал, надо resync на keyframe | + +### 10.16 Open для v0.3+ + +- **Sub-stream selection** — publisher может публиковать несколько + packet rings (для multi-resolution streams). Сейчас один key = один stream. + v0.3 → `-substream-` naming? +- **Codec change mid-stream** — текущий design требует publisher restart. + Future: invalidate codec_extradata + bump generation field. +- **Audio streams** — analogichno в packet ring, но codec_id = audio (AAC, + Opus). v0.3. diff --git a/include/cuframes/cuframes.h b/include/cuframes/cuframes.h index 22e8d6f..775910f 100644 --- a/include/cuframes/cuframes.h +++ b/include/cuframes/cuframes.h @@ -65,6 +65,11 @@ typedef enum cuframes_error { несовпадение размеров frame'а */ CUFRAMES_ERR_WOULD_BLOCK = -11, /**< non-blocking call — no data yet */ CUFRAMES_ERR_TOO_MANY = -12, /**< превышен MAX_SUBSCRIBERS (32) */ + /* v0.2 — packet ring (см. docs/protocol.md §10.15) */ + CUFRAMES_ERR_PACKET_OVERSIZED = -20, /**< publish_packet size > max_packet_size */ + CUFRAMES_ERR_NO_PACKET_RING = -21, /**< subscriber запросил packets, у publisher'а нет ring'а */ + CUFRAMES_ERR_NO_CODEC_PARAMS = -22, /**< extradata ещё не set publisher'ом */ + CUFRAMES_ERR_PACKET_OVERRUN = -23, /**< slow subscriber, packet seq уехал — resync на keyframe */ CUFRAMES_ERR_INTERNAL = -100, /**< bug в библиотеке — repro и reportить */ } cuframes_error_t; @@ -366,6 +371,142 @@ int cuframes_async_subscriber_create(const cuframes_subscriber_config_t *cfg, */ int cuframes_async_subscriber_destroy(cuframes_async_subscriber_t *sub); +/* ─────────────────────────────────────────────────────────────────────── */ +/* Encoded packet ring API (v0.2 — см. docs/protocol.md §10) */ +/* ─────────────────────────────────────────────────────────────────────── */ + +/** Packet flags — биты соответствуют AV_PKT_FLAG_* у FFmpeg. */ +#define CUFRAMES_PKT_FLAG_KEY 0x01u /**< IDR / keyframe */ +#define CUFRAMES_PKT_FLAG_CORRUPT 0x02u /**< RTP loss / damage */ +#define CUFRAMES_PKT_FLAG_DISCONTINUITY 0x04u /**< gap before this packet */ +#define CUFRAMES_PKT_FLAG_LAST_IN_AU 0x08u /**< последний NAL в access unit */ + +typedef struct cuframes_packet_ring_options { + /** Слотов в индексе ring'а. Default 64 (≈ 2 sec @ 30fps + GOP). */ + uint32_t ring_slots; + /** Размер data section ring'а в байтах. Default 8 MiB. */ + uint32_t data_size; + /** Sanity guard — publisher отклонит packet > этого. Default 2 MiB. */ + uint32_t max_packet_size; + /** FFmpeg AV_CODEC_ID_* (H.264 = 27, HEVC = 173). */ + uint32_t codec_id; + uint64_t _reserved[4]; +} cuframes_packet_ring_options_t; + +/** + * @brief Активировать encoded packet ring на существующем publisher'е. + * + * Создаёт дополнительный SHM `/dev/shm/cuframes--packets`. После + * этого call'а publisher шлёт packets через `cuframes_publisher_publish_packet`. + * + * Должно быть вызвано **до** первого `publish_packet` и желательно до того + * как subscribers начнут подключаться (иначе они увидят publisher без packet + * ring и не получат packets). + * + * @param pub + * @param opts NULL = default sizing (64 slots, 8MiB data, 2MiB max). codec_id=0 = unknown. + * @return CUFRAMES_ERR_ALREADY_EXISTS если ring уже активирован + */ +int cuframes_publisher_enable_packets(cuframes_publisher_t *pub, + const cuframes_packet_ring_options_t *opts); + +/** + * @brief Установить codec extradata (SPS/PPS/VPS) для packet ring. + * + * Subscribers (FFmpeg demuxer) читают extradata из shared header и подставляют + * в AVCodecContext.extradata. Должно быть вызвано до того как subscribers + * захотят decode. + * + * @param size ≤ 4096 байт (CUFRAMES_PKT_EXTRADATA_MAX) + */ +int cuframes_publisher_set_codec_extradata(cuframes_publisher_t *pub, + const void *extradata, size_t size); + +/** + * @brief Опубликовать encoded packet (H.264/H.265 NAL units, Annex B). + * + * Slow consumer = overwrite oldest. Late subscriber resync'нется на last + * keyframe (см. docs/protocol.md §10.14). + * + * @param flags CUFRAMES_PKT_FLAG_* (минимум KEY на IDR — критично!) + * @return CUFRAMES_ERR_NO_PACKET_RING если не вызывали enable_packets + * @return CUFRAMES_ERR_PACKET_OVERSIZED если size > max_packet_size + */ +int cuframes_publisher_publish_packet(cuframes_publisher_t *pub, + const void *data, size_t size, + int64_t pts_ns, int64_t dts_ns, + uint32_t flags); + +/* ── Subscriber-side packet API ───────────────────────────────────────── */ + +/** Opaque packet handle. Освобождается через release_packet. */ +typedef struct cuframes_packet cuframes_packet_t; + +/** @brief Pointer на encoded NAL bytes. Valid до release_packet. */ +const void *cuframes_packet_data(const cuframes_packet_t *p); + +/** @brief Размер payload в байтах. */ +size_t cuframes_packet_size(const cuframes_packet_t *p); + +/** @brief Presentation timestamp (наносекунды). */ +int64_t cuframes_packet_pts(const cuframes_packet_t *p); + +/** @brief Decode timestamp (для B-frames pipelines). */ +int64_t cuframes_packet_dts(const cuframes_packet_t *p); + +/** @brief Биты CUFRAMES_PKT_FLAG_*. */ +uint32_t cuframes_packet_flags(const cuframes_packet_t *p); + +/** @brief Sequence number у publisher'а. */ +uint64_t cuframes_packet_seq(const cuframes_packet_t *p); + +/** + * @brief Активировать чтение packet ring на subscriber'е. + * + * Открывает SHM `/dev/shm/cuframes--packets` (тот же `key` что в config). + * После этого можно читать через `cuframes_subscriber_next_packet`. + * + * Subscriber может одновременно иметь frames ring и packets ring (или один из). + * + * @return CUFRAMES_ERR_NOT_FOUND если publisher не имеет packet ring + */ +int cuframes_subscriber_enable_packets(cuframes_subscriber_t *sub); + +/** + * @brief Получить следующий packet. + * + * Late subscriber (первый вызов) начинает с last_keyframe_seq publisher'а — + * decoder receive'нет valid stream без glitches. + * + * Полученный packet ОБЯЗАТЕЛЬНО освободить через + * cuframes_subscriber_release_packet(). + * + * @param timeout_ms <0 = блокироваться, 0 = non-blocking (WOULD_BLOCK), >0 = с таймаутом + * @return CUFRAMES_ERR_PACKET_OVERRUN — subscriber отстал, resync на keyframe (library сделает автоматически на next call) + * @return CUFRAMES_ERR_DISCONNECTED — publisher shutdown + */ +int cuframes_subscriber_next_packet(cuframes_subscriber_t *sub, + cuframes_packet_t **pkt_out, + int32_t timeout_ms); + +/** @brief Освободить packet handle. NULL-safe. */ +int cuframes_subscriber_release_packet(cuframes_subscriber_t *sub, + cuframes_packet_t *pkt); + +/** + * @brief Получить codec parameters publisher'а. + * + * `*extradata_out` — pointer в библиотечный buffer, valid пока subscriber жив. + * Caller должен скопировать данные если хочет hold past subscriber lifetime. + * + * @return CUFRAMES_ERR_NO_CODEC_PARAMS если publisher ещё не вызвал + * set_codec_extradata + */ +int cuframes_subscriber_get_codec_params(cuframes_subscriber_t *sub, + uint32_t *codec_id_out, + const void **extradata_out, + size_t *extradata_size_out); + /* ─────────────────────────────────────────────────────────────────────── */ /* Утилиты */ /* ─────────────────────────────────────────────────────────────────────── */ diff --git a/include/cuframes/cuframes.hpp b/include/cuframes/cuframes.hpp index 846688e..af30b8c 100644 --- a/include/cuframes/cuframes.hpp +++ b/include/cuframes/cuframes.hpp @@ -148,6 +148,23 @@ public: "Publisher::publish_external"); } + /* v0.2 — encoded packet ring */ + void enable_packets(const cuframes_packet_ring_options_t *opts = nullptr) { + check(cuframes_publisher_enable_packets(pub_, opts), + "Publisher::enable_packets"); + } + + void set_codec_extradata(const void *data, size_t size) { + check(cuframes_publisher_set_codec_extradata(pub_, data, size), + "Publisher::set_codec_extradata"); + } + + /* Returns CUFRAMES_OK / negative error code (без throw — caller решает). */ + int publish_packet(const void *data, size_t size, + int64_t pts_ns, int64_t dts_ns, uint32_t flags) noexcept { + return cuframes_publisher_publish_packet(pub_, data, size, pts_ns, dts_ns, flags); + } + cuframes_publisher_t *raw() noexcept { return pub_; } private: diff --git a/libcuframes/CMakeLists.txt b/libcuframes/CMakeLists.txt index 5d166ff..a914178 100644 --- a/libcuframes/CMakeLists.txt +++ b/libcuframes/CMakeLists.txt @@ -10,6 +10,7 @@ set(CUFRAMES_SOURCES src/producer.c src/consumer.c src/consumer_async.c + src/packet_ring.c ) add_library(cuframes SHARED ${CUFRAMES_SOURCES}) diff --git a/libcuframes/src/consumer.c b/libcuframes/src/consumer.c index c6f4392..02f46c1 100644 --- a/libcuframes/src/consumer.c +++ b/libcuframes/src/consumer.c @@ -6,6 +6,7 @@ #include #include #include +#include #include /* Opaque frame — выдаётся subscriber'у на next() */ @@ -23,6 +24,17 @@ struct cuframes_frame { void *subscriber; /* back-ref для release() */ }; +/* Opaque packet handle — single-packet pattern (как frame_obj). */ +struct cuframes_packet { + uint8_t *data; /* heap buffer, allocated by subscriber на enable_packets */ + size_t capacity; /* size of allocation */ + size_t size; /* actual payload size */ + int64_t pts_ns; + int64_t dts_ns; + uint32_t flags; + uint64_t seq; +}; + struct cuframes_subscriber { cuframes_subscriber_config_t cfg; char key[CUFRAMES_MAX_KEY_LEN + 1]; @@ -38,10 +50,16 @@ struct cuframes_subscriber { uint32_t assigned_bit; uint64_t last_seen_seq; - /* Frame pool — переиспользуем одну frame_t structure (single-thread API). - * Опционально расширим до lock-free pool в v0.2 если нужен multi-frame. */ + /* Frame pool — переиспользуем одну frame_t structure (single-thread API). */ struct cuframes_frame frame_obj; int frame_busy; + + /* v0.2 — packet ring (optional, opened via enable_packets). */ + int has_pkt_ring; + cuframes_pkt_ring_t pkt_ring; + uint64_t last_packet_seq; /* UINT64_MAX = no packet read yet */ + struct cuframes_packet packet_obj; + int packet_busy; }; /* ─── Frame accessors ────────────────────────────────────────────────── */ @@ -347,9 +365,143 @@ int cuframes_subscriber_destroy(cuframes_subscriber_t *sub) { if (sub->mapped_ptrs[i]) cudaIpcCloseMemHandle(sub->mapped_ptrs[i]); } + /* Packet ring cleanup */ + if (sub->has_pkt_ring) { + cuframes_internal_pkt_ring_destroy(&sub->pkt_ring); + } + if (sub->packet_obj.data) { + free(sub->packet_obj.data); + sub->packet_obj.data = NULL; + } + if (sub->hdr) munmap(sub->hdr, sizeof(cuframes_shm_header_t)); if (sub->shm_fd >= 0) close(sub->shm_fd); if (sub->sock_fd >= 0) close(sub->sock_fd); free(sub); return CUFRAMES_OK; } + +/* ─────────────────────────────────────────────────────────────────────── */ +/* v0.2 — encoded packet ring API (см. docs/protocol.md §10) */ +/* ─────────────────────────────────────────────────────────────────────── */ + +/* Packet accessors */ +const void *cuframes_packet_data(const cuframes_packet_t *p) { return p ? p->data : NULL; } +size_t cuframes_packet_size(const cuframes_packet_t *p) { return p ? p->size : 0; } +int64_t cuframes_packet_pts(const cuframes_packet_t *p) { return p ? p->pts_ns : 0; } +int64_t cuframes_packet_dts(const cuframes_packet_t *p) { return p ? p->dts_ns : 0; } +uint32_t cuframes_packet_flags(const cuframes_packet_t *p) { return p ? p->flags : 0; } +uint64_t cuframes_packet_seq(const cuframes_packet_t *p) { return p ? p->seq : 0; } + +int cuframes_subscriber_enable_packets(cuframes_subscriber_t *sub) { + if (!sub) return CUFRAMES_ERR_INVALID_ARG; + if (sub->has_pkt_ring) return CUFRAMES_OK; /* idempotent */ + + char pkt_name[128]; + int r = cuframes_internal_pkt_shm_name(sub->key, pkt_name, sizeof(pkt_name)); + if (r != CUFRAMES_OK) return r; + + r = cuframes_internal_pkt_ring_open(pkt_name, &sub->pkt_ring); + if (r != CUFRAMES_OK) return r; + + /* Allocate copy-buffer (max packet size). Используем data_size как + * conservative upper bound (publisher гарантирует data_size >= max_packet_size). */ + size_t capacity = sub->pkt_ring.hdr->data_size; + sub->packet_obj.data = (uint8_t *)malloc(capacity); + if (!sub->packet_obj.data) { + cuframes_internal_pkt_ring_destroy(&sub->pkt_ring); + return CUFRAMES_ERR_OUT_OF_MEMORY; + } + sub->packet_obj.capacity = capacity; + + /* Start с last_keyframe_seq - 1 → первый read даст IDR (§10.14). */ + uint64_t kf = atomic_load_explicit(&sub->pkt_ring.hdr->last_keyframe_seq, + memory_order_acquire); + sub->last_packet_seq = (kf == UINT64_MAX) ? UINT64_MAX : kf - 1; + sub->has_pkt_ring = 1; + return CUFRAMES_OK; +} + +int cuframes_subscriber_next_packet(cuframes_subscriber_t *sub, + cuframes_packet_t **pkt_out, + int32_t timeout_ms) { + if (!sub || !pkt_out) return CUFRAMES_ERR_INVALID_ARG; + if (!sub->has_pkt_ring) return CUFRAMES_ERR_NO_PACKET_RING; + if (sub->packet_busy) return CUFRAMES_ERR_INVALID_ARG; /* previous packet not released */ + + int64_t deadline_ns = (timeout_ms > 0) ? + cuframes_now_ns() + (int64_t)timeout_ms * 1000000LL : 0; + + for (;;) { + size_t size = 0; + int64_t pts = 0, dts = 0; + uint32_t flags = 0; + uint64_t seq_attempt = sub->last_packet_seq; + + int r = cuframes_internal_pkt_ring_read(&sub->pkt_ring, + &seq_attempt, + sub->packet_obj.data, + sub->packet_obj.capacity, + &size, &pts, &dts, &flags); + if (r == CUFRAMES_OK) { + sub->last_packet_seq = seq_attempt; + sub->packet_obj.size = size; + sub->packet_obj.pts_ns = pts; + sub->packet_obj.dts_ns = dts; + sub->packet_obj.flags = flags; + sub->packet_obj.seq = seq_attempt; + sub->packet_busy = 1; + *pkt_out = &sub->packet_obj; + return CUFRAMES_OK; + } + + if (r == CUFRAMES_ERR_PACKET_OVERRUN) { + /* Resync — установить last_seq = last_keyframe_seq - 1, повторить. */ + uint64_t kf = atomic_load_explicit( + &sub->pkt_ring.hdr->last_keyframe_seq, memory_order_acquire); + if (kf != UINT64_MAX) { + sub->last_packet_seq = kf - 1; + } + /* Возвращаем OVERRUN наружу — caller знает что был discontinuity. */ + *pkt_out = NULL; + return CUFRAMES_ERR_PACKET_OVERRUN; + } + + if (r != CUFRAMES_ERR_TIMEOUT) { + *pkt_out = NULL; + return r; /* DISCONNECTED, INVALID_ARG, etc. */ + } + + /* TIMEOUT branch — poll/sleep */ + if (timeout_ms == 0) return CUFRAMES_ERR_WOULD_BLOCK; + if (timeout_ms > 0 && cuframes_now_ns() >= deadline_ns) { + return CUFRAMES_ERR_TIMEOUT; + } + struct timespec ts = {0, 1 * 1000 * 1000}; /* 1 ms poll interval */ + nanosleep(&ts, NULL); + } +} + +int cuframes_subscriber_release_packet(cuframes_subscriber_t *sub, + cuframes_packet_t *pkt) { + if (!sub) return CUFRAMES_ERR_INVALID_ARG; + if (!pkt) return CUFRAMES_OK; /* NULL-safe */ + if (pkt != &sub->packet_obj) return CUFRAMES_ERR_INVALID_ARG; + sub->packet_busy = 0; + return CUFRAMES_OK; +} + +int cuframes_subscriber_get_codec_params(cuframes_subscriber_t *sub, + uint32_t *codec_id_out, + const void **extradata_out, + size_t *extradata_size_out) { + if (!sub) return CUFRAMES_ERR_INVALID_ARG; + if (!sub->has_pkt_ring) return CUFRAMES_ERR_NO_PACKET_RING; + cuframes_pkt_header_t *hdr = sub->pkt_ring.hdr; + if (codec_id_out) *codec_id_out = hdr->codec_id; + /* Если extradata ещё не выставлен publisher'ом — size=0, pointer ok но empty. */ + if (extradata_out) *extradata_out = hdr->codec_extradata; + if (extradata_size_out) *extradata_size_out = hdr->codec_extradata_size; + if (hdr->codec_extradata_size == 0) return CUFRAMES_ERR_NO_CODEC_PARAMS; + return CUFRAMES_OK; +} diff --git a/libcuframes/src/internal.h b/libcuframes/src/internal.h index 62b9090..3377357 100644 --- a/libcuframes/src/internal.h +++ b/libcuframes/src/internal.h @@ -23,12 +23,28 @@ #define CUFRAMES_MAGIC 0xCC7C1DCCu #define CUFRAMES_PROTOCOL_V1 1u +#define CUFRAMES_PROTOCOL_V2 2u /* v0.2 — packet ring support */ #define CUFRAMES_MAX_SUBSCRIBERS 32 #define CUFRAMES_MAX_RING 16 #define CUFRAMES_MAX_KEY_LEN 63 #define CUFRAMES_MAX_NAME_LEN 31 #define CUFRAMES_RUNTIME_DIR "/run/cuframes" #define CUFRAMES_SHM_PREFIX "/cuframes-" +#define CUFRAMES_PKT_SHM_SUFFIX "-packets" /* /cuframes--packets */ + +/* Packet ring constants (см. docs/protocol.md §10) */ +#define CUFRAMES_PKT_MAGIC 0xCC7C1DCDu /* frames magic + 1 */ +#define CUFRAMES_PKT_EXTRADATA_MAX 4096u +#define CUFRAMES_PKT_DEFAULT_SLOTS 64u +#define CUFRAMES_PKT_DEFAULT_DATA_SIZE (8u * 1024u * 1024u) /* 8 MB */ +#define CUFRAMES_PKT_DEFAULT_MAX_SIZE (2u * 1024u * 1024u) /* 2 MB */ +#define CUFRAMES_PKT_MAX_SLOTS 1024u + +/* Packet flags (см. docs/protocol.md §10.6) */ +#define CUFRAMES_PKT_FLAG_KEY 0x01u +#define CUFRAMES_PKT_FLAG_CORRUPT 0x02u +#define CUFRAMES_PKT_FLAG_DISCONTINUITY 0x04u +#define CUFRAMES_PKT_FLAG_LAST_IN_AU 0x08u /* ─── Shared memory layout (см. docs/protocol.md §2) ──────────────────── */ @@ -103,6 +119,73 @@ _Static_assert(offsetof(cuframes_shm_header_t, ipc_event_handle) == 0x0080, "eve _Static_assert(offsetof(cuframes_shm_header_t, global_seq) == 0x00C0, "global_seq offset"); _Static_assert(offsetof(cuframes_shm_header_t, slots) == 0x0100, "slots offset"); +/* ─── Packet ring shared memory layout (docs/protocol.md §10) ──────────── */ + +/* Packet slot entry — packed 64 байт */ +typedef struct __attribute__((packed)) cuframes_pkt_slot { + _Atomic uint64_t seq; /* UINT64_MAX = invalid */ + int64_t pts_ns; + int64_t dts_ns; + uint64_t data_offset; /* absolute byte cursor; % data_size = ring offset */ + uint32_t data_size; + uint32_t flags; + uint8_t reserved[24]; +} cuframes_pkt_slot_t; +_Static_assert(sizeof(cuframes_pkt_slot_t) == 64, "packet slot must be 64 bytes"); + +/* Packet ring header (fixed 0x1040 = 4160 bytes). Followed by slots[N] + data[]. */ +typedef struct __attribute__((packed)) cuframes_pkt_header { + uint32_t magic; /* CUFRAMES_PKT_MAGIC */ + uint32_t proto_version; /* 2 */ + uint32_t ring_slots; + uint32_t data_size; + uint32_t codec_id; /* AV_CODEC_ID_H264 / HEVC / ... */ + uint32_t codec_extradata_size; /* ≤ CUFRAMES_PKT_EXTRADATA_MAX */ + uint64_t producer_pid; + _Atomic uint64_t global_seq; + _Atomic uint64_t last_keyframe_seq; + _Atomic uint64_t write_offset; + _Atomic uint64_t shutdown_flag; + uint8_t codec_extradata[CUFRAMES_PKT_EXTRADATA_MAX]; + /* offset 0x1040 — slots[ring_slots], then data[data_size] */ +} cuframes_pkt_header_t; + +_Static_assert(offsetof(cuframes_pkt_header_t, magic) == 0x0000, "pkt magic offset"); +_Static_assert(offsetof(cuframes_pkt_header_t, proto_version) == 0x0004, "pkt proto offset"); +_Static_assert(offsetof(cuframes_pkt_header_t, producer_pid) == 0x0018, "pkt pid offset"); +_Static_assert(offsetof(cuframes_pkt_header_t, global_seq) == 0x0020, "pkt global_seq offset"); +_Static_assert(offsetof(cuframes_pkt_header_t, write_offset) == 0x0030, "pkt write_offset offset"); +_Static_assert(offsetof(cuframes_pkt_header_t, codec_extradata) == 0x0040, "pkt extradata offset"); +_Static_assert(sizeof(cuframes_pkt_header_t) == 0x1040, "pkt header must be 0x1040 bytes"); + +/* Computed SHM layout helper: + * total = sizeof(cuframes_pkt_header_t) + slots*sizeof(slot) + data_size + */ +static inline size_t cuframes_pkt_shm_size(uint32_t slots, uint32_t data_size) { + return sizeof(cuframes_pkt_header_t) + + (size_t)slots * sizeof(cuframes_pkt_slot_t) + + (size_t)data_size; +} + +/* Pointers into mmap'ed pkt SHM (computed from header base) */ +static inline cuframes_pkt_slot_t * cuframes_pkt_slots(cuframes_pkt_header_t *hdr) { + return (cuframes_pkt_slot_t *)((uint8_t *)hdr + sizeof(cuframes_pkt_header_t)); +} +static inline uint8_t * cuframes_pkt_data(cuframes_pkt_header_t *hdr) { + return (uint8_t *)hdr + sizeof(cuframes_pkt_header_t) + + (size_t)hdr->ring_slots * sizeof(cuframes_pkt_slot_t); +} + +/* Opaque ring handle — содержит state и mapping для publisher или subscriber. */ +typedef struct cuframes_pkt_ring { + int shm_fd; + void *shm_base; + size_t shm_size; + cuframes_pkt_header_t *hdr; + char shm_name[128]; /* /cuframes--packets */ + int is_publisher; +} cuframes_pkt_ring_t; + /* ─── Socket protocol messages (docs/protocol.md §3) ───────────────────── */ #define CUFRAMES_MSG_HELLO_REQ 0x01 @@ -164,6 +247,8 @@ typedef struct __attribute__((packed)) cuframes_msg_subscribe_resp { int cuframes_internal_socket_path(const char *key, char *out, size_t out_size); /* Build /cuframes- (for shm_open) */ int cuframes_internal_shm_name(const char *key, char *out, size_t out_size); +/* Build /cuframes--packets (for shm_open) */ +int cuframes_internal_pkt_shm_name(const char *key, char *out, size_t out_size); /* Validate key per protocol.md (alphanum/_/-, 1..63 chars) */ int cuframes_internal_validate_key(const char *key); /* Calculate frame size + pitch для format/W/H */ @@ -181,4 +266,48 @@ int cuframes_internal_recv_msg(int sock_fd, uint32_t *msg_type_out, void *payload, uint32_t *payload_len_inout, int32_t timeout_ms); +/* ─── Packet ring helpers (libcuframes/src/packet_ring.c) ─────────────── */ + +/* Publisher: create SHM + initialize header + slots. Stale recovery как у frames. */ +int cuframes_internal_pkt_ring_create(const char *key, + uint32_t slots, + uint32_t data_size, + uint32_t codec_id, + cuframes_pkt_ring_t *ring_out); + +/* Publisher: set codec extradata (SPS/PPS). Must be called before first publish. + * Если size > CUFRAMES_PKT_EXTRADATA_MAX → ERR_INVALID_ARG. */ +int cuframes_internal_pkt_ring_set_extradata(cuframes_pkt_ring_t *ring, + const void *extradata, + size_t size); + +/* Publisher: publish single encoded packet. Slow consumer = overwrite oldest. + * Returns CUFRAMES_ERR_PACKET_OVERSIZED если size > data_size. */ +int cuframes_internal_pkt_ring_publish(cuframes_pkt_ring_t *ring, + const void *data, size_t size, + int64_t pts_ns, int64_t dts_ns, + uint32_t flags); + +/* Subscriber: open existing SHM by shm name (from HELLO_RESP packet_shm_path). */ +int cuframes_internal_pkt_ring_open(const char *shm_name, + cuframes_pkt_ring_t *ring_out); + +/* Subscriber: read next packet. + * *seq_inout — currently held seq (we read seq_inout+1); updated on success. + * out_buf must have ≥ max_packet_size bytes; out_size receives actual size. + * Returns: + * CUFRAMES_OK on success + * CUFRAMES_ERR_PACKET_OVERRUN если publisher уехал — caller resync on keyframe + * CUFRAMES_ERR_TIMEOUT если нет нового packet + * CUFRAMES_ERR_DISCONNECTED если publisher shutdown */ +int cuframes_internal_pkt_ring_read(cuframes_pkt_ring_t *ring, + uint64_t *seq_inout, + void *out_buf, size_t out_buf_max, + size_t *out_size, + int64_t *out_pts, int64_t *out_dts, + uint32_t *out_flags); + +/* Publisher OR Subscriber: cleanup mmap + close FD. Publisher additionally shm_unlink. */ +void cuframes_internal_pkt_ring_destroy(cuframes_pkt_ring_t *ring); + #endif /* CUFRAMES_INTERNAL_H */ diff --git a/libcuframes/src/packet_ring.c b/libcuframes/src/packet_ring.c new file mode 100644 index 0000000..387f9a5 --- /dev/null +++ b/libcuframes/src/packet_ring.c @@ -0,0 +1,380 @@ +/* libcuframes/src/packet_ring.c + * + * Variable-length encoded packet ring buffer (docs/protocol.md §10). + * + * Использует POSIX shared memory (`/cuframes--packets`), packed + * structures с _Atomic полями, seqlock-style read для защиты от overrun + * mid-read. + * + * Этот модуль внутренний — exposed API будет в Step 3 (cuframes.h + * extension). Сейчас functions имеют prefix `cuframes_internal_pkt_ring_*` + * и используются из producer.c / consumer.c. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "internal.h" + +/* ─── Internal helpers ────────────────────────────────────────────────── */ + +static void wraparound_memcpy(uint8_t *dst, const uint8_t *src, size_t n, + size_t buf_size, size_t offset) { + /* Запись n байт начиная с offset в buf размера buf_size, wraparound. */ + size_t off = offset % buf_size; + size_t first = n; + if (first > buf_size - off) first = buf_size - off; + memcpy(dst + off, src, first); + if (first < n) { + memcpy(dst, src + first, n - first); + } +} + +static void wraparound_memcpy_from(uint8_t *out, const uint8_t *buf, + size_t buf_size, size_t offset, size_t n) { + /* Чтение n байт из buf с wraparound от offset. */ + size_t off = offset % buf_size; + size_t first = n; + if (first > buf_size - off) first = buf_size - off; + memcpy(out, buf + off, first); + if (first < n) { + memcpy(out + first, buf, n - first); + } +} + +/* ─── Publisher API ───────────────────────────────────────────────────── */ + +int cuframes_internal_pkt_ring_create(const char *key, + uint32_t slots, + uint32_t data_size, + uint32_t codec_id, + cuframes_pkt_ring_t *ring_out) { + if (!ring_out) return CUFRAMES_ERR_INVALID_ARG; + if (slots == 0 || slots > CUFRAMES_PKT_MAX_SLOTS) return CUFRAMES_ERR_INVALID_ARG; + if (data_size == 0) return CUFRAMES_ERR_INVALID_ARG; + + memset(ring_out, 0, sizeof(*ring_out)); + ring_out->shm_fd = -1; + ring_out->is_publisher = 1; + + int r = cuframes_internal_pkt_shm_name(key, ring_out->shm_name, + sizeof(ring_out->shm_name)); + if (r != CUFRAMES_OK) return r; + + /* Stale recovery (как в frames SHM) */ + int fd = shm_open(ring_out->shm_name, O_CREAT | O_EXCL | O_RDWR, 0644); + if (fd < 0) { + if (errno == EEXIST) { + int existing = shm_open(ring_out->shm_name, O_RDWR, 0); + if (existing >= 0) { + cuframes_pkt_header_t tmp; + ssize_t rb = read(existing, &tmp, sizeof(tmp)); + close(existing); + if (rb == (ssize_t)sizeof(tmp) && tmp.magic == CUFRAMES_PKT_MAGIC) { + if (cuframes_internal_pid_alive((pid_t)tmp.producer_pid)) { + CUFRAMES_LOG_ERROR("packet ring %s: publisher pid %lu still alive", + ring_out->shm_name, + (unsigned long)tmp.producer_pid); + return CUFRAMES_ERR_ALREADY_EXISTS; + } + } + } + CUFRAMES_LOG_INFO("stale packet shm %s — unlinking", ring_out->shm_name); + shm_unlink(ring_out->shm_name); + fd = shm_open(ring_out->shm_name, O_CREAT | O_EXCL | O_RDWR, 0644); + if (fd < 0) { + CUFRAMES_LOG_ERROR("packet shm_open after unlink: %s", strerror(errno)); + return CUFRAMES_ERR_IO; + } + } else { + CUFRAMES_LOG_ERROR("packet shm_open: %s", strerror(errno)); + return CUFRAMES_ERR_IO; + } + } + + size_t total_size = cuframes_pkt_shm_size(slots, data_size); + if (ftruncate(fd, (off_t)total_size) < 0) { + CUFRAMES_LOG_ERROR("packet ftruncate(%zu): %s", total_size, strerror(errno)); + close(fd); + shm_unlink(ring_out->shm_name); + return CUFRAMES_ERR_IO; + } + + void *base = mmap(NULL, total_size, PROT_READ | PROT_WRITE, + MAP_SHARED, fd, 0); + if (base == MAP_FAILED) { + CUFRAMES_LOG_ERROR("packet mmap: %s", strerror(errno)); + close(fd); + shm_unlink(ring_out->shm_name); + return CUFRAMES_ERR_IO; + } + + ring_out->shm_fd = fd; + ring_out->shm_base = base; + ring_out->shm_size = total_size; + ring_out->hdr = (cuframes_pkt_header_t *)base; + + /* Initialize header — нули + magic/version/sizes */ + memset(ring_out->hdr, 0, sizeof(*ring_out->hdr)); + ring_out->hdr->magic = CUFRAMES_PKT_MAGIC; + ring_out->hdr->proto_version = CUFRAMES_PROTOCOL_V2; + ring_out->hdr->ring_slots = slots; + ring_out->hdr->data_size = data_size; + ring_out->hdr->codec_id = codec_id; + ring_out->hdr->codec_extradata_size = 0; + ring_out->hdr->producer_pid = (uint64_t)getpid(); + atomic_store_explicit(&ring_out->hdr->global_seq, UINT64_MAX, + memory_order_release); + atomic_store_explicit(&ring_out->hdr->last_keyframe_seq, UINT64_MAX, + memory_order_release); + atomic_store_explicit(&ring_out->hdr->write_offset, 0, + memory_order_release); + atomic_store_explicit(&ring_out->hdr->shutdown_flag, 0, + memory_order_release); + + /* Initialize slots — invalid seq markers */ + cuframes_pkt_slot_t *slots_arr = cuframes_pkt_slots(ring_out->hdr); + for (uint32_t i = 0; i < slots; ++i) { + atomic_store_explicit(&slots_arr[i].seq, UINT64_MAX, + memory_order_release); + } + + /* Data section уже zeroed через ftruncate (POSIX guarantees) */ + + CUFRAMES_LOG_INFO("packet ring %s: slots=%u data_size=%u codec_id=%u (total=%zu bytes)", + ring_out->shm_name, slots, data_size, codec_id, total_size); + return CUFRAMES_OK; +} + +int cuframes_internal_pkt_ring_set_extradata(cuframes_pkt_ring_t *ring, + const void *extradata, + size_t size) { + if (!ring || !ring->hdr) return CUFRAMES_ERR_INVALID_ARG; + if (!ring->is_publisher) return CUFRAMES_ERR_INVALID_ARG; + if (size > CUFRAMES_PKT_EXTRADATA_MAX) return CUFRAMES_ERR_INVALID_ARG; + if (size > 0 && !extradata) return CUFRAMES_ERR_INVALID_ARG; + + /* Записываем сначала bytes, потом size (release-style — subscriber видит size>0 только когда extradata готов). */ + if (size > 0) { + memcpy(ring->hdr->codec_extradata, extradata, size); + /* Memory barrier — extradata stores complete до size update. */ + __atomic_thread_fence(__ATOMIC_RELEASE); + } + ring->hdr->codec_extradata_size = (uint32_t)size; + return CUFRAMES_OK; +} + +int cuframes_internal_pkt_ring_publish(cuframes_pkt_ring_t *ring, + const void *data, size_t size, + int64_t pts_ns, int64_t dts_ns, + uint32_t flags) { + if (!ring || !ring->hdr) return CUFRAMES_ERR_INVALID_ARG; + if (!ring->is_publisher) return CUFRAMES_ERR_INVALID_ARG; + if (size == 0 || !data) return CUFRAMES_ERR_INVALID_ARG; + if (size > ring->hdr->data_size) return CUFRAMES_ERR_PACKET_OVERSIZED; + + cuframes_pkt_header_t *hdr = ring->hdr; + + /* Allocate next seq + cursor offset. Single-publisher — без CAS. */ + uint64_t prev_seq = atomic_load_explicit(&hdr->global_seq, + memory_order_relaxed); + uint64_t new_seq = (prev_seq == UINT64_MAX) ? 0 : prev_seq + 1; + + uint64_t write_off = atomic_load_explicit(&hdr->write_offset, + memory_order_relaxed); + + /* Записать payload в data ring (wraparound aware) */ + wraparound_memcpy(cuframes_pkt_data(hdr), data, size, + hdr->data_size, write_off); + + /* Записать slot metadata. Slot index = seq % ring_slots. */ + uint32_t slot_idx = (uint32_t)(new_seq % hdr->ring_slots); + cuframes_pkt_slot_t *slot = &cuframes_pkt_slots(hdr)[slot_idx]; + + slot->pts_ns = pts_ns; + slot->dts_ns = dts_ns; + slot->data_offset = write_off; + slot->data_size = (uint32_t)size; + slot->flags = flags; + + /* RELEASE order — payload bytes + slot metadata готовы перед publish seq. */ + atomic_store_explicit(&slot->seq, new_seq, memory_order_release); + + /* Update global cursor + global_seq. */ + atomic_store_explicit(&hdr->write_offset, write_off + size, + memory_order_release); + atomic_store_explicit(&hdr->global_seq, new_seq, + memory_order_release); + + /* Keyframe — update last_keyframe_seq для late subscribers. */ + if (flags & CUFRAMES_PKT_FLAG_KEY) { + atomic_store_explicit(&hdr->last_keyframe_seq, new_seq, + memory_order_release); + } + + return CUFRAMES_OK; +} + +/* ─── Subscriber API ──────────────────────────────────────────────────── */ + +int cuframes_internal_pkt_ring_open(const char *shm_name, + cuframes_pkt_ring_t *ring_out) { + if (!shm_name || !ring_out) return CUFRAMES_ERR_INVALID_ARG; + + memset(ring_out, 0, sizeof(*ring_out)); + ring_out->shm_fd = -1; + ring_out->is_publisher = 0; + strncpy(ring_out->shm_name, shm_name, sizeof(ring_out->shm_name) - 1); + + int fd = shm_open(shm_name, O_RDONLY, 0); + if (fd < 0) { + if (errno == ENOENT) return CUFRAMES_ERR_NOT_FOUND; + CUFRAMES_LOG_ERROR("packet shm_open(%s) ro: %s", shm_name, strerror(errno)); + return CUFRAMES_ERR_IO; + } + + /* Прочитать header чтобы узнать total size */ + cuframes_pkt_header_t header_peek; + ssize_t rb = read(fd, &header_peek, sizeof(header_peek)); + if (rb != (ssize_t)sizeof(header_peek)) { + close(fd); + return CUFRAMES_ERR_IO; + } + if (header_peek.magic != CUFRAMES_PKT_MAGIC) { + CUFRAMES_LOG_ERROR("packet shm %s: bad magic 0x%08x", shm_name, header_peek.magic); + close(fd); + return CUFRAMES_ERR_PROTOCOL; + } + if (header_peek.proto_version != CUFRAMES_PROTOCOL_V2) { + CUFRAMES_LOG_ERROR("packet shm %s: proto_version=%u (expected %u)", + shm_name, header_peek.proto_version, CUFRAMES_PROTOCOL_V2); + close(fd); + return CUFRAMES_ERR_PROTOCOL; + } + + size_t total = cuframes_pkt_shm_size(header_peek.ring_slots, + header_peek.data_size); + + /* mmap полностью read-only */ + void *base = mmap(NULL, total, PROT_READ, MAP_SHARED, fd, 0); + if (base == MAP_FAILED) { + CUFRAMES_LOG_ERROR("packet mmap ro: %s", strerror(errno)); + close(fd); + return CUFRAMES_ERR_IO; + } + + ring_out->shm_fd = fd; + ring_out->shm_base = base; + ring_out->shm_size = total; + ring_out->hdr = (cuframes_pkt_header_t *)base; + + CUFRAMES_LOG_INFO("packet ring %s opened: slots=%u data_size=%u", + shm_name, header_peek.ring_slots, header_peek.data_size); + return CUFRAMES_OK; +} + +int cuframes_internal_pkt_ring_read(cuframes_pkt_ring_t *ring, + uint64_t *seq_inout, + void *out_buf, size_t out_buf_max, + size_t *out_size, + int64_t *out_pts, int64_t *out_dts, + uint32_t *out_flags) { + if (!ring || !ring->hdr || !seq_inout || !out_buf || !out_size + || !out_pts || !out_dts || !out_flags) { + return CUFRAMES_ERR_INVALID_ARG; + } + + cuframes_pkt_header_t *hdr = ring->hdr; + + /* Publisher shutdown? */ + if (atomic_load_explicit(&hdr->shutdown_flag, memory_order_acquire) != 0) { + return CUFRAMES_ERR_DISCONNECTED; + } + + /* Текущий published seq */ + uint64_t cur = atomic_load_explicit(&hdr->global_seq, memory_order_acquire); + if (cur == UINT64_MAX) return CUFRAMES_ERR_TIMEOUT; /* нет published */ + if (*seq_inout != UINT64_MAX && cur <= *seq_inout) { + return CUFRAMES_ERR_TIMEOUT; + } + + /* Calculate the next seq we want (handle первый read с UINT64_MAX → start с 0) */ + uint64_t want_seq = (*seq_inout == UINT64_MAX) ? 0 : (*seq_inout + 1); + + /* Если want_seq < cur и slot уже перезаписан — попадаем в OVERRUN */ + if (cur - want_seq >= hdr->ring_slots) { + /* Скорее всего slot уже rewritten. Подсказка caller'у — resync. */ + return CUFRAMES_ERR_PACKET_OVERRUN; + } + + uint32_t slot_idx = (uint32_t)(want_seq % hdr->ring_slots); + cuframes_pkt_slot_t *slot = &cuframes_pkt_slots(hdr)[slot_idx]; + + /* Seqlock-style read: load seq, prove not overwritten после copy. */ + uint64_t s1 = atomic_load_explicit(&slot->seq, memory_order_acquire); + if (s1 != want_seq) { + /* Slot уже занят следующим packet'ом — overrun. */ + return CUFRAMES_ERR_PACKET_OVERRUN; + } + + /* Снять metadata (non-atomic — read OK поскольку post-check защищает) */ + uint64_t data_off = slot->data_offset; + uint32_t data_sz = slot->data_size; + int64_t pts = slot->pts_ns; + int64_t dts = slot->dts_ns; + uint32_t flags = slot->flags; + + if (data_sz > out_buf_max) { + return CUFRAMES_ERR_INVALID_ARG; /* caller's buf too small */ + } + + /* Copy payload */ + wraparound_memcpy_from((uint8_t *)out_buf, + cuframes_pkt_data(hdr), + hdr->data_size, data_off, data_sz); + + /* Post-check: slot->seq не изменился во время copy. */ + uint64_t s2 = atomic_load_explicit(&slot->seq, memory_order_acquire); + if (s2 != want_seq) { + return CUFRAMES_ERR_PACKET_OVERRUN; + } + + *out_size = data_sz; + *out_pts = pts; + *out_dts = dts; + *out_flags = flags; + *seq_inout = want_seq; + return CUFRAMES_OK; +} + +/* ─── Cleanup ─────────────────────────────────────────────────────────── */ + +void cuframes_internal_pkt_ring_destroy(cuframes_pkt_ring_t *ring) { + if (!ring) return; + + if (ring->is_publisher && ring->hdr) { + /* Сигнализируем consumer'ам shutdown */ + atomic_store_explicit(&ring->hdr->shutdown_flag, 1, + memory_order_release); + } + + if (ring->shm_base && ring->shm_size > 0) { + munmap(ring->shm_base, ring->shm_size); + } + if (ring->shm_fd >= 0) { + close(ring->shm_fd); + } + if (ring->is_publisher && ring->shm_name[0] != '\0') { + shm_unlink(ring->shm_name); + } + + memset(ring, 0, sizeof(*ring)); + ring->shm_fd = -1; +} diff --git a/libcuframes/src/producer.c b/libcuframes/src/producer.c index 9011f17..ab9a842 100644 --- a/libcuframes/src/producer.c +++ b/libcuframes/src/producer.c @@ -41,6 +41,11 @@ struct cuframes_publisher { int accept_thread_alive; int stop_flag; pthread_mutex_t state_mu; /* protects subscriber connections */ + + /* v0.2 — encoded packet ring (optional). is_pkt_ring=1 → активирован. */ + int has_pkt_ring; + uint32_t max_packet_size; + cuframes_pkt_ring_t pkt_ring; }; /* Forward decls */ @@ -505,6 +510,12 @@ int cuframes_publisher_destroy(cuframes_publisher_t *pub) { } if (pub->event) cudaEventDestroy(pub->event); + /* Packet ring cleanup (если активирован) */ + if (pub->has_pkt_ring) { + cuframes_internal_pkt_ring_destroy(&pub->pkt_ring); + pub->has_pkt_ring = 0; + } + /* Unlink resources */ if (pub->hdr) { munmap(pub->hdr, sizeof(cuframes_shm_header_t)); @@ -523,6 +534,61 @@ int cuframes_publisher_destroy(cuframes_publisher_t *pub) { return CUFRAMES_OK; } +/* ─────────────────────────────────────────────────────────────────────── */ +/* v0.2 — encoded packet ring API (см. docs/protocol.md §10) */ +/* ─────────────────────────────────────────────────────────────────────── */ + +int cuframes_publisher_enable_packets(cuframes_publisher_t *pub, + const cuframes_packet_ring_options_t *opts) { + if (!pub) return CUFRAMES_ERR_INVALID_ARG; + if (pub->has_pkt_ring) return CUFRAMES_ERR_ALREADY_EXISTS; + + uint32_t slots = opts && opts->ring_slots ? opts->ring_slots + : CUFRAMES_PKT_DEFAULT_SLOTS; + uint32_t data_size = opts && opts->data_size ? opts->data_size + : CUFRAMES_PKT_DEFAULT_DATA_SIZE; + uint32_t max_pkt = opts && opts->max_packet_size ? opts->max_packet_size + : CUFRAMES_PKT_DEFAULT_MAX_SIZE; + uint32_t codec_id = opts ? opts->codec_id : 0; + + if (max_pkt > data_size) { + CUFRAMES_LOG_ERROR("max_packet_size (%u) > data_size (%u)", max_pkt, data_size); + return CUFRAMES_ERR_INVALID_ARG; + } + + int r = cuframes_internal_pkt_ring_create(pub->key, slots, data_size, + codec_id, &pub->pkt_ring); + if (r != CUFRAMES_OK) return r; + + pub->has_pkt_ring = 1; + pub->max_packet_size = max_pkt; + + /* Bump proto_version в frames header чтобы v2-subscribers видели поддержку. */ + if (pub->hdr) { + pub->hdr->proto_version = CUFRAMES_PROTOCOL_V2; + } + return CUFRAMES_OK; +} + +int cuframes_publisher_set_codec_extradata(cuframes_publisher_t *pub, + const void *extradata, size_t size) { + if (!pub) return CUFRAMES_ERR_INVALID_ARG; + if (!pub->has_pkt_ring) return CUFRAMES_ERR_NO_PACKET_RING; + return cuframes_internal_pkt_ring_set_extradata(&pub->pkt_ring, + extradata, size); +} + +int cuframes_publisher_publish_packet(cuframes_publisher_t *pub, + const void *data, size_t size, + int64_t pts_ns, int64_t dts_ns, + uint32_t flags) { + if (!pub) return CUFRAMES_ERR_INVALID_ARG; + if (!pub->has_pkt_ring) return CUFRAMES_ERR_NO_PACKET_RING; + if (size > pub->max_packet_size) return CUFRAMES_ERR_PACKET_OVERSIZED; + return cuframes_internal_pkt_ring_publish(&pub->pkt_ring, data, size, + pts_ns, dts_ns, flags); +} + /* ─── Accept thread + handshake ──────────────────────────────────────── */ static void *accept_thread_main(void *arg) { diff --git a/libcuframes/src/utils.c b/libcuframes/src/utils.c index aaf576d..3c4b6f1 100644 --- a/libcuframes/src/utils.c +++ b/libcuframes/src/utils.c @@ -32,6 +32,10 @@ const char *cuframes_strerror(int err) { case CUFRAMES_ERR_FORMAT: return "unsupported format or size mismatch"; case CUFRAMES_ERR_WOULD_BLOCK: return "would block"; case CUFRAMES_ERR_TOO_MANY: return "too many subscribers (max 32)"; + case CUFRAMES_ERR_PACKET_OVERSIZED: return "packet exceeds max_packet_size"; + case CUFRAMES_ERR_NO_PACKET_RING: return "publisher has no packet ring"; + case CUFRAMES_ERR_NO_CODEC_PARAMS: return "codec extradata not set by publisher"; + case CUFRAMES_ERR_PACKET_OVERRUN: return "packet ring overrun — resync on keyframe"; case CUFRAMES_ERR_INTERNAL: return "internal error (please report)"; default: return "unknown error"; } @@ -83,6 +87,15 @@ int cuframes_internal_shm_name(const char *key, char *out, size_t out_size) { return CUFRAMES_OK; } +int cuframes_internal_pkt_shm_name(const char *key, char *out, size_t out_size) { + int r = cuframes_internal_validate_key(key); + if (r != CUFRAMES_OK) return r; + int n = snprintf(out, out_size, "%s%s%s", + CUFRAMES_SHM_PREFIX, key, CUFRAMES_PKT_SHM_SUFFIX); + if (n < 0 || (size_t)n >= out_size) return CUFRAMES_ERR_INVALID_ARG; + return CUFRAMES_OK; +} + int cuframes_internal_ensure_runtime_dir(void) { if (mkdir(CUFRAMES_RUNTIME_DIR, 0755) == 0) return CUFRAMES_OK; if (errno == EEXIST) return CUFRAMES_OK; diff --git a/libcuframes/tests/CMakeLists.txt b/libcuframes/tests/CMakeLists.txt index 11ecdc7..620d080 100644 --- a/libcuframes/tests/CMakeLists.txt +++ b/libcuframes/tests/CMakeLists.txt @@ -22,3 +22,11 @@ target_include_directories(test_stress PRIVATE ${CMAKE_SOURCE_DIR}/include) add_test(NAME stress_4consumer COMMAND test_stress) set_tests_properties(stress_4consumer PROPERTIES TIMEOUT 120) + +# v0.2 — packet ring tests (host-only, без CUDA в test-коде) +add_executable(test_packet_ring test_packet_ring.c) +target_link_libraries(test_packet_ring PRIVATE cuframes) +target_include_directories(test_packet_ring PRIVATE + ${CMAKE_SOURCE_DIR}/include) +add_test(NAME packet_ring_basic COMMAND test_packet_ring) +set_tests_properties(packet_ring_basic PROPERTIES TIMEOUT 120) diff --git a/libcuframes/tests/test_packet_ring.c b/libcuframes/tests/test_packet_ring.c new file mode 100644 index 0000000..1b97e11 --- /dev/null +++ b/libcuframes/tests/test_packet_ring.c @@ -0,0 +1,280 @@ +/* Stress test для encoded packet ring (v0.2). + * + * Сценарии: + * 1) Normal flow: 1 publisher × 1 subscriber × 2000 packets, varied sizes, + * каждые 30 packets — KEY flag (имитация GOP). Subscriber проверяет: + * - монотонные seq (без пропусков в этом тесте — fast consumer) + * - data integrity через checksum (XOR fold) + * - PTS/DTS monotonic, KEY flag доходит + * 2) Slow subscriber: publisher шлёт быстрее чем subscriber читает → + * должен случиться OVERRUN, library resync'нет на keyframe. + * 3) Cleanup: после exit нет leaked SHM в /dev/shm. + * + * Без CUDA-зависимостей (packets host-side). + */ +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define KEY "test_pkt_ring" +#define TOTAL_PACKETS 2000 +#define GOP_SIZE 30 +#define SMALL_PKT 4096 +#define LARGE_PKT (256 * 1024) + +#define CHECK(call) do { int _r = (call); if (_r != 0) { \ + fprintf(stderr, "FAIL %s:%d (rc=%d): %s\n", __FILE__, __LINE__, _r, \ + cuframes_strerror(_r)); exit(2); } } while (0) + +#define EXPECT_TRUE(cond) do { if (!(cond)) { \ + fprintf(stderr, "EXPECT_TRUE failed at %s:%d: %s\n", \ + __FILE__, __LINE__, #cond); exit(2); } } while (0) + +/* Сгенерировать payload: первые 8 байт = seq (little-endian), остальное pattern. */ +static void gen_payload(uint8_t *buf, size_t size, uint64_t seq) { + memcpy(buf, &seq, sizeof(seq)); + for (size_t i = sizeof(seq); i < size; ++i) { + buf[i] = (uint8_t)((seq + i) & 0xFF); + } +} + +/* Verify payload matches seq. Возвращает 0 если ok. */ +static int verify_payload(const uint8_t *buf, size_t size, uint64_t expected_seq) { + uint64_t seq_in_buf; + if (size < sizeof(seq_in_buf)) return -1; + memcpy(&seq_in_buf, buf, sizeof(seq_in_buf)); + if (seq_in_buf != expected_seq) return -2; + for (size_t i = sizeof(seq_in_buf); i < size; ++i) { + if (buf[i] != (uint8_t)((expected_seq + i) & 0xFF)) return -3; + } + return 0; +} + +static cuframes_publisher_t *make_publisher(void) { + cuframes_publisher_config_t cfg = {0}; + cfg.key = KEY; + cfg.width = 320; + cfg.height = 240; + cfg.format = CUFRAMES_FORMAT_NV12; + cfg.ownership = CUFRAMES_OWNERSHIP_LIBRARY; + cfg.ring_size = 2; + cfg.policy = CUFRAMES_POLICY_DROP_OLDEST; + cfg.cuda_device = 0; + cuframes_publisher_t *pub = NULL; + CHECK(cuframes_publisher_create(&cfg, &pub)); + + cuframes_packet_ring_options_t pkt_opts = {0}; + pkt_opts.codec_id = 27; /* AV_CODEC_ID_H264 */ + pkt_opts.ring_slots = 64; + pkt_opts.data_size = 8 * 1024 * 1024; + pkt_opts.max_packet_size = LARGE_PKT * 2; + CHECK(cuframes_publisher_enable_packets(pub, &pkt_opts)); + + /* Fake SPS/PPS — 16 байт */ + uint8_t extradata[16]; + for (int i = 0; i < 16; ++i) extradata[i] = (uint8_t)(0xAA + i); + CHECK(cuframes_publisher_set_codec_extradata(pub, extradata, sizeof(extradata))); + return pub; +} + +/* Subscriber-процесс. read_delay_us позволяет имитировать slow consumer. */ +static int run_subscriber(int read_delay_us, int *out_received, int *out_overruns, + int *out_first_key_seq) { + /* Wait чтобы publisher успел создать SHM */ + usleep(100 * 1000); + + cuframes_subscriber_config_t cfg = {0}; + cfg.key = KEY; + cfg.mode = CUFRAMES_MODE_NEWEST_ONLY; + cfg.cuda_device = 0; + cfg.connect_timeout_ms = 5000; + cuframes_subscriber_t *sub = NULL; + CHECK(cuframes_subscriber_create(&cfg, &sub)); + + CHECK(cuframes_subscriber_enable_packets(sub)); + + /* Verify codec params */ + uint32_t codec_id = 0; + const void *extradata = NULL; + size_t extradata_sz = 0; + int r = cuframes_subscriber_get_codec_params(sub, &codec_id, &extradata, &extradata_sz); + EXPECT_TRUE(r == CUFRAMES_OK); + EXPECT_TRUE(codec_id == 27); + EXPECT_TRUE(extradata_sz == 16); + + int received = 0; + int overruns = 0; + int first_key_seq = -1; + int64_t last_pts = -1; + int data_errors = 0; + + /* Run на ~30s или до того как publisher закончит. */ + time_t start = time(NULL); + while (time(NULL) - start < 30) { + cuframes_packet_t *pkt = NULL; + int rc = cuframes_subscriber_next_packet(sub, &pkt, 500); + if (rc == CUFRAMES_ERR_TIMEOUT || rc == CUFRAMES_ERR_WOULD_BLOCK) { + if (received >= TOTAL_PACKETS / 2) break; /* достаточно для теста */ + continue; + } + if (rc == CUFRAMES_ERR_DISCONNECTED) break; + if (rc == CUFRAMES_ERR_PACKET_OVERRUN) { + overruns++; + continue; /* library resync'нет на next call */ + } + if (rc != CUFRAMES_OK) { + fprintf(stderr, "next_packet rc=%d (%s)\n", rc, cuframes_strerror(rc)); + break; + } + + const uint8_t *data = (const uint8_t *)cuframes_packet_data(pkt); + size_t size = cuframes_packet_size(pkt); + int64_t pts = cuframes_packet_pts(pkt); + uint32_t flags = cuframes_packet_flags(pkt); + uint64_t seq = cuframes_packet_seq(pkt); + + if (verify_payload(data, size, seq) != 0) { + data_errors++; + } + + if ((flags & CUFRAMES_PKT_FLAG_KEY) && first_key_seq < 0) { + first_key_seq = (int)seq; + } + if (pts <= last_pts && last_pts >= 0) { + fprintf(stderr, "PTS не монотонно: %ld <= %ld (seq=%lu)\n", + pts, last_pts, seq); + } + last_pts = pts; + received++; + + cuframes_subscriber_release_packet(sub, pkt); + + if (read_delay_us > 0) usleep(read_delay_us); + } + + EXPECT_TRUE(data_errors == 0); + cuframes_subscriber_destroy(sub); + + *out_received = received; + *out_overruns = overruns; + *out_first_key_seq = first_key_seq; + return 0; +} + +static void publisher_loop(int total_packets, int inter_packet_us) { + cuframes_publisher_t *pub = make_publisher(); + + /* Buffer pre-alloc — max size */ + uint8_t *buf = (uint8_t *)malloc(LARGE_PKT); + EXPECT_TRUE(buf != NULL); + + for (int i = 0; i < total_packets; ++i) { + int is_key = (i % GOP_SIZE == 0); + size_t size = is_key ? LARGE_PKT : SMALL_PKT + (i % 8) * 1024; + gen_payload(buf, size, (uint64_t)i); + + int64_t pts_ns = (int64_t)i * 33333333LL; /* ~30 fps */ + uint32_t flags = is_key ? CUFRAMES_PKT_FLAG_KEY : 0; + int rc = cuframes_publisher_publish_packet(pub, buf, size, + pts_ns, pts_ns, flags); + if (rc != CUFRAMES_OK) { + fprintf(stderr, "publish rc=%d size=%zu\n", rc, size); + } + if (inter_packet_us > 0) usleep(inter_packet_us); + } + free(buf); + cuframes_publisher_destroy(pub); +} + +static int check_no_leaked_shm(void) { + int fail = 0; + char path[256]; + snprintf(path, sizeof(path), "/dev/shm/cuframes-%s", KEY); + if (access(path, F_OK) == 0) { + fprintf(stderr, "LEAKED %s\n", path); + fail = 1; + } + snprintf(path, sizeof(path), "/dev/shm/cuframes-%s-packets", KEY); + if (access(path, F_OK) == 0) { + fprintf(stderr, "LEAKED %s\n", path); + fail = 1; + } + return fail; +} + +static int scenario_normal_flow(void) { + fprintf(stderr, "[scenario 1] normal flow — fast consumer\n"); + + pid_t pid = fork(); + EXPECT_TRUE(pid >= 0); + if (pid == 0) { + /* child = subscriber */ + int received = 0, overruns = 0, first_key = -1; + run_subscriber(0, &received, &overruns, &first_key); + fprintf(stderr, " consumer: received=%d overruns=%d first_key_seq=%d\n", + received, overruns, first_key); + EXPECT_TRUE(received >= TOTAL_PACKETS / 2); + EXPECT_TRUE(overruns == 0); + EXPECT_TRUE(first_key >= 0); + exit(0); + } + + /* parent = publisher (медленнее чем consumer) */ + publisher_loop(TOTAL_PACKETS, 1000); /* 1ms между packets = 1000 fps */ + int status = 0; + waitpid(pid, &status, 0); + EXPECT_TRUE(WIFEXITED(status) && WEXITSTATUS(status) == 0); + return 0; +} + +static int scenario_slow_consumer(void) { + fprintf(stderr, "[scenario 2] slow consumer — must hit OVERRUN + resync\n"); + + pid_t pid = fork(); + EXPECT_TRUE(pid >= 0); + if (pid == 0) { + /* child = очень медленный subscriber */ + int received = 0, overruns = 0, first_key = -1; + run_subscriber(10 * 1000, &received, &overruns, &first_key); /* 10ms */ + fprintf(stderr, " consumer: received=%d overruns=%d first_key_seq=%d\n", + received, overruns, first_key); + /* Должны быть overruns поскольку publisher faster */ + EXPECT_TRUE(overruns > 0); + /* И всё-таки что-то получили (resync работает) */ + EXPECT_TRUE(received > 10); + exit(0); + } + + /* publisher fast — 200 fps */ + publisher_loop(TOTAL_PACKETS, 5 * 1000); + int status = 0; + waitpid(pid, &status, 0); + EXPECT_TRUE(WIFEXITED(status) && WEXITSTATUS(status) == 0); + return 0; +} + +int main(void) { + signal(SIGPIPE, SIG_IGN); + + scenario_normal_flow(); + /* Ensure clean inter-test state */ + usleep(200 * 1000); + if (check_no_leaked_shm()) exit(2); + + scenario_slow_consumer(); + usleep(200 * 1000); + if (check_no_leaked_shm()) exit(2); + + fprintf(stderr, "OK — all scenarios passed\n"); + return 0; +} diff --git a/tools/cuframes-rtsp-source/main.cpp b/tools/cuframes-rtsp-source/main.cpp index 5eb2ce6..6a67d02 100644 --- a/tools/cuframes-rtsp-source/main.cpp +++ b/tools/cuframes-rtsp-source/main.cpp @@ -60,6 +60,7 @@ struct Args { bool verbose = false; bool realtime = false; // emulate -re у ffmpeg CLI: sleep по pts bool loop = false; // loop input на eof (для file://) + bool enable_packet_ring = false; // v0.2 — публиковать encoded packets }; static void print_usage() { @@ -75,6 +76,8 @@ static void print_usage() { " --ring N cuframes ring size (default 4, range 2..16)\n" " --realtime pace input по PTS (как ffmpeg -re; полезно для файла)\n" " --loop loop input на EOF (только для file://)\n" + " --enable-packet-ring v0.2: дополнительно публиковать encoded packets\n" + " (для consumer'ов с -c:v copy, Frigate record path)\n" " --verbose debug logs\n" " -h, --help this help\n"; } @@ -92,6 +95,7 @@ static int parse_args(int argc, char **argv, Args &a) { else if (s == "--ring") a.ring_size = std::stoi(next()); else if (s == "--realtime") a.realtime = true; else if (s == "--loop") a.loop = true; + else if (s == "--enable-packet-ring") a.enable_packet_ring = true; else if (s == "--verbose") a.verbose = true; else if (s == "-h" || s == "--help") { print_usage(); std::exit(0); } else { std::cerr << "Unknown arg: " << s << "\n"; print_usage(); std::exit(1); } @@ -235,6 +239,27 @@ int main(int argc, char **argv) { << "' ready, ring=" << a.ring_size << " pool_size=" << frame_size << " bytes/frame\n"; + /* v0.2 — encoded packet ring (опционально). */ + if (a.enable_packet_ring) { + cuframes_packet_ring_options_t pkt_opts{}; + pkt_opts.codec_id = (uint32_t)vstream->codecpar->codec_id; + /* остальные поля = 0 → library использует defaults (64 slots, 8MiB, 2MiB max) */ + pub.enable_packets(&pkt_opts); + + if (vstream->codecpar->extradata_size > 0 && vstream->codecpar->extradata) { + pub.set_codec_extradata(vstream->codecpar->extradata, + (size_t)vstream->codecpar->extradata_size); + std::cerr << "[cuframes-src] packet ring active, codec_id=" + << vstream->codecpar->codec_id + << " extradata=" << vstream->codecpar->extradata_size + << " bytes\n"; + } else { + std::cerr << "[cuframes-src] packet ring active, codec_id=" + << vstream->codecpar->codec_id + << " (no extradata in stream — will rely on in-band SPS/PPS)\n"; + } + } + /* Stream для D2D copies */ cudaStream_t stream; cudaStreamCreate(&stream); @@ -279,6 +304,29 @@ int main(int argc, char **argv) { continue; } + /* v0.2 — публикуем encoded packet в packet ring ДО decoder. Это позволяет + * record-consumer'ам брать packet без второго RTSP-подключения к камере. */ + if (a.enable_packet_ring) { + int64_t pkt_pts_ns = (pkt->pts != AV_NOPTS_VALUE) + ? av_rescale_q(pkt->pts, stream_tb, AVRational{1, 1000000000}) + : cuframes::now_ns(); + int64_t pkt_dts_ns = (pkt->dts != AV_NOPTS_VALUE) + ? av_rescale_q(pkt->dts, stream_tb, AVRational{1, 1000000000}) + : pkt_pts_ns; + uint32_t pkt_flags = 0; + if (pkt->flags & AV_PKT_FLAG_KEY) pkt_flags |= CUFRAMES_PKT_FLAG_KEY; + if (pkt->flags & AV_PKT_FLAG_CORRUPT) pkt_flags |= CUFRAMES_PKT_FLAG_CORRUPT; +#ifdef AV_PKT_FLAG_DISCONTINUITY + if (pkt->flags & AV_PKT_FLAG_DISCONTINUITY) pkt_flags |= CUFRAMES_PKT_FLAG_DISCONTINUITY; +#endif + int prr = pub.publish_packet(pkt->data, (size_t)pkt->size, + pkt_pts_ns, pkt_dts_ns, pkt_flags); + if (prr != CUFRAMES_OK && a.verbose) { + std::cerr << "[cuframes-src] publish_packet rc=" << prr + << " size=" << pkt->size << "\n"; + } + } + r = avcodec_send_packet(ctx, pkt); av_packet_unref(pkt); if (r < 0) continue;