Files
cuframes/tools/cuframes-rtsp-source/main.cpp
T
gx becfbebc78
release / build runtime Docker image (push) Failing after 0s
release / build source tarball (push) Successful in 2s
build / cmake build (CUDA 12.4, Ubuntu 22.04) (push) Successful in 1m39s
build / ffmpeg filter patch (out-of-tree) (push) Successful in 1m25s
test-u4-runner / u4 runner smoke test (push) Has been cancelled
cuframes-rtsp-source: + --policy + --ack-timeout-ms CLI flags
Opt-in для STRICT_WAIT policy (default остаётся DROP_OLDEST).

Use case STRICT_WAIT:
  Frame integrity критичен (e.g. recording, frame-accurate analytics).
  Producer ждёт ack от всех subscribers перед wrap ring → no torn frames.
  Trade-off: slow consumer задерживает all (default 200ms timeout затем
  subscriber dropped from bitmap).

Use case DROP_OLDEST (default):
  Low-latency real-time display (TV grid). Producer wraps freely; v0.3
  per-slot CUDA events закрывают race без waiting.

Validation: policy=wait + ack-timeout-ms<=0 = infinite hold dead consumer —
warning + force к 200ms safe default.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-24 08:47:14 +01:00

456 lines
18 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/* cuframes-rtsp-source — standalone tool который читает RTSP-stream, decode'ит
* на CUDA через libav*, и публикует декодированные frames через cuframes.
*
* Целевое использование: запускается рядом с Frigate (например, в docker-compose
* stack), даёт consumer'ам (cctv-companion, AI scripts, etc.) zero-copy доступ
* к декодированным кадрам через cuframes IPC — без модификации Frigate.
*
* После Step 9 v0.2 — заменится на FFmpeg filter `vf_cuda_ipc_export` который
* встраивается в существующий Frigate ffmpeg-pipeline без extra decode.
*
* Pipeline:
*
* RTSP ─► avformat_open ─► avcodec hwaccel cuda ─► AVFrame в CUDA hwframes
* │
* ▼
* D2D memcpy ◄─ pre-allocated cuframes pool (EXTERNAL ownership)
* │
* ▼
* cuframes_publisher_publish_external
*
* Build:
* g++ -std=c++17 main.cpp -o cuframes-rtsp-source \
* -lavformat -lavcodec -lavutil -lcuframes -lcudart
*
* Usage:
* cuframes-rtsp-source --rtsp rtsp://admin:pw@cam/stream --key cam1
*/
extern "C" {
#include <libavcodec/avcodec.h>
#include <libavformat/avformat.h>
#include <libavutil/hwcontext.h>
#include <libavutil/hwcontext_cuda.h>
#include <libavutil/opt.h>
#include <libavutil/pixdesc.h>
}
#include <cuframes/cuframes.hpp>
#include <cuda_runtime.h>
#include <atomic>
#include <chrono>
#include <csignal>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <string>
#include <thread>
static std::atomic<int> g_stop{0};
static void on_signal(int) { g_stop.store(1); }
struct Args {
std::string rtsp_url;
std::string key = "cam";
std::string consumer_role = "main"; // informative
int cuda_device = 0;
int ring_size = 4;
bool verbose = false;
bool realtime = false; // emulate -re у ffmpeg CLI: sleep по pts
bool loop = false; // loop input на eof (для file://)
bool enable_packet_ring = false; // v0.2 — публиковать encoded packets
std::string policy = "drop"; // "drop" = DROP_OLDEST, "wait" = STRICT_WAIT
int ack_timeout_ms = 200; // only used при policy=wait; <=0 = infinite (unsafe)
};
static void print_usage() {
std::cerr <<
"Usage: cuframes-rtsp-source --rtsp URL --key NAME [opts]\n"
"\n"
"Required:\n"
" --rtsp URL RTSP/HTTP URL (e.g. rtsp://admin:pw@cam/stream)\n"
" --key NAME cuframes resource name (a-zA-Z0-9_-, ≤63)\n"
"\n"
"Optional:\n"
" --cuda-device N CUDA device index (default 0)\n"
" --ring N cuframes ring size (default 4, range 2..16)\n"
" --realtime pace input по PTS (как ffmpeg -re; полезно для файла)\n"
" --loop loop input на EOF (только для file://)\n"
" --enable-packet-ring v0.2: дополнительно публиковать encoded packets\n"
" (для consumer'ов с -c:v copy, Frigate record path)\n"
" --policy MODE drop (default) = DROP_OLDEST — producer wrap'ает ring\n"
" без ожидания consumer ack. Подходит для multi-consumer.\n"
" wait = STRICT_WAIT — producer ждёт ack от всех subscribers\n"
" перед overwrite. Безопаснее для frame integrity, но slow\n"
" consumer задерживает all (default ack-timeout 200ms).\n"
" --ack-timeout-ms N только при --policy wait. Max wait для ack (default 200).\n"
" <=0 = infinite — НЕ РЕКОМЕНДУЕТСЯ (dead consumer вешает\n"
" producer навсегда).\n"
" --verbose debug logs\n"
" -h, --help this help\n";
}
static int parse_args(int argc, char **argv, Args &a) {
for (int i = 1; i < argc; ++i) {
std::string s = argv[i];
auto next = [&] {
if (i + 1 >= argc) { print_usage(); std::exit(1); }
return std::string(argv[++i]);
};
if (s == "--rtsp") a.rtsp_url = next();
else if (s == "--key") a.key = next();
else if (s == "--cuda-device") a.cuda_device = std::stoi(next());
else if (s == "--ring") a.ring_size = std::stoi(next());
else if (s == "--realtime") a.realtime = true;
else if (s == "--loop") a.loop = true;
else if (s == "--enable-packet-ring") a.enable_packet_ring = true;
else if (s == "--policy") a.policy = next();
else if (s == "--ack-timeout-ms") a.ack_timeout_ms = std::stoi(next());
else if (s == "--verbose") a.verbose = true;
else if (s == "-h" || s == "--help") { print_usage(); std::exit(0); }
else { std::cerr << "Unknown arg: " << s << "\n"; print_usage(); std::exit(1); }
}
if (a.rtsp_url.empty() || a.key.empty()) { print_usage(); return 1; }
if (a.policy != "drop" && a.policy != "wait") {
std::cerr << "Invalid --policy '" << a.policy << "' (use drop|wait)\n";
return 1;
}
if (a.policy == "wait" && a.ack_timeout_ms <= 0) {
std::cerr << "WARNING: --policy wait + --ack-timeout-ms<=0 = infinite wait.\n"
<< " Dead consumer повесит producer навсегда. Forcing к 200ms.\n"
<< " Set явно --ack-timeout-ms 200 (или больше) чтобы убрать warning.\n";
a.ack_timeout_ms = 200;
}
return 0;
}
/* Get HW frame pixel format callback — выбирает CUDA pixel format */
static enum AVPixelFormat get_hw_format(AVCodecContext *ctx,
const enum AVPixelFormat *fmts) {
(void)ctx;
for (const enum AVPixelFormat *p = fmts; *p != AV_PIX_FMT_NONE; ++p) {
if (*p == AV_PIX_FMT_CUDA) return *p;
}
return AV_PIX_FMT_NONE;
}
/* HW device context для CUDA */
static int setup_hw_device(int cuda_device, AVBufferRef **out) {
AVBufferRef *hw_ctx = NULL;
char dev_str[16];
snprintf(dev_str, sizeof(dev_str), "%d", cuda_device);
int r = av_hwdevice_ctx_create(&hw_ctx, AV_HWDEVICE_TYPE_CUDA, dev_str, NULL, 0);
if (r < 0) {
char err[256];
av_strerror(r, err, sizeof(err));
std::cerr << "av_hwdevice_ctx_create: " << err << "\n";
return -1;
}
*out = hw_ctx;
return 0;
}
int main(int argc, char **argv) {
Args a;
if (parse_args(argc, argv, a) != 0) return 1;
signal(SIGINT, on_signal);
signal(SIGTERM, on_signal);
/* libav init */
avformat_network_init();
/* Open input */
AVFormatContext *fmt = nullptr;
AVDictionary *opts = nullptr;
av_dict_set(&opts, "rtsp_transport", "tcp", 0);
av_dict_set(&opts, "stimeout", "10000000", 0); /* 10s */
av_dict_set(&opts, "fflags", "+genpts+discardcorrupt", 0);
int r = avformat_open_input(&fmt, a.rtsp_url.c_str(), nullptr, &opts);
av_dict_free(&opts);
if (r < 0) {
char err[256];
av_strerror(r, err, sizeof(err));
std::cerr << "avformat_open_input '" << a.rtsp_url << "': " << err << "\n";
return 2;
}
if (avformat_find_stream_info(fmt, nullptr) < 0) {
std::cerr << "avformat_find_stream_info failed\n";
avformat_close_input(&fmt);
return 2;
}
/* Find video stream */
int video_idx = -1;
for (unsigned i = 0; i < fmt->nb_streams; ++i) {
if (fmt->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) {
video_idx = (int)i;
break;
}
}
if (video_idx < 0) {
std::cerr << "no video stream in input\n";
avformat_close_input(&fmt);
return 2;
}
AVStream *vstream = fmt->streams[video_idx];
int width = vstream->codecpar->width;
int height = vstream->codecpar->height;
enum AVCodecID codec_id = vstream->codecpar->codec_id;
const AVCodec *decoder = avcodec_find_decoder(codec_id);
if (!decoder) {
std::cerr << "no decoder for codec " << avcodec_get_name(codec_id) << "\n";
avformat_close_input(&fmt);
return 2;
}
std::cerr << "[cuframes-src] input " << a.rtsp_url << "" << width << "x" << height
<< " " << avcodec_get_name(codec_id) << "\n";
/* CUDA hwaccel setup */
AVBufferRef *hw_device = nullptr;
if (setup_hw_device(a.cuda_device, &hw_device) < 0) {
avformat_close_input(&fmt);
return 2;
}
AVCodecContext *ctx = avcodec_alloc_context3(decoder);
if (!ctx) { std::cerr << "alloc_context fail\n"; return 2; }
if (avcodec_parameters_to_context(ctx, vstream->codecpar) < 0) {
std::cerr << "parameters_to_context fail\n"; return 2;
}
ctx->hw_device_ctx = av_buffer_ref(hw_device);
ctx->get_format = get_hw_format;
r = avcodec_open2(ctx, decoder, nullptr);
if (r < 0) {
char err[256]; av_strerror(r, err, sizeof(err));
std::cerr << "avcodec_open2: " << err << "\n";
return 2;
}
/* Pre-allocate cuframes pool (NV12 — что nvdec выдаёт) */
int32_t pitch_y = 0, pitch_uv = 0;
size_t frame_size = cuframes::calc_frame_size(CUFRAMES_FORMAT_NV12,
width, height,
&pitch_y, &pitch_uv);
cudaSetDevice(a.cuda_device);
std::vector<void *> pool(a.ring_size, nullptr);
for (int i = 0; i < a.ring_size; ++i) {
cudaError_t cerr = cudaMalloc(&pool[i], frame_size);
if (cerr != cudaSuccess) {
std::cerr << "cudaMalloc pool[" << i << "]: " << cudaGetErrorString(cerr) << "\n";
return 2;
}
}
cuframes::PublisherOptions po;
po.key = a.key;
po.width = width;
po.height = height;
po.format = CUFRAMES_FORMAT_NV12;
po.policy = (a.policy == "wait")
? CUFRAMES_POLICY_STRICT_WAIT
: CUFRAMES_POLICY_DROP_OLDEST;
po.consumer_ack_timeout_ms = a.ack_timeout_ms;
po.cuda_device = a.cuda_device;
po.ring_size = a.ring_size; /* для logging */
cuframes::Publisher pub(po, pool.data(), a.ring_size, frame_size);
std::cerr << "[cuframes-src] publisher 'cuframes-" << a.key
<< "' ready, ring=" << a.ring_size
<< " pool_size=" << frame_size << " bytes/frame\n";
/* v0.2 — encoded packet ring (опционально). */
if (a.enable_packet_ring) {
cuframes_packet_ring_options_t pkt_opts{};
pkt_opts.codec_id = (uint32_t)vstream->codecpar->codec_id;
/* остальные поля = 0 → library использует defaults (64 slots, 8MiB, 2MiB max) */
pub.enable_packets(&pkt_opts);
if (vstream->codecpar->extradata_size > 0 && vstream->codecpar->extradata) {
pub.set_codec_extradata(vstream->codecpar->extradata,
(size_t)vstream->codecpar->extradata_size);
std::cerr << "[cuframes-src] packet ring active, codec_id="
<< vstream->codecpar->codec_id
<< " extradata=" << vstream->codecpar->extradata_size
<< " bytes\n";
} else {
std::cerr << "[cuframes-src] packet ring active, codec_id="
<< vstream->codecpar->codec_id
<< " (no extradata in stream — will rely on in-band SPS/PPS)\n";
}
}
/* Stream для D2D copies */
cudaStream_t stream;
cudaStreamCreate(&stream);
AVPacket *pkt = av_packet_alloc();
AVFrame *frame = av_frame_alloc();
if (!pkt || !frame) return 2;
int pool_idx = 0;
uint64_t frame_count = 0;
auto t_last_log = std::chrono::steady_clock::now();
uint64_t last_log_count = 0;
/* Realtime pacing: t0 = время первого frame; ждём до t0 + pts_in_real_time */
auto rt_t0 = std::chrono::steady_clock::time_point::min();
int64_t rt_pts0 = AV_NOPTS_VALUE;
AVRational stream_tb = vstream->time_base;
while (!g_stop.load()) {
r = av_read_frame(fmt, pkt);
if (r == AVERROR_EOF) {
if (a.loop) {
/* Seek в начало; offset PTS чтобы избежать non-monotonic */
av_seek_frame(fmt, video_idx, 0, AVSEEK_FLAG_BACKWARD);
avcodec_flush_buffers(ctx);
rt_t0 = std::chrono::steady_clock::time_point::min();
rt_pts0 = AV_NOPTS_VALUE;
av_packet_unref(pkt);
continue;
}
break;
}
if (r < 0) {
char err[256]; av_strerror(r, err, sizeof(err));
std::cerr << "[cuframes-src] av_read_frame: " << err << "\n";
av_packet_unref(pkt);
std::this_thread::sleep_for(std::chrono::seconds(1));
continue;
}
if (pkt->stream_index != video_idx) {
av_packet_unref(pkt);
continue;
}
/* v0.2 — публикуем encoded packet в packet ring ДО decoder. Это позволяет
* record-consumer'ам брать packet без второго RTSP-подключения к камере. */
if (a.enable_packet_ring) {
int64_t pkt_pts_ns = (pkt->pts != AV_NOPTS_VALUE)
? av_rescale_q(pkt->pts, stream_tb, AVRational{1, 1000000000})
: cuframes::now_ns();
int64_t pkt_dts_ns = (pkt->dts != AV_NOPTS_VALUE)
? av_rescale_q(pkt->dts, stream_tb, AVRational{1, 1000000000})
: pkt_pts_ns;
uint32_t pkt_flags = 0;
if (pkt->flags & AV_PKT_FLAG_KEY) pkt_flags |= CUFRAMES_PKT_FLAG_KEY;
if (pkt->flags & AV_PKT_FLAG_CORRUPT) pkt_flags |= CUFRAMES_PKT_FLAG_CORRUPT;
#ifdef AV_PKT_FLAG_DISCONTINUITY
if (pkt->flags & AV_PKT_FLAG_DISCONTINUITY) pkt_flags |= CUFRAMES_PKT_FLAG_DISCONTINUITY;
#endif
int prr = pub.publish_packet(pkt->data, (size_t)pkt->size,
pkt_pts_ns, pkt_dts_ns, pkt_flags);
if (prr != CUFRAMES_OK && a.verbose) {
std::cerr << "[cuframes-src] publish_packet rc=" << prr
<< " size=" << pkt->size << "\n";
}
}
r = avcodec_send_packet(ctx, pkt);
av_packet_unref(pkt);
if (r < 0) continue;
while (true) {
r = avcodec_receive_frame(ctx, frame);
if (r == AVERROR(EAGAIN) || r == AVERROR_EOF) break;
if (r < 0) {
std::cerr << "avcodec_receive_frame error\n";
break;
}
if (frame->format != AV_PIX_FMT_CUDA) {
av_frame_unref(frame);
continue;
}
/* frame->data[0] — указатель на CUDA Y plane; frame->data[1] — UV */
void *cuda_src_y = frame->data[0];
void *cuda_src_uv = frame->data[1];
int src_pitch_y = frame->linesize[0];
int src_pitch_uv = frame->linesize[1];
void *dst = pool[pool_idx];
/* D2D 2D-copy Y plane */
cudaError_t cerr = cudaMemcpy2DAsync(
dst, pitch_y, cuda_src_y, src_pitch_y, width, height,
cudaMemcpyDeviceToDevice, stream);
if (cerr != cudaSuccess) {
std::cerr << "memcpy Y: " << cudaGetErrorString(cerr) << "\n";
av_frame_unref(frame);
continue;
}
/* UV plane (height/2 строк) */
uint8_t *dst_uv = (uint8_t *)dst + (size_t)pitch_y * height;
cerr = cudaMemcpy2DAsync(
dst_uv, pitch_uv, cuda_src_uv, src_pitch_uv, width, height / 2,
cudaMemcpyDeviceToDevice, stream);
if (cerr != cudaSuccess) {
std::cerr << "memcpy UV: " << cudaGetErrorString(cerr) << "\n";
av_frame_unref(frame);
continue;
}
/* Realtime pacing — спим до момента когда wall-clock соответствует pts */
if (a.realtime && frame->pts != AV_NOPTS_VALUE) {
if (rt_pts0 == AV_NOPTS_VALUE) {
rt_pts0 = frame->pts;
rt_t0 = std::chrono::steady_clock::now();
}
int64_t rel_pts = frame->pts - rt_pts0;
double rel_sec = rel_pts * av_q2d(stream_tb);
auto target = rt_t0 + std::chrono::microseconds(
(int64_t)(rel_sec * 1e6));
std::this_thread::sleep_until(target);
}
int64_t pts_ns = cuframes::now_ns();
try {
pub.publish_external(dst, stream, pts_ns);
} catch (const cuframes::Error &e) {
std::cerr << "publish_external: " << e.what() << "\n";
av_frame_unref(frame);
continue;
}
pool_idx = (pool_idx + 1) % a.ring_size;
frame_count++;
av_frame_unref(frame);
if (a.verbose && (frame_count % 100 == 0)) {
auto now = std::chrono::steady_clock::now();
double dt = std::chrono::duration<double>(now - t_last_log).count();
if (dt >= 1.0) {
uint64_t delta = frame_count - last_log_count;
std::cerr << "[cuframes-src] frames=" << frame_count
<< " (" << (delta / dt) << " fps)\n";
t_last_log = now;
last_log_count = frame_count;
}
}
}
}
std::cerr << "[cuframes-src] shutdown, total frames=" << frame_count << "\n";
av_frame_free(&frame);
av_packet_free(&pkt);
avcodec_free_context(&ctx);
avformat_close_input(&fmt);
av_buffer_unref(&hw_device);
cudaStreamDestroy(stream);
/* Publisher destructor freed first; теперь освободим pool */
/* Note: publisher уже destroyed by RAII, IPC handles closed by subscribers */
for (auto p : pool) if (p) cudaFree(p);
return 0;
}