feat(libcuframes): packet ring buffer implementation (v0.2 Step 2)
build / cmake build (CUDA 12.4, Ubuntu 22.04) (pull_request) Successful in 1m37s
build / ffmpeg filter patch (out-of-tree) (pull_request) Successful in 1m21s

Реализация encoded packet ring per docs/protocol.md §10.

Files:
- internal.h: cuframes_pkt_slot_t (64b packed), cuframes_pkt_header_t
  (0x1040 fixed header), cuframes_pkt_ring_t handle, constants for
  default sizing, packet flags, helper inline functions for slot/data
  pointer arithmetic.
- packet_ring.c (new, ~290 LOC): create/open/publish/read/destroy.
  Stale recovery симметрично frames SHM (pid liveness check). Seqlock
  pattern для subscriber защиты от overrun mid-read (post-check seq
  после copy). Wraparound memcpy helpers для variable-length data ring.
- utils.c: cuframes_internal_pkt_shm_name helper + strerror entries.
- cuframes.h: 4 новых error codes (PACKET_OVERSIZED, NO_PACKET_RING,
  NO_CODEC_PARAMS, PACKET_OVERRUN).
- CMakeLists.txt: src/packet_ring.c в sources.

API внутренний (cuframes_internal_pkt_ring_*) — publicly exposed
функции будут в Step 3 (cuframes.h API extension).

Связано: #2 (v0.2), PR #4 (draft).
This commit is contained in:
2026-05-19 16:11:42 +01:00
parent ad75aa9624
commit bd7fd95fef
5 changed files with 528 additions and 0 deletions
+1
View File
@@ -10,6 +10,7 @@ set(CUFRAMES_SOURCES
src/producer.c
src/consumer.c
src/consumer_async.c
src/packet_ring.c
)
add_library(cuframes SHARED ${CUFRAMES_SOURCES})
+129
View File
@@ -23,12 +23,28 @@
#define CUFRAMES_MAGIC 0xCC7C1DCCu
#define CUFRAMES_PROTOCOL_V1 1u
#define CUFRAMES_PROTOCOL_V2 2u /* v0.2 — packet ring support */
#define CUFRAMES_MAX_SUBSCRIBERS 32
#define CUFRAMES_MAX_RING 16
#define CUFRAMES_MAX_KEY_LEN 63
#define CUFRAMES_MAX_NAME_LEN 31
#define CUFRAMES_RUNTIME_DIR "/run/cuframes"
#define CUFRAMES_SHM_PREFIX "/cuframes-"
#define CUFRAMES_PKT_SHM_SUFFIX "-packets" /* /cuframes-<key>-packets */
/* Packet ring constants (см. docs/protocol.md §10) */
#define CUFRAMES_PKT_MAGIC 0xCC7C1DCDu /* frames magic + 1 */
#define CUFRAMES_PKT_EXTRADATA_MAX 4096u
#define CUFRAMES_PKT_DEFAULT_SLOTS 64u
#define CUFRAMES_PKT_DEFAULT_DATA_SIZE (8u * 1024u * 1024u) /* 8 MB */
#define CUFRAMES_PKT_DEFAULT_MAX_SIZE (2u * 1024u * 1024u) /* 2 MB */
#define CUFRAMES_PKT_MAX_SLOTS 1024u
/* Packet flags (см. docs/protocol.md §10.6) */
#define CUFRAMES_PKT_FLAG_KEY 0x01u
#define CUFRAMES_PKT_FLAG_CORRUPT 0x02u
#define CUFRAMES_PKT_FLAG_DISCONTINUITY 0x04u
#define CUFRAMES_PKT_FLAG_LAST_IN_AU 0x08u
/* ─── Shared memory layout (см. docs/protocol.md §2) ──────────────────── */
@@ -103,6 +119,73 @@ _Static_assert(offsetof(cuframes_shm_header_t, ipc_event_handle) == 0x0080, "eve
_Static_assert(offsetof(cuframes_shm_header_t, global_seq) == 0x00C0, "global_seq offset");
_Static_assert(offsetof(cuframes_shm_header_t, slots) == 0x0100, "slots offset");
/* ─── Packet ring shared memory layout (docs/protocol.md §10) ──────────── */
/* Packet slot entry — packed 64 байт */
typedef struct __attribute__((packed)) cuframes_pkt_slot {
_Atomic uint64_t seq; /* UINT64_MAX = invalid */
int64_t pts_ns;
int64_t dts_ns;
uint64_t data_offset; /* absolute byte cursor; % data_size = ring offset */
uint32_t data_size;
uint32_t flags;
uint8_t reserved[24];
} cuframes_pkt_slot_t;
_Static_assert(sizeof(cuframes_pkt_slot_t) == 64, "packet slot must be 64 bytes");
/* Packet ring header (fixed 0x1040 = 4160 bytes). Followed by slots[N] + data[]. */
typedef struct __attribute__((packed)) cuframes_pkt_header {
uint32_t magic; /* CUFRAMES_PKT_MAGIC */
uint32_t proto_version; /* 2 */
uint32_t ring_slots;
uint32_t data_size;
uint32_t codec_id; /* AV_CODEC_ID_H264 / HEVC / ... */
uint32_t codec_extradata_size; /* ≤ CUFRAMES_PKT_EXTRADATA_MAX */
uint64_t producer_pid;
_Atomic uint64_t global_seq;
_Atomic uint64_t last_keyframe_seq;
_Atomic uint64_t write_offset;
_Atomic uint64_t shutdown_flag;
uint8_t codec_extradata[CUFRAMES_PKT_EXTRADATA_MAX];
/* offset 0x1040 — slots[ring_slots], then data[data_size] */
} cuframes_pkt_header_t;
_Static_assert(offsetof(cuframes_pkt_header_t, magic) == 0x0000, "pkt magic offset");
_Static_assert(offsetof(cuframes_pkt_header_t, proto_version) == 0x0004, "pkt proto offset");
_Static_assert(offsetof(cuframes_pkt_header_t, producer_pid) == 0x0018, "pkt pid offset");
_Static_assert(offsetof(cuframes_pkt_header_t, global_seq) == 0x0020, "pkt global_seq offset");
_Static_assert(offsetof(cuframes_pkt_header_t, write_offset) == 0x0030, "pkt write_offset offset");
_Static_assert(offsetof(cuframes_pkt_header_t, codec_extradata) == 0x0040, "pkt extradata offset");
_Static_assert(sizeof(cuframes_pkt_header_t) == 0x1040, "pkt header must be 0x1040 bytes");
/* Computed SHM layout helper:
* total = sizeof(cuframes_pkt_header_t) + slots*sizeof(slot) + data_size
*/
static inline size_t cuframes_pkt_shm_size(uint32_t slots, uint32_t data_size) {
return sizeof(cuframes_pkt_header_t)
+ (size_t)slots * sizeof(cuframes_pkt_slot_t)
+ (size_t)data_size;
}
/* Pointers into mmap'ed pkt SHM (computed from header base) */
static inline cuframes_pkt_slot_t * cuframes_pkt_slots(cuframes_pkt_header_t *hdr) {
return (cuframes_pkt_slot_t *)((uint8_t *)hdr + sizeof(cuframes_pkt_header_t));
}
static inline uint8_t * cuframes_pkt_data(cuframes_pkt_header_t *hdr) {
return (uint8_t *)hdr + sizeof(cuframes_pkt_header_t)
+ (size_t)hdr->ring_slots * sizeof(cuframes_pkt_slot_t);
}
/* Opaque ring handle — содержит state и mapping для publisher или subscriber. */
typedef struct cuframes_pkt_ring {
int shm_fd;
void *shm_base;
size_t shm_size;
cuframes_pkt_header_t *hdr;
char shm_name[128]; /* /cuframes-<key>-packets */
int is_publisher;
} cuframes_pkt_ring_t;
/* ─── Socket protocol messages (docs/protocol.md §3) ───────────────────── */
#define CUFRAMES_MSG_HELLO_REQ 0x01
@@ -164,6 +247,8 @@ typedef struct __attribute__((packed)) cuframes_msg_subscribe_resp {
int cuframes_internal_socket_path(const char *key, char *out, size_t out_size);
/* Build /cuframes-<key> (for shm_open) */
int cuframes_internal_shm_name(const char *key, char *out, size_t out_size);
/* Build /cuframes-<key>-packets (for shm_open) */
int cuframes_internal_pkt_shm_name(const char *key, char *out, size_t out_size);
/* Validate key per protocol.md (alphanum/_/-, 1..63 chars) */
int cuframes_internal_validate_key(const char *key);
/* Calculate frame size + pitch для format/W/H */
@@ -181,4 +266,48 @@ int cuframes_internal_recv_msg(int sock_fd, uint32_t *msg_type_out,
void *payload, uint32_t *payload_len_inout,
int32_t timeout_ms);
/* ─── Packet ring helpers (libcuframes/src/packet_ring.c) ─────────────── */
/* Publisher: create SHM + initialize header + slots. Stale recovery как у frames. */
int cuframes_internal_pkt_ring_create(const char *key,
uint32_t slots,
uint32_t data_size,
uint32_t codec_id,
cuframes_pkt_ring_t *ring_out);
/* Publisher: set codec extradata (SPS/PPS). Must be called before first publish.
* Если size > CUFRAMES_PKT_EXTRADATA_MAX → ERR_INVALID_ARG. */
int cuframes_internal_pkt_ring_set_extradata(cuframes_pkt_ring_t *ring,
const void *extradata,
size_t size);
/* Publisher: publish single encoded packet. Slow consumer = overwrite oldest.
* Returns CUFRAMES_ERR_PACKET_OVERSIZED если size > data_size. */
int cuframes_internal_pkt_ring_publish(cuframes_pkt_ring_t *ring,
const void *data, size_t size,
int64_t pts_ns, int64_t dts_ns,
uint32_t flags);
/* Subscriber: open existing SHM by shm name (from HELLO_RESP packet_shm_path). */
int cuframes_internal_pkt_ring_open(const char *shm_name,
cuframes_pkt_ring_t *ring_out);
/* Subscriber: read next packet.
* *seq_inout — currently held seq (we read seq_inout+1); updated on success.
* out_buf must have ≥ max_packet_size bytes; out_size receives actual size.
* Returns:
* CUFRAMES_OK on success
* CUFRAMES_ERR_PACKET_OVERRUN если publisher уехал — caller resync on keyframe
* CUFRAMES_ERR_TIMEOUT если нет нового packet
* CUFRAMES_ERR_DISCONNECTED если publisher shutdown */
int cuframes_internal_pkt_ring_read(cuframes_pkt_ring_t *ring,
uint64_t *seq_inout,
void *out_buf, size_t out_buf_max,
size_t *out_size,
int64_t *out_pts, int64_t *out_dts,
uint32_t *out_flags);
/* Publisher OR Subscriber: cleanup mmap + close FD. Publisher additionally shm_unlink. */
void cuframes_internal_pkt_ring_destroy(cuframes_pkt_ring_t *ring);
#endif /* CUFRAMES_INTERNAL_H */
+380
View File
@@ -0,0 +1,380 @@
/* libcuframes/src/packet_ring.c
*
* Variable-length encoded packet ring buffer (docs/protocol.md §10).
*
* Использует POSIX shared memory (`/cuframes-<key>-packets`), packed
* structures с _Atomic полями, seqlock-style read для защиты от overrun
* mid-read.
*
* Этот модуль внутренний — exposed API будет в Step 3 (cuframes.h
* extension). Сейчас functions имеют prefix `cuframes_internal_pkt_ring_*`
* и используются из producer.c / consumer.c.
*/
#include <errno.h>
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include "internal.h"
/* ─── Internal helpers ────────────────────────────────────────────────── */
static void wraparound_memcpy(uint8_t *dst, const uint8_t *src, size_t n,
size_t buf_size, size_t offset) {
/* Запись n байт начиная с offset в buf размера buf_size, wraparound. */
size_t off = offset % buf_size;
size_t first = n;
if (first > buf_size - off) first = buf_size - off;
memcpy(dst + off, src, first);
if (first < n) {
memcpy(dst, src + first, n - first);
}
}
static void wraparound_memcpy_from(uint8_t *out, const uint8_t *buf,
size_t buf_size, size_t offset, size_t n) {
/* Чтение n байт из buf с wraparound от offset. */
size_t off = offset % buf_size;
size_t first = n;
if (first > buf_size - off) first = buf_size - off;
memcpy(out, buf + off, first);
if (first < n) {
memcpy(out + first, buf, n - first);
}
}
/* ─── Publisher API ───────────────────────────────────────────────────── */
int cuframes_internal_pkt_ring_create(const char *key,
uint32_t slots,
uint32_t data_size,
uint32_t codec_id,
cuframes_pkt_ring_t *ring_out) {
if (!ring_out) return CUFRAMES_ERR_INVALID_ARG;
if (slots == 0 || slots > CUFRAMES_PKT_MAX_SLOTS) return CUFRAMES_ERR_INVALID_ARG;
if (data_size == 0) return CUFRAMES_ERR_INVALID_ARG;
memset(ring_out, 0, sizeof(*ring_out));
ring_out->shm_fd = -1;
ring_out->is_publisher = 1;
int r = cuframes_internal_pkt_shm_name(key, ring_out->shm_name,
sizeof(ring_out->shm_name));
if (r != CUFRAMES_OK) return r;
/* Stale recovery (как в frames SHM) */
int fd = shm_open(ring_out->shm_name, O_CREAT | O_EXCL | O_RDWR, 0644);
if (fd < 0) {
if (errno == EEXIST) {
int existing = shm_open(ring_out->shm_name, O_RDWR, 0);
if (existing >= 0) {
cuframes_pkt_header_t tmp;
ssize_t rb = read(existing, &tmp, sizeof(tmp));
close(existing);
if (rb == (ssize_t)sizeof(tmp) && tmp.magic == CUFRAMES_PKT_MAGIC) {
if (cuframes_internal_pid_alive((pid_t)tmp.producer_pid)) {
CUFRAMES_LOG_ERROR("packet ring %s: publisher pid %lu still alive",
ring_out->shm_name,
(unsigned long)tmp.producer_pid);
return CUFRAMES_ERR_ALREADY_EXISTS;
}
}
}
CUFRAMES_LOG_INFO("stale packet shm %s — unlinking", ring_out->shm_name);
shm_unlink(ring_out->shm_name);
fd = shm_open(ring_out->shm_name, O_CREAT | O_EXCL | O_RDWR, 0644);
if (fd < 0) {
CUFRAMES_LOG_ERROR("packet shm_open after unlink: %s", strerror(errno));
return CUFRAMES_ERR_IO;
}
} else {
CUFRAMES_LOG_ERROR("packet shm_open: %s", strerror(errno));
return CUFRAMES_ERR_IO;
}
}
size_t total_size = cuframes_pkt_shm_size(slots, data_size);
if (ftruncate(fd, (off_t)total_size) < 0) {
CUFRAMES_LOG_ERROR("packet ftruncate(%zu): %s", total_size, strerror(errno));
close(fd);
shm_unlink(ring_out->shm_name);
return CUFRAMES_ERR_IO;
}
void *base = mmap(NULL, total_size, PROT_READ | PROT_WRITE,
MAP_SHARED, fd, 0);
if (base == MAP_FAILED) {
CUFRAMES_LOG_ERROR("packet mmap: %s", strerror(errno));
close(fd);
shm_unlink(ring_out->shm_name);
return CUFRAMES_ERR_IO;
}
ring_out->shm_fd = fd;
ring_out->shm_base = base;
ring_out->shm_size = total_size;
ring_out->hdr = (cuframes_pkt_header_t *)base;
/* Initialize header — нули + magic/version/sizes */
memset(ring_out->hdr, 0, sizeof(*ring_out->hdr));
ring_out->hdr->magic = CUFRAMES_PKT_MAGIC;
ring_out->hdr->proto_version = CUFRAMES_PROTOCOL_V2;
ring_out->hdr->ring_slots = slots;
ring_out->hdr->data_size = data_size;
ring_out->hdr->codec_id = codec_id;
ring_out->hdr->codec_extradata_size = 0;
ring_out->hdr->producer_pid = (uint64_t)getpid();
atomic_store_explicit(&ring_out->hdr->global_seq, UINT64_MAX,
memory_order_release);
atomic_store_explicit(&ring_out->hdr->last_keyframe_seq, UINT64_MAX,
memory_order_release);
atomic_store_explicit(&ring_out->hdr->write_offset, 0,
memory_order_release);
atomic_store_explicit(&ring_out->hdr->shutdown_flag, 0,
memory_order_release);
/* Initialize slots — invalid seq markers */
cuframes_pkt_slot_t *slots_arr = cuframes_pkt_slots(ring_out->hdr);
for (uint32_t i = 0; i < slots; ++i) {
atomic_store_explicit(&slots_arr[i].seq, UINT64_MAX,
memory_order_release);
}
/* Data section уже zeroed через ftruncate (POSIX guarantees) */
CUFRAMES_LOG_INFO("packet ring %s: slots=%u data_size=%u codec_id=%u (total=%zu bytes)",
ring_out->shm_name, slots, data_size, codec_id, total_size);
return CUFRAMES_OK;
}
int cuframes_internal_pkt_ring_set_extradata(cuframes_pkt_ring_t *ring,
const void *extradata,
size_t size) {
if (!ring || !ring->hdr) return CUFRAMES_ERR_INVALID_ARG;
if (!ring->is_publisher) return CUFRAMES_ERR_INVALID_ARG;
if (size > CUFRAMES_PKT_EXTRADATA_MAX) return CUFRAMES_ERR_INVALID_ARG;
if (size > 0 && !extradata) return CUFRAMES_ERR_INVALID_ARG;
/* Записываем сначала bytes, потом size (release-style — subscriber видит size>0 только когда extradata готов). */
if (size > 0) {
memcpy(ring->hdr->codec_extradata, extradata, size);
/* Memory barrier — extradata stores complete до size update. */
__atomic_thread_fence(__ATOMIC_RELEASE);
}
ring->hdr->codec_extradata_size = (uint32_t)size;
return CUFRAMES_OK;
}
int cuframes_internal_pkt_ring_publish(cuframes_pkt_ring_t *ring,
const void *data, size_t size,
int64_t pts_ns, int64_t dts_ns,
uint32_t flags) {
if (!ring || !ring->hdr) return CUFRAMES_ERR_INVALID_ARG;
if (!ring->is_publisher) return CUFRAMES_ERR_INVALID_ARG;
if (size == 0 || !data) return CUFRAMES_ERR_INVALID_ARG;
if (size > ring->hdr->data_size) return CUFRAMES_ERR_PACKET_OVERSIZED;
cuframes_pkt_header_t *hdr = ring->hdr;
/* Allocate next seq + cursor offset. Single-publisher — без CAS. */
uint64_t prev_seq = atomic_load_explicit(&hdr->global_seq,
memory_order_relaxed);
uint64_t new_seq = (prev_seq == UINT64_MAX) ? 0 : prev_seq + 1;
uint64_t write_off = atomic_load_explicit(&hdr->write_offset,
memory_order_relaxed);
/* Записать payload в data ring (wraparound aware) */
wraparound_memcpy(cuframes_pkt_data(hdr), data, size,
hdr->data_size, write_off);
/* Записать slot metadata. Slot index = seq % ring_slots. */
uint32_t slot_idx = (uint32_t)(new_seq % hdr->ring_slots);
cuframes_pkt_slot_t *slot = &cuframes_pkt_slots(hdr)[slot_idx];
slot->pts_ns = pts_ns;
slot->dts_ns = dts_ns;
slot->data_offset = write_off;
slot->data_size = (uint32_t)size;
slot->flags = flags;
/* RELEASE order — payload bytes + slot metadata готовы перед publish seq. */
atomic_store_explicit(&slot->seq, new_seq, memory_order_release);
/* Update global cursor + global_seq. */
atomic_store_explicit(&hdr->write_offset, write_off + size,
memory_order_release);
atomic_store_explicit(&hdr->global_seq, new_seq,
memory_order_release);
/* Keyframe — update last_keyframe_seq для late subscribers. */
if (flags & CUFRAMES_PKT_FLAG_KEY) {
atomic_store_explicit(&hdr->last_keyframe_seq, new_seq,
memory_order_release);
}
return CUFRAMES_OK;
}
/* ─── Subscriber API ──────────────────────────────────────────────────── */
int cuframes_internal_pkt_ring_open(const char *shm_name,
cuframes_pkt_ring_t *ring_out) {
if (!shm_name || !ring_out) return CUFRAMES_ERR_INVALID_ARG;
memset(ring_out, 0, sizeof(*ring_out));
ring_out->shm_fd = -1;
ring_out->is_publisher = 0;
strncpy(ring_out->shm_name, shm_name, sizeof(ring_out->shm_name) - 1);
int fd = shm_open(shm_name, O_RDONLY, 0);
if (fd < 0) {
if (errno == ENOENT) return CUFRAMES_ERR_NOT_FOUND;
CUFRAMES_LOG_ERROR("packet shm_open(%s) ro: %s", shm_name, strerror(errno));
return CUFRAMES_ERR_IO;
}
/* Прочитать header чтобы узнать total size */
cuframes_pkt_header_t header_peek;
ssize_t rb = read(fd, &header_peek, sizeof(header_peek));
if (rb != (ssize_t)sizeof(header_peek)) {
close(fd);
return CUFRAMES_ERR_IO;
}
if (header_peek.magic != CUFRAMES_PKT_MAGIC) {
CUFRAMES_LOG_ERROR("packet shm %s: bad magic 0x%08x", shm_name, header_peek.magic);
close(fd);
return CUFRAMES_ERR_PROTOCOL;
}
if (header_peek.proto_version != CUFRAMES_PROTOCOL_V2) {
CUFRAMES_LOG_ERROR("packet shm %s: proto_version=%u (expected %u)",
shm_name, header_peek.proto_version, CUFRAMES_PROTOCOL_V2);
close(fd);
return CUFRAMES_ERR_PROTOCOL;
}
size_t total = cuframes_pkt_shm_size(header_peek.ring_slots,
header_peek.data_size);
/* mmap полностью read-only */
void *base = mmap(NULL, total, PROT_READ, MAP_SHARED, fd, 0);
if (base == MAP_FAILED) {
CUFRAMES_LOG_ERROR("packet mmap ro: %s", strerror(errno));
close(fd);
return CUFRAMES_ERR_IO;
}
ring_out->shm_fd = fd;
ring_out->shm_base = base;
ring_out->shm_size = total;
ring_out->hdr = (cuframes_pkt_header_t *)base;
CUFRAMES_LOG_INFO("packet ring %s opened: slots=%u data_size=%u",
shm_name, header_peek.ring_slots, header_peek.data_size);
return CUFRAMES_OK;
}
int cuframes_internal_pkt_ring_read(cuframes_pkt_ring_t *ring,
uint64_t *seq_inout,
void *out_buf, size_t out_buf_max,
size_t *out_size,
int64_t *out_pts, int64_t *out_dts,
uint32_t *out_flags) {
if (!ring || !ring->hdr || !seq_inout || !out_buf || !out_size
|| !out_pts || !out_dts || !out_flags) {
return CUFRAMES_ERR_INVALID_ARG;
}
cuframes_pkt_header_t *hdr = ring->hdr;
/* Publisher shutdown? */
if (atomic_load_explicit(&hdr->shutdown_flag, memory_order_acquire) != 0) {
return CUFRAMES_ERR_DISCONNECTED;
}
/* Текущий published seq */
uint64_t cur = atomic_load_explicit(&hdr->global_seq, memory_order_acquire);
if (cur == UINT64_MAX) return CUFRAMES_ERR_TIMEOUT; /* нет published */
if (*seq_inout != UINT64_MAX && cur <= *seq_inout) {
return CUFRAMES_ERR_TIMEOUT;
}
/* Calculate the next seq we want (handle первый read с UINT64_MAX → start с 0) */
uint64_t want_seq = (*seq_inout == UINT64_MAX) ? 0 : (*seq_inout + 1);
/* Если want_seq < cur и slot уже перезаписан — попадаем в OVERRUN */
if (cur - want_seq >= hdr->ring_slots) {
/* Скорее всего slot уже rewritten. Подсказка caller'у — resync. */
return CUFRAMES_ERR_PACKET_OVERRUN;
}
uint32_t slot_idx = (uint32_t)(want_seq % hdr->ring_slots);
cuframes_pkt_slot_t *slot = &cuframes_pkt_slots(hdr)[slot_idx];
/* Seqlock-style read: load seq, prove not overwritten после copy. */
uint64_t s1 = atomic_load_explicit(&slot->seq, memory_order_acquire);
if (s1 != want_seq) {
/* Slot уже занят следующим packet'ом — overrun. */
return CUFRAMES_ERR_PACKET_OVERRUN;
}
/* Снять metadata (non-atomic — read OK поскольку post-check защищает) */
uint64_t data_off = slot->data_offset;
uint32_t data_sz = slot->data_size;
int64_t pts = slot->pts_ns;
int64_t dts = slot->dts_ns;
uint32_t flags = slot->flags;
if (data_sz > out_buf_max) {
return CUFRAMES_ERR_INVALID_ARG; /* caller's buf too small */
}
/* Copy payload */
wraparound_memcpy_from((uint8_t *)out_buf,
cuframes_pkt_data(hdr),
hdr->data_size, data_off, data_sz);
/* Post-check: slot->seq не изменился во время copy. */
uint64_t s2 = atomic_load_explicit(&slot->seq, memory_order_acquire);
if (s2 != want_seq) {
return CUFRAMES_ERR_PACKET_OVERRUN;
}
*out_size = data_sz;
*out_pts = pts;
*out_dts = dts;
*out_flags = flags;
*seq_inout = want_seq;
return CUFRAMES_OK;
}
/* ─── Cleanup ─────────────────────────────────────────────────────────── */
void cuframes_internal_pkt_ring_destroy(cuframes_pkt_ring_t *ring) {
if (!ring) return;
if (ring->is_publisher && ring->hdr) {
/* Сигнализируем consumer'ам shutdown */
atomic_store_explicit(&ring->hdr->shutdown_flag, 1,
memory_order_release);
}
if (ring->shm_base && ring->shm_size > 0) {
munmap(ring->shm_base, ring->shm_size);
}
if (ring->shm_fd >= 0) {
close(ring->shm_fd);
}
if (ring->is_publisher && ring->shm_name[0] != '\0') {
shm_unlink(ring->shm_name);
}
memset(ring, 0, sizeof(*ring));
ring->shm_fd = -1;
}
+13
View File
@@ -32,6 +32,10 @@ const char *cuframes_strerror(int err) {
case CUFRAMES_ERR_FORMAT: return "unsupported format or size mismatch";
case CUFRAMES_ERR_WOULD_BLOCK: return "would block";
case CUFRAMES_ERR_TOO_MANY: return "too many subscribers (max 32)";
case CUFRAMES_ERR_PACKET_OVERSIZED: return "packet exceeds max_packet_size";
case CUFRAMES_ERR_NO_PACKET_RING: return "publisher has no packet ring";
case CUFRAMES_ERR_NO_CODEC_PARAMS: return "codec extradata not set by publisher";
case CUFRAMES_ERR_PACKET_OVERRUN: return "packet ring overrun — resync on keyframe";
case CUFRAMES_ERR_INTERNAL: return "internal error (please report)";
default: return "unknown error";
}
@@ -83,6 +87,15 @@ int cuframes_internal_shm_name(const char *key, char *out, size_t out_size) {
return CUFRAMES_OK;
}
int cuframes_internal_pkt_shm_name(const char *key, char *out, size_t out_size) {
int r = cuframes_internal_validate_key(key);
if (r != CUFRAMES_OK) return r;
int n = snprintf(out, out_size, "%s%s%s",
CUFRAMES_SHM_PREFIX, key, CUFRAMES_PKT_SHM_SUFFIX);
if (n < 0 || (size_t)n >= out_size) return CUFRAMES_ERR_INVALID_ARG;
return CUFRAMES_OK;
}
int cuframes_internal_ensure_runtime_dir(void) {
if (mkdir(CUFRAMES_RUNTIME_DIR, 0755) == 0) return CUFRAMES_OK;
if (errno == EEXIST) return CUFRAMES_OK;