/* * cuframes_packets input demuxer для FFmpeg 7.x. * * Принимает URL вида "cuframes_packets://" — подключается к * cuframes-publisher'у v0.2+ (с активированным packet ring) и выдаёт * encoded NAL units (H.264/H.265). В отличие от родственного `cuframes` * demuxer'а (NV12 raw video), этот выдаёт **encoded** stream — для * consumer'ов которые делают `-c:v copy` (record/mux without decode). * * Use case (Frigate): * detect role: cuframes://camA — decoded NV12, GPU shared * record role: cuframes_packets://camA — encoded H.264, без второго RTSP * * Лицензия: LGPL-2.1+ (соответствует libcuframes и FFmpeg LGPL builds) */ #include "libavformat/avformat.h" #include "libavformat/demux.h" #include "libavformat/internal.h" #include "libavutil/avstring.h" #include "libavutil/mem.h" #include "libavutil/opt.h" #include typedef struct CuframesPacketsDemuxerContext { const AVClass *class_; /* options */ int connect_timeout_ms; int read_timeout_ms; /* state */ cuframes_subscriber_t *sub; int64_t first_pts_ns; int got_first_pts; /* первый packet получаем в read_header чтобы узнать codec/extradata * валидно (publisher мог ещё не вызвать set_codec_extradata к моменту * subscribe — но к моменту первого keyframe — точно) */ int pending_first_packet; } CuframesPacketsDemuxerContext; #define OFFSET(x) offsetof(CuframesPacketsDemuxerContext, x) #define D AV_OPT_FLAG_DECODING_PARAM static const AVOption cuframes_packets_options[] = { { "connect_timeout", "wait for publisher (ms); -1 = forever", OFFSET(connect_timeout_ms), AV_OPT_TYPE_INT, { .i64 = 5000 }, -1, INT_MAX, D }, { "read_timeout", "wait for next packet (ms); -1 = forever", OFFSET(read_timeout_ms), AV_OPT_TYPE_INT, { .i64 = 10000 }, -1, INT_MAX, D }, { NULL } }; static const AVClass cuframes_packets_demuxer_class = { .class_name = "cuframes_packets demuxer", .item_name = av_default_item_name, .option = cuframes_packets_options, .version = LIBAVUTIL_VERSION_INT, }; static const char *parse_key(const char *url) { if (av_strstart(url, "cuframes_packets://", &url)) return url; if (av_strstart(url, "cuframes_packets:", &url)) return url; return url; } /* Map cuframes packet flags → AVPacket flags */ static int packet_flags_to_av(uint32_t cf_flags) { int f = 0; if (cf_flags & CUFRAMES_PKT_FLAG_KEY) f |= AV_PKT_FLAG_KEY; if (cf_flags & CUFRAMES_PKT_FLAG_CORRUPT) f |= AV_PKT_FLAG_CORRUPT; #ifdef AV_PKT_FLAG_DISCONTINUITY if (cf_flags & CUFRAMES_PKT_FLAG_DISCONTINUITY) f |= AV_PKT_FLAG_DISCONTINUITY; #endif return f; } static int cuframes_packets_read_header(AVFormatContext *s) { CuframesPacketsDemuxerContext *c = s->priv_data; const char *key = parse_key(s->url); if (!key || !*key) { av_log(s, AV_LOG_ERROR, "cuframes_packets: empty key in URL '%s'\n", s->url); return AVERROR(EINVAL); } /* Subscribe to frames ring first (наследие v1 socket handshake), затем * enable_packets для opening packet shm. */ cuframes_subscriber_config_t cfg = {0}; cfg.key = key; cfg.consumer_name = NULL; /* auto-generated */ cfg.mode = CUFRAMES_MODE_NEWEST_ONLY; cfg.cuda_device = 0; cfg.connect_timeout_ms = c->connect_timeout_ms; int rc = cuframes_subscriber_create(&cfg, &c->sub); if (rc != CUFRAMES_OK) { av_log(s, AV_LOG_ERROR, "cuframes_packets: subscriber_create('%s'): %s\n", key, cuframes_strerror(rc)); return AVERROR_EXTERNAL; } rc = cuframes_subscriber_enable_packets(c->sub); if (rc != CUFRAMES_OK) { av_log(s, AV_LOG_ERROR, "cuframes_packets: enable_packets('%s'): %s (publisher без packet ring?)\n", key, cuframes_strerror(rc)); return AVERROR_EXTERNAL; } /* Получаем codec params (extradata может быть пустым если publisher ещё не * установил — это OK, decoder соберёт SPS/PPS из in-band keyframes). */ uint32_t codec_id = 0; const void *extradata = NULL; size_t extradata_sz = 0; int param_rc = cuframes_subscriber_get_codec_params(c->sub, &codec_id, &extradata, &extradata_sz); if (param_rc != CUFRAMES_OK && param_rc != CUFRAMES_ERR_NO_CODEC_PARAMS) { av_log(s, AV_LOG_ERROR, "cuframes_packets: get_codec_params: %s\n", cuframes_strerror(param_rc)); return AVERROR_EXTERNAL; } if (codec_id == 0) { av_log(s, AV_LOG_WARNING, "cuframes_packets: publisher не указал codec_id, угадываем H.264\n"); codec_id = AV_CODEC_ID_H264; } AVStream *st = avformat_new_stream(s, NULL); if (!st) return AVERROR(ENOMEM); st->codecpar->codec_type = AVMEDIA_TYPE_VIDEO; st->codecpar->codec_id = (enum AVCodecID)codec_id; /* Copy extradata если есть (publisher успел вызвать set_codec_extradata) */ if (extradata_sz > 0 && extradata) { st->codecpar->extradata = av_mallocz(extradata_sz + AV_INPUT_BUFFER_PADDING_SIZE); if (!st->codecpar->extradata) return AVERROR(ENOMEM); memcpy(st->codecpar->extradata, extradata, extradata_sz); st->codecpar->extradata_size = (int)extradata_sz; av_log(s, AV_LOG_INFO, "cuframes_packets: connected to '%s' codec_id=%u extradata=%zu bytes\n", key, codec_id, extradata_sz); } else { av_log(s, AV_LOG_INFO, "cuframes_packets: connected to '%s' codec_id=%u (no extradata yet)\n", key, codec_id); } /* Timestamps: cuframes публикует pts в наносекундах. Используем µs timebase. */ avpriv_set_pts_info(st, 64, 1, 1000000); c->got_first_pts = 0; c->pending_first_packet = 1; return 0; } static int cuframes_packets_read_packet(AVFormatContext *s, AVPacket *pkt) { CuframesPacketsDemuxerContext *c = s->priv_data; cuframes_packet_t *cp = NULL; int rc = cuframes_subscriber_next_packet(c->sub, &cp, c->read_timeout_ms); if (rc == CUFRAMES_ERR_TIMEOUT || rc == CUFRAMES_ERR_WOULD_BLOCK) { return AVERROR(EAGAIN); } if (rc == CUFRAMES_ERR_DISCONNECTED) { return AVERROR_EOF; } if (rc == CUFRAMES_ERR_PACKET_OVERRUN) { /* Library уже resync'нула на last keyframe — на next call всё восстановится. * Signal upstream EAGAIN (FFmpeg-side ретрай) + warning. */ av_log(s, AV_LOG_WARNING, "cuframes_packets: overrun — slow consumer, resync to keyframe\n"); return AVERROR(EAGAIN); } if (rc != CUFRAMES_OK || !cp) { av_log(s, AV_LOG_ERROR, "cuframes_packets: next_packet: %s\n", cuframes_strerror(rc)); return AVERROR_EXTERNAL; } const void *data = cuframes_packet_data(cp); size_t size = cuframes_packet_size(cp); int64_t pts = cuframes_packet_pts(cp); int64_t dts = cuframes_packet_dts(cp); uint32_t fl = cuframes_packet_flags(cp); /* Anchor PTS на первом packet — downstream ожидает start from ~0 */ if (!c->got_first_pts) { c->first_pts_ns = pts; c->got_first_pts = 1; } int err = av_new_packet(pkt, (int)size); if (err < 0) { cuframes_subscriber_release_packet(c->sub, cp); return err; } memcpy(pkt->data, data, size); pkt->stream_index = 0; pkt->pts = (pts - c->first_pts_ns) / 1000; /* ns → µs */ pkt->dts = (dts != 0) ? (dts - c->first_pts_ns) / 1000 : pkt->pts; pkt->flags |= packet_flags_to_av(fl); cuframes_subscriber_release_packet(c->sub, cp); return 0; } static int cuframes_packets_read_close(AVFormatContext *s) { CuframesPacketsDemuxerContext *c = s->priv_data; if (c->sub) { cuframes_subscriber_destroy(c->sub); c->sub = NULL; } return 0; } static int cuframes_packets_probe(const AVProbeData *p) { /* URL-based protocol — probe не используется, см. AVFMT_NOFILE */ return 0; } const FFInputFormat ff_cuframes_packets_demuxer = { .p.name = "cuframes_packets", .p.long_name = NULL_IF_CONFIG_SMALL("cuframes encoded packet ring (H.264/H.265 passthrough)"), .p.flags = AVFMT_NOFILE, .p.priv_class = &cuframes_packets_demuxer_class, .priv_data_size = sizeof(CuframesPacketsDemuxerContext), .read_probe = cuframes_packets_probe, .read_header = cuframes_packets_read_header, .read_packet = cuframes_packets_read_packet, .read_close = cuframes_packets_read_close, };