/* cuframes-rtsp-source — standalone tool который читает RTSP-stream, decode'ит * на CUDA через libav*, и публикует декодированные frames через cuframes. * * Целевое использование: запускается рядом с Frigate (например, в docker-compose * stack), даёт consumer'ам (cctv-companion, AI scripts, etc.) zero-copy доступ * к декодированным кадрам через cuframes IPC — без модификации Frigate. * * После Step 9 v0.2 — заменится на FFmpeg filter `vf_cuda_ipc_export` который * встраивается в существующий Frigate ffmpeg-pipeline без extra decode. * * Pipeline: * * RTSP ─► avformat_open ─► avcodec hwaccel cuda ─► AVFrame в CUDA hwframes * │ * ▼ * D2D memcpy ◄─ pre-allocated cuframes pool (EXTERNAL ownership) * │ * ▼ * cuframes_publisher_publish_external * * Build: * g++ -std=c++17 main.cpp -o cuframes-rtsp-source \ * -lavformat -lavcodec -lavutil -lcuframes -lcudart * * Usage: * cuframes-rtsp-source --rtsp rtsp://admin:pw@cam/stream --key cam1 */ extern "C" { #include #include #include #include #include #include } #include #include #include #include #include #include #include #include #include #include #include static std::atomic g_stop{0}; static void on_signal(int) { g_stop.store(1); } struct Args { std::string rtsp_url; std::string key = "cam"; std::string consumer_role = "main"; // informative int cuda_device = 0; int ring_size = 4; bool verbose = false; bool realtime = false; // emulate -re у ffmpeg CLI: sleep по pts bool loop = false; // loop input на eof (для file://) bool enable_packet_ring = false; // v0.2 — публиковать encoded packets std::string policy = "drop"; // "drop" = DROP_OLDEST, "wait" = STRICT_WAIT int ack_timeout_ms = 200; // only used при policy=wait; <=0 = infinite (unsafe) }; static void print_usage() { std::cerr << "Usage: cuframes-rtsp-source --rtsp URL --key NAME [opts]\n" "\n" "Required:\n" " --rtsp URL RTSP/HTTP URL (e.g. rtsp://admin:pw@cam/stream)\n" " --key NAME cuframes resource name (a-zA-Z0-9_-, ≤63)\n" "\n" "Optional:\n" " --cuda-device N CUDA device index (default 0)\n" " --ring N cuframes ring size (default 4, range 2..16)\n" " --realtime pace input по PTS (как ffmpeg -re; полезно для файла)\n" " --loop loop input на EOF (только для file://)\n" " --enable-packet-ring v0.2: дополнительно публиковать encoded packets\n" " (для consumer'ов с -c:v copy, Frigate record path)\n" " --policy MODE drop (default) = DROP_OLDEST — producer wrap'ает ring\n" " без ожидания consumer ack. Подходит для multi-consumer.\n" " wait = STRICT_WAIT — producer ждёт ack от всех subscribers\n" " перед overwrite. Безопаснее для frame integrity, но slow\n" " consumer задерживает all (default ack-timeout 200ms).\n" " --ack-timeout-ms N только при --policy wait. Max wait для ack (default 200).\n" " <=0 = infinite — НЕ РЕКОМЕНДУЕТСЯ (dead consumer вешает\n" " producer навсегда).\n" " --verbose debug logs\n" " -h, --help this help\n"; } static int parse_args(int argc, char **argv, Args &a) { for (int i = 1; i < argc; ++i) { std::string s = argv[i]; auto next = [&] { if (i + 1 >= argc) { print_usage(); std::exit(1); } return std::string(argv[++i]); }; if (s == "--rtsp") a.rtsp_url = next(); else if (s == "--key") a.key = next(); else if (s == "--cuda-device") a.cuda_device = std::stoi(next()); else if (s == "--ring") a.ring_size = std::stoi(next()); else if (s == "--realtime") a.realtime = true; else if (s == "--loop") a.loop = true; else if (s == "--enable-packet-ring") a.enable_packet_ring = true; else if (s == "--policy") a.policy = next(); else if (s == "--ack-timeout-ms") a.ack_timeout_ms = std::stoi(next()); else if (s == "--verbose") a.verbose = true; else if (s == "-h" || s == "--help") { print_usage(); std::exit(0); } else { std::cerr << "Unknown arg: " << s << "\n"; print_usage(); std::exit(1); } } if (a.rtsp_url.empty() || a.key.empty()) { print_usage(); return 1; } if (a.policy != "drop" && a.policy != "wait") { std::cerr << "Invalid --policy '" << a.policy << "' (use drop|wait)\n"; return 1; } if (a.policy == "wait" && a.ack_timeout_ms <= 0) { std::cerr << "WARNING: --policy wait + --ack-timeout-ms<=0 = infinite wait.\n" << " Dead consumer повесит producer навсегда. Forcing к 200ms.\n" << " Set явно --ack-timeout-ms 200 (или больше) чтобы убрать warning.\n"; a.ack_timeout_ms = 200; } return 0; } /* Get HW frame pixel format callback — выбирает CUDA pixel format */ static enum AVPixelFormat get_hw_format(AVCodecContext *ctx, const enum AVPixelFormat *fmts) { (void)ctx; for (const enum AVPixelFormat *p = fmts; *p != AV_PIX_FMT_NONE; ++p) { if (*p == AV_PIX_FMT_CUDA) return *p; } return AV_PIX_FMT_NONE; } /* HW device context для CUDA */ static int setup_hw_device(int cuda_device, AVBufferRef **out) { AVBufferRef *hw_ctx = NULL; char dev_str[16]; snprintf(dev_str, sizeof(dev_str), "%d", cuda_device); int r = av_hwdevice_ctx_create(&hw_ctx, AV_HWDEVICE_TYPE_CUDA, dev_str, NULL, 0); if (r < 0) { char err[256]; av_strerror(r, err, sizeof(err)); std::cerr << "av_hwdevice_ctx_create: " << err << "\n"; return -1; } *out = hw_ctx; return 0; } int main(int argc, char **argv) { Args a; if (parse_args(argc, argv, a) != 0) return 1; signal(SIGINT, on_signal); signal(SIGTERM, on_signal); /* libav init */ avformat_network_init(); /* Open input */ AVFormatContext *fmt = nullptr; AVDictionary *opts = nullptr; av_dict_set(&opts, "rtsp_transport", "tcp", 0); av_dict_set(&opts, "stimeout", "10000000", 0); /* 10s */ av_dict_set(&opts, "fflags", "+genpts+discardcorrupt", 0); int r = avformat_open_input(&fmt, a.rtsp_url.c_str(), nullptr, &opts); av_dict_free(&opts); if (r < 0) { char err[256]; av_strerror(r, err, sizeof(err)); std::cerr << "avformat_open_input '" << a.rtsp_url << "': " << err << "\n"; return 2; } if (avformat_find_stream_info(fmt, nullptr) < 0) { std::cerr << "avformat_find_stream_info failed\n"; avformat_close_input(&fmt); return 2; } /* Find video stream */ int video_idx = -1; for (unsigned i = 0; i < fmt->nb_streams; ++i) { if (fmt->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) { video_idx = (int)i; break; } } if (video_idx < 0) { std::cerr << "no video stream in input\n"; avformat_close_input(&fmt); return 2; } AVStream *vstream = fmt->streams[video_idx]; int width = vstream->codecpar->width; int height = vstream->codecpar->height; enum AVCodecID codec_id = vstream->codecpar->codec_id; const AVCodec *decoder = avcodec_find_decoder(codec_id); if (!decoder) { std::cerr << "no decoder for codec " << avcodec_get_name(codec_id) << "\n"; avformat_close_input(&fmt); return 2; } std::cerr << "[cuframes-src] input " << a.rtsp_url << " — " << width << "x" << height << " " << avcodec_get_name(codec_id) << "\n"; /* CUDA hwaccel setup */ AVBufferRef *hw_device = nullptr; if (setup_hw_device(a.cuda_device, &hw_device) < 0) { avformat_close_input(&fmt); return 2; } AVCodecContext *ctx = avcodec_alloc_context3(decoder); if (!ctx) { std::cerr << "alloc_context fail\n"; return 2; } if (avcodec_parameters_to_context(ctx, vstream->codecpar) < 0) { std::cerr << "parameters_to_context fail\n"; return 2; } ctx->hw_device_ctx = av_buffer_ref(hw_device); ctx->get_format = get_hw_format; r = avcodec_open2(ctx, decoder, nullptr); if (r < 0) { char err[256]; av_strerror(r, err, sizeof(err)); std::cerr << "avcodec_open2: " << err << "\n"; return 2; } /* Pre-allocate cuframes pool (NV12 — что nvdec выдаёт) */ int32_t pitch_y = 0, pitch_uv = 0; size_t frame_size = cuframes::calc_frame_size(CUFRAMES_FORMAT_NV12, width, height, &pitch_y, &pitch_uv); cudaSetDevice(a.cuda_device); std::vector pool(a.ring_size, nullptr); for (int i = 0; i < a.ring_size; ++i) { cudaError_t cerr = cudaMalloc(&pool[i], frame_size); if (cerr != cudaSuccess) { std::cerr << "cudaMalloc pool[" << i << "]: " << cudaGetErrorString(cerr) << "\n"; return 2; } } cuframes::PublisherOptions po; po.key = a.key; po.width = width; po.height = height; po.format = CUFRAMES_FORMAT_NV12; po.policy = (a.policy == "wait") ? CUFRAMES_POLICY_STRICT_WAIT : CUFRAMES_POLICY_DROP_OLDEST; po.consumer_ack_timeout_ms = a.ack_timeout_ms; po.cuda_device = a.cuda_device; po.ring_size = a.ring_size; /* для logging */ cuframes::Publisher pub(po, pool.data(), a.ring_size, frame_size); std::cerr << "[cuframes-src] publisher 'cuframes-" << a.key << "' ready, ring=" << a.ring_size << " pool_size=" << frame_size << " bytes/frame\n"; /* v0.2 — encoded packet ring (опционально). */ if (a.enable_packet_ring) { cuframes_packet_ring_options_t pkt_opts{}; pkt_opts.codec_id = (uint32_t)vstream->codecpar->codec_id; /* остальные поля = 0 → library использует defaults (64 slots, 8MiB, 2MiB max) */ pub.enable_packets(&pkt_opts); if (vstream->codecpar->extradata_size > 0 && vstream->codecpar->extradata) { pub.set_codec_extradata(vstream->codecpar->extradata, (size_t)vstream->codecpar->extradata_size); std::cerr << "[cuframes-src] packet ring active, codec_id=" << vstream->codecpar->codec_id << " extradata=" << vstream->codecpar->extradata_size << " bytes\n"; } else { std::cerr << "[cuframes-src] packet ring active, codec_id=" << vstream->codecpar->codec_id << " (no extradata in stream — will rely on in-band SPS/PPS)\n"; } } /* Stream для D2D copies */ cudaStream_t stream; cudaStreamCreate(&stream); AVPacket *pkt = av_packet_alloc(); AVFrame *frame = av_frame_alloc(); if (!pkt || !frame) return 2; int pool_idx = 0; uint64_t frame_count = 0; auto t_last_log = std::chrono::steady_clock::now(); uint64_t last_log_count = 0; /* Realtime pacing: t0 = время первого frame; ждём до t0 + pts_in_real_time */ auto rt_t0 = std::chrono::steady_clock::time_point::min(); int64_t rt_pts0 = AV_NOPTS_VALUE; AVRational stream_tb = vstream->time_base; while (!g_stop.load()) { r = av_read_frame(fmt, pkt); if (r == AVERROR_EOF) { if (a.loop) { /* Seek в начало; offset PTS чтобы избежать non-monotonic */ av_seek_frame(fmt, video_idx, 0, AVSEEK_FLAG_BACKWARD); avcodec_flush_buffers(ctx); rt_t0 = std::chrono::steady_clock::time_point::min(); rt_pts0 = AV_NOPTS_VALUE; av_packet_unref(pkt); continue; } break; } if (r < 0) { char err[256]; av_strerror(r, err, sizeof(err)); std::cerr << "[cuframes-src] av_read_frame: " << err << "\n"; av_packet_unref(pkt); std::this_thread::sleep_for(std::chrono::seconds(1)); continue; } if (pkt->stream_index != video_idx) { av_packet_unref(pkt); continue; } /* v0.2 — публикуем encoded packet в packet ring ДО decoder. Это позволяет * record-consumer'ам брать packet без второго RTSP-подключения к камере. */ if (a.enable_packet_ring) { int64_t pkt_pts_ns = (pkt->pts != AV_NOPTS_VALUE) ? av_rescale_q(pkt->pts, stream_tb, AVRational{1, 1000000000}) : cuframes::now_ns(); int64_t pkt_dts_ns = (pkt->dts != AV_NOPTS_VALUE) ? av_rescale_q(pkt->dts, stream_tb, AVRational{1, 1000000000}) : pkt_pts_ns; uint32_t pkt_flags = 0; if (pkt->flags & AV_PKT_FLAG_KEY) pkt_flags |= CUFRAMES_PKT_FLAG_KEY; if (pkt->flags & AV_PKT_FLAG_CORRUPT) pkt_flags |= CUFRAMES_PKT_FLAG_CORRUPT; #ifdef AV_PKT_FLAG_DISCONTINUITY if (pkt->flags & AV_PKT_FLAG_DISCONTINUITY) pkt_flags |= CUFRAMES_PKT_FLAG_DISCONTINUITY; #endif int prr = pub.publish_packet(pkt->data, (size_t)pkt->size, pkt_pts_ns, pkt_dts_ns, pkt_flags); if (prr != CUFRAMES_OK && a.verbose) { std::cerr << "[cuframes-src] publish_packet rc=" << prr << " size=" << pkt->size << "\n"; } } r = avcodec_send_packet(ctx, pkt); av_packet_unref(pkt); if (r < 0) continue; while (true) { r = avcodec_receive_frame(ctx, frame); if (r == AVERROR(EAGAIN) || r == AVERROR_EOF) break; if (r < 0) { std::cerr << "avcodec_receive_frame error\n"; break; } if (frame->format != AV_PIX_FMT_CUDA) { av_frame_unref(frame); continue; } /* frame->data[0] — указатель на CUDA Y plane; frame->data[1] — UV */ void *cuda_src_y = frame->data[0]; void *cuda_src_uv = frame->data[1]; int src_pitch_y = frame->linesize[0]; int src_pitch_uv = frame->linesize[1]; void *dst = pool[pool_idx]; /* D2D 2D-copy Y plane */ cudaError_t cerr = cudaMemcpy2DAsync( dst, pitch_y, cuda_src_y, src_pitch_y, width, height, cudaMemcpyDeviceToDevice, stream); if (cerr != cudaSuccess) { std::cerr << "memcpy Y: " << cudaGetErrorString(cerr) << "\n"; av_frame_unref(frame); continue; } /* UV plane (height/2 строк) */ uint8_t *dst_uv = (uint8_t *)dst + (size_t)pitch_y * height; cerr = cudaMemcpy2DAsync( dst_uv, pitch_uv, cuda_src_uv, src_pitch_uv, width, height / 2, cudaMemcpyDeviceToDevice, stream); if (cerr != cudaSuccess) { std::cerr << "memcpy UV: " << cudaGetErrorString(cerr) << "\n"; av_frame_unref(frame); continue; } /* Realtime pacing — спим до момента когда wall-clock соответствует pts */ if (a.realtime && frame->pts != AV_NOPTS_VALUE) { if (rt_pts0 == AV_NOPTS_VALUE) { rt_pts0 = frame->pts; rt_t0 = std::chrono::steady_clock::now(); } int64_t rel_pts = frame->pts - rt_pts0; double rel_sec = rel_pts * av_q2d(stream_tb); auto target = rt_t0 + std::chrono::microseconds( (int64_t)(rel_sec * 1e6)); std::this_thread::sleep_until(target); } int64_t pts_ns = cuframes::now_ns(); try { pub.publish_external(dst, stream, pts_ns); } catch (const cuframes::Error &e) { std::cerr << "publish_external: " << e.what() << "\n"; av_frame_unref(frame); continue; } pool_idx = (pool_idx + 1) % a.ring_size; frame_count++; av_frame_unref(frame); if (a.verbose && (frame_count % 100 == 0)) { auto now = std::chrono::steady_clock::now(); double dt = std::chrono::duration(now - t_last_log).count(); if (dt >= 1.0) { uint64_t delta = frame_count - last_log_count; std::cerr << "[cuframes-src] frames=" << frame_count << " (" << (delta / dt) << " fps)\n"; t_last_log = now; last_log_count = frame_count; } } } } std::cerr << "[cuframes-src] shutdown, total frames=" << frame_count << "\n"; av_frame_free(&frame); av_packet_free(&pkt); avcodec_free_context(&ctx); avformat_close_input(&fmt); av_buffer_unref(&hw_device); cudaStreamDestroy(stream); /* Publisher destructor freed first; теперь освободим pool */ /* Note: publisher уже destroyed by RAII, IPC handles closed by subscribers */ for (auto p : pool) if (p) cudaFree(p); return 0; }