Merge pull request 'v0.2: encoded packet ring' (#4) from v0.2-encoded-packets into main
build / cmake build (CUDA 12.4, Ubuntu 22.04) (push) Successful in 10m0s
build / ffmpeg filter patch (out-of-tree) (push) Successful in 8m32s

This commit was merged in pull request #4.
This commit is contained in:
gx
2026-05-19 17:47:10 +01:00
15 changed files with 1773 additions and 7 deletions
+40 -4
View File
@@ -25,12 +25,30 @@ jobs:
# Ставим Node 20 из NodeSource repo.
- name: Bootstrap Node 20 + git (для actions/checkout)
run: |
set -e
export DEBIAN_FRONTEND=noninteractive
apt-get update
apt-get install -y --no-install-recommends curl git ca-certificates gnupg
curl -fsSL https://deb.nodesource.com/setup_20.x | bash -
# NodeSource setup может молча упасть на slow networks (особенно через VPN
# на u4-runner); retry + явная verification что Node >= 18 после install.
for i in 1 2 3; do
if curl -fsSL --retry 3 --retry-delay 5 --connect-timeout 30 \
https://deb.nodesource.com/setup_20.x | bash -; then
break
fi
echo "NodeSource setup attempt $i failed, retrying..."
sleep 10
done
apt-get install -y --no-install-recommends nodejs
node --version
NODE_VER=$(node --version)
echo "node: $NODE_VER"
# actions/checkout@v4 требует Node 20+ (ES2022 static blocks).
# Если NodeSource setup упал и установился Ubuntu's Node 12 — фейлим явно.
NODE_MAJOR=$(echo "$NODE_VER" | sed -E 's/^v([0-9]+).*/\1/')
if [ "$NODE_MAJOR" -lt 18 ]; then
echo "ERROR: Node $NODE_VER too old, NodeSource setup likely failed" >&2
exit 1
fi
- name: Install build deps
run: |
@@ -80,12 +98,30 @@ jobs:
steps:
- name: Bootstrap Node 20 + git (для actions/checkout)
run: |
set -e
export DEBIAN_FRONTEND=noninteractive
apt-get update
apt-get install -y --no-install-recommends curl git ca-certificates gnupg
curl -fsSL https://deb.nodesource.com/setup_20.x | bash -
# NodeSource setup может молча упасть на slow networks (особенно через VPN
# на u4-runner); retry + явная verification что Node >= 18 после install.
for i in 1 2 3; do
if curl -fsSL --retry 3 --retry-delay 5 --connect-timeout 30 \
https://deb.nodesource.com/setup_20.x | bash -; then
break
fi
echo "NodeSource setup attempt $i failed, retrying..."
sleep 10
done
apt-get install -y --no-install-recommends nodejs
node --version
NODE_VER=$(node --version)
echo "node: $NODE_VER"
# actions/checkout@v4 требует Node 20+ (ES2022 static blocks).
# Если NodeSource setup упал и установился Ubuntu's Node 12 — фейлим явно.
NODE_MAJOR=$(echo "$NODE_VER" | sed -E 's/^v([0-9]+).*/\1/')
if [ "$NODE_MAJOR" -lt 18 ]; then
echo "ERROR: Node $NODE_VER too old, NodeSource setup likely failed" >&2
exit 1
fi
- name: Install build deps
run: |
+39
View File
@@ -5,6 +5,45 @@
Формат основан на [Keep a Changelog](https://keepachangelog.com/en/1.1.0/),
проект следует [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## [Unreleased] — v0.2 work in progress
См. PR [#4](https://git.goldix.org/gx/cuframes/pulls/4).
### Added
- **Encoded packet ring** — параллельный ring для H.264/H.265 NAL units
(отдельный SHM `/dev/shm/cuframes-<key>-packets`, variable-length byte
buffer + slot index, seqlock-style read для защиты от overrun).
- **Wire protocol v2** (`proto_version = 2` в SHM header). Backward-compat:
v2 publishers принимают v1 subscribers (frames-only).
- **Public C API** (`include/cuframes/cuframes.h`):
- `cuframes_publisher_enable_packets(opts)` — активирует ring
- `cuframes_publisher_set_codec_extradata(data, size)` — SPS/PPS
- `cuframes_publisher_publish_packet(data, size, pts, dts, flags)`
- `cuframes_subscriber_enable_packets()` + `_next_packet()` + accessors
- `cuframes_subscriber_get_codec_params(codec_id, extradata, size)`
- **`cuframes::Publisher`** (C++ RAII): `enable_packets`, `set_codec_extradata`,
`publish_packet` методы.
- **`cuframes-rtsp-source`**: новый CLI flag `--enable-packet-ring`.
Дублирует `AVPacket` в encoded ring до передачи декодеру.
- **FFmpeg demuxer `cuframes_packets://<key>`** (отдельная ветка
[gx/ffmpeg-patched PR #1](https://git.goldix.org/gx/ffmpeg-patched/pulls/1)).
Companion к `cuframes://`. Use case: Frigate `record` role без
второго RTSP к камере.
- **4 новых error codes**: `PACKET_OVERSIZED`, `NO_PACKET_RING`,
`NO_CODEC_PARAMS`, `PACKET_OVERRUN`.
- **Stress test** `libcuframes/tests/test_packet_ring.c`: 2 scenarios —
normal flow (1 pub × 1 sub × 2000 packets, integrity check) +
slow consumer (must hit OVERRUN + library auto-resync на keyframe).
- **Protocol spec §10** в `docs/protocol.md` (397 строк): byte-exact
layout, seqlock semantics, late-subscriber GOP-aligned start.
### Limitations (документировано)
- Sub-stream selection отложено в v0.3 (`<key>-substream-<N>` naming).
- Audio packets — v0.3 (тот же ring layout, codec_id = audio).
- Codec change mid-stream — требует publisher destroy+recreate.
## [0.1.0] — 2026-05-17
Первый функциональный release с production deployment.
+60 -1
View File
@@ -297,9 +297,68 @@ encoded packet path (v0.2).
Не относится к cuframes — это нормальное поведение Frigate's go2rtc для
TCP transport. TV/VLC обычно использует UDP — оно работает.
## v0.2: dual-input (detect + record через один RTSP)
После cuframes v0.2 publisher активирует **encoded packet ring** параллельно
с decoded frames ring. Это даёт Frigate одновременно:
- `cuframes://<key>`**decoded NV12** для `detect` role (как в v0.1)
- `cuframes_packets://<key>`**encoded H.264/H.265** для `record` role
(passthrough, без decode)
**1 RTSP connection** к камере вместо 2-3 (Frigate сейчас открывает
отдельный stream для record).
### Setup
```bash
cuframes-rtsp-source \
--rtsp rtsp://admin:pw@192.168.88.98/cam/realmonitor?channel=1 \
--key cam-parking \
--enable-packet-ring
```
Publisher держит **два** SHM:
- `/dev/shm/cuframes-cam-parking` (decoded NV12, v0.1)
- `/dev/shm/cuframes-cam-parking-packets` (encoded packets, v0.2)
### Frigate config
```yaml
cameras:
cam_parking:
ffmpeg:
inputs:
- path: cuframes://cam-parking
input_args: -f cuframes
roles: [detect]
- path: cuframes_packets://cam-parking
input_args: -f cuframes_packets
roles: [record]
```
### Requirements
- Patched FFmpeg с обоими demuxer'ами:
[gx/ffmpeg-patched PR #1](https://git.goldix.org/gx/ffmpeg-patched/pulls/1).
- Frigate Dockerfile перекомпилирован с этим ffmpeg (см. секцию выше про
`cuframes-frigate:0.17` build).
### Trade-offs
| Метрика | v0.1 (frames only) | v0.2 (frames + packets) |
|---|---|---|
| RTSP к камере | 1 (publisher) | 1 (publisher) |
| Frigate-side RTSP | 1+ (record отдельно) | **0** — всё через cuframes |
| Camera RTSP streams | 2+ | **1** |
| Доп. VRAM | ring (~10 MB) | без изменений |
| Доп. host RAM | минимум | + 8 MB на packet ring |
| Доп. CPU | nominal | nominal (memcpy в shared ring) |
## См. также
- [filter/README.md](../../filter/README.md) — детали FFmpeg demuxer + patch
- [docs/integration.md](../integration.md) — общий integration guide
- [docs/protocol.md §10](../protocol.md#10-v02-extension-encoded-packet-ring-proto_version2) — wire-protocol spec для packet ring
- [BENCHMARKS.md](../../BENCHMARKS.md) — production-measured результаты
- [ROADMAP.md](../../ROADMAP.md) — v0.2 что улучшит для Frigate
- [ROADMAP.md](../../ROADMAP.md) — v0.3+ planned features
+397
View File
@@ -423,3 +423,400 @@ TEST(Handshake, HelloRespMismatchProto) {
`libcuframes/src/protocol.c` (Phase 1, Step 2) — единственная reference.
Любая другая реализация (Python ctypes, Rust bindings, FFmpeg plugin)
должна **conformance-tested** против этого документа.
## 10. v0.2 extension: encoded packet ring (proto_version=2)
**Статус:** design draft, ещё не реализовано (см. issue #2).
Параллельно с decoded-frames ring (§2) publisher может опционально
поддерживать **encoded packet ring** — публикует raw H.264/H.265 NAL units
**до** decoder, для consumer'ов которые делают `-c:v copy` (recording, mux).
### 10.1 Совместимость с v1
- v2 publisher принимает **v1-subscribers** — они получают только frames
ring (как v0.1), packet ring им не показывается.
- v1 publisher отвергает v2-subscribers с `wants_packets=true`
(HELLO_RESP error PROTOCOL).
- v1 layout (§2) **не меняется** для frames ring — packet ring это отдельный SHM.
Publisher version bumping:
- `proto_version` = 2 в SHM header и в HELLO_RESP когда packet ring active.
- Если publisher v2 не активирует packet ring (`enable_packet_ring=false`)
`proto_version` остаётся 1 (полная v1 compat).
### 10.2 Дополнительные ресурсы
| Resource | Path | Назначение | Когда |
|---|---|---|---|
| Packet shared memory | `/dev/shm/cuframes-<key>-packets` | Packet ring header + slots + byte buffer | если publisher активировал packet ring |
Cleanup — симметрично §1: `shm_unlink` при destroy(); orphaned автоматически
если nobody mmap'ит.
### 10.3 Packet ring layout
Размер пакетного SHM: `sizeof(packet_ring_header_t) + N×PSE + DATA_SIZE`,
где:
- N = `packet_ring_slots`, default 64 (configurable)
- PSE = `sizeof(packet_slot_entry_t)` = 64 байт (см. §10.5)
- DATA_SIZE = `packet_data_size`, default 8 MB (configurable)
#### Byte layout
```
Offset Size Field Comments
─────────────────────── ────── ────────────────────────── ─────────────────────────────
0x0000 4 magic (LE u32) 0xCC7C1DCD (frames magic + 1)
0x0004 4 proto_version (LE u32) 2
0x0008 4 ring_slots (LE u32) N (1..1024)
0x000C 4 data_size (LE u32) bytes for packet data ring
0x0010 4 codec_id (LE u32) AV_CODEC_ID_* enum (см. §10.4)
0x0014 4 codec_extradata_size (LE u32) ≤ 4096
0x0018 8 producer_pid (LE u64)
0x0020 8 global_seq (LE u64, atomic) монотонная по packets
0x0028 8 last_keyframe_seq (LE u64, atomic) для late subscribers
0x0030 8 write_offset (LE u64, atomic) текущий cursor в data ring
0x0038 8 shutdown_flag (LE u64, atomic)
0x0040 4096 codec_extradata SPS/PPS/VPS bytes (см. §10.4)
0x1040 N×64 slots[N] packet_slot_entry_t (см. §10.5)
0x1040+N×64 DATA_SIZE data[] wraparound byte buffer
```
Все atomic fields — C11 `_Atomic` (release/acquire semantics для seq updates).
### 10.4 Codec extradata
H.264 — SPS + PPS, конкатенированные в **Annex B** формате
(start codes `00 00 00 01`). H.265 — VPS + SPS + PPS.
`codec_id` соответствует FFmpeg `AV_CODEC_ID_H264`, `AV_CODEC_ID_HEVC`,
`AV_CODEC_ID_AV1` (future). Subscriber пишет этот extradata в
`AVCodecContext.extradata` своего decoder'а (если он его создаёт)
или в `AVStream.codecpar->extradata` для muxer'ов.
Extradata устанавливается publisher'ом **один раз** при первом keyframe
(или из RTSP SDP до первого packet). После — fixed на lifetime publisher'а
(codec change mid-stream → publisher destroy+recreate с новым `<key>`).
### 10.5 Packet slot entry (64 байта)
```
Offset Size Field Comments
0x00 8 seq (LE u64, atomic) published seq; UINT64_MAX = invalid
0x08 8 pts_ns (LE i64)
0x10 8 dts_ns (LE i64) для B-frames pipelines
0x18 8 data_offset (LE u64) offset в `data[]` секции SHM
0x20 4 data_size (LE u32) size of payload bytes
0x24 4 flags (LE u32) §10.6
0x28 24 reserved 0
```
`data_offset` может быть **больше** `data_size` секции SHM — semantics
"absolute byte cursor", фактический byte index = `data_offset % data_size`.
Subscriber может detect wrap (если payload crosses end → split read).
### 10.6 Packet flags
```
Bit Name Comments
0 KEY keyframe (IDR for H.264, или CRA/IDR для HEVC).
Critical для late subscribers — must wait IDR.
1 CORRUPT publisher detect'нул что packet damaged
(RTP loss и т.п.) — subscriber может skip
2 DISCONTINUITY был gap перед этим packet
(publisher reconnect к камере)
3 LAST_IN_AU last NAL в access unit (полный frame)
— для muxer'ов которые ждут полный frame
4-31 reserved 0
```
Mapping в `AVPacket.flags`:
- bit 0 (KEY) → `AV_PKT_FLAG_KEY`
- bit 1 (CORRUPT) → `AV_PKT_FLAG_CORRUPT`
- bit 2 (DISCONTINUITY) → `AV_PKT_FLAG_DISCONTINUITY` (FFmpeg 5+)
### 10.7 Atomic publish (publisher-side)
```c
// Pseudo-C (упрощено, без error handling)
uint64_t seq = atomic_load(&hdr->global_seq, RELAXED) + 1;
uint64_t off = atomic_load(&hdr->write_offset, RELAXED);
// 1. Найти free slot (overwrite oldest)
size_t slot_idx = seq % hdr->ring_slots;
packet_slot_entry_t *slot = &slots[slot_idx];
// 2. Записать payload bytes (wraparound, может потребовать 2 memcpy)
size_t off_in_ring = off % hdr->data_size;
size_t first_chunk = min(size, hdr->data_size - off_in_ring);
memcpy(data + off_in_ring, payload, first_chunk);
if (first_chunk < size)
memcpy(data, payload + first_chunk, size - first_chunk);
// 3. RELEASE: записать metadata в slot
slot->pts_ns = pts;
slot->dts_ns = dts;
slot->data_offset = off;
slot->data_size = size;
slot->flags = flags;
atomic_store(&slot->seq, seq, RELEASE);
// 4. Update global cursor + global_seq
atomic_store(&hdr->write_offset, off + size, RELEASE);
atomic_store(&hdr->global_seq, seq, RELEASE);
// 5. If KEY → update last_keyframe_seq
if (flags & PKT_FLAG_KEY)
atomic_store(&hdr->last_keyframe_seq, seq, RELEASE);
```
### 10.8 Atomic read (subscriber-side)
```c
// Pseudo-C
uint64_t cur = atomic_load(&hdr->global_seq, ACQUIRE);
if (cur <= my_last_seq) return TIMEOUT; // ничего нового
uint64_t want_seq = my_last_seq + 1;
size_t slot_idx = want_seq % hdr->ring_slots;
packet_slot_entry_t *slot = &slots[slot_idx];
uint64_t slot_seq = atomic_load(&slot->seq, ACQUIRE);
if (slot_seq != want_seq) {
// overrun — slow subscriber. Re-anchor:
want_seq = atomic_load(&hdr->last_keyframe_seq, ACQUIRE);
slot_idx = want_seq % hdr->ring_slots;
slot = &slots[slot_idx];
return DROPPED; // signal user через flags = DISCONTINUITY
}
// Copy payload (wraparound aware)
uint64_t off = slot->data_offset % hdr->data_size;
uint32_t size = slot->data_size;
uint32_t first_chunk = min(size, hdr->data_size - off);
memcpy(out_buf, data + off, first_chunk);
if (first_chunk < size)
memcpy(out_buf + first_chunk, data, size - first_chunk);
// Re-check slot->seq не изменился (защита от overrun mid-read)
if (atomic_load(&slot->seq, ACQUIRE) != want_seq) {
return DROPPED; // publisher overwrote во время copy
}
my_last_seq = want_seq;
return OK;
```
Защита от overrun mid-read через **post-check `slot->seq`** — простая
вариант seqlock. Если publisher успел overwrite между metadata-read и
data-copy — subscriber detect и retry.
### 10.9 Socket protocol extensions
#### HELLO_REQ — добавляются flags в reserved field
v1 layout (§3.3):
```
[4 bytes] proto_version
[4 bytes] consumer_name_len
[N bytes] consumer_name
[4 bytes] cuda_device
[4 bytes] mode
[12 bytes] reserved (must be 0) ← v0.2 использует первые 4 байта
```
v0.2 интерпретирует первые 4 байта `reserved` как `subscribe_flags`:
| Bit | Name | Comments |
|---|---|---|
| 0 | `WANTS_FRAMES` | подписаться на decoded frames ring (default ON в v1 — implicit) |
| 1 | `WANTS_PACKETS` | подписаться на encoded packet ring |
| 2-31 | reserved | 0 |
Если v1-subscriber оставляет reserved=0 — publisher v2 интерпретирует это
как `WANTS_FRAMES=true, WANTS_PACKETS=false` (v1 backward-compat).
#### HELLO_RESP — добавляются packet-ring fields
v1 layout (§3.4) расширяется в reserved секции:
```
[4 bytes] result
[4 bytes] proto_version_actual ← теперь может быть 1 или 2
[4 bytes] ring_size ← frames ring
[4 bytes] ownership_mode
[64 bytes] frame_meta
[4 bytes] shm_path_len ← frames SHM
[N bytes] shm_path
[12 bytes] reserved ← v0.2 интерпретирует
```
v0.2 reserved layout (если `proto_version_actual == 2` И publisher
поддерживает packets):
```
[4 bytes] packet_shm_path_len (LE u32) 0 = packets disabled at publisher
[N bytes] packet_shm_path (UTF-8) — относительно /dev/shm/, например "cuframes-camA-packets"
[4 bytes] codec_id (LE u32) AV_CODEC_ID_*
[4 bytes] initial_packet_seq (LE u64) last_keyframe_seq на момент handshake
(subscriber должен start с этого seq)
```
Если subscriber запросил `WANTS_PACKETS=1` но publisher не имеет packet ring
`result = ERR_NOT_AVAILABLE`.
### 10.10 Subscriber state machine extension
Подключение к **обоим** rings (или одному из):
```
┌──────────┐
│ HELLO_OK │ proto_version_actual=2, packet_shm_path_len>0
└────┬─────┘
┌────────────────────────────────┐
│ Open frames SHM (если WANTS_FRAMES) │ → standard v1 flow
└────────────────────────────────┘
┌────────────────────────────────┐
│ Open packet SHM (если WANTS_PACKETS) │
│ - mmap /dev/shm/cuframes-<key>-packets │
│ - check magic, proto_version │
│ - set my_last_packet_seq = initial_packet_seq - 1 │
│ (так что первый next_packet вернёт IDR) │
└────────────────────────────────┘
┌─────────┐
│ READY │ — frames или packets или оба доступны
└─────────┘
```
### 10.11 Threading в subscriber
Frames ring и packet ring имеют **разные** `global_seq` counters.
Subscriber имеет **отдельные** `my_last_seq` для каждого. Может
poll'ить обе независимо (или через два threads).
Producer's `cudaEventRecord` (frames sync) не релевантен для packets —
encoded data на CPU, без CUDA sync.
### 10.12 Конфигурируемость packet ring
Publisher API extension (§10.13) принимает параметры:
```c
typedef struct {
uint32_t packet_ring_slots; // default 64
uint32_t packet_data_size; // default 8 MB (8388608)
uint32_t max_packet_size; // default 2 MB — sanity guard для оversized
// packets (publisher rejects with error)
uint32_t codec_id; // AV_CODEC_ID_H264 / HEVC / ...
} cuframes_packet_ring_options_t;
```
### 10.13 API extension (для cuframes.h)
```c
/* Сreate publisher с активным packet ring. NULL для opts → packet ring disabled. */
int cuframes_publisher_create_ex(
const cuframes_publisher_options_t *frames_opts,
const cuframes_packet_ring_options_t *packet_opts, /* NULL = no packet ring */
cuframes_publisher_t **pub_out
);
/* Set codec extradata (SPS/PPS) — должен быть called до первого publish_packet. */
int cuframes_publisher_set_codec_extradata(
cuframes_publisher_t *pub,
const void *extradata,
size_t size
);
/* Публикация packet. Slow consumer = overwrite oldest. */
int cuframes_publisher_publish_packet(
cuframes_publisher_t *pub,
const void *data,
size_t size,
int64_t pts_ns,
int64_t dts_ns,
uint32_t flags /* CUFRAMES_PKT_FLAG_KEY | _CORRUPT | _DISCONTINUITY | _LAST_IN_AU */
);
/* Subscriber-side: подписаться с opt-in для packets. */
typedef struct {
/* ... existing v1 fields ... */
uint32_t subscribe_flags; /* WANTS_FRAMES, WANTS_PACKETS bits */
} cuframes_subscriber_options_v2_t;
int cuframes_subscriber_create_v2(
const cuframes_subscriber_options_v2_t *opts,
cuframes_subscriber_t **sub_out
);
/* Чтение packet. Opaque handle — каллер вызывает release_packet после. */
typedef struct cuframes_packet cuframes_packet_t;
int cuframes_subscriber_next_packet(
cuframes_subscriber_t *sub,
cuframes_packet_t **pkt_out,
int32_t timeout_ms
);
const void * cuframes_packet_data(const cuframes_packet_t *p);
size_t cuframes_packet_size(const cuframes_packet_t *p);
int64_t cuframes_packet_pts(const cuframes_packet_t *p);
int64_t cuframes_packet_dts(const cuframes_packet_t *p);
uint32_t cuframes_packet_flags(const cuframes_packet_t *p);
int cuframes_subscriber_release_packet(cuframes_subscriber_t *sub, cuframes_packet_t *p);
/* Codec params для subscriber (extracted из shared header). */
int cuframes_subscriber_get_codec_params(
cuframes_subscriber_t *sub,
uint32_t *codec_id_out,
const void **extradata_out,
size_t *extradata_size_out
);
```
`cuframes_packet_t` opaque — фактически указатель в local-mapped data (на
heap subscriber'а — copy при `next_packet`, освобождение при `release`).
Subscriber **не** держит ссылки на shared ring data между `next_packet` и
`release_packet` — это избавляет от reader-locks.
### 10.14 Late subscriber → keyframe-aligned start
При SUBSCRIBE_RESP publisher отвечает `initial_packet_seq = last_keyframe_seq`.
Subscriber устанавливает `my_last_seq = initial_packet_seq - 1`, так что
первый `next_packet` вернёт keyframe (decoder может start без glitches).
**Risk:** если в момент handshake **last_keyframe_seq уже выехал из
ring** (slow start subscriber, GOP > ring_slots packets) — subscriber
detect overrun в первом read и переходит на следующий keyframe.
В implementation `publisher_publish_packet` для оптимизации может маркировать
slot перед IDR как **persistent** (флаг в reserved), но **v0.2 keep simple**
просто требуем что `packet_ring_slots × avg_packet_size > GOP_size_in_bytes`
для нормальной работы. Sizing guide см. в [docs/integration.md](integration.md).
### 10.15 Error codes (новые)
| Code | Name | Когда |
|---|---|---|
| -20 | `CUFRAMES_ERR_PACKET_OVERSIZED` | publish_packet с size > max_packet_size |
| -21 | `CUFRAMES_ERR_NO_PACKET_RING` | subscriber запросил packets, publisher без packet ring |
| -22 | `CUFRAMES_ERR_NO_CODEC_PARAMS` | get_codec_params вызван до set_codec_extradata publisher'ом |
| -23 | `CUFRAMES_ERR_PACKET_OVERRUN` | subscriber slow — packet seq уехал, надо resync на keyframe |
### 10.16 Open для v0.3+
- **Sub-stream selection** — publisher может публиковать несколько
packet rings (для multi-resolution streams). Сейчас один key = один stream.
v0.3 → `<key>-substream-<N>` naming?
- **Codec change mid-stream** — текущий design требует publisher restart.
Future: invalidate codec_extradata + bump generation field.
- **Audio streams** — analogichno в packet ring, но codec_id = audio (AAC,
Opus). v0.3.
+141
View File
@@ -65,6 +65,11 @@ typedef enum cuframes_error {
несовпадение размеров frame'а */
CUFRAMES_ERR_WOULD_BLOCK = -11, /**< non-blocking call — no data yet */
CUFRAMES_ERR_TOO_MANY = -12, /**< превышен MAX_SUBSCRIBERS (32) */
/* v0.2 — packet ring (см. docs/protocol.md §10.15) */
CUFRAMES_ERR_PACKET_OVERSIZED = -20, /**< publish_packet size > max_packet_size */
CUFRAMES_ERR_NO_PACKET_RING = -21, /**< subscriber запросил packets, у publisher'а нет ring'а */
CUFRAMES_ERR_NO_CODEC_PARAMS = -22, /**< extradata ещё не set publisher'ом */
CUFRAMES_ERR_PACKET_OVERRUN = -23, /**< slow subscriber, packet seq уехал — resync на keyframe */
CUFRAMES_ERR_INTERNAL = -100, /**< bug в библиотеке — repro и reportить */
} cuframes_error_t;
@@ -366,6 +371,142 @@ int cuframes_async_subscriber_create(const cuframes_subscriber_config_t *cfg,
*/
int cuframes_async_subscriber_destroy(cuframes_async_subscriber_t *sub);
/* ─────────────────────────────────────────────────────────────────────── */
/* Encoded packet ring API (v0.2 — см. docs/protocol.md §10) */
/* ─────────────────────────────────────────────────────────────────────── */
/** Packet flags — биты соответствуют AV_PKT_FLAG_* у FFmpeg. */
#define CUFRAMES_PKT_FLAG_KEY 0x01u /**< IDR / keyframe */
#define CUFRAMES_PKT_FLAG_CORRUPT 0x02u /**< RTP loss / damage */
#define CUFRAMES_PKT_FLAG_DISCONTINUITY 0x04u /**< gap before this packet */
#define CUFRAMES_PKT_FLAG_LAST_IN_AU 0x08u /**< последний NAL в access unit */
typedef struct cuframes_packet_ring_options {
/** Слотов в индексе ring'а. Default 64 (≈ 2 sec @ 30fps + GOP). */
uint32_t ring_slots;
/** Размер data section ring'а в байтах. Default 8 MiB. */
uint32_t data_size;
/** Sanity guard — publisher отклонит packet > этого. Default 2 MiB. */
uint32_t max_packet_size;
/** FFmpeg AV_CODEC_ID_* (H.264 = 27, HEVC = 173). */
uint32_t codec_id;
uint64_t _reserved[4];
} cuframes_packet_ring_options_t;
/**
* @brief Активировать encoded packet ring на существующем publisher'е.
*
* Создаёт дополнительный SHM `/dev/shm/cuframes-<key>-packets`. После
* этого call'а publisher шлёт packets через `cuframes_publisher_publish_packet`.
*
* Должно быть вызвано **до** первого `publish_packet` и желательно до того
* как subscribers начнут подключаться (иначе они увидят publisher без packet
* ring и не получат packets).
*
* @param pub
* @param opts NULL = default sizing (64 slots, 8MiB data, 2MiB max). codec_id=0 = unknown.
* @return CUFRAMES_ERR_ALREADY_EXISTS если ring уже активирован
*/
int cuframes_publisher_enable_packets(cuframes_publisher_t *pub,
const cuframes_packet_ring_options_t *opts);
/**
* @brief Установить codec extradata (SPS/PPS/VPS) для packet ring.
*
* Subscribers (FFmpeg demuxer) читают extradata из shared header и подставляют
* в AVCodecContext.extradata. Должно быть вызвано до того как subscribers
* захотят decode.
*
* @param size ≤ 4096 байт (CUFRAMES_PKT_EXTRADATA_MAX)
*/
int cuframes_publisher_set_codec_extradata(cuframes_publisher_t *pub,
const void *extradata, size_t size);
/**
* @brief Опубликовать encoded packet (H.264/H.265 NAL units, Annex B).
*
* Slow consumer = overwrite oldest. Late subscriber resync'нется на last
* keyframe (см. docs/protocol.md §10.14).
*
* @param flags CUFRAMES_PKT_FLAG_* (минимум KEY на IDR — критично!)
* @return CUFRAMES_ERR_NO_PACKET_RING если не вызывали enable_packets
* @return CUFRAMES_ERR_PACKET_OVERSIZED если size > max_packet_size
*/
int cuframes_publisher_publish_packet(cuframes_publisher_t *pub,
const void *data, size_t size,
int64_t pts_ns, int64_t dts_ns,
uint32_t flags);
/* ── Subscriber-side packet API ───────────────────────────────────────── */
/** Opaque packet handle. Освобождается через release_packet. */
typedef struct cuframes_packet cuframes_packet_t;
/** @brief Pointer на encoded NAL bytes. Valid до release_packet. */
const void *cuframes_packet_data(const cuframes_packet_t *p);
/** @brief Размер payload в байтах. */
size_t cuframes_packet_size(const cuframes_packet_t *p);
/** @brief Presentation timestamp (наносекунды). */
int64_t cuframes_packet_pts(const cuframes_packet_t *p);
/** @brief Decode timestamp (для B-frames pipelines). */
int64_t cuframes_packet_dts(const cuframes_packet_t *p);
/** @brief Биты CUFRAMES_PKT_FLAG_*. */
uint32_t cuframes_packet_flags(const cuframes_packet_t *p);
/** @brief Sequence number у publisher'а. */
uint64_t cuframes_packet_seq(const cuframes_packet_t *p);
/**
* @brief Активировать чтение packet ring на subscriber'е.
*
* Открывает SHM `/dev/shm/cuframes-<key>-packets` (тот же `key` что в config).
* После этого можно читать через `cuframes_subscriber_next_packet`.
*
* Subscriber может одновременно иметь frames ring и packets ring (или один из).
*
* @return CUFRAMES_ERR_NOT_FOUND если publisher не имеет packet ring
*/
int cuframes_subscriber_enable_packets(cuframes_subscriber_t *sub);
/**
* @brief Получить следующий packet.
*
* Late subscriber (первый вызов) начинает с last_keyframe_seq publisher'а
* decoder receive'нет valid stream без glitches.
*
* Полученный packet ОБЯЗАТЕЛЬНО освободить через
* cuframes_subscriber_release_packet().
*
* @param timeout_ms <0 = блокироваться, 0 = non-blocking (WOULD_BLOCK), >0 = с таймаутом
* @return CUFRAMES_ERR_PACKET_OVERRUN — subscriber отстал, resync на keyframe (library сделает автоматически на next call)
* @return CUFRAMES_ERR_DISCONNECTED — publisher shutdown
*/
int cuframes_subscriber_next_packet(cuframes_subscriber_t *sub,
cuframes_packet_t **pkt_out,
int32_t timeout_ms);
/** @brief Освободить packet handle. NULL-safe. */
int cuframes_subscriber_release_packet(cuframes_subscriber_t *sub,
cuframes_packet_t *pkt);
/**
* @brief Получить codec parameters publisher'а.
*
* `*extradata_out` — pointer в библиотечный buffer, valid пока subscriber жив.
* Caller должен скопировать данные если хочет hold past subscriber lifetime.
*
* @return CUFRAMES_ERR_NO_CODEC_PARAMS если publisher ещё не вызвал
* set_codec_extradata
*/
int cuframes_subscriber_get_codec_params(cuframes_subscriber_t *sub,
uint32_t *codec_id_out,
const void **extradata_out,
size_t *extradata_size_out);
/* ─────────────────────────────────────────────────────────────────────── */
/* Утилиты */
/* ─────────────────────────────────────────────────────────────────────── */
+17
View File
@@ -148,6 +148,23 @@ public:
"Publisher::publish_external");
}
/* v0.2 — encoded packet ring */
void enable_packets(const cuframes_packet_ring_options_t *opts = nullptr) {
check(cuframes_publisher_enable_packets(pub_, opts),
"Publisher::enable_packets");
}
void set_codec_extradata(const void *data, size_t size) {
check(cuframes_publisher_set_codec_extradata(pub_, data, size),
"Publisher::set_codec_extradata");
}
/* Returns CUFRAMES_OK / negative error code (без throw — caller решает). */
int publish_packet(const void *data, size_t size,
int64_t pts_ns, int64_t dts_ns, uint32_t flags) noexcept {
return cuframes_publisher_publish_packet(pub_, data, size, pts_ns, dts_ns, flags);
}
cuframes_publisher_t *raw() noexcept { return pub_; }
private:
+1
View File
@@ -10,6 +10,7 @@ set(CUFRAMES_SOURCES
src/producer.c
src/consumer.c
src/consumer_async.c
src/packet_ring.c
)
add_library(cuframes SHARED ${CUFRAMES_SOURCES})
+154 -2
View File
@@ -6,6 +6,7 @@
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <time.h>
#include <unistd.h>
/* Opaque frame — выдаётся subscriber'у на next() */
@@ -23,6 +24,17 @@ struct cuframes_frame {
void *subscriber; /* back-ref для release() */
};
/* Opaque packet handle — single-packet pattern (как frame_obj). */
struct cuframes_packet {
uint8_t *data; /* heap buffer, allocated by subscriber на enable_packets */
size_t capacity; /* size of allocation */
size_t size; /* actual payload size */
int64_t pts_ns;
int64_t dts_ns;
uint32_t flags;
uint64_t seq;
};
struct cuframes_subscriber {
cuframes_subscriber_config_t cfg;
char key[CUFRAMES_MAX_KEY_LEN + 1];
@@ -38,10 +50,16 @@ struct cuframes_subscriber {
uint32_t assigned_bit;
uint64_t last_seen_seq;
/* Frame pool — переиспользуем одну frame_t structure (single-thread API).
* Опционально расширим до lock-free pool в v0.2 если нужен multi-frame. */
/* Frame pool — переиспользуем одну frame_t structure (single-thread API). */
struct cuframes_frame frame_obj;
int frame_busy;
/* v0.2 — packet ring (optional, opened via enable_packets). */
int has_pkt_ring;
cuframes_pkt_ring_t pkt_ring;
uint64_t last_packet_seq; /* UINT64_MAX = no packet read yet */
struct cuframes_packet packet_obj;
int packet_busy;
};
/* ─── Frame accessors ────────────────────────────────────────────────── */
@@ -347,9 +365,143 @@ int cuframes_subscriber_destroy(cuframes_subscriber_t *sub) {
if (sub->mapped_ptrs[i]) cudaIpcCloseMemHandle(sub->mapped_ptrs[i]);
}
/* Packet ring cleanup */
if (sub->has_pkt_ring) {
cuframes_internal_pkt_ring_destroy(&sub->pkt_ring);
}
if (sub->packet_obj.data) {
free(sub->packet_obj.data);
sub->packet_obj.data = NULL;
}
if (sub->hdr) munmap(sub->hdr, sizeof(cuframes_shm_header_t));
if (sub->shm_fd >= 0) close(sub->shm_fd);
if (sub->sock_fd >= 0) close(sub->sock_fd);
free(sub);
return CUFRAMES_OK;
}
/* ─────────────────────────────────────────────────────────────────────── */
/* v0.2 — encoded packet ring API (см. docs/protocol.md §10) */
/* ─────────────────────────────────────────────────────────────────────── */
/* Packet accessors */
const void *cuframes_packet_data(const cuframes_packet_t *p) { return p ? p->data : NULL; }
size_t cuframes_packet_size(const cuframes_packet_t *p) { return p ? p->size : 0; }
int64_t cuframes_packet_pts(const cuframes_packet_t *p) { return p ? p->pts_ns : 0; }
int64_t cuframes_packet_dts(const cuframes_packet_t *p) { return p ? p->dts_ns : 0; }
uint32_t cuframes_packet_flags(const cuframes_packet_t *p) { return p ? p->flags : 0; }
uint64_t cuframes_packet_seq(const cuframes_packet_t *p) { return p ? p->seq : 0; }
int cuframes_subscriber_enable_packets(cuframes_subscriber_t *sub) {
if (!sub) return CUFRAMES_ERR_INVALID_ARG;
if (sub->has_pkt_ring) return CUFRAMES_OK; /* idempotent */
char pkt_name[128];
int r = cuframes_internal_pkt_shm_name(sub->key, pkt_name, sizeof(pkt_name));
if (r != CUFRAMES_OK) return r;
r = cuframes_internal_pkt_ring_open(pkt_name, &sub->pkt_ring);
if (r != CUFRAMES_OK) return r;
/* Allocate copy-buffer (max packet size). Используем data_size как
* conservative upper bound (publisher гарантирует data_size >= max_packet_size). */
size_t capacity = sub->pkt_ring.hdr->data_size;
sub->packet_obj.data = (uint8_t *)malloc(capacity);
if (!sub->packet_obj.data) {
cuframes_internal_pkt_ring_destroy(&sub->pkt_ring);
return CUFRAMES_ERR_OUT_OF_MEMORY;
}
sub->packet_obj.capacity = capacity;
/* Start с last_keyframe_seq - 1 → первый read даст IDR (§10.14). */
uint64_t kf = atomic_load_explicit(&sub->pkt_ring.hdr->last_keyframe_seq,
memory_order_acquire);
sub->last_packet_seq = (kf == UINT64_MAX) ? UINT64_MAX : kf - 1;
sub->has_pkt_ring = 1;
return CUFRAMES_OK;
}
int cuframes_subscriber_next_packet(cuframes_subscriber_t *sub,
cuframes_packet_t **pkt_out,
int32_t timeout_ms) {
if (!sub || !pkt_out) return CUFRAMES_ERR_INVALID_ARG;
if (!sub->has_pkt_ring) return CUFRAMES_ERR_NO_PACKET_RING;
if (sub->packet_busy) return CUFRAMES_ERR_INVALID_ARG; /* previous packet not released */
int64_t deadline_ns = (timeout_ms > 0) ?
cuframes_now_ns() + (int64_t)timeout_ms * 1000000LL : 0;
for (;;) {
size_t size = 0;
int64_t pts = 0, dts = 0;
uint32_t flags = 0;
uint64_t seq_attempt = sub->last_packet_seq;
int r = cuframes_internal_pkt_ring_read(&sub->pkt_ring,
&seq_attempt,
sub->packet_obj.data,
sub->packet_obj.capacity,
&size, &pts, &dts, &flags);
if (r == CUFRAMES_OK) {
sub->last_packet_seq = seq_attempt;
sub->packet_obj.size = size;
sub->packet_obj.pts_ns = pts;
sub->packet_obj.dts_ns = dts;
sub->packet_obj.flags = flags;
sub->packet_obj.seq = seq_attempt;
sub->packet_busy = 1;
*pkt_out = &sub->packet_obj;
return CUFRAMES_OK;
}
if (r == CUFRAMES_ERR_PACKET_OVERRUN) {
/* Resync — установить last_seq = last_keyframe_seq - 1, повторить. */
uint64_t kf = atomic_load_explicit(
&sub->pkt_ring.hdr->last_keyframe_seq, memory_order_acquire);
if (kf != UINT64_MAX) {
sub->last_packet_seq = kf - 1;
}
/* Возвращаем OVERRUN наружу — caller знает что был discontinuity. */
*pkt_out = NULL;
return CUFRAMES_ERR_PACKET_OVERRUN;
}
if (r != CUFRAMES_ERR_TIMEOUT) {
*pkt_out = NULL;
return r; /* DISCONNECTED, INVALID_ARG, etc. */
}
/* TIMEOUT branch — poll/sleep */
if (timeout_ms == 0) return CUFRAMES_ERR_WOULD_BLOCK;
if (timeout_ms > 0 && cuframes_now_ns() >= deadline_ns) {
return CUFRAMES_ERR_TIMEOUT;
}
struct timespec ts = {0, 1 * 1000 * 1000}; /* 1 ms poll interval */
nanosleep(&ts, NULL);
}
}
int cuframes_subscriber_release_packet(cuframes_subscriber_t *sub,
cuframes_packet_t *pkt) {
if (!sub) return CUFRAMES_ERR_INVALID_ARG;
if (!pkt) return CUFRAMES_OK; /* NULL-safe */
if (pkt != &sub->packet_obj) return CUFRAMES_ERR_INVALID_ARG;
sub->packet_busy = 0;
return CUFRAMES_OK;
}
int cuframes_subscriber_get_codec_params(cuframes_subscriber_t *sub,
uint32_t *codec_id_out,
const void **extradata_out,
size_t *extradata_size_out) {
if (!sub) return CUFRAMES_ERR_INVALID_ARG;
if (!sub->has_pkt_ring) return CUFRAMES_ERR_NO_PACKET_RING;
cuframes_pkt_header_t *hdr = sub->pkt_ring.hdr;
if (codec_id_out) *codec_id_out = hdr->codec_id;
/* Если extradata ещё не выставлен publisher'ом — size=0, pointer ok но empty. */
if (extradata_out) *extradata_out = hdr->codec_extradata;
if (extradata_size_out) *extradata_size_out = hdr->codec_extradata_size;
if (hdr->codec_extradata_size == 0) return CUFRAMES_ERR_NO_CODEC_PARAMS;
return CUFRAMES_OK;
}
+129
View File
@@ -23,12 +23,28 @@
#define CUFRAMES_MAGIC 0xCC7C1DCCu
#define CUFRAMES_PROTOCOL_V1 1u
#define CUFRAMES_PROTOCOL_V2 2u /* v0.2 — packet ring support */
#define CUFRAMES_MAX_SUBSCRIBERS 32
#define CUFRAMES_MAX_RING 16
#define CUFRAMES_MAX_KEY_LEN 63
#define CUFRAMES_MAX_NAME_LEN 31
#define CUFRAMES_RUNTIME_DIR "/run/cuframes"
#define CUFRAMES_SHM_PREFIX "/cuframes-"
#define CUFRAMES_PKT_SHM_SUFFIX "-packets" /* /cuframes-<key>-packets */
/* Packet ring constants (см. docs/protocol.md §10) */
#define CUFRAMES_PKT_MAGIC 0xCC7C1DCDu /* frames magic + 1 */
#define CUFRAMES_PKT_EXTRADATA_MAX 4096u
#define CUFRAMES_PKT_DEFAULT_SLOTS 64u
#define CUFRAMES_PKT_DEFAULT_DATA_SIZE (8u * 1024u * 1024u) /* 8 MB */
#define CUFRAMES_PKT_DEFAULT_MAX_SIZE (2u * 1024u * 1024u) /* 2 MB */
#define CUFRAMES_PKT_MAX_SLOTS 1024u
/* Packet flags (см. docs/protocol.md §10.6) */
#define CUFRAMES_PKT_FLAG_KEY 0x01u
#define CUFRAMES_PKT_FLAG_CORRUPT 0x02u
#define CUFRAMES_PKT_FLAG_DISCONTINUITY 0x04u
#define CUFRAMES_PKT_FLAG_LAST_IN_AU 0x08u
/* ─── Shared memory layout (см. docs/protocol.md §2) ──────────────────── */
@@ -103,6 +119,73 @@ _Static_assert(offsetof(cuframes_shm_header_t, ipc_event_handle) == 0x0080, "eve
_Static_assert(offsetof(cuframes_shm_header_t, global_seq) == 0x00C0, "global_seq offset");
_Static_assert(offsetof(cuframes_shm_header_t, slots) == 0x0100, "slots offset");
/* ─── Packet ring shared memory layout (docs/protocol.md §10) ──────────── */
/* Packet slot entry — packed 64 байт */
typedef struct __attribute__((packed)) cuframes_pkt_slot {
_Atomic uint64_t seq; /* UINT64_MAX = invalid */
int64_t pts_ns;
int64_t dts_ns;
uint64_t data_offset; /* absolute byte cursor; % data_size = ring offset */
uint32_t data_size;
uint32_t flags;
uint8_t reserved[24];
} cuframes_pkt_slot_t;
_Static_assert(sizeof(cuframes_pkt_slot_t) == 64, "packet slot must be 64 bytes");
/* Packet ring header (fixed 0x1040 = 4160 bytes). Followed by slots[N] + data[]. */
typedef struct __attribute__((packed)) cuframes_pkt_header {
uint32_t magic; /* CUFRAMES_PKT_MAGIC */
uint32_t proto_version; /* 2 */
uint32_t ring_slots;
uint32_t data_size;
uint32_t codec_id; /* AV_CODEC_ID_H264 / HEVC / ... */
uint32_t codec_extradata_size; /* ≤ CUFRAMES_PKT_EXTRADATA_MAX */
uint64_t producer_pid;
_Atomic uint64_t global_seq;
_Atomic uint64_t last_keyframe_seq;
_Atomic uint64_t write_offset;
_Atomic uint64_t shutdown_flag;
uint8_t codec_extradata[CUFRAMES_PKT_EXTRADATA_MAX];
/* offset 0x1040 — slots[ring_slots], then data[data_size] */
} cuframes_pkt_header_t;
_Static_assert(offsetof(cuframes_pkt_header_t, magic) == 0x0000, "pkt magic offset");
_Static_assert(offsetof(cuframes_pkt_header_t, proto_version) == 0x0004, "pkt proto offset");
_Static_assert(offsetof(cuframes_pkt_header_t, producer_pid) == 0x0018, "pkt pid offset");
_Static_assert(offsetof(cuframes_pkt_header_t, global_seq) == 0x0020, "pkt global_seq offset");
_Static_assert(offsetof(cuframes_pkt_header_t, write_offset) == 0x0030, "pkt write_offset offset");
_Static_assert(offsetof(cuframes_pkt_header_t, codec_extradata) == 0x0040, "pkt extradata offset");
_Static_assert(sizeof(cuframes_pkt_header_t) == 0x1040, "pkt header must be 0x1040 bytes");
/* Computed SHM layout helper:
* total = sizeof(cuframes_pkt_header_t) + slots*sizeof(slot) + data_size
*/
static inline size_t cuframes_pkt_shm_size(uint32_t slots, uint32_t data_size) {
return sizeof(cuframes_pkt_header_t)
+ (size_t)slots * sizeof(cuframes_pkt_slot_t)
+ (size_t)data_size;
}
/* Pointers into mmap'ed pkt SHM (computed from header base) */
static inline cuframes_pkt_slot_t * cuframes_pkt_slots(cuframes_pkt_header_t *hdr) {
return (cuframes_pkt_slot_t *)((uint8_t *)hdr + sizeof(cuframes_pkt_header_t));
}
static inline uint8_t * cuframes_pkt_data(cuframes_pkt_header_t *hdr) {
return (uint8_t *)hdr + sizeof(cuframes_pkt_header_t)
+ (size_t)hdr->ring_slots * sizeof(cuframes_pkt_slot_t);
}
/* Opaque ring handle — содержит state и mapping для publisher или subscriber. */
typedef struct cuframes_pkt_ring {
int shm_fd;
void *shm_base;
size_t shm_size;
cuframes_pkt_header_t *hdr;
char shm_name[128]; /* /cuframes-<key>-packets */
int is_publisher;
} cuframes_pkt_ring_t;
/* ─── Socket protocol messages (docs/protocol.md §3) ───────────────────── */
#define CUFRAMES_MSG_HELLO_REQ 0x01
@@ -164,6 +247,8 @@ typedef struct __attribute__((packed)) cuframes_msg_subscribe_resp {
int cuframes_internal_socket_path(const char *key, char *out, size_t out_size);
/* Build /cuframes-<key> (for shm_open) */
int cuframes_internal_shm_name(const char *key, char *out, size_t out_size);
/* Build /cuframes-<key>-packets (for shm_open) */
int cuframes_internal_pkt_shm_name(const char *key, char *out, size_t out_size);
/* Validate key per protocol.md (alphanum/_/-, 1..63 chars) */
int cuframes_internal_validate_key(const char *key);
/* Calculate frame size + pitch для format/W/H */
@@ -181,4 +266,48 @@ int cuframes_internal_recv_msg(int sock_fd, uint32_t *msg_type_out,
void *payload, uint32_t *payload_len_inout,
int32_t timeout_ms);
/* ─── Packet ring helpers (libcuframes/src/packet_ring.c) ─────────────── */
/* Publisher: create SHM + initialize header + slots. Stale recovery как у frames. */
int cuframes_internal_pkt_ring_create(const char *key,
uint32_t slots,
uint32_t data_size,
uint32_t codec_id,
cuframes_pkt_ring_t *ring_out);
/* Publisher: set codec extradata (SPS/PPS). Must be called before first publish.
* Если size > CUFRAMES_PKT_EXTRADATA_MAX → ERR_INVALID_ARG. */
int cuframes_internal_pkt_ring_set_extradata(cuframes_pkt_ring_t *ring,
const void *extradata,
size_t size);
/* Publisher: publish single encoded packet. Slow consumer = overwrite oldest.
* Returns CUFRAMES_ERR_PACKET_OVERSIZED если size > data_size. */
int cuframes_internal_pkt_ring_publish(cuframes_pkt_ring_t *ring,
const void *data, size_t size,
int64_t pts_ns, int64_t dts_ns,
uint32_t flags);
/* Subscriber: open existing SHM by shm name (from HELLO_RESP packet_shm_path). */
int cuframes_internal_pkt_ring_open(const char *shm_name,
cuframes_pkt_ring_t *ring_out);
/* Subscriber: read next packet.
* *seq_inout — currently held seq (we read seq_inout+1); updated on success.
* out_buf must have ≥ max_packet_size bytes; out_size receives actual size.
* Returns:
* CUFRAMES_OK on success
* CUFRAMES_ERR_PACKET_OVERRUN если publisher уехал — caller resync on keyframe
* CUFRAMES_ERR_TIMEOUT если нет нового packet
* CUFRAMES_ERR_DISCONNECTED если publisher shutdown */
int cuframes_internal_pkt_ring_read(cuframes_pkt_ring_t *ring,
uint64_t *seq_inout,
void *out_buf, size_t out_buf_max,
size_t *out_size,
int64_t *out_pts, int64_t *out_dts,
uint32_t *out_flags);
/* Publisher OR Subscriber: cleanup mmap + close FD. Publisher additionally shm_unlink. */
void cuframes_internal_pkt_ring_destroy(cuframes_pkt_ring_t *ring);
#endif /* CUFRAMES_INTERNAL_H */
+380
View File
@@ -0,0 +1,380 @@
/* libcuframes/src/packet_ring.c
*
* Variable-length encoded packet ring buffer (docs/protocol.md §10).
*
* Использует POSIX shared memory (`/cuframes-<key>-packets`), packed
* structures с _Atomic полями, seqlock-style read для защиты от overrun
* mid-read.
*
* Этот модуль внутренний — exposed API будет в Step 3 (cuframes.h
* extension). Сейчас functions имеют prefix `cuframes_internal_pkt_ring_*`
* и используются из producer.c / consumer.c.
*/
#include <errno.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include "internal.h"
/* ─── Internal helpers ────────────────────────────────────────────────── */
static void wraparound_memcpy(uint8_t *dst, const uint8_t *src, size_t n,
size_t buf_size, size_t offset) {
/* Запись n байт начиная с offset в buf размера buf_size, wraparound. */
size_t off = offset % buf_size;
size_t first = n;
if (first > buf_size - off) first = buf_size - off;
memcpy(dst + off, src, first);
if (first < n) {
memcpy(dst, src + first, n - first);
}
}
static void wraparound_memcpy_from(uint8_t *out, const uint8_t *buf,
size_t buf_size, size_t offset, size_t n) {
/* Чтение n байт из buf с wraparound от offset. */
size_t off = offset % buf_size;
size_t first = n;
if (first > buf_size - off) first = buf_size - off;
memcpy(out, buf + off, first);
if (first < n) {
memcpy(out + first, buf, n - first);
}
}
/* ─── Publisher API ───────────────────────────────────────────────────── */
int cuframes_internal_pkt_ring_create(const char *key,
uint32_t slots,
uint32_t data_size,
uint32_t codec_id,
cuframes_pkt_ring_t *ring_out) {
if (!ring_out) return CUFRAMES_ERR_INVALID_ARG;
if (slots == 0 || slots > CUFRAMES_PKT_MAX_SLOTS) return CUFRAMES_ERR_INVALID_ARG;
if (data_size == 0) return CUFRAMES_ERR_INVALID_ARG;
memset(ring_out, 0, sizeof(*ring_out));
ring_out->shm_fd = -1;
ring_out->is_publisher = 1;
int r = cuframes_internal_pkt_shm_name(key, ring_out->shm_name,
sizeof(ring_out->shm_name));
if (r != CUFRAMES_OK) return r;
/* Stale recovery (как в frames SHM) */
int fd = shm_open(ring_out->shm_name, O_CREAT | O_EXCL | O_RDWR, 0644);
if (fd < 0) {
if (errno == EEXIST) {
int existing = shm_open(ring_out->shm_name, O_RDWR, 0);
if (existing >= 0) {
cuframes_pkt_header_t tmp;
ssize_t rb = read(existing, &tmp, sizeof(tmp));
close(existing);
if (rb == (ssize_t)sizeof(tmp) && tmp.magic == CUFRAMES_PKT_MAGIC) {
if (cuframes_internal_pid_alive((pid_t)tmp.producer_pid)) {
CUFRAMES_LOG_ERROR("packet ring %s: publisher pid %lu still alive",
ring_out->shm_name,
(unsigned long)tmp.producer_pid);
return CUFRAMES_ERR_ALREADY_EXISTS;
}
}
}
CUFRAMES_LOG_INFO("stale packet shm %s — unlinking", ring_out->shm_name);
shm_unlink(ring_out->shm_name);
fd = shm_open(ring_out->shm_name, O_CREAT | O_EXCL | O_RDWR, 0644);
if (fd < 0) {
CUFRAMES_LOG_ERROR("packet shm_open after unlink: %s", strerror(errno));
return CUFRAMES_ERR_IO;
}
} else {
CUFRAMES_LOG_ERROR("packet shm_open: %s", strerror(errno));
return CUFRAMES_ERR_IO;
}
}
size_t total_size = cuframes_pkt_shm_size(slots, data_size);
if (ftruncate(fd, (off_t)total_size) < 0) {
CUFRAMES_LOG_ERROR("packet ftruncate(%zu): %s", total_size, strerror(errno));
close(fd);
shm_unlink(ring_out->shm_name);
return CUFRAMES_ERR_IO;
}
void *base = mmap(NULL, total_size, PROT_READ | PROT_WRITE,
MAP_SHARED, fd, 0);
if (base == MAP_FAILED) {
CUFRAMES_LOG_ERROR("packet mmap: %s", strerror(errno));
close(fd);
shm_unlink(ring_out->shm_name);
return CUFRAMES_ERR_IO;
}
ring_out->shm_fd = fd;
ring_out->shm_base = base;
ring_out->shm_size = total_size;
ring_out->hdr = (cuframes_pkt_header_t *)base;
/* Initialize header — нули + magic/version/sizes */
memset(ring_out->hdr, 0, sizeof(*ring_out->hdr));
ring_out->hdr->magic = CUFRAMES_PKT_MAGIC;
ring_out->hdr->proto_version = CUFRAMES_PROTOCOL_V2;
ring_out->hdr->ring_slots = slots;
ring_out->hdr->data_size = data_size;
ring_out->hdr->codec_id = codec_id;
ring_out->hdr->codec_extradata_size = 0;
ring_out->hdr->producer_pid = (uint64_t)getpid();
atomic_store_explicit(&ring_out->hdr->global_seq, UINT64_MAX,
memory_order_release);
atomic_store_explicit(&ring_out->hdr->last_keyframe_seq, UINT64_MAX,
memory_order_release);
atomic_store_explicit(&ring_out->hdr->write_offset, 0,
memory_order_release);
atomic_store_explicit(&ring_out->hdr->shutdown_flag, 0,
memory_order_release);
/* Initialize slots — invalid seq markers */
cuframes_pkt_slot_t *slots_arr = cuframes_pkt_slots(ring_out->hdr);
for (uint32_t i = 0; i < slots; ++i) {
atomic_store_explicit(&slots_arr[i].seq, UINT64_MAX,
memory_order_release);
}
/* Data section уже zeroed через ftruncate (POSIX guarantees) */
CUFRAMES_LOG_INFO("packet ring %s: slots=%u data_size=%u codec_id=%u (total=%zu bytes)",
ring_out->shm_name, slots, data_size, codec_id, total_size);
return CUFRAMES_OK;
}
int cuframes_internal_pkt_ring_set_extradata(cuframes_pkt_ring_t *ring,
const void *extradata,
size_t size) {
if (!ring || !ring->hdr) return CUFRAMES_ERR_INVALID_ARG;
if (!ring->is_publisher) return CUFRAMES_ERR_INVALID_ARG;
if (size > CUFRAMES_PKT_EXTRADATA_MAX) return CUFRAMES_ERR_INVALID_ARG;
if (size > 0 && !extradata) return CUFRAMES_ERR_INVALID_ARG;
/* Записываем сначала bytes, потом size (release-style — subscriber видит size>0 только когда extradata готов). */
if (size > 0) {
memcpy(ring->hdr->codec_extradata, extradata, size);
/* Memory barrier — extradata stores complete до size update. */
__atomic_thread_fence(__ATOMIC_RELEASE);
}
ring->hdr->codec_extradata_size = (uint32_t)size;
return CUFRAMES_OK;
}
int cuframes_internal_pkt_ring_publish(cuframes_pkt_ring_t *ring,
const void *data, size_t size,
int64_t pts_ns, int64_t dts_ns,
uint32_t flags) {
if (!ring || !ring->hdr) return CUFRAMES_ERR_INVALID_ARG;
if (!ring->is_publisher) return CUFRAMES_ERR_INVALID_ARG;
if (size == 0 || !data) return CUFRAMES_ERR_INVALID_ARG;
if (size > ring->hdr->data_size) return CUFRAMES_ERR_PACKET_OVERSIZED;
cuframes_pkt_header_t *hdr = ring->hdr;
/* Allocate next seq + cursor offset. Single-publisher — без CAS. */
uint64_t prev_seq = atomic_load_explicit(&hdr->global_seq,
memory_order_relaxed);
uint64_t new_seq = (prev_seq == UINT64_MAX) ? 0 : prev_seq + 1;
uint64_t write_off = atomic_load_explicit(&hdr->write_offset,
memory_order_relaxed);
/* Записать payload в data ring (wraparound aware) */
wraparound_memcpy(cuframes_pkt_data(hdr), data, size,
hdr->data_size, write_off);
/* Записать slot metadata. Slot index = seq % ring_slots. */
uint32_t slot_idx = (uint32_t)(new_seq % hdr->ring_slots);
cuframes_pkt_slot_t *slot = &cuframes_pkt_slots(hdr)[slot_idx];
slot->pts_ns = pts_ns;
slot->dts_ns = dts_ns;
slot->data_offset = write_off;
slot->data_size = (uint32_t)size;
slot->flags = flags;
/* RELEASE order — payload bytes + slot metadata готовы перед publish seq. */
atomic_store_explicit(&slot->seq, new_seq, memory_order_release);
/* Update global cursor + global_seq. */
atomic_store_explicit(&hdr->write_offset, write_off + size,
memory_order_release);
atomic_store_explicit(&hdr->global_seq, new_seq,
memory_order_release);
/* Keyframe — update last_keyframe_seq для late subscribers. */
if (flags & CUFRAMES_PKT_FLAG_KEY) {
atomic_store_explicit(&hdr->last_keyframe_seq, new_seq,
memory_order_release);
}
return CUFRAMES_OK;
}
/* ─── Subscriber API ──────────────────────────────────────────────────── */
int cuframes_internal_pkt_ring_open(const char *shm_name,
cuframes_pkt_ring_t *ring_out) {
if (!shm_name || !ring_out) return CUFRAMES_ERR_INVALID_ARG;
memset(ring_out, 0, sizeof(*ring_out));
ring_out->shm_fd = -1;
ring_out->is_publisher = 0;
strncpy(ring_out->shm_name, shm_name, sizeof(ring_out->shm_name) - 1);
int fd = shm_open(shm_name, O_RDONLY, 0);
if (fd < 0) {
if (errno == ENOENT) return CUFRAMES_ERR_NOT_FOUND;
CUFRAMES_LOG_ERROR("packet shm_open(%s) ro: %s", shm_name, strerror(errno));
return CUFRAMES_ERR_IO;
}
/* Прочитать header чтобы узнать total size */
cuframes_pkt_header_t header_peek;
ssize_t rb = read(fd, &header_peek, sizeof(header_peek));
if (rb != (ssize_t)sizeof(header_peek)) {
close(fd);
return CUFRAMES_ERR_IO;
}
if (header_peek.magic != CUFRAMES_PKT_MAGIC) {
CUFRAMES_LOG_ERROR("packet shm %s: bad magic 0x%08x", shm_name, header_peek.magic);
close(fd);
return CUFRAMES_ERR_PROTOCOL;
}
if (header_peek.proto_version != CUFRAMES_PROTOCOL_V2) {
CUFRAMES_LOG_ERROR("packet shm %s: proto_version=%u (expected %u)",
shm_name, header_peek.proto_version, CUFRAMES_PROTOCOL_V2);
close(fd);
return CUFRAMES_ERR_PROTOCOL;
}
size_t total = cuframes_pkt_shm_size(header_peek.ring_slots,
header_peek.data_size);
/* mmap полностью read-only */
void *base = mmap(NULL, total, PROT_READ, MAP_SHARED, fd, 0);
if (base == MAP_FAILED) {
CUFRAMES_LOG_ERROR("packet mmap ro: %s", strerror(errno));
close(fd);
return CUFRAMES_ERR_IO;
}
ring_out->shm_fd = fd;
ring_out->shm_base = base;
ring_out->shm_size = total;
ring_out->hdr = (cuframes_pkt_header_t *)base;
CUFRAMES_LOG_INFO("packet ring %s opened: slots=%u data_size=%u",
shm_name, header_peek.ring_slots, header_peek.data_size);
return CUFRAMES_OK;
}
int cuframes_internal_pkt_ring_read(cuframes_pkt_ring_t *ring,
uint64_t *seq_inout,
void *out_buf, size_t out_buf_max,
size_t *out_size,
int64_t *out_pts, int64_t *out_dts,
uint32_t *out_flags) {
if (!ring || !ring->hdr || !seq_inout || !out_buf || !out_size
|| !out_pts || !out_dts || !out_flags) {
return CUFRAMES_ERR_INVALID_ARG;
}
cuframes_pkt_header_t *hdr = ring->hdr;
/* Publisher shutdown? */
if (atomic_load_explicit(&hdr->shutdown_flag, memory_order_acquire) != 0) {
return CUFRAMES_ERR_DISCONNECTED;
}
/* Текущий published seq */
uint64_t cur = atomic_load_explicit(&hdr->global_seq, memory_order_acquire);
if (cur == UINT64_MAX) return CUFRAMES_ERR_TIMEOUT; /* нет published */
if (*seq_inout != UINT64_MAX && cur <= *seq_inout) {
return CUFRAMES_ERR_TIMEOUT;
}
/* Calculate the next seq we want (handle первый read с UINT64_MAX → start с 0) */
uint64_t want_seq = (*seq_inout == UINT64_MAX) ? 0 : (*seq_inout + 1);
/* Если want_seq < cur и slot уже перезаписан — попадаем в OVERRUN */
if (cur - want_seq >= hdr->ring_slots) {
/* Скорее всего slot уже rewritten. Подсказка caller'у — resync. */
return CUFRAMES_ERR_PACKET_OVERRUN;
}
uint32_t slot_idx = (uint32_t)(want_seq % hdr->ring_slots);
cuframes_pkt_slot_t *slot = &cuframes_pkt_slots(hdr)[slot_idx];
/* Seqlock-style read: load seq, prove not overwritten после copy. */
uint64_t s1 = atomic_load_explicit(&slot->seq, memory_order_acquire);
if (s1 != want_seq) {
/* Slot уже занят следующим packet'ом — overrun. */
return CUFRAMES_ERR_PACKET_OVERRUN;
}
/* Снять metadata (non-atomic — read OK поскольку post-check защищает) */
uint64_t data_off = slot->data_offset;
uint32_t data_sz = slot->data_size;
int64_t pts = slot->pts_ns;
int64_t dts = slot->dts_ns;
uint32_t flags = slot->flags;
if (data_sz > out_buf_max) {
return CUFRAMES_ERR_INVALID_ARG; /* caller's buf too small */
}
/* Copy payload */
wraparound_memcpy_from((uint8_t *)out_buf,
cuframes_pkt_data(hdr),
hdr->data_size, data_off, data_sz);
/* Post-check: slot->seq не изменился во время copy. */
uint64_t s2 = atomic_load_explicit(&slot->seq, memory_order_acquire);
if (s2 != want_seq) {
return CUFRAMES_ERR_PACKET_OVERRUN;
}
*out_size = data_sz;
*out_pts = pts;
*out_dts = dts;
*out_flags = flags;
*seq_inout = want_seq;
return CUFRAMES_OK;
}
/* ─── Cleanup ─────────────────────────────────────────────────────────── */
void cuframes_internal_pkt_ring_destroy(cuframes_pkt_ring_t *ring) {
if (!ring) return;
if (ring->is_publisher && ring->hdr) {
/* Сигнализируем consumer'ам shutdown */
atomic_store_explicit(&ring->hdr->shutdown_flag, 1,
memory_order_release);
}
if (ring->shm_base && ring->shm_size > 0) {
munmap(ring->shm_base, ring->shm_size);
}
if (ring->shm_fd >= 0) {
close(ring->shm_fd);
}
if (ring->is_publisher && ring->shm_name[0] != '\0') {
shm_unlink(ring->shm_name);
}
memset(ring, 0, sizeof(*ring));
ring->shm_fd = -1;
}
+66
View File
@@ -41,6 +41,11 @@ struct cuframes_publisher {
int accept_thread_alive;
int stop_flag;
pthread_mutex_t state_mu; /* protects subscriber connections */
/* v0.2 — encoded packet ring (optional). is_pkt_ring=1 → активирован. */
int has_pkt_ring;
uint32_t max_packet_size;
cuframes_pkt_ring_t pkt_ring;
};
/* Forward decls */
@@ -505,6 +510,12 @@ int cuframes_publisher_destroy(cuframes_publisher_t *pub) {
}
if (pub->event) cudaEventDestroy(pub->event);
/* Packet ring cleanup (если активирован) */
if (pub->has_pkt_ring) {
cuframes_internal_pkt_ring_destroy(&pub->pkt_ring);
pub->has_pkt_ring = 0;
}
/* Unlink resources */
if (pub->hdr) {
munmap(pub->hdr, sizeof(cuframes_shm_header_t));
@@ -523,6 +534,61 @@ int cuframes_publisher_destroy(cuframes_publisher_t *pub) {
return CUFRAMES_OK;
}
/* ─────────────────────────────────────────────────────────────────────── */
/* v0.2 — encoded packet ring API (см. docs/protocol.md §10) */
/* ─────────────────────────────────────────────────────────────────────── */
int cuframes_publisher_enable_packets(cuframes_publisher_t *pub,
const cuframes_packet_ring_options_t *opts) {
if (!pub) return CUFRAMES_ERR_INVALID_ARG;
if (pub->has_pkt_ring) return CUFRAMES_ERR_ALREADY_EXISTS;
uint32_t slots = opts && opts->ring_slots ? opts->ring_slots
: CUFRAMES_PKT_DEFAULT_SLOTS;
uint32_t data_size = opts && opts->data_size ? opts->data_size
: CUFRAMES_PKT_DEFAULT_DATA_SIZE;
uint32_t max_pkt = opts && opts->max_packet_size ? opts->max_packet_size
: CUFRAMES_PKT_DEFAULT_MAX_SIZE;
uint32_t codec_id = opts ? opts->codec_id : 0;
if (max_pkt > data_size) {
CUFRAMES_LOG_ERROR("max_packet_size (%u) > data_size (%u)", max_pkt, data_size);
return CUFRAMES_ERR_INVALID_ARG;
}
int r = cuframes_internal_pkt_ring_create(pub->key, slots, data_size,
codec_id, &pub->pkt_ring);
if (r != CUFRAMES_OK) return r;
pub->has_pkt_ring = 1;
pub->max_packet_size = max_pkt;
/* Bump proto_version в frames header чтобы v2-subscribers видели поддержку. */
if (pub->hdr) {
pub->hdr->proto_version = CUFRAMES_PROTOCOL_V2;
}
return CUFRAMES_OK;
}
int cuframes_publisher_set_codec_extradata(cuframes_publisher_t *pub,
const void *extradata, size_t size) {
if (!pub) return CUFRAMES_ERR_INVALID_ARG;
if (!pub->has_pkt_ring) return CUFRAMES_ERR_NO_PACKET_RING;
return cuframes_internal_pkt_ring_set_extradata(&pub->pkt_ring,
extradata, size);
}
int cuframes_publisher_publish_packet(cuframes_publisher_t *pub,
const void *data, size_t size,
int64_t pts_ns, int64_t dts_ns,
uint32_t flags) {
if (!pub) return CUFRAMES_ERR_INVALID_ARG;
if (!pub->has_pkt_ring) return CUFRAMES_ERR_NO_PACKET_RING;
if (size > pub->max_packet_size) return CUFRAMES_ERR_PACKET_OVERSIZED;
return cuframes_internal_pkt_ring_publish(&pub->pkt_ring, data, size,
pts_ns, dts_ns, flags);
}
/* ─── Accept thread + handshake ──────────────────────────────────────── */
static void *accept_thread_main(void *arg) {
+13
View File
@@ -32,6 +32,10 @@ const char *cuframes_strerror(int err) {
case CUFRAMES_ERR_FORMAT: return "unsupported format or size mismatch";
case CUFRAMES_ERR_WOULD_BLOCK: return "would block";
case CUFRAMES_ERR_TOO_MANY: return "too many subscribers (max 32)";
case CUFRAMES_ERR_PACKET_OVERSIZED: return "packet exceeds max_packet_size";
case CUFRAMES_ERR_NO_PACKET_RING: return "publisher has no packet ring";
case CUFRAMES_ERR_NO_CODEC_PARAMS: return "codec extradata not set by publisher";
case CUFRAMES_ERR_PACKET_OVERRUN: return "packet ring overrun — resync on keyframe";
case CUFRAMES_ERR_INTERNAL: return "internal error (please report)";
default: return "unknown error";
}
@@ -83,6 +87,15 @@ int cuframes_internal_shm_name(const char *key, char *out, size_t out_size) {
return CUFRAMES_OK;
}
int cuframes_internal_pkt_shm_name(const char *key, char *out, size_t out_size) {
int r = cuframes_internal_validate_key(key);
if (r != CUFRAMES_OK) return r;
int n = snprintf(out, out_size, "%s%s%s",
CUFRAMES_SHM_PREFIX, key, CUFRAMES_PKT_SHM_SUFFIX);
if (n < 0 || (size_t)n >= out_size) return CUFRAMES_ERR_INVALID_ARG;
return CUFRAMES_OK;
}
int cuframes_internal_ensure_runtime_dir(void) {
if (mkdir(CUFRAMES_RUNTIME_DIR, 0755) == 0) return CUFRAMES_OK;
if (errno == EEXIST) return CUFRAMES_OK;
+8
View File
@@ -22,3 +22,11 @@ target_include_directories(test_stress PRIVATE
${CMAKE_SOURCE_DIR}/include)
add_test(NAME stress_4consumer COMMAND test_stress)
set_tests_properties(stress_4consumer PROPERTIES TIMEOUT 120)
# v0.2 — packet ring tests (host-only, без CUDA в test-коде)
add_executable(test_packet_ring test_packet_ring.c)
target_link_libraries(test_packet_ring PRIVATE cuframes)
target_include_directories(test_packet_ring PRIVATE
${CMAKE_SOURCE_DIR}/include)
add_test(NAME packet_ring_basic COMMAND test_packet_ring)
set_tests_properties(packet_ring_basic PROPERTIES TIMEOUT 120)
+280
View File
@@ -0,0 +1,280 @@
/* Stress test для encoded packet ring (v0.2).
*
* Сценарии:
* 1) Normal flow: 1 publisher × 1 subscriber × 2000 packets, varied sizes,
* каждые 30 packets — KEY flag (имитация GOP). Subscriber проверяет:
* - монотонные seq (без пропусков в этом тесте — fast consumer)
* - data integrity через checksum (XOR fold)
* - PTS/DTS monotonic, KEY flag доходит
* 2) Slow subscriber: publisher шлёт быстрее чем subscriber читает →
* должен случиться OVERRUN, library resync'нет на keyframe.
* 3) Cleanup: после exit нет leaked SHM в /dev/shm.
*
* Без CUDA-зависимостей (packets host-side).
*/
#include <cuframes/cuframes.h>
#include <errno.h>
#include <fcntl.h>
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <time.h>
#include <unistd.h>
#define KEY "test_pkt_ring"
#define TOTAL_PACKETS 2000
#define GOP_SIZE 30
#define SMALL_PKT 4096
#define LARGE_PKT (256 * 1024)
#define CHECK(call) do { int _r = (call); if (_r != 0) { \
fprintf(stderr, "FAIL %s:%d (rc=%d): %s\n", __FILE__, __LINE__, _r, \
cuframes_strerror(_r)); exit(2); } } while (0)
#define EXPECT_TRUE(cond) do { if (!(cond)) { \
fprintf(stderr, "EXPECT_TRUE failed at %s:%d: %s\n", \
__FILE__, __LINE__, #cond); exit(2); } } while (0)
/* Сгенерировать payload: первые 8 байт = seq (little-endian), остальное pattern. */
static void gen_payload(uint8_t *buf, size_t size, uint64_t seq) {
memcpy(buf, &seq, sizeof(seq));
for (size_t i = sizeof(seq); i < size; ++i) {
buf[i] = (uint8_t)((seq + i) & 0xFF);
}
}
/* Verify payload matches seq. Возвращает 0 если ok. */
static int verify_payload(const uint8_t *buf, size_t size, uint64_t expected_seq) {
uint64_t seq_in_buf;
if (size < sizeof(seq_in_buf)) return -1;
memcpy(&seq_in_buf, buf, sizeof(seq_in_buf));
if (seq_in_buf != expected_seq) return -2;
for (size_t i = sizeof(seq_in_buf); i < size; ++i) {
if (buf[i] != (uint8_t)((expected_seq + i) & 0xFF)) return -3;
}
return 0;
}
static cuframes_publisher_t *make_publisher(void) {
cuframes_publisher_config_t cfg = {0};
cfg.key = KEY;
cfg.width = 320;
cfg.height = 240;
cfg.format = CUFRAMES_FORMAT_NV12;
cfg.ownership = CUFRAMES_OWNERSHIP_LIBRARY;
cfg.ring_size = 2;
cfg.policy = CUFRAMES_POLICY_DROP_OLDEST;
cfg.cuda_device = 0;
cuframes_publisher_t *pub = NULL;
CHECK(cuframes_publisher_create(&cfg, &pub));
cuframes_packet_ring_options_t pkt_opts = {0};
pkt_opts.codec_id = 27; /* AV_CODEC_ID_H264 */
pkt_opts.ring_slots = 64;
pkt_opts.data_size = 8 * 1024 * 1024;
pkt_opts.max_packet_size = LARGE_PKT * 2;
CHECK(cuframes_publisher_enable_packets(pub, &pkt_opts));
/* Fake SPS/PPS — 16 байт */
uint8_t extradata[16];
for (int i = 0; i < 16; ++i) extradata[i] = (uint8_t)(0xAA + i);
CHECK(cuframes_publisher_set_codec_extradata(pub, extradata, sizeof(extradata)));
return pub;
}
/* Subscriber-процесс. read_delay_us позволяет имитировать slow consumer. */
static int run_subscriber(int read_delay_us, int *out_received, int *out_overruns,
int *out_first_key_seq) {
/* Wait чтобы publisher успел создать SHM */
usleep(100 * 1000);
cuframes_subscriber_config_t cfg = {0};
cfg.key = KEY;
cfg.mode = CUFRAMES_MODE_NEWEST_ONLY;
cfg.cuda_device = 0;
cfg.connect_timeout_ms = 5000;
cuframes_subscriber_t *sub = NULL;
CHECK(cuframes_subscriber_create(&cfg, &sub));
CHECK(cuframes_subscriber_enable_packets(sub));
/* Verify codec params */
uint32_t codec_id = 0;
const void *extradata = NULL;
size_t extradata_sz = 0;
int r = cuframes_subscriber_get_codec_params(sub, &codec_id, &extradata, &extradata_sz);
EXPECT_TRUE(r == CUFRAMES_OK);
EXPECT_TRUE(codec_id == 27);
EXPECT_TRUE(extradata_sz == 16);
int received = 0;
int overruns = 0;
int first_key_seq = -1;
int64_t last_pts = -1;
int data_errors = 0;
/* Run на ~30s или до того как publisher закончит. */
time_t start = time(NULL);
while (time(NULL) - start < 30) {
cuframes_packet_t *pkt = NULL;
int rc = cuframes_subscriber_next_packet(sub, &pkt, 500);
if (rc == CUFRAMES_ERR_TIMEOUT || rc == CUFRAMES_ERR_WOULD_BLOCK) {
if (received >= TOTAL_PACKETS / 2) break; /* достаточно для теста */
continue;
}
if (rc == CUFRAMES_ERR_DISCONNECTED) break;
if (rc == CUFRAMES_ERR_PACKET_OVERRUN) {
overruns++;
continue; /* library resync'нет на next call */
}
if (rc != CUFRAMES_OK) {
fprintf(stderr, "next_packet rc=%d (%s)\n", rc, cuframes_strerror(rc));
break;
}
const uint8_t *data = (const uint8_t *)cuframes_packet_data(pkt);
size_t size = cuframes_packet_size(pkt);
int64_t pts = cuframes_packet_pts(pkt);
uint32_t flags = cuframes_packet_flags(pkt);
uint64_t seq = cuframes_packet_seq(pkt);
if (verify_payload(data, size, seq) != 0) {
data_errors++;
}
if ((flags & CUFRAMES_PKT_FLAG_KEY) && first_key_seq < 0) {
first_key_seq = (int)seq;
}
if (pts <= last_pts && last_pts >= 0) {
fprintf(stderr, "PTS не монотонно: %ld <= %ld (seq=%lu)\n",
pts, last_pts, seq);
}
last_pts = pts;
received++;
cuframes_subscriber_release_packet(sub, pkt);
if (read_delay_us > 0) usleep(read_delay_us);
}
EXPECT_TRUE(data_errors == 0);
cuframes_subscriber_destroy(sub);
*out_received = received;
*out_overruns = overruns;
*out_first_key_seq = first_key_seq;
return 0;
}
static void publisher_loop(int total_packets, int inter_packet_us) {
cuframes_publisher_t *pub = make_publisher();
/* Buffer pre-alloc — max size */
uint8_t *buf = (uint8_t *)malloc(LARGE_PKT);
EXPECT_TRUE(buf != NULL);
for (int i = 0; i < total_packets; ++i) {
int is_key = (i % GOP_SIZE == 0);
size_t size = is_key ? LARGE_PKT : SMALL_PKT + (i % 8) * 1024;
gen_payload(buf, size, (uint64_t)i);
int64_t pts_ns = (int64_t)i * 33333333LL; /* ~30 fps */
uint32_t flags = is_key ? CUFRAMES_PKT_FLAG_KEY : 0;
int rc = cuframes_publisher_publish_packet(pub, buf, size,
pts_ns, pts_ns, flags);
if (rc != CUFRAMES_OK) {
fprintf(stderr, "publish rc=%d size=%zu\n", rc, size);
}
if (inter_packet_us > 0) usleep(inter_packet_us);
}
free(buf);
cuframes_publisher_destroy(pub);
}
static int check_no_leaked_shm(void) {
int fail = 0;
char path[256];
snprintf(path, sizeof(path), "/dev/shm/cuframes-%s", KEY);
if (access(path, F_OK) == 0) {
fprintf(stderr, "LEAKED %s\n", path);
fail = 1;
}
snprintf(path, sizeof(path), "/dev/shm/cuframes-%s-packets", KEY);
if (access(path, F_OK) == 0) {
fprintf(stderr, "LEAKED %s\n", path);
fail = 1;
}
return fail;
}
static int scenario_normal_flow(void) {
fprintf(stderr, "[scenario 1] normal flow — fast consumer\n");
pid_t pid = fork();
EXPECT_TRUE(pid >= 0);
if (pid == 0) {
/* child = subscriber */
int received = 0, overruns = 0, first_key = -1;
run_subscriber(0, &received, &overruns, &first_key);
fprintf(stderr, " consumer: received=%d overruns=%d first_key_seq=%d\n",
received, overruns, first_key);
EXPECT_TRUE(received >= TOTAL_PACKETS / 2);
EXPECT_TRUE(overruns == 0);
EXPECT_TRUE(first_key >= 0);
exit(0);
}
/* parent = publisher (медленнее чем consumer) */
publisher_loop(TOTAL_PACKETS, 1000); /* 1ms между packets = 1000 fps */
int status = 0;
waitpid(pid, &status, 0);
EXPECT_TRUE(WIFEXITED(status) && WEXITSTATUS(status) == 0);
return 0;
}
static int scenario_slow_consumer(void) {
fprintf(stderr, "[scenario 2] slow consumer — must hit OVERRUN + resync\n");
pid_t pid = fork();
EXPECT_TRUE(pid >= 0);
if (pid == 0) {
/* child = очень медленный subscriber */
int received = 0, overruns = 0, first_key = -1;
run_subscriber(10 * 1000, &received, &overruns, &first_key); /* 10ms */
fprintf(stderr, " consumer: received=%d overruns=%d first_key_seq=%d\n",
received, overruns, first_key);
/* Должны быть overruns поскольку publisher faster */
EXPECT_TRUE(overruns > 0);
/* И всё-таки что-то получили (resync работает) */
EXPECT_TRUE(received > 10);
exit(0);
}
/* publisher fast — 200 fps */
publisher_loop(TOTAL_PACKETS, 5 * 1000);
int status = 0;
waitpid(pid, &status, 0);
EXPECT_TRUE(WIFEXITED(status) && WEXITSTATUS(status) == 0);
return 0;
}
int main(void) {
signal(SIGPIPE, SIG_IGN);
scenario_normal_flow();
/* Ensure clean inter-test state */
usleep(200 * 1000);
if (check_no_leaked_shm()) exit(2);
scenario_slow_consumer();
usleep(200 * 1000);
if (check_no_leaked_shm()) exit(2);
fprintf(stderr, "OK — all scenarios passed\n");
return 0;
}
+48
View File
@@ -60,6 +60,7 @@ struct Args {
bool verbose = false;
bool realtime = false; // emulate -re у ffmpeg CLI: sleep по pts
bool loop = false; // loop input на eof (для file://)
bool enable_packet_ring = false; // v0.2 — публиковать encoded packets
};
static void print_usage() {
@@ -75,6 +76,8 @@ static void print_usage() {
" --ring N cuframes ring size (default 4, range 2..16)\n"
" --realtime pace input по PTS (как ffmpeg -re; полезно для файла)\n"
" --loop loop input на EOF (только для file://)\n"
" --enable-packet-ring v0.2: дополнительно публиковать encoded packets\n"
" (для consumer'ов с -c:v copy, Frigate record path)\n"
" --verbose debug logs\n"
" -h, --help this help\n";
}
@@ -92,6 +95,7 @@ static int parse_args(int argc, char **argv, Args &a) {
else if (s == "--ring") a.ring_size = std::stoi(next());
else if (s == "--realtime") a.realtime = true;
else if (s == "--loop") a.loop = true;
else if (s == "--enable-packet-ring") a.enable_packet_ring = true;
else if (s == "--verbose") a.verbose = true;
else if (s == "-h" || s == "--help") { print_usage(); std::exit(0); }
else { std::cerr << "Unknown arg: " << s << "\n"; print_usage(); std::exit(1); }
@@ -235,6 +239,27 @@ int main(int argc, char **argv) {
<< "' ready, ring=" << a.ring_size
<< " pool_size=" << frame_size << " bytes/frame\n";
/* v0.2 — encoded packet ring (опционально). */
if (a.enable_packet_ring) {
cuframes_packet_ring_options_t pkt_opts{};
pkt_opts.codec_id = (uint32_t)vstream->codecpar->codec_id;
/* остальные поля = 0 → library использует defaults (64 slots, 8MiB, 2MiB max) */
pub.enable_packets(&pkt_opts);
if (vstream->codecpar->extradata_size > 0 && vstream->codecpar->extradata) {
pub.set_codec_extradata(vstream->codecpar->extradata,
(size_t)vstream->codecpar->extradata_size);
std::cerr << "[cuframes-src] packet ring active, codec_id="
<< vstream->codecpar->codec_id
<< " extradata=" << vstream->codecpar->extradata_size
<< " bytes\n";
} else {
std::cerr << "[cuframes-src] packet ring active, codec_id="
<< vstream->codecpar->codec_id
<< " (no extradata in stream — will rely on in-band SPS/PPS)\n";
}
}
/* Stream для D2D copies */
cudaStream_t stream;
cudaStreamCreate(&stream);
@@ -279,6 +304,29 @@ int main(int argc, char **argv) {
continue;
}
/* v0.2 — публикуем encoded packet в packet ring ДО decoder. Это позволяет
* record-consumer'ам брать packet без второго RTSP-подключения к камере. */
if (a.enable_packet_ring) {
int64_t pkt_pts_ns = (pkt->pts != AV_NOPTS_VALUE)
? av_rescale_q(pkt->pts, stream_tb, AVRational{1, 1000000000})
: cuframes::now_ns();
int64_t pkt_dts_ns = (pkt->dts != AV_NOPTS_VALUE)
? av_rescale_q(pkt->dts, stream_tb, AVRational{1, 1000000000})
: pkt_pts_ns;
uint32_t pkt_flags = 0;
if (pkt->flags & AV_PKT_FLAG_KEY) pkt_flags |= CUFRAMES_PKT_FLAG_KEY;
if (pkt->flags & AV_PKT_FLAG_CORRUPT) pkt_flags |= CUFRAMES_PKT_FLAG_CORRUPT;
#ifdef AV_PKT_FLAG_DISCONTINUITY
if (pkt->flags & AV_PKT_FLAG_DISCONTINUITY) pkt_flags |= CUFRAMES_PKT_FLAG_DISCONTINUITY;
#endif
int prr = pub.publish_packet(pkt->data, (size_t)pkt->size,
pkt_pts_ns, pkt_dts_ns, pkt_flags);
if (prr != CUFRAMES_OK && a.verbose) {
std::cerr << "[cuframes-src] publish_packet rc=" << prr
<< " size=" << pkt->size << "\n";
}
}
r = avcodec_send_packet(ctx, pkt);
av_packet_unref(pkt);
if (r < 0) continue;