Files
cuframes/libcuframes/src/consumer.c
T
gx 4862247fe2
build / cmake build (CUDA 12.4, Ubuntu 22.04) (push) Successful in 1m46s
build / ffmpeg filter patch (out-of-tree) (push) Failing after 1m30s
v0.4: VMM + POSIX FD — namespace decoupling (no pid share required)
Заменяет cudaMalloc + cudaIpcGetMemHandle на cuMemCreate (VMM) +
cuMemExportToShareableHandle(POSIX_FILE_DESCRIPTOR). FDs передаются consumer'у
через sendmsg(SCM_RIGHTS) в handshake. Frigate (s6-overlay не даёт share PID)
и любой другой consumer работают БЕЗ pid namespace share — только volume mount
unix socket'a /run/cuframes и IPC share для /dev/shm header.

Sync: cudaEventRecord+IPC events → cuStreamSynchronize в do_publish.
Producer ждёт ~1 ms что stream flush'нулся, потом atomic_store(seq).
Consumer читает seq через memory_order_acquire и копирует DtoD без
event wait — HW coherence гарантирована на одном GPU.

ABI break (согласован с user'ом):
  - magic 0xCC7C1DCC → 0xCC7C1DCE (старые consumers fail cleanly)
  - protocol V3 → V4
  - libcuframes.so.0 SOVERSION остаётся, но .so.0.3.0 → .so.0.4.0
  - EXTERNAL ownership убран (VMM требует cuMemCreate-allocated memory,
    нельзя export'нуть произвольный cudaMalloc-pointer как POSIX FD)
  - cuframes-rtsp-source переведён на LIBRARY mode + один D2D memcpy
    в acquire'нутый slot (overhead малый — публишер всё равно делал такой
    D2D из FFmpeg hwframe pool в EXTERNAL pool раньше)

Размер: granularity 2 MB на 5090 → NV12 1920×1080 (~3.1 MB) округляется до
4 MB, +1 MB на slot × 16 × 4 камеры = +64 MB VRAM. Терпимо.

Packet ring (cuframes_packets://) НЕ затронут — отдельный SHM с своим
magic, работает как раньше.

PoC + smoke в spike/:
  - vmm_fd_pingpong/ — minimal cuMemCreate+FD round-trip
  - smoke_v04/ — full publisher+subscriber, 100/100 frames без pid share

Base image: Dockerfile.runtime → CUDA 12.4 (был 13.0). Matching prod
pipeline + Frigate base, иначе libcudart conflict при load.

Compose stack (localhost-infra repo) — параллельный commit:
  - убран pid: container:cuframes-pub-parking из subscribers
  - image теги: gx/cuframes:0.4, gx/cuda-grid-pipeline:phase8,
    gx/frigate:cuframes-v0.4

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-25 20:13:31 +01:00

580 lines
23 KiB
C
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/* Subscriber implementation (sync).
*
* v0.4 — VMM + POSIX FD. Принимает FDs через SCM_RIGHTS в handshake,
* импортирует через cuMemImportFromShareableHandle + cuMemMap. Не требует
* shared pid/ipc namespace с producer'ом.
*
* Sync: producer cuStreamSynchronize'ит свой stream перед atomic_store(seq).
* Consumer просто читает seq (acquire) и копирует данные через DtoD memcpy —
* никаких cudaEventWait не нужно (HW coherence на одном GPU).
*/
#include "internal.h"
#include <errno.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <time.h>
#include <unistd.h>
/* Opaque frame — выдаётся subscriber'у на next() */
struct cuframes_frame {
void *cuda_ptr;
cuframes_format_t format;
int32_t width;
int32_t height;
int32_t pitch_y;
int32_t pitch_uv;
uint64_t seq;
int64_t pts_ns;
uint32_t slot_idx;
void *subscriber;
};
struct cuframes_packet {
uint8_t *data;
size_t capacity;
size_t size;
int64_t pts_ns;
int64_t dts_ns;
uint32_t flags;
uint64_t seq;
};
struct cuframes_subscriber {
cuframes_subscriber_config_t cfg;
char key[CUFRAMES_MAX_KEY_LEN + 1];
int sock_fd;
int shm_fd;
cuframes_shm_header_t *hdr;
char shm_name[80];
/* v0.4 — VMM imported slots */
CUmemGenericAllocationHandle vmm_handles[CUFRAMES_MAX_RING];
CUdeviceptr vmm_ptrs[CUFRAMES_MAX_RING];
size_t vmm_slot_size;
int imported_count;
uint32_t assigned_bit;
uint64_t last_seen_seq;
struct cuframes_frame frame_obj;
int frame_busy;
int has_pkt_ring;
cuframes_pkt_ring_t pkt_ring;
uint64_t last_packet_seq;
struct cuframes_packet packet_obj;
int packet_busy;
};
static const char *cu_err_str(CUresult r) {
const char *s = NULL;
cuGetErrorString(r, &s);
return s ? s : "?";
}
/* ─── Frame accessors ────────────────────────────────────────────────── */
void *cuframes_frame_cuda_ptr(const cuframes_frame_t *f) { return f ? f->cuda_ptr : NULL; }
cuframes_format_t cuframes_frame_format(const cuframes_frame_t *f) { return f ? f->format : 0; }
void cuframes_frame_size(const cuframes_frame_t *f, int32_t *w, int32_t *h) {
if (!f) return;
if (w) *w = f->width;
if (h) *h = f->height;
}
int32_t cuframes_frame_pitch_y(const cuframes_frame_t *f) { return f ? f->pitch_y : 0; }
int32_t cuframes_frame_pitch_uv(const cuframes_frame_t *f) { return f ? f->pitch_uv : 0; }
uint64_t cuframes_frame_seq(const cuframes_frame_t *f) { return f ? f->seq : 0; }
int64_t cuframes_frame_pts_ns(const cuframes_frame_t *f) { return f ? f->pts_ns : 0; }
/* ─── Subscriber create ──────────────────────────────────────────────── */
static int do_handshake(struct cuframes_subscriber *sub, const char *name,
int *fds_out, uint32_t *fd_count_inout,
uint64_t *slot_size_out) {
/* Send HELLO_REQ — proto v4 */
uint8_t buf[CUFRAMES_MAX_MSG_PAYLOAD];
cuframes_msg_hello_req_t *hreq = (cuframes_msg_hello_req_t *)buf;
hreq->proto_version = CUFRAMES_PROTOCOL_V4;
uint32_t nl = name ? (uint32_t)strlen(name) : 0;
if (nl > 31) nl = 31;
hreq->consumer_name_len = nl;
if (nl > 0) memcpy(buf + sizeof(*hreq), name, nl);
uint8_t *tail = buf + sizeof(*hreq) + nl;
int32_t cuda_dev = sub->cfg.cuda_device;
uint32_t mode = (uint32_t)sub->cfg.mode;
memcpy(tail, &cuda_dev, 4);
memcpy(tail + 4, &mode, 4);
memset(tail + 8, 0, 12);
uint32_t plen = (uint32_t)(sizeof(*hreq) + nl + 20);
int r = cuframes_internal_send_msg(sub->sock_fd, CUFRAMES_MSG_HELLO_REQ,
buf, plen);
if (r != CUFRAMES_OK) return r;
uint32_t rmt = 0, rpl = sizeof(buf);
r = cuframes_internal_recv_msg(sub->sock_fd, &rmt, buf, &rpl, 5000);
if (r != CUFRAMES_OK) return r;
if (rmt != CUFRAMES_MSG_HELLO_RESP) return CUFRAMES_ERR_PROTOCOL;
cuframes_msg_hello_resp_t *hresp = (cuframes_msg_hello_resp_t *)buf;
if (hresp->result != CUFRAMES_OK) return hresp->result;
if (hresp->proto_version_actual != CUFRAMES_PROTOCOL_V4) {
CUFRAMES_LOG_ERROR("publisher proto v%u — нужен v%u (v0.4)",
hresp->proto_version_actual, CUFRAMES_PROTOCOL_V4);
return CUFRAMES_ERR_PROTOCOL;
}
/* Send SUBSCRIBE_REQ */
uint32_t srbuf[8];
srbuf[0] = CUFRAMES_PROTOCOL_V4;
memset(srbuf + 1, 0, 28);
r = cuframes_internal_send_msg(sub->sock_fd, CUFRAMES_MSG_SUBSCRIBE_REQ,
srbuf, sizeof(srbuf));
if (r != CUFRAMES_OK) return r;
/* Recv SUBSCRIBE_RESP */
cuframes_msg_subscribe_resp_t sresp;
rmt = 0; rpl = sizeof(sresp);
r = cuframes_internal_recv_msg(sub->sock_fd, &rmt, &sresp, &rpl, 5000);
if (r != CUFRAMES_OK) return r;
if (rmt != CUFRAMES_MSG_SUBSCRIBE_RESP) return CUFRAMES_ERR_PROTOCOL;
if (sresp.result != CUFRAMES_OK) return sresp.result;
sub->assigned_bit = sresp.assigned_bit;
sub->last_seen_seq = sresp.initial_seq;
/* Recv VMM_FDS */
cuframes_msg_vmm_fds_t vmm_payload = {0};
uint32_t vmm_plen = sizeof(vmm_payload);
rmt = 0;
r = cuframes_internal_recv_msg_with_fds(sub->sock_fd, &rmt,
&vmm_payload, &vmm_plen,
fds_out, fd_count_inout, 5000);
if (r != CUFRAMES_OK) {
CUFRAMES_LOG_ERROR("recv VMM_FDS: %s", cuframes_strerror(r));
return r;
}
if (rmt != CUFRAMES_MSG_VMM_FDS) {
CUFRAMES_LOG_ERROR("expected VMM_FDS got 0x%x", rmt);
return CUFRAMES_ERR_PROTOCOL;
}
if (vmm_payload.fd_count != *fd_count_inout) {
CUFRAMES_LOG_ERROR("VMM_FDS: payload fd_count=%u, received %u",
vmm_payload.fd_count, *fd_count_inout);
return CUFRAMES_ERR_PROTOCOL;
}
*slot_size_out = vmm_payload.slot_size_bytes;
return CUFRAMES_OK;
}
int cuframes_subscriber_create(const cuframes_subscriber_config_t *cfg,
cuframes_subscriber_t **out) {
if (!cfg || !cfg->key || !out) return CUFRAMES_ERR_INVALID_ARG;
if (cuframes_internal_validate_key(cfg->key) != CUFRAMES_OK)
return CUFRAMES_ERR_INVALID_ARG;
struct cuframes_subscriber *sub = calloc(1, sizeof(*sub));
if (!sub) return CUFRAMES_ERR_OUT_OF_MEMORY;
sub->cfg = *cfg;
strncpy(sub->key, cfg->key, sizeof(sub->key) - 1);
sub->sock_fd = -1;
sub->shm_fd = -1;
char name_buf[32];
const char *name = cfg->consumer_name;
if (!name) {
snprintf(name_buf, sizeof(name_buf), "sub-%d-%lx",
(int)getpid(), (unsigned long)cuframes_now_ns() & 0xFFFFu);
name = name_buf;
}
char sock_path[128];
int r = cuframes_internal_socket_path(cfg->key, sock_path, sizeof(sock_path));
if (r != CUFRAMES_OK) { free(sub); return r; }
int64_t deadline = cfg->connect_timeout_ms > 0
? cuframes_now_ns() + (int64_t)cfg->connect_timeout_ms * 1000000LL
: 0;
while (1) {
sub->sock_fd = socket(AF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0);
if (sub->sock_fd < 0) { r = CUFRAMES_ERR_IO; goto fail; }
struct sockaddr_un sa = {.sun_family = AF_UNIX};
strncpy(sa.sun_path, sock_path, sizeof(sa.sun_path) - 1);
if (connect(sub->sock_fd, (struct sockaddr *)&sa, sizeof(sa)) == 0) break;
close(sub->sock_fd);
sub->sock_fd = -1;
if (cfg->connect_timeout_ms == 0) { r = CUFRAMES_ERR_NOT_FOUND; goto fail; }
if (deadline && cuframes_now_ns() > deadline) { r = CUFRAMES_ERR_TIMEOUT; goto fail; }
struct timespec ts = {.tv_sec = 0, .tv_nsec = 100000000};
nanosleep(&ts, NULL);
}
/* Handshake (включая VMM_FDS) */
int fds[CUFRAMES_MAX_RING];
for (int i = 0; i < CUFRAMES_MAX_RING; i++) fds[i] = -1;
uint32_t fd_count = CUFRAMES_MAX_RING;
uint64_t slot_size = 0;
r = do_handshake(sub, name, fds, &fd_count, &slot_size);
if (r != CUFRAMES_OK) goto fail;
/* Open SHM (для seq atomics + meta) */
r = cuframes_internal_shm_name(cfg->key, sub->shm_name, sizeof(sub->shm_name));
if (r != CUFRAMES_OK) goto fail_close_fds;
sub->shm_fd = shm_open(sub->shm_name, O_RDWR, 0);
if (sub->shm_fd < 0) {
CUFRAMES_LOG_ERROR("shm_open %s: %s", sub->shm_name, strerror(errno));
r = CUFRAMES_ERR_IO; goto fail_close_fds;
}
sub->hdr = mmap(NULL, sizeof(cuframes_shm_header_t),
PROT_READ | PROT_WRITE, MAP_SHARED, sub->shm_fd, 0);
if (sub->hdr == MAP_FAILED) {
sub->hdr = NULL;
r = CUFRAMES_ERR_IO; goto fail_close_fds;
}
if (sub->hdr->magic != CUFRAMES_MAGIC) {
if (sub->hdr->magic == CUFRAMES_MAGIC_LEGACY) {
CUFRAMES_LOG_ERROR("publisher uses legacy v0.1-v0.3 SHM — нужен v0.4 publisher");
} else {
CUFRAMES_LOG_ERROR("SHM magic mismatch: 0x%x", sub->hdr->magic);
}
r = CUFRAMES_ERR_PROTOCOL; goto fail_close_fds;
}
if (sub->hdr->proto_version != CUFRAMES_PROTOCOL_V4) {
CUFRAMES_LOG_ERROR("SHM proto v%u — нужен v%u",
sub->hdr->proto_version, CUFRAMES_PROTOCOL_V4);
r = CUFRAMES_ERR_PROTOCOL; goto fail_close_fds;
}
/* CUDA driver init + import VMM handles */
CUresult cr = cuInit(0);
if (cr != CUDA_SUCCESS) {
CUFRAMES_LOG_ERROR("cuInit: %s", cu_err_str(cr));
r = CUFRAMES_ERR_CUDA; goto fail_close_fds;
}
/* Ensure a runtime context exists (cudaMemcpyAsync from this pool needs it) */
cudaError_t cerr = cudaSetDevice(sub->cfg.cuda_device);
if (cerr != cudaSuccess) {
CUFRAMES_LOG_ERROR("cudaSetDevice: %s", cudaGetErrorString(cerr));
r = CUFRAMES_ERR_CUDA; goto fail_close_fds;
}
CUmemAccessDesc access = {0};
access.location.type = CU_MEM_LOCATION_TYPE_DEVICE;
access.location.id = sub->cfg.cuda_device;
access.flags = CU_MEM_ACCESS_FLAGS_PROT_READWRITE;
sub->vmm_slot_size = (size_t)slot_size;
sub->imported_count = 0;
for (uint32_t i = 0; i < fd_count; ++i) {
cr = cuMemImportFromShareableHandle(&sub->vmm_handles[i],
(void *)(uintptr_t)fds[i],
CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR);
if (cr != CUDA_SUCCESS) {
CUFRAMES_LOG_ERROR("cuMemImportFromShareableHandle slot %u: %s",
i, cu_err_str(cr));
r = CUFRAMES_ERR_CUDA; goto fail_unmap;
}
/* После import можно закрыть FD — kernel держит reference через handle */
close(fds[i]);
fds[i] = -1;
cr = cuMemAddressReserve(&sub->vmm_ptrs[i], sub->vmm_slot_size, 0, 0, 0);
if (cr != CUDA_SUCCESS) {
CUFRAMES_LOG_ERROR("cuMemAddressReserve slot %u: %s",
i, cu_err_str(cr));
r = CUFRAMES_ERR_CUDA; goto fail_unmap;
}
cr = cuMemMap(sub->vmm_ptrs[i], sub->vmm_slot_size, 0,
sub->vmm_handles[i], 0);
if (cr != CUDA_SUCCESS) {
CUFRAMES_LOG_ERROR("cuMemMap slot %u: %s", i, cu_err_str(cr));
r = CUFRAMES_ERR_CUDA; goto fail_unmap;
}
cr = cuMemSetAccess(sub->vmm_ptrs[i], sub->vmm_slot_size, &access, 1);
if (cr != CUDA_SUCCESS) {
CUFRAMES_LOG_ERROR("cuMemSetAccess slot %u: %s", i, cu_err_str(cr));
r = CUFRAMES_ERR_CUDA; goto fail_unmap;
}
sub->imported_count++;
}
CUFRAMES_LOG_INFO("subscriber '%s' connected to '%s' (bit=%u, ring=%u, v0.4 VMM)",
name, sub->key, sub->assigned_bit, fd_count);
*out = sub;
return CUFRAMES_OK;
fail_unmap:
/* Cleanup partial VMM */
for (int i = 0; i < sub->imported_count; i++) {
if (sub->vmm_ptrs[i]) {
cuMemUnmap(sub->vmm_ptrs[i], sub->vmm_slot_size);
cuMemAddressFree(sub->vmm_ptrs[i], sub->vmm_slot_size);
}
if (sub->vmm_handles[i]) cuMemRelease(sub->vmm_handles[i]);
}
fail_close_fds:
for (int i = 0; i < CUFRAMES_MAX_RING; i++) {
if (fds[i] >= 0) close(fds[i]);
}
fail:
cuframes_subscriber_destroy(sub);
return r;
}
int cuframes_subscriber_next(cuframes_subscriber_t *sub,
void *consumer_stream,
cuframes_frame_t **frame_out,
int32_t timeout_ms) {
if (!sub || !frame_out) return CUFRAMES_ERR_INVALID_ARG;
if (sub->frame_busy) return CUFRAMES_ERR_INVALID_ARG;
if (atomic_load_explicit(&sub->hdr->shutdown_flag,
memory_order_acquire) != 0) {
return CUFRAMES_ERR_DISCONNECTED;
}
(void)consumer_stream; /* v0.4: producer уже StreamSync'нул, sync не нужен */
int64_t deadline = (timeout_ms > 0)
? cuframes_now_ns() + (int64_t)timeout_ms * 1000000LL
: 0;
while (1) {
uint64_t gs = atomic_load_explicit(&sub->hdr->global_seq,
memory_order_acquire);
if (gs != UINT64_MAX && gs != sub->last_seen_seq) {
uint64_t target_seq;
if (sub->cfg.mode == CUFRAMES_MODE_NEWEST_ONLY) {
target_seq = gs;
} else {
if (sub->last_seen_seq == UINT64_MAX) {
target_seq = gs;
} else if (gs > sub->last_seen_seq + (uint64_t)sub->hdr->ring_size) {
return CUFRAMES_ERR_DISCONNECTED;
} else {
target_seq = sub->last_seen_seq + 1;
}
}
uint32_t slot_idx = (uint32_t)(target_seq % sub->hdr->ring_size);
uint64_t slot_seq = atomic_load_explicit(&sub->hdr->slots[slot_idx].seq,
memory_order_acquire);
if (slot_seq != target_seq) {
continue;
}
int64_t pts = atomic_load_explicit(&sub->hdr->slots[slot_idx].pts_ns,
memory_order_acquire);
/* v0.4: producer уже cuStreamSynchronize'нул перед atomic_store seq.
* Данные физически в GPU memory к моменту acquire fence. Post-sync
* verify оставляем — defending against ring wrap pока мы читали pts. */
uint64_t verify_seq = atomic_load_explicit(&sub->hdr->slots[slot_idx].seq,
memory_order_acquire);
if (verify_seq != target_seq) {
continue;
}
struct cuframes_frame *f = &sub->frame_obj;
f->cuda_ptr = (void *)(uintptr_t)sub->vmm_ptrs[slot_idx];
f->format = (cuframes_format_t)sub->hdr->meta.format;
f->width = sub->hdr->meta.width;
f->height = sub->hdr->meta.height;
f->pitch_y = sub->hdr->meta.pitch_y;
f->pitch_uv = sub->hdr->meta.pitch_uv;
f->seq = target_seq;
f->pts_ns = pts;
f->slot_idx = slot_idx;
f->subscriber = sub;
sub->frame_busy = 1;
sub->last_seen_seq = target_seq;
*frame_out = f;
return CUFRAMES_OK;
}
if (timeout_ms == 0) return CUFRAMES_ERR_WOULD_BLOCK;
if (timeout_ms > 0 && cuframes_now_ns() > deadline) return CUFRAMES_ERR_TIMEOUT;
struct timespec ts = {.tv_sec = 0, .tv_nsec = 50000};
nanosleep(&ts, NULL);
if (atomic_load_explicit(&sub->hdr->shutdown_flag,
memory_order_acquire) != 0) {
return CUFRAMES_ERR_DISCONNECTED;
}
}
}
int cuframes_subscriber_release(cuframes_subscriber_t *sub,
cuframes_frame_t *frame) {
if (!frame) return CUFRAMES_OK;
if (!sub || frame->subscriber != sub) return CUFRAMES_ERR_INVALID_ARG;
if (sub->assigned_bit > 0 && sub->assigned_bit < 64) {
atomic_fetch_or_explicit(&sub->hdr->slots[frame->slot_idx].ack_bitmap,
1ULL << sub->assigned_bit,
memory_order_release);
atomic_store_explicit(&sub->hdr->subscribers[sub->assigned_bit].last_seen_seq,
frame->seq, memory_order_release);
atomic_store_explicit(&sub->hdr->subscribers[sub->assigned_bit].last_ack_ns,
cuframes_now_ns(), memory_order_release);
}
frame->cuda_ptr = NULL;
frame->subscriber = NULL;
sub->frame_busy = 0;
return CUFRAMES_OK;
}
int cuframes_subscriber_destroy(cuframes_subscriber_t *sub) {
if (!sub) return CUFRAMES_OK;
if (sub->hdr && sub->assigned_bit > 0) {
atomic_fetch_and_explicit(&sub->hdr->subscriber_bitmap,
~(1ULL << sub->assigned_bit),
memory_order_release);
atomic_store_explicit(&sub->hdr->subscribers[sub->assigned_bit].state,
0, memory_order_release);
}
/* VMM cleanup */
for (int i = 0; i < sub->imported_count; i++) {
if (sub->vmm_ptrs[i]) {
cuMemUnmap(sub->vmm_ptrs[i], sub->vmm_slot_size);
cuMemAddressFree(sub->vmm_ptrs[i], sub->vmm_slot_size);
}
if (sub->vmm_handles[i]) cuMemRelease(sub->vmm_handles[i]);
}
if (sub->has_pkt_ring) {
cuframes_internal_pkt_ring_destroy(&sub->pkt_ring);
}
if (sub->packet_obj.data) {
free(sub->packet_obj.data);
sub->packet_obj.data = NULL;
}
if (sub->hdr) munmap(sub->hdr, sizeof(cuframes_shm_header_t));
if (sub->shm_fd >= 0) close(sub->shm_fd);
if (sub->sock_fd >= 0) close(sub->sock_fd);
free(sub);
return CUFRAMES_OK;
}
/* ─────────────────────────────────────────────────────────────────────── */
/* v0.2 — encoded packet ring API (см. docs/protocol.md §10) */
/* ─────────────────────────────────────────────────────────────────────── */
const void *cuframes_packet_data(const cuframes_packet_t *p) { return p ? p->data : NULL; }
size_t cuframes_packet_size(const cuframes_packet_t *p) { return p ? p->size : 0; }
int64_t cuframes_packet_pts(const cuframes_packet_t *p) { return p ? p->pts_ns : 0; }
int64_t cuframes_packet_dts(const cuframes_packet_t *p) { return p ? p->dts_ns : 0; }
uint32_t cuframes_packet_flags(const cuframes_packet_t *p) { return p ? p->flags : 0; }
uint64_t cuframes_packet_seq(const cuframes_packet_t *p) { return p ? p->seq : 0; }
int cuframes_subscriber_enable_packets(cuframes_subscriber_t *sub) {
if (!sub) return CUFRAMES_ERR_INVALID_ARG;
if (sub->has_pkt_ring) return CUFRAMES_OK;
char pkt_name[128];
int r = cuframes_internal_pkt_shm_name(sub->key, pkt_name, sizeof(pkt_name));
if (r != CUFRAMES_OK) return r;
r = cuframes_internal_pkt_ring_open(pkt_name, &sub->pkt_ring);
if (r != CUFRAMES_OK) return r;
size_t capacity = sub->pkt_ring.hdr->data_size;
sub->packet_obj.data = (uint8_t *)malloc(capacity);
if (!sub->packet_obj.data) {
cuframes_internal_pkt_ring_destroy(&sub->pkt_ring);
return CUFRAMES_ERR_OUT_OF_MEMORY;
}
sub->packet_obj.capacity = capacity;
uint64_t kf = atomic_load_explicit(&sub->pkt_ring.hdr->last_keyframe_seq,
memory_order_acquire);
sub->last_packet_seq = (kf == UINT64_MAX) ? UINT64_MAX : kf - 1;
sub->has_pkt_ring = 1;
return CUFRAMES_OK;
}
int cuframes_subscriber_next_packet(cuframes_subscriber_t *sub,
cuframes_packet_t **pkt_out,
int32_t timeout_ms) {
if (!sub || !pkt_out) return CUFRAMES_ERR_INVALID_ARG;
if (!sub->has_pkt_ring) return CUFRAMES_ERR_NO_PACKET_RING;
if (sub->packet_busy) return CUFRAMES_ERR_INVALID_ARG;
int64_t deadline_ns = (timeout_ms > 0) ?
cuframes_now_ns() + (int64_t)timeout_ms * 1000000LL : 0;
for (;;) {
size_t size = 0;
int64_t pts = 0, dts = 0;
uint32_t flags = 0;
uint64_t seq_attempt = sub->last_packet_seq;
int r = cuframes_internal_pkt_ring_read(&sub->pkt_ring,
&seq_attempt,
sub->packet_obj.data,
sub->packet_obj.capacity,
&size, &pts, &dts, &flags);
if (r == CUFRAMES_OK) {
sub->last_packet_seq = seq_attempt;
sub->packet_obj.size = size;
sub->packet_obj.pts_ns = pts;
sub->packet_obj.dts_ns = dts;
sub->packet_obj.flags = flags;
sub->packet_obj.seq = seq_attempt;
sub->packet_busy = 1;
*pkt_out = &sub->packet_obj;
return CUFRAMES_OK;
}
if (r == CUFRAMES_ERR_PACKET_OVERRUN) {
uint64_t kf = atomic_load_explicit(
&sub->pkt_ring.hdr->last_keyframe_seq, memory_order_acquire);
if (kf != UINT64_MAX) {
sub->last_packet_seq = kf - 1;
}
*pkt_out = NULL;
return CUFRAMES_ERR_PACKET_OVERRUN;
}
if (r != CUFRAMES_ERR_TIMEOUT) {
*pkt_out = NULL;
return r;
}
if (timeout_ms == 0) return CUFRAMES_ERR_WOULD_BLOCK;
if (timeout_ms > 0 && cuframes_now_ns() >= deadline_ns) {
return CUFRAMES_ERR_TIMEOUT;
}
struct timespec ts = {0, 1 * 1000 * 1000};
nanosleep(&ts, NULL);
}
}
int cuframes_subscriber_release_packet(cuframes_subscriber_t *sub,
cuframes_packet_t *pkt) {
if (!sub) return CUFRAMES_ERR_INVALID_ARG;
if (!pkt) return CUFRAMES_OK;
if (pkt != &sub->packet_obj) return CUFRAMES_ERR_INVALID_ARG;
sub->packet_busy = 0;
return CUFRAMES_OK;
}
int cuframes_subscriber_get_codec_params(cuframes_subscriber_t *sub,
uint32_t *codec_id_out,
const void **extradata_out,
size_t *extradata_size_out) {
if (!sub) return CUFRAMES_ERR_INVALID_ARG;
if (!sub->has_pkt_ring) return CUFRAMES_ERR_NO_PACKET_RING;
cuframes_pkt_header_t *hdr = sub->pkt_ring.hdr;
if (codec_id_out) *codec_id_out = hdr->codec_id;
if (extradata_out) *extradata_out = hdr->codec_extradata;
if (extradata_size_out) *extradata_size_out = hdr->codec_extradata_size;
if (hdr->codec_extradata_size == 0) return CUFRAMES_ERR_NO_CODEC_PARAMS;
return CUFRAMES_OK;
}