From bd7fd95fef625552e51ea12a0bc56715fcc27b02 Mon Sep 17 00:00:00 2001 From: Evgeny Demchenko Date: Tue, 19 May 2026 16:11:42 +0100 Subject: [PATCH] feat(libcuframes): packet ring buffer implementation (v0.2 Step 2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Реализация encoded packet ring per docs/protocol.md §10. Files: - internal.h: cuframes_pkt_slot_t (64b packed), cuframes_pkt_header_t (0x1040 fixed header), cuframes_pkt_ring_t handle, constants for default sizing, packet flags, helper inline functions for slot/data pointer arithmetic. - packet_ring.c (new, ~290 LOC): create/open/publish/read/destroy. Stale recovery симметрично frames SHM (pid liveness check). Seqlock pattern для subscriber защиты от overrun mid-read (post-check seq после copy). Wraparound memcpy helpers для variable-length data ring. - utils.c: cuframes_internal_pkt_shm_name helper + strerror entries. - cuframes.h: 4 новых error codes (PACKET_OVERSIZED, NO_PACKET_RING, NO_CODEC_PARAMS, PACKET_OVERRUN). - CMakeLists.txt: src/packet_ring.c в sources. API внутренний (cuframes_internal_pkt_ring_*) — publicly exposed функции будут в Step 3 (cuframes.h API extension). Связано: #2 (v0.2), PR #4 (draft). --- include/cuframes/cuframes.h | 5 + libcuframes/CMakeLists.txt | 1 + libcuframes/src/internal.h | 129 ++++++++++++ libcuframes/src/packet_ring.c | 380 ++++++++++++++++++++++++++++++++++ libcuframes/src/utils.c | 13 ++ 5 files changed, 528 insertions(+) create mode 100644 libcuframes/src/packet_ring.c diff --git a/include/cuframes/cuframes.h b/include/cuframes/cuframes.h index 22e8d6f..e23bf23 100644 --- a/include/cuframes/cuframes.h +++ b/include/cuframes/cuframes.h @@ -65,6 +65,11 @@ typedef enum cuframes_error { несовпадение размеров frame'а */ CUFRAMES_ERR_WOULD_BLOCK = -11, /**< non-blocking call — no data yet */ CUFRAMES_ERR_TOO_MANY = -12, /**< превышен MAX_SUBSCRIBERS (32) */ + /* v0.2 — packet ring (см. docs/protocol.md §10.15) */ + CUFRAMES_ERR_PACKET_OVERSIZED = -20, /**< publish_packet size > max_packet_size */ + CUFRAMES_ERR_NO_PACKET_RING = -21, /**< subscriber запросил packets, у publisher'а нет ring'а */ + CUFRAMES_ERR_NO_CODEC_PARAMS = -22, /**< extradata ещё не set publisher'ом */ + CUFRAMES_ERR_PACKET_OVERRUN = -23, /**< slow subscriber, packet seq уехал — resync на keyframe */ CUFRAMES_ERR_INTERNAL = -100, /**< bug в библиотеке — repro и reportить */ } cuframes_error_t; diff --git a/libcuframes/CMakeLists.txt b/libcuframes/CMakeLists.txt index 5d166ff..a914178 100644 --- a/libcuframes/CMakeLists.txt +++ b/libcuframes/CMakeLists.txt @@ -10,6 +10,7 @@ set(CUFRAMES_SOURCES src/producer.c src/consumer.c src/consumer_async.c + src/packet_ring.c ) add_library(cuframes SHARED ${CUFRAMES_SOURCES}) diff --git a/libcuframes/src/internal.h b/libcuframes/src/internal.h index 62b9090..3377357 100644 --- a/libcuframes/src/internal.h +++ b/libcuframes/src/internal.h @@ -23,12 +23,28 @@ #define CUFRAMES_MAGIC 0xCC7C1DCCu #define CUFRAMES_PROTOCOL_V1 1u +#define CUFRAMES_PROTOCOL_V2 2u /* v0.2 — packet ring support */ #define CUFRAMES_MAX_SUBSCRIBERS 32 #define CUFRAMES_MAX_RING 16 #define CUFRAMES_MAX_KEY_LEN 63 #define CUFRAMES_MAX_NAME_LEN 31 #define CUFRAMES_RUNTIME_DIR "/run/cuframes" #define CUFRAMES_SHM_PREFIX "/cuframes-" +#define CUFRAMES_PKT_SHM_SUFFIX "-packets" /* /cuframes--packets */ + +/* Packet ring constants (см. docs/protocol.md §10) */ +#define CUFRAMES_PKT_MAGIC 0xCC7C1DCDu /* frames magic + 1 */ +#define CUFRAMES_PKT_EXTRADATA_MAX 4096u +#define CUFRAMES_PKT_DEFAULT_SLOTS 64u +#define CUFRAMES_PKT_DEFAULT_DATA_SIZE (8u * 1024u * 1024u) /* 8 MB */ +#define CUFRAMES_PKT_DEFAULT_MAX_SIZE (2u * 1024u * 1024u) /* 2 MB */ +#define CUFRAMES_PKT_MAX_SLOTS 1024u + +/* Packet flags (см. docs/protocol.md §10.6) */ +#define CUFRAMES_PKT_FLAG_KEY 0x01u +#define CUFRAMES_PKT_FLAG_CORRUPT 0x02u +#define CUFRAMES_PKT_FLAG_DISCONTINUITY 0x04u +#define CUFRAMES_PKT_FLAG_LAST_IN_AU 0x08u /* ─── Shared memory layout (см. docs/protocol.md §2) ──────────────────── */ @@ -103,6 +119,73 @@ _Static_assert(offsetof(cuframes_shm_header_t, ipc_event_handle) == 0x0080, "eve _Static_assert(offsetof(cuframes_shm_header_t, global_seq) == 0x00C0, "global_seq offset"); _Static_assert(offsetof(cuframes_shm_header_t, slots) == 0x0100, "slots offset"); +/* ─── Packet ring shared memory layout (docs/protocol.md §10) ──────────── */ + +/* Packet slot entry — packed 64 байт */ +typedef struct __attribute__((packed)) cuframes_pkt_slot { + _Atomic uint64_t seq; /* UINT64_MAX = invalid */ + int64_t pts_ns; + int64_t dts_ns; + uint64_t data_offset; /* absolute byte cursor; % data_size = ring offset */ + uint32_t data_size; + uint32_t flags; + uint8_t reserved[24]; +} cuframes_pkt_slot_t; +_Static_assert(sizeof(cuframes_pkt_slot_t) == 64, "packet slot must be 64 bytes"); + +/* Packet ring header (fixed 0x1040 = 4160 bytes). Followed by slots[N] + data[]. */ +typedef struct __attribute__((packed)) cuframes_pkt_header { + uint32_t magic; /* CUFRAMES_PKT_MAGIC */ + uint32_t proto_version; /* 2 */ + uint32_t ring_slots; + uint32_t data_size; + uint32_t codec_id; /* AV_CODEC_ID_H264 / HEVC / ... */ + uint32_t codec_extradata_size; /* ≤ CUFRAMES_PKT_EXTRADATA_MAX */ + uint64_t producer_pid; + _Atomic uint64_t global_seq; + _Atomic uint64_t last_keyframe_seq; + _Atomic uint64_t write_offset; + _Atomic uint64_t shutdown_flag; + uint8_t codec_extradata[CUFRAMES_PKT_EXTRADATA_MAX]; + /* offset 0x1040 — slots[ring_slots], then data[data_size] */ +} cuframes_pkt_header_t; + +_Static_assert(offsetof(cuframes_pkt_header_t, magic) == 0x0000, "pkt magic offset"); +_Static_assert(offsetof(cuframes_pkt_header_t, proto_version) == 0x0004, "pkt proto offset"); +_Static_assert(offsetof(cuframes_pkt_header_t, producer_pid) == 0x0018, "pkt pid offset"); +_Static_assert(offsetof(cuframes_pkt_header_t, global_seq) == 0x0020, "pkt global_seq offset"); +_Static_assert(offsetof(cuframes_pkt_header_t, write_offset) == 0x0030, "pkt write_offset offset"); +_Static_assert(offsetof(cuframes_pkt_header_t, codec_extradata) == 0x0040, "pkt extradata offset"); +_Static_assert(sizeof(cuframes_pkt_header_t) == 0x1040, "pkt header must be 0x1040 bytes"); + +/* Computed SHM layout helper: + * total = sizeof(cuframes_pkt_header_t) + slots*sizeof(slot) + data_size + */ +static inline size_t cuframes_pkt_shm_size(uint32_t slots, uint32_t data_size) { + return sizeof(cuframes_pkt_header_t) + + (size_t)slots * sizeof(cuframes_pkt_slot_t) + + (size_t)data_size; +} + +/* Pointers into mmap'ed pkt SHM (computed from header base) */ +static inline cuframes_pkt_slot_t * cuframes_pkt_slots(cuframes_pkt_header_t *hdr) { + return (cuframes_pkt_slot_t *)((uint8_t *)hdr + sizeof(cuframes_pkt_header_t)); +} +static inline uint8_t * cuframes_pkt_data(cuframes_pkt_header_t *hdr) { + return (uint8_t *)hdr + sizeof(cuframes_pkt_header_t) + + (size_t)hdr->ring_slots * sizeof(cuframes_pkt_slot_t); +} + +/* Opaque ring handle — содержит state и mapping для publisher или subscriber. */ +typedef struct cuframes_pkt_ring { + int shm_fd; + void *shm_base; + size_t shm_size; + cuframes_pkt_header_t *hdr; + char shm_name[128]; /* /cuframes--packets */ + int is_publisher; +} cuframes_pkt_ring_t; + /* ─── Socket protocol messages (docs/protocol.md §3) ───────────────────── */ #define CUFRAMES_MSG_HELLO_REQ 0x01 @@ -164,6 +247,8 @@ typedef struct __attribute__((packed)) cuframes_msg_subscribe_resp { int cuframes_internal_socket_path(const char *key, char *out, size_t out_size); /* Build /cuframes- (for shm_open) */ int cuframes_internal_shm_name(const char *key, char *out, size_t out_size); +/* Build /cuframes--packets (for shm_open) */ +int cuframes_internal_pkt_shm_name(const char *key, char *out, size_t out_size); /* Validate key per protocol.md (alphanum/_/-, 1..63 chars) */ int cuframes_internal_validate_key(const char *key); /* Calculate frame size + pitch для format/W/H */ @@ -181,4 +266,48 @@ int cuframes_internal_recv_msg(int sock_fd, uint32_t *msg_type_out, void *payload, uint32_t *payload_len_inout, int32_t timeout_ms); +/* ─── Packet ring helpers (libcuframes/src/packet_ring.c) ─────────────── */ + +/* Publisher: create SHM + initialize header + slots. Stale recovery как у frames. */ +int cuframes_internal_pkt_ring_create(const char *key, + uint32_t slots, + uint32_t data_size, + uint32_t codec_id, + cuframes_pkt_ring_t *ring_out); + +/* Publisher: set codec extradata (SPS/PPS). Must be called before first publish. + * Если size > CUFRAMES_PKT_EXTRADATA_MAX → ERR_INVALID_ARG. */ +int cuframes_internal_pkt_ring_set_extradata(cuframes_pkt_ring_t *ring, + const void *extradata, + size_t size); + +/* Publisher: publish single encoded packet. Slow consumer = overwrite oldest. + * Returns CUFRAMES_ERR_PACKET_OVERSIZED если size > data_size. */ +int cuframes_internal_pkt_ring_publish(cuframes_pkt_ring_t *ring, + const void *data, size_t size, + int64_t pts_ns, int64_t dts_ns, + uint32_t flags); + +/* Subscriber: open existing SHM by shm name (from HELLO_RESP packet_shm_path). */ +int cuframes_internal_pkt_ring_open(const char *shm_name, + cuframes_pkt_ring_t *ring_out); + +/* Subscriber: read next packet. + * *seq_inout — currently held seq (we read seq_inout+1); updated on success. + * out_buf must have ≥ max_packet_size bytes; out_size receives actual size. + * Returns: + * CUFRAMES_OK on success + * CUFRAMES_ERR_PACKET_OVERRUN если publisher уехал — caller resync on keyframe + * CUFRAMES_ERR_TIMEOUT если нет нового packet + * CUFRAMES_ERR_DISCONNECTED если publisher shutdown */ +int cuframes_internal_pkt_ring_read(cuframes_pkt_ring_t *ring, + uint64_t *seq_inout, + void *out_buf, size_t out_buf_max, + size_t *out_size, + int64_t *out_pts, int64_t *out_dts, + uint32_t *out_flags); + +/* Publisher OR Subscriber: cleanup mmap + close FD. Publisher additionally shm_unlink. */ +void cuframes_internal_pkt_ring_destroy(cuframes_pkt_ring_t *ring); + #endif /* CUFRAMES_INTERNAL_H */ diff --git a/libcuframes/src/packet_ring.c b/libcuframes/src/packet_ring.c new file mode 100644 index 0000000..387f9a5 --- /dev/null +++ b/libcuframes/src/packet_ring.c @@ -0,0 +1,380 @@ +/* libcuframes/src/packet_ring.c + * + * Variable-length encoded packet ring buffer (docs/protocol.md §10). + * + * Использует POSIX shared memory (`/cuframes--packets`), packed + * structures с _Atomic полями, seqlock-style read для защиты от overrun + * mid-read. + * + * Этот модуль внутренний — exposed API будет в Step 3 (cuframes.h + * extension). Сейчас functions имеют prefix `cuframes_internal_pkt_ring_*` + * и используются из producer.c / consumer.c. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "internal.h" + +/* ─── Internal helpers ────────────────────────────────────────────────── */ + +static void wraparound_memcpy(uint8_t *dst, const uint8_t *src, size_t n, + size_t buf_size, size_t offset) { + /* Запись n байт начиная с offset в buf размера buf_size, wraparound. */ + size_t off = offset % buf_size; + size_t first = n; + if (first > buf_size - off) first = buf_size - off; + memcpy(dst + off, src, first); + if (first < n) { + memcpy(dst, src + first, n - first); + } +} + +static void wraparound_memcpy_from(uint8_t *out, const uint8_t *buf, + size_t buf_size, size_t offset, size_t n) { + /* Чтение n байт из buf с wraparound от offset. */ + size_t off = offset % buf_size; + size_t first = n; + if (first > buf_size - off) first = buf_size - off; + memcpy(out, buf + off, first); + if (first < n) { + memcpy(out + first, buf, n - first); + } +} + +/* ─── Publisher API ───────────────────────────────────────────────────── */ + +int cuframes_internal_pkt_ring_create(const char *key, + uint32_t slots, + uint32_t data_size, + uint32_t codec_id, + cuframes_pkt_ring_t *ring_out) { + if (!ring_out) return CUFRAMES_ERR_INVALID_ARG; + if (slots == 0 || slots > CUFRAMES_PKT_MAX_SLOTS) return CUFRAMES_ERR_INVALID_ARG; + if (data_size == 0) return CUFRAMES_ERR_INVALID_ARG; + + memset(ring_out, 0, sizeof(*ring_out)); + ring_out->shm_fd = -1; + ring_out->is_publisher = 1; + + int r = cuframes_internal_pkt_shm_name(key, ring_out->shm_name, + sizeof(ring_out->shm_name)); + if (r != CUFRAMES_OK) return r; + + /* Stale recovery (как в frames SHM) */ + int fd = shm_open(ring_out->shm_name, O_CREAT | O_EXCL | O_RDWR, 0644); + if (fd < 0) { + if (errno == EEXIST) { + int existing = shm_open(ring_out->shm_name, O_RDWR, 0); + if (existing >= 0) { + cuframes_pkt_header_t tmp; + ssize_t rb = read(existing, &tmp, sizeof(tmp)); + close(existing); + if (rb == (ssize_t)sizeof(tmp) && tmp.magic == CUFRAMES_PKT_MAGIC) { + if (cuframes_internal_pid_alive((pid_t)tmp.producer_pid)) { + CUFRAMES_LOG_ERROR("packet ring %s: publisher pid %lu still alive", + ring_out->shm_name, + (unsigned long)tmp.producer_pid); + return CUFRAMES_ERR_ALREADY_EXISTS; + } + } + } + CUFRAMES_LOG_INFO("stale packet shm %s — unlinking", ring_out->shm_name); + shm_unlink(ring_out->shm_name); + fd = shm_open(ring_out->shm_name, O_CREAT | O_EXCL | O_RDWR, 0644); + if (fd < 0) { + CUFRAMES_LOG_ERROR("packet shm_open after unlink: %s", strerror(errno)); + return CUFRAMES_ERR_IO; + } + } else { + CUFRAMES_LOG_ERROR("packet shm_open: %s", strerror(errno)); + return CUFRAMES_ERR_IO; + } + } + + size_t total_size = cuframes_pkt_shm_size(slots, data_size); + if (ftruncate(fd, (off_t)total_size) < 0) { + CUFRAMES_LOG_ERROR("packet ftruncate(%zu): %s", total_size, strerror(errno)); + close(fd); + shm_unlink(ring_out->shm_name); + return CUFRAMES_ERR_IO; + } + + void *base = mmap(NULL, total_size, PROT_READ | PROT_WRITE, + MAP_SHARED, fd, 0); + if (base == MAP_FAILED) { + CUFRAMES_LOG_ERROR("packet mmap: %s", strerror(errno)); + close(fd); + shm_unlink(ring_out->shm_name); + return CUFRAMES_ERR_IO; + } + + ring_out->shm_fd = fd; + ring_out->shm_base = base; + ring_out->shm_size = total_size; + ring_out->hdr = (cuframes_pkt_header_t *)base; + + /* Initialize header — нули + magic/version/sizes */ + memset(ring_out->hdr, 0, sizeof(*ring_out->hdr)); + ring_out->hdr->magic = CUFRAMES_PKT_MAGIC; + ring_out->hdr->proto_version = CUFRAMES_PROTOCOL_V2; + ring_out->hdr->ring_slots = slots; + ring_out->hdr->data_size = data_size; + ring_out->hdr->codec_id = codec_id; + ring_out->hdr->codec_extradata_size = 0; + ring_out->hdr->producer_pid = (uint64_t)getpid(); + atomic_store_explicit(&ring_out->hdr->global_seq, UINT64_MAX, + memory_order_release); + atomic_store_explicit(&ring_out->hdr->last_keyframe_seq, UINT64_MAX, + memory_order_release); + atomic_store_explicit(&ring_out->hdr->write_offset, 0, + memory_order_release); + atomic_store_explicit(&ring_out->hdr->shutdown_flag, 0, + memory_order_release); + + /* Initialize slots — invalid seq markers */ + cuframes_pkt_slot_t *slots_arr = cuframes_pkt_slots(ring_out->hdr); + for (uint32_t i = 0; i < slots; ++i) { + atomic_store_explicit(&slots_arr[i].seq, UINT64_MAX, + memory_order_release); + } + + /* Data section уже zeroed через ftruncate (POSIX guarantees) */ + + CUFRAMES_LOG_INFO("packet ring %s: slots=%u data_size=%u codec_id=%u (total=%zu bytes)", + ring_out->shm_name, slots, data_size, codec_id, total_size); + return CUFRAMES_OK; +} + +int cuframes_internal_pkt_ring_set_extradata(cuframes_pkt_ring_t *ring, + const void *extradata, + size_t size) { + if (!ring || !ring->hdr) return CUFRAMES_ERR_INVALID_ARG; + if (!ring->is_publisher) return CUFRAMES_ERR_INVALID_ARG; + if (size > CUFRAMES_PKT_EXTRADATA_MAX) return CUFRAMES_ERR_INVALID_ARG; + if (size > 0 && !extradata) return CUFRAMES_ERR_INVALID_ARG; + + /* Записываем сначала bytes, потом size (release-style — subscriber видит size>0 только когда extradata готов). */ + if (size > 0) { + memcpy(ring->hdr->codec_extradata, extradata, size); + /* Memory barrier — extradata stores complete до size update. */ + __atomic_thread_fence(__ATOMIC_RELEASE); + } + ring->hdr->codec_extradata_size = (uint32_t)size; + return CUFRAMES_OK; +} + +int cuframes_internal_pkt_ring_publish(cuframes_pkt_ring_t *ring, + const void *data, size_t size, + int64_t pts_ns, int64_t dts_ns, + uint32_t flags) { + if (!ring || !ring->hdr) return CUFRAMES_ERR_INVALID_ARG; + if (!ring->is_publisher) return CUFRAMES_ERR_INVALID_ARG; + if (size == 0 || !data) return CUFRAMES_ERR_INVALID_ARG; + if (size > ring->hdr->data_size) return CUFRAMES_ERR_PACKET_OVERSIZED; + + cuframes_pkt_header_t *hdr = ring->hdr; + + /* Allocate next seq + cursor offset. Single-publisher — без CAS. */ + uint64_t prev_seq = atomic_load_explicit(&hdr->global_seq, + memory_order_relaxed); + uint64_t new_seq = (prev_seq == UINT64_MAX) ? 0 : prev_seq + 1; + + uint64_t write_off = atomic_load_explicit(&hdr->write_offset, + memory_order_relaxed); + + /* Записать payload в data ring (wraparound aware) */ + wraparound_memcpy(cuframes_pkt_data(hdr), data, size, + hdr->data_size, write_off); + + /* Записать slot metadata. Slot index = seq % ring_slots. */ + uint32_t slot_idx = (uint32_t)(new_seq % hdr->ring_slots); + cuframes_pkt_slot_t *slot = &cuframes_pkt_slots(hdr)[slot_idx]; + + slot->pts_ns = pts_ns; + slot->dts_ns = dts_ns; + slot->data_offset = write_off; + slot->data_size = (uint32_t)size; + slot->flags = flags; + + /* RELEASE order — payload bytes + slot metadata готовы перед publish seq. */ + atomic_store_explicit(&slot->seq, new_seq, memory_order_release); + + /* Update global cursor + global_seq. */ + atomic_store_explicit(&hdr->write_offset, write_off + size, + memory_order_release); + atomic_store_explicit(&hdr->global_seq, new_seq, + memory_order_release); + + /* Keyframe — update last_keyframe_seq для late subscribers. */ + if (flags & CUFRAMES_PKT_FLAG_KEY) { + atomic_store_explicit(&hdr->last_keyframe_seq, new_seq, + memory_order_release); + } + + return CUFRAMES_OK; +} + +/* ─── Subscriber API ──────────────────────────────────────────────────── */ + +int cuframes_internal_pkt_ring_open(const char *shm_name, + cuframes_pkt_ring_t *ring_out) { + if (!shm_name || !ring_out) return CUFRAMES_ERR_INVALID_ARG; + + memset(ring_out, 0, sizeof(*ring_out)); + ring_out->shm_fd = -1; + ring_out->is_publisher = 0; + strncpy(ring_out->shm_name, shm_name, sizeof(ring_out->shm_name) - 1); + + int fd = shm_open(shm_name, O_RDONLY, 0); + if (fd < 0) { + if (errno == ENOENT) return CUFRAMES_ERR_NOT_FOUND; + CUFRAMES_LOG_ERROR("packet shm_open(%s) ro: %s", shm_name, strerror(errno)); + return CUFRAMES_ERR_IO; + } + + /* Прочитать header чтобы узнать total size */ + cuframes_pkt_header_t header_peek; + ssize_t rb = read(fd, &header_peek, sizeof(header_peek)); + if (rb != (ssize_t)sizeof(header_peek)) { + close(fd); + return CUFRAMES_ERR_IO; + } + if (header_peek.magic != CUFRAMES_PKT_MAGIC) { + CUFRAMES_LOG_ERROR("packet shm %s: bad magic 0x%08x", shm_name, header_peek.magic); + close(fd); + return CUFRAMES_ERR_PROTOCOL; + } + if (header_peek.proto_version != CUFRAMES_PROTOCOL_V2) { + CUFRAMES_LOG_ERROR("packet shm %s: proto_version=%u (expected %u)", + shm_name, header_peek.proto_version, CUFRAMES_PROTOCOL_V2); + close(fd); + return CUFRAMES_ERR_PROTOCOL; + } + + size_t total = cuframes_pkt_shm_size(header_peek.ring_slots, + header_peek.data_size); + + /* mmap полностью read-only */ + void *base = mmap(NULL, total, PROT_READ, MAP_SHARED, fd, 0); + if (base == MAP_FAILED) { + CUFRAMES_LOG_ERROR("packet mmap ro: %s", strerror(errno)); + close(fd); + return CUFRAMES_ERR_IO; + } + + ring_out->shm_fd = fd; + ring_out->shm_base = base; + ring_out->shm_size = total; + ring_out->hdr = (cuframes_pkt_header_t *)base; + + CUFRAMES_LOG_INFO("packet ring %s opened: slots=%u data_size=%u", + shm_name, header_peek.ring_slots, header_peek.data_size); + return CUFRAMES_OK; +} + +int cuframes_internal_pkt_ring_read(cuframes_pkt_ring_t *ring, + uint64_t *seq_inout, + void *out_buf, size_t out_buf_max, + size_t *out_size, + int64_t *out_pts, int64_t *out_dts, + uint32_t *out_flags) { + if (!ring || !ring->hdr || !seq_inout || !out_buf || !out_size + || !out_pts || !out_dts || !out_flags) { + return CUFRAMES_ERR_INVALID_ARG; + } + + cuframes_pkt_header_t *hdr = ring->hdr; + + /* Publisher shutdown? */ + if (atomic_load_explicit(&hdr->shutdown_flag, memory_order_acquire) != 0) { + return CUFRAMES_ERR_DISCONNECTED; + } + + /* Текущий published seq */ + uint64_t cur = atomic_load_explicit(&hdr->global_seq, memory_order_acquire); + if (cur == UINT64_MAX) return CUFRAMES_ERR_TIMEOUT; /* нет published */ + if (*seq_inout != UINT64_MAX && cur <= *seq_inout) { + return CUFRAMES_ERR_TIMEOUT; + } + + /* Calculate the next seq we want (handle первый read с UINT64_MAX → start с 0) */ + uint64_t want_seq = (*seq_inout == UINT64_MAX) ? 0 : (*seq_inout + 1); + + /* Если want_seq < cur и slot уже перезаписан — попадаем в OVERRUN */ + if (cur - want_seq >= hdr->ring_slots) { + /* Скорее всего slot уже rewritten. Подсказка caller'у — resync. */ + return CUFRAMES_ERR_PACKET_OVERRUN; + } + + uint32_t slot_idx = (uint32_t)(want_seq % hdr->ring_slots); + cuframes_pkt_slot_t *slot = &cuframes_pkt_slots(hdr)[slot_idx]; + + /* Seqlock-style read: load seq, prove not overwritten после copy. */ + uint64_t s1 = atomic_load_explicit(&slot->seq, memory_order_acquire); + if (s1 != want_seq) { + /* Slot уже занят следующим packet'ом — overrun. */ + return CUFRAMES_ERR_PACKET_OVERRUN; + } + + /* Снять metadata (non-atomic — read OK поскольку post-check защищает) */ + uint64_t data_off = slot->data_offset; + uint32_t data_sz = slot->data_size; + int64_t pts = slot->pts_ns; + int64_t dts = slot->dts_ns; + uint32_t flags = slot->flags; + + if (data_sz > out_buf_max) { + return CUFRAMES_ERR_INVALID_ARG; /* caller's buf too small */ + } + + /* Copy payload */ + wraparound_memcpy_from((uint8_t *)out_buf, + cuframes_pkt_data(hdr), + hdr->data_size, data_off, data_sz); + + /* Post-check: slot->seq не изменился во время copy. */ + uint64_t s2 = atomic_load_explicit(&slot->seq, memory_order_acquire); + if (s2 != want_seq) { + return CUFRAMES_ERR_PACKET_OVERRUN; + } + + *out_size = data_sz; + *out_pts = pts; + *out_dts = dts; + *out_flags = flags; + *seq_inout = want_seq; + return CUFRAMES_OK; +} + +/* ─── Cleanup ─────────────────────────────────────────────────────────── */ + +void cuframes_internal_pkt_ring_destroy(cuframes_pkt_ring_t *ring) { + if (!ring) return; + + if (ring->is_publisher && ring->hdr) { + /* Сигнализируем consumer'ам shutdown */ + atomic_store_explicit(&ring->hdr->shutdown_flag, 1, + memory_order_release); + } + + if (ring->shm_base && ring->shm_size > 0) { + munmap(ring->shm_base, ring->shm_size); + } + if (ring->shm_fd >= 0) { + close(ring->shm_fd); + } + if (ring->is_publisher && ring->shm_name[0] != '\0') { + shm_unlink(ring->shm_name); + } + + memset(ring, 0, sizeof(*ring)); + ring->shm_fd = -1; +} diff --git a/libcuframes/src/utils.c b/libcuframes/src/utils.c index aaf576d..3c4b6f1 100644 --- a/libcuframes/src/utils.c +++ b/libcuframes/src/utils.c @@ -32,6 +32,10 @@ const char *cuframes_strerror(int err) { case CUFRAMES_ERR_FORMAT: return "unsupported format or size mismatch"; case CUFRAMES_ERR_WOULD_BLOCK: return "would block"; case CUFRAMES_ERR_TOO_MANY: return "too many subscribers (max 32)"; + case CUFRAMES_ERR_PACKET_OVERSIZED: return "packet exceeds max_packet_size"; + case CUFRAMES_ERR_NO_PACKET_RING: return "publisher has no packet ring"; + case CUFRAMES_ERR_NO_CODEC_PARAMS: return "codec extradata not set by publisher"; + case CUFRAMES_ERR_PACKET_OVERRUN: return "packet ring overrun — resync on keyframe"; case CUFRAMES_ERR_INTERNAL: return "internal error (please report)"; default: return "unknown error"; } @@ -83,6 +87,15 @@ int cuframes_internal_shm_name(const char *key, char *out, size_t out_size) { return CUFRAMES_OK; } +int cuframes_internal_pkt_shm_name(const char *key, char *out, size_t out_size) { + int r = cuframes_internal_validate_key(key); + if (r != CUFRAMES_OK) return r; + int n = snprintf(out, out_size, "%s%s%s", + CUFRAMES_SHM_PREFIX, key, CUFRAMES_PKT_SHM_SUFFIX); + if (n < 0 || (size_t)n >= out_size) return CUFRAMES_ERR_INVALID_ARG; + return CUFRAMES_OK; +} + int cuframes_internal_ensure_runtime_dir(void) { if (mkdir(CUFRAMES_RUNTIME_DIR, 0755) == 0) return CUFRAMES_OK; if (errno == EEXIST) return CUFRAMES_OK;