feat(api): public C API для packet ring (v0.2 Step 3)
build / cmake build (CUDA 12.4, Ubuntu 22.04) (pull_request) Successful in 1m36s
build / ffmpeg filter patch (out-of-tree) (pull_request) Successful in 1m24s

Публичные функции в include/cuframes/cuframes.h:
- cuframes_publisher_enable_packets(opts)  — активирует ring на
  существующем publisher'е; default sizing (64 slots, 8MiB data, 2MiB max).
- cuframes_publisher_set_codec_extradata(data, size) — SPS/PPS bytes.
- cuframes_publisher_publish_packet(data, size, pts, dts, flags)
- cuframes_subscriber_enable_packets()  — открывает packet shm у subscriber'а.
- cuframes_subscriber_next_packet(pkt_out, timeout_ms) с поллингом 1ms.
- cuframes_packet_data/size/pts/dts/flags/seq accessors.
- cuframes_subscriber_release_packet()
- cuframes_subscriber_get_codec_params()

Internal:
- producer.c: расширена struct cuframes_publisher (has_pkt_ring,
  max_packet_size, pkt_ring); cleanup в destroy(); enable_packets()
  bump'ит proto_version=2 в frames header.
- consumer.c: расширена struct cuframes_subscriber (has_pkt_ring,
  pkt_ring, last_packet_seq, packet_obj); single-packet pattern (как
  frame_obj — busy flag, переиспользование buffer). enable_packets()
  стартует с last_keyframe_seq-1 для late subscriber resync. На
  PACKET_OVERRUN автоматически resync на last_keyframe и возвращает
  ERR наружу для signalling discontinuity.

Связано: #2, PR #4.
This commit is contained in:
2026-05-19 16:27:05 +01:00
parent bd7fd95fef
commit 4cb0321a6f
3 changed files with 356 additions and 2 deletions
+136
View File
@@ -371,6 +371,142 @@ int cuframes_async_subscriber_create(const cuframes_subscriber_config_t *cfg,
*/
int cuframes_async_subscriber_destroy(cuframes_async_subscriber_t *sub);
/* ─────────────────────────────────────────────────────────────────────── */
/* Encoded packet ring API (v0.2 — см. docs/protocol.md §10) */
/* ─────────────────────────────────────────────────────────────────────── */
/** Packet flags — биты соответствуют AV_PKT_FLAG_* у FFmpeg. */
#define CUFRAMES_PKT_FLAG_KEY 0x01u /**< IDR / keyframe */
#define CUFRAMES_PKT_FLAG_CORRUPT 0x02u /**< RTP loss / damage */
#define CUFRAMES_PKT_FLAG_DISCONTINUITY 0x04u /**< gap before this packet */
#define CUFRAMES_PKT_FLAG_LAST_IN_AU 0x08u /**< последний NAL в access unit */
typedef struct cuframes_packet_ring_options {
/** Слотов в индексе ring'а. Default 64 (≈ 2 sec @ 30fps + GOP). */
uint32_t ring_slots;
/** Размер data section ring'а в байтах. Default 8 MiB. */
uint32_t data_size;
/** Sanity guard — publisher отклонит packet > этого. Default 2 MiB. */
uint32_t max_packet_size;
/** FFmpeg AV_CODEC_ID_* (H.264 = 27, HEVC = 173). */
uint32_t codec_id;
uint64_t _reserved[4];
} cuframes_packet_ring_options_t;
/**
* @brief Активировать encoded packet ring на существующем publisher'е.
*
* Создаёт дополнительный SHM `/dev/shm/cuframes-<key>-packets`. После
* этого call'а publisher шлёт packets через `cuframes_publisher_publish_packet`.
*
* Должно быть вызвано **до** первого `publish_packet` и желательно до того
* как subscribers начнут подключаться (иначе они увидят publisher без packet
* ring и не получат packets).
*
* @param pub
* @param opts NULL = default sizing (64 slots, 8MiB data, 2MiB max). codec_id=0 = unknown.
* @return CUFRAMES_ERR_ALREADY_EXISTS если ring уже активирован
*/
int cuframes_publisher_enable_packets(cuframes_publisher_t *pub,
const cuframes_packet_ring_options_t *opts);
/**
* @brief Установить codec extradata (SPS/PPS/VPS) для packet ring.
*
* Subscribers (FFmpeg demuxer) читают extradata из shared header и подставляют
* в AVCodecContext.extradata. Должно быть вызвано до того как subscribers
* захотят decode.
*
* @param size ≤ 4096 байт (CUFRAMES_PKT_EXTRADATA_MAX)
*/
int cuframes_publisher_set_codec_extradata(cuframes_publisher_t *pub,
const void *extradata, size_t size);
/**
* @brief Опубликовать encoded packet (H.264/H.265 NAL units, Annex B).
*
* Slow consumer = overwrite oldest. Late subscriber resync'нется на last
* keyframe (см. docs/protocol.md §10.14).
*
* @param flags CUFRAMES_PKT_FLAG_* (минимум KEY на IDR — критично!)
* @return CUFRAMES_ERR_NO_PACKET_RING если не вызывали enable_packets
* @return CUFRAMES_ERR_PACKET_OVERSIZED если size > max_packet_size
*/
int cuframes_publisher_publish_packet(cuframes_publisher_t *pub,
const void *data, size_t size,
int64_t pts_ns, int64_t dts_ns,
uint32_t flags);
/* ── Subscriber-side packet API ───────────────────────────────────────── */
/** Opaque packet handle. Освобождается через release_packet. */
typedef struct cuframes_packet cuframes_packet_t;
/** @brief Pointer на encoded NAL bytes. Valid до release_packet. */
const void *cuframes_packet_data(const cuframes_packet_t *p);
/** @brief Размер payload в байтах. */
size_t cuframes_packet_size(const cuframes_packet_t *p);
/** @brief Presentation timestamp (наносекунды). */
int64_t cuframes_packet_pts(const cuframes_packet_t *p);
/** @brief Decode timestamp (для B-frames pipelines). */
int64_t cuframes_packet_dts(const cuframes_packet_t *p);
/** @brief Биты CUFRAMES_PKT_FLAG_*. */
uint32_t cuframes_packet_flags(const cuframes_packet_t *p);
/** @brief Sequence number у publisher'а. */
uint64_t cuframes_packet_seq(const cuframes_packet_t *p);
/**
* @brief Активировать чтение packet ring на subscriber'е.
*
* Открывает SHM `/dev/shm/cuframes-<key>-packets` (тот же `key` что в config).
* После этого можно читать через `cuframes_subscriber_next_packet`.
*
* Subscriber может одновременно иметь frames ring и packets ring (или один из).
*
* @return CUFRAMES_ERR_NOT_FOUND если publisher не имеет packet ring
*/
int cuframes_subscriber_enable_packets(cuframes_subscriber_t *sub);
/**
* @brief Получить следующий packet.
*
* Late subscriber (первый вызов) начинает с last_keyframe_seq publisher'а
* decoder receive'нет valid stream без glitches.
*
* Полученный packet ОБЯЗАТЕЛЬНО освободить через
* cuframes_subscriber_release_packet().
*
* @param timeout_ms <0 = блокироваться, 0 = non-blocking (WOULD_BLOCK), >0 = с таймаутом
* @return CUFRAMES_ERR_PACKET_OVERRUN — subscriber отстал, resync на keyframe (library сделает автоматически на next call)
* @return CUFRAMES_ERR_DISCONNECTED — publisher shutdown
*/
int cuframes_subscriber_next_packet(cuframes_subscriber_t *sub,
cuframes_packet_t **pkt_out,
int32_t timeout_ms);
/** @brief Освободить packet handle. NULL-safe. */
int cuframes_subscriber_release_packet(cuframes_subscriber_t *sub,
cuframes_packet_t *pkt);
/**
* @brief Получить codec parameters publisher'а.
*
* `*extradata_out` — pointer в библиотечный buffer, valid пока subscriber жив.
* Caller должен скопировать данные если хочет hold past subscriber lifetime.
*
* @return CUFRAMES_ERR_NO_CODEC_PARAMS если publisher ещё не вызвал
* set_codec_extradata
*/
int cuframes_subscriber_get_codec_params(cuframes_subscriber_t *sub,
uint32_t *codec_id_out,
const void **extradata_out,
size_t *extradata_size_out);
/* ─────────────────────────────────────────────────────────────────────── */
/* Утилиты */
/* ─────────────────────────────────────────────────────────────────────── */
+154 -2
View File
@@ -6,6 +6,7 @@
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <time.h>
#include <unistd.h>
/* Opaque frame — выдаётся subscriber'у на next() */
@@ -23,6 +24,17 @@ struct cuframes_frame {
void *subscriber; /* back-ref для release() */
};
/* Opaque packet handle — single-packet pattern (как frame_obj). */
struct cuframes_packet {
uint8_t *data; /* heap buffer, allocated by subscriber на enable_packets */
size_t capacity; /* size of allocation */
size_t size; /* actual payload size */
int64_t pts_ns;
int64_t dts_ns;
uint32_t flags;
uint64_t seq;
};
struct cuframes_subscriber {
cuframes_subscriber_config_t cfg;
char key[CUFRAMES_MAX_KEY_LEN + 1];
@@ -38,10 +50,16 @@ struct cuframes_subscriber {
uint32_t assigned_bit;
uint64_t last_seen_seq;
/* Frame pool — переиспользуем одну frame_t structure (single-thread API).
* Опционально расширим до lock-free pool в v0.2 если нужен multi-frame. */
/* Frame pool — переиспользуем одну frame_t structure (single-thread API). */
struct cuframes_frame frame_obj;
int frame_busy;
/* v0.2 — packet ring (optional, opened via enable_packets). */
int has_pkt_ring;
cuframes_pkt_ring_t pkt_ring;
uint64_t last_packet_seq; /* UINT64_MAX = no packet read yet */
struct cuframes_packet packet_obj;
int packet_busy;
};
/* ─── Frame accessors ────────────────────────────────────────────────── */
@@ -347,9 +365,143 @@ int cuframes_subscriber_destroy(cuframes_subscriber_t *sub) {
if (sub->mapped_ptrs[i]) cudaIpcCloseMemHandle(sub->mapped_ptrs[i]);
}
/* Packet ring cleanup */
if (sub->has_pkt_ring) {
cuframes_internal_pkt_ring_destroy(&sub->pkt_ring);
}
if (sub->packet_obj.data) {
free(sub->packet_obj.data);
sub->packet_obj.data = NULL;
}
if (sub->hdr) munmap(sub->hdr, sizeof(cuframes_shm_header_t));
if (sub->shm_fd >= 0) close(sub->shm_fd);
if (sub->sock_fd >= 0) close(sub->sock_fd);
free(sub);
return CUFRAMES_OK;
}
/* ─────────────────────────────────────────────────────────────────────── */
/* v0.2 — encoded packet ring API (см. docs/protocol.md §10) */
/* ─────────────────────────────────────────────────────────────────────── */
/* Packet accessors */
const void *cuframes_packet_data(const cuframes_packet_t *p) { return p ? p->data : NULL; }
size_t cuframes_packet_size(const cuframes_packet_t *p) { return p ? p->size : 0; }
int64_t cuframes_packet_pts(const cuframes_packet_t *p) { return p ? p->pts_ns : 0; }
int64_t cuframes_packet_dts(const cuframes_packet_t *p) { return p ? p->dts_ns : 0; }
uint32_t cuframes_packet_flags(const cuframes_packet_t *p) { return p ? p->flags : 0; }
uint64_t cuframes_packet_seq(const cuframes_packet_t *p) { return p ? p->seq : 0; }
int cuframes_subscriber_enable_packets(cuframes_subscriber_t *sub) {
if (!sub) return CUFRAMES_ERR_INVALID_ARG;
if (sub->has_pkt_ring) return CUFRAMES_OK; /* idempotent */
char pkt_name[128];
int r = cuframes_internal_pkt_shm_name(sub->key, pkt_name, sizeof(pkt_name));
if (r != CUFRAMES_OK) return r;
r = cuframes_internal_pkt_ring_open(pkt_name, &sub->pkt_ring);
if (r != CUFRAMES_OK) return r;
/* Allocate copy-buffer (max packet size). Используем data_size как
* conservative upper bound (publisher гарантирует data_size >= max_packet_size). */
size_t capacity = sub->pkt_ring.hdr->data_size;
sub->packet_obj.data = (uint8_t *)malloc(capacity);
if (!sub->packet_obj.data) {
cuframes_internal_pkt_ring_destroy(&sub->pkt_ring);
return CUFRAMES_ERR_OUT_OF_MEMORY;
}
sub->packet_obj.capacity = capacity;
/* Start с last_keyframe_seq - 1 → первый read даст IDR (§10.14). */
uint64_t kf = atomic_load_explicit(&sub->pkt_ring.hdr->last_keyframe_seq,
memory_order_acquire);
sub->last_packet_seq = (kf == UINT64_MAX) ? UINT64_MAX : kf - 1;
sub->has_pkt_ring = 1;
return CUFRAMES_OK;
}
int cuframes_subscriber_next_packet(cuframes_subscriber_t *sub,
cuframes_packet_t **pkt_out,
int32_t timeout_ms) {
if (!sub || !pkt_out) return CUFRAMES_ERR_INVALID_ARG;
if (!sub->has_pkt_ring) return CUFRAMES_ERR_NO_PACKET_RING;
if (sub->packet_busy) return CUFRAMES_ERR_INVALID_ARG; /* previous packet not released */
int64_t deadline_ns = (timeout_ms > 0) ?
cuframes_now_ns() + (int64_t)timeout_ms * 1000000LL : 0;
for (;;) {
size_t size = 0;
int64_t pts = 0, dts = 0;
uint32_t flags = 0;
uint64_t seq_attempt = sub->last_packet_seq;
int r = cuframes_internal_pkt_ring_read(&sub->pkt_ring,
&seq_attempt,
sub->packet_obj.data,
sub->packet_obj.capacity,
&size, &pts, &dts, &flags);
if (r == CUFRAMES_OK) {
sub->last_packet_seq = seq_attempt;
sub->packet_obj.size = size;
sub->packet_obj.pts_ns = pts;
sub->packet_obj.dts_ns = dts;
sub->packet_obj.flags = flags;
sub->packet_obj.seq = seq_attempt;
sub->packet_busy = 1;
*pkt_out = &sub->packet_obj;
return CUFRAMES_OK;
}
if (r == CUFRAMES_ERR_PACKET_OVERRUN) {
/* Resync — установить last_seq = last_keyframe_seq - 1, повторить. */
uint64_t kf = atomic_load_explicit(
&sub->pkt_ring.hdr->last_keyframe_seq, memory_order_acquire);
if (kf != UINT64_MAX) {
sub->last_packet_seq = kf - 1;
}
/* Возвращаем OVERRUN наружу — caller знает что был discontinuity. */
*pkt_out = NULL;
return CUFRAMES_ERR_PACKET_OVERRUN;
}
if (r != CUFRAMES_ERR_TIMEOUT) {
*pkt_out = NULL;
return r; /* DISCONNECTED, INVALID_ARG, etc. */
}
/* TIMEOUT branch — poll/sleep */
if (timeout_ms == 0) return CUFRAMES_ERR_WOULD_BLOCK;
if (timeout_ms > 0 && cuframes_now_ns() >= deadline_ns) {
return CUFRAMES_ERR_TIMEOUT;
}
struct timespec ts = {0, 1 * 1000 * 1000}; /* 1 ms poll interval */
nanosleep(&ts, NULL);
}
}
int cuframes_subscriber_release_packet(cuframes_subscriber_t *sub,
cuframes_packet_t *pkt) {
if (!sub) return CUFRAMES_ERR_INVALID_ARG;
if (!pkt) return CUFRAMES_OK; /* NULL-safe */
if (pkt != &sub->packet_obj) return CUFRAMES_ERR_INVALID_ARG;
sub->packet_busy = 0;
return CUFRAMES_OK;
}
int cuframes_subscriber_get_codec_params(cuframes_subscriber_t *sub,
uint32_t *codec_id_out,
const void **extradata_out,
size_t *extradata_size_out) {
if (!sub) return CUFRAMES_ERR_INVALID_ARG;
if (!sub->has_pkt_ring) return CUFRAMES_ERR_NO_PACKET_RING;
cuframes_pkt_header_t *hdr = sub->pkt_ring.hdr;
if (codec_id_out) *codec_id_out = hdr->codec_id;
/* Если extradata ещё не выставлен publisher'ом — size=0, pointer ok но empty. */
if (extradata_out) *extradata_out = hdr->codec_extradata;
if (extradata_size_out) *extradata_size_out = hdr->codec_extradata_size;
if (hdr->codec_extradata_size == 0) return CUFRAMES_ERR_NO_CODEC_PARAMS;
return CUFRAMES_OK;
}
+66
View File
@@ -41,6 +41,11 @@ struct cuframes_publisher {
int accept_thread_alive;
int stop_flag;
pthread_mutex_t state_mu; /* protects subscriber connections */
/* v0.2 — encoded packet ring (optional). is_pkt_ring=1 → активирован. */
int has_pkt_ring;
uint32_t max_packet_size;
cuframes_pkt_ring_t pkt_ring;
};
/* Forward decls */
@@ -505,6 +510,12 @@ int cuframes_publisher_destroy(cuframes_publisher_t *pub) {
}
if (pub->event) cudaEventDestroy(pub->event);
/* Packet ring cleanup (если активирован) */
if (pub->has_pkt_ring) {
cuframes_internal_pkt_ring_destroy(&pub->pkt_ring);
pub->has_pkt_ring = 0;
}
/* Unlink resources */
if (pub->hdr) {
munmap(pub->hdr, sizeof(cuframes_shm_header_t));
@@ -523,6 +534,61 @@ int cuframes_publisher_destroy(cuframes_publisher_t *pub) {
return CUFRAMES_OK;
}
/* ─────────────────────────────────────────────────────────────────────── */
/* v0.2 — encoded packet ring API (см. docs/protocol.md §10) */
/* ─────────────────────────────────────────────────────────────────────── */
int cuframes_publisher_enable_packets(cuframes_publisher_t *pub,
const cuframes_packet_ring_options_t *opts) {
if (!pub) return CUFRAMES_ERR_INVALID_ARG;
if (pub->has_pkt_ring) return CUFRAMES_ERR_ALREADY_EXISTS;
uint32_t slots = opts && opts->ring_slots ? opts->ring_slots
: CUFRAMES_PKT_DEFAULT_SLOTS;
uint32_t data_size = opts && opts->data_size ? opts->data_size
: CUFRAMES_PKT_DEFAULT_DATA_SIZE;
uint32_t max_pkt = opts && opts->max_packet_size ? opts->max_packet_size
: CUFRAMES_PKT_DEFAULT_MAX_SIZE;
uint32_t codec_id = opts ? opts->codec_id : 0;
if (max_pkt > data_size) {
CUFRAMES_LOG_ERROR("max_packet_size (%u) > data_size (%u)", max_pkt, data_size);
return CUFRAMES_ERR_INVALID_ARG;
}
int r = cuframes_internal_pkt_ring_create(pub->key, slots, data_size,
codec_id, &pub->pkt_ring);
if (r != CUFRAMES_OK) return r;
pub->has_pkt_ring = 1;
pub->max_packet_size = max_pkt;
/* Bump proto_version в frames header чтобы v2-subscribers видели поддержку. */
if (pub->hdr) {
pub->hdr->proto_version = CUFRAMES_PROTOCOL_V2;
}
return CUFRAMES_OK;
}
int cuframes_publisher_set_codec_extradata(cuframes_publisher_t *pub,
const void *extradata, size_t size) {
if (!pub) return CUFRAMES_ERR_INVALID_ARG;
if (!pub->has_pkt_ring) return CUFRAMES_ERR_NO_PACKET_RING;
return cuframes_internal_pkt_ring_set_extradata(&pub->pkt_ring,
extradata, size);
}
int cuframes_publisher_publish_packet(cuframes_publisher_t *pub,
const void *data, size_t size,
int64_t pts_ns, int64_t dts_ns,
uint32_t flags) {
if (!pub) return CUFRAMES_ERR_INVALID_ARG;
if (!pub->has_pkt_ring) return CUFRAMES_ERR_NO_PACKET_RING;
if (size > pub->max_packet_size) return CUFRAMES_ERR_PACKET_OVERSIZED;
return cuframes_internal_pkt_ring_publish(&pub->pkt_ring, data, size,
pts_ns, dts_ns, flags);
}
/* ─── Accept thread + handshake ──────────────────────────────────────── */
static void *accept_thread_main(void *arg) {