Files
cuframes/tools/cuframes-rtsp-source/main.cpp
T
gx 4862247fe2
build / cmake build (CUDA 12.4, Ubuntu 22.04) (push) Successful in 1m46s
build / ffmpeg filter patch (out-of-tree) (push) Failing after 1m30s
v0.4: VMM + POSIX FD — namespace decoupling (no pid share required)
Заменяет cudaMalloc + cudaIpcGetMemHandle на cuMemCreate (VMM) +
cuMemExportToShareableHandle(POSIX_FILE_DESCRIPTOR). FDs передаются consumer'у
через sendmsg(SCM_RIGHTS) в handshake. Frigate (s6-overlay не даёт share PID)
и любой другой consumer работают БЕЗ pid namespace share — только volume mount
unix socket'a /run/cuframes и IPC share для /dev/shm header.

Sync: cudaEventRecord+IPC events → cuStreamSynchronize в do_publish.
Producer ждёт ~1 ms что stream flush'нулся, потом atomic_store(seq).
Consumer читает seq через memory_order_acquire и копирует DtoD без
event wait — HW coherence гарантирована на одном GPU.

ABI break (согласован с user'ом):
  - magic 0xCC7C1DCC → 0xCC7C1DCE (старые consumers fail cleanly)
  - protocol V3 → V4
  - libcuframes.so.0 SOVERSION остаётся, но .so.0.3.0 → .so.0.4.0
  - EXTERNAL ownership убран (VMM требует cuMemCreate-allocated memory,
    нельзя export'нуть произвольный cudaMalloc-pointer как POSIX FD)
  - cuframes-rtsp-source переведён на LIBRARY mode + один D2D memcpy
    в acquire'нутый slot (overhead малый — публишер всё равно делал такой
    D2D из FFmpeg hwframe pool в EXTERNAL pool раньше)

Размер: granularity 2 MB на 5090 → NV12 1920×1080 (~3.1 MB) округляется до
4 MB, +1 MB на slot × 16 × 4 камеры = +64 MB VRAM. Терпимо.

Packet ring (cuframes_packets://) НЕ затронут — отдельный SHM с своим
magic, работает как раньше.

PoC + smoke в spike/:
  - vmm_fd_pingpong/ — minimal cuMemCreate+FD round-trip
  - smoke_v04/ — full publisher+subscriber, 100/100 frames без pid share

Base image: Dockerfile.runtime → CUDA 12.4 (был 13.0). Matching prod
pipeline + Frigate base, иначе libcudart conflict при load.

Compose stack (localhost-infra repo) — параллельный commit:
  - убран pid: container:cuframes-pub-parking из subscribers
  - image теги: gx/cuframes:0.4, gx/cuda-grid-pipeline:phase8,
    gx/frigate:cuframes-v0.4

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-25 20:13:31 +01:00

455 lines
18 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/* cuframes-rtsp-source — standalone tool который читает RTSP-stream, decode'ит
* на CUDA через libav*, и публикует декодированные frames через cuframes.
*
* Целевое использование: запускается рядом с Frigate (например, в docker-compose
* stack), даёт consumer'ам (cctv-companion, AI scripts, etc.) zero-copy доступ
* к декодированным кадрам через cuframes IPC — без модификации Frigate.
*
* После Step 9 v0.2 — заменится на FFmpeg filter `vf_cuda_ipc_export` который
* встраивается в существующий Frigate ffmpeg-pipeline без extra decode.
*
* Pipeline:
*
* RTSP ─► avformat_open ─► avcodec hwaccel cuda ─► AVFrame в CUDA hwframes
* │
* ▼
* D2D memcpy ◄─ pre-allocated cuframes pool (EXTERNAL ownership)
* │
* ▼
* cuframes_publisher_publish_external
*
* Build:
* g++ -std=c++17 main.cpp -o cuframes-rtsp-source \
* -lavformat -lavcodec -lavutil -lcuframes -lcudart
*
* Usage:
* cuframes-rtsp-source --rtsp rtsp://admin:pw@cam/stream --key cam1
*/
extern "C" {
#include <libavcodec/avcodec.h>
#include <libavformat/avformat.h>
#include <libavutil/hwcontext.h>
#include <libavutil/hwcontext_cuda.h>
#include <libavutil/opt.h>
#include <libavutil/pixdesc.h>
}
#include <cuframes/cuframes.hpp>
#include <cuda_runtime.h>
#include <atomic>
#include <chrono>
#include <csignal>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <string>
#include <thread>
static std::atomic<int> g_stop{0};
static void on_signal(int) { g_stop.store(1); }
struct Args {
std::string rtsp_url;
std::string key = "cam";
std::string consumer_role = "main"; // informative
int cuda_device = 0;
int ring_size = 4;
bool verbose = false;
bool realtime = false; // emulate -re у ffmpeg CLI: sleep по pts
bool loop = false; // loop input на eof (для file://)
bool enable_packet_ring = false; // v0.2 — публиковать encoded packets
std::string policy = "drop"; // "drop" = DROP_OLDEST, "wait" = STRICT_WAIT
int ack_timeout_ms = 200; // only used при policy=wait; <=0 = infinite (unsafe)
};
static void print_usage() {
std::cerr <<
"Usage: cuframes-rtsp-source --rtsp URL --key NAME [opts]\n"
"\n"
"Required:\n"
" --rtsp URL RTSP/HTTP URL (e.g. rtsp://admin:pw@cam/stream)\n"
" --key NAME cuframes resource name (a-zA-Z0-9_-, ≤63)\n"
"\n"
"Optional:\n"
" --cuda-device N CUDA device index (default 0)\n"
" --ring N cuframes ring size (default 4, range 2..16)\n"
" --realtime pace input по PTS (как ffmpeg -re; полезно для файла)\n"
" --loop loop input на EOF (только для file://)\n"
" --enable-packet-ring v0.2: дополнительно публиковать encoded packets\n"
" (для consumer'ов с -c:v copy, Frigate record path)\n"
" --policy MODE drop (default) = DROP_OLDEST — producer wrap'ает ring\n"
" без ожидания consumer ack. Подходит для multi-consumer.\n"
" wait = STRICT_WAIT — producer ждёт ack от всех subscribers\n"
" перед overwrite. Безопаснее для frame integrity, но slow\n"
" consumer задерживает all (default ack-timeout 200ms).\n"
" --ack-timeout-ms N только при --policy wait. Max wait для ack (default 200).\n"
" <=0 = infinite — НЕ РЕКОМЕНДУЕТСЯ (dead consumer вешает\n"
" producer навсегда).\n"
" --verbose debug logs\n"
" -h, --help this help\n";
}
static int parse_args(int argc, char **argv, Args &a) {
for (int i = 1; i < argc; ++i) {
std::string s = argv[i];
auto next = [&] {
if (i + 1 >= argc) { print_usage(); std::exit(1); }
return std::string(argv[++i]);
};
if (s == "--rtsp") a.rtsp_url = next();
else if (s == "--key") a.key = next();
else if (s == "--cuda-device") a.cuda_device = std::stoi(next());
else if (s == "--ring") a.ring_size = std::stoi(next());
else if (s == "--realtime") a.realtime = true;
else if (s == "--loop") a.loop = true;
else if (s == "--enable-packet-ring") a.enable_packet_ring = true;
else if (s == "--policy") a.policy = next();
else if (s == "--ack-timeout-ms") a.ack_timeout_ms = std::stoi(next());
else if (s == "--verbose") a.verbose = true;
else if (s == "-h" || s == "--help") { print_usage(); std::exit(0); }
else { std::cerr << "Unknown arg: " << s << "\n"; print_usage(); std::exit(1); }
}
if (a.rtsp_url.empty() || a.key.empty()) { print_usage(); return 1; }
if (a.policy != "drop" && a.policy != "wait") {
std::cerr << "Invalid --policy '" << a.policy << "' (use drop|wait)\n";
return 1;
}
if (a.policy == "wait" && a.ack_timeout_ms <= 0) {
std::cerr << "WARNING: --policy wait + --ack-timeout-ms<=0 = infinite wait.\n"
<< " Dead consumer повесит producer навсегда. Forcing к 200ms.\n"
<< " Set явно --ack-timeout-ms 200 (или больше) чтобы убрать warning.\n";
a.ack_timeout_ms = 200;
}
return 0;
}
/* Get HW frame pixel format callback — выбирает CUDA pixel format */
static enum AVPixelFormat get_hw_format(AVCodecContext *ctx,
const enum AVPixelFormat *fmts) {
(void)ctx;
for (const enum AVPixelFormat *p = fmts; *p != AV_PIX_FMT_NONE; ++p) {
if (*p == AV_PIX_FMT_CUDA) return *p;
}
return AV_PIX_FMT_NONE;
}
/* HW device context для CUDA */
static int setup_hw_device(int cuda_device, AVBufferRef **out) {
AVBufferRef *hw_ctx = NULL;
char dev_str[16];
snprintf(dev_str, sizeof(dev_str), "%d", cuda_device);
int r = av_hwdevice_ctx_create(&hw_ctx, AV_HWDEVICE_TYPE_CUDA, dev_str, NULL, 0);
if (r < 0) {
char err[256];
av_strerror(r, err, sizeof(err));
std::cerr << "av_hwdevice_ctx_create: " << err << "\n";
return -1;
}
*out = hw_ctx;
return 0;
}
int main(int argc, char **argv) {
Args a;
if (parse_args(argc, argv, a) != 0) return 1;
signal(SIGINT, on_signal);
signal(SIGTERM, on_signal);
/* libav init */
avformat_network_init();
/* Open input */
AVFormatContext *fmt = nullptr;
AVDictionary *opts = nullptr;
av_dict_set(&opts, "rtsp_transport", "tcp", 0);
av_dict_set(&opts, "stimeout", "10000000", 0); /* 10s */
av_dict_set(&opts, "fflags", "+genpts+discardcorrupt", 0);
int r = avformat_open_input(&fmt, a.rtsp_url.c_str(), nullptr, &opts);
av_dict_free(&opts);
if (r < 0) {
char err[256];
av_strerror(r, err, sizeof(err));
std::cerr << "avformat_open_input '" << a.rtsp_url << "': " << err << "\n";
return 2;
}
if (avformat_find_stream_info(fmt, nullptr) < 0) {
std::cerr << "avformat_find_stream_info failed\n";
avformat_close_input(&fmt);
return 2;
}
/* Find video stream */
int video_idx = -1;
for (unsigned i = 0; i < fmt->nb_streams; ++i) {
if (fmt->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) {
video_idx = (int)i;
break;
}
}
if (video_idx < 0) {
std::cerr << "no video stream in input\n";
avformat_close_input(&fmt);
return 2;
}
AVStream *vstream = fmt->streams[video_idx];
int width = vstream->codecpar->width;
int height = vstream->codecpar->height;
enum AVCodecID codec_id = vstream->codecpar->codec_id;
const AVCodec *decoder = avcodec_find_decoder(codec_id);
if (!decoder) {
std::cerr << "no decoder for codec " << avcodec_get_name(codec_id) << "\n";
avformat_close_input(&fmt);
return 2;
}
std::cerr << "[cuframes-src] input " << a.rtsp_url << "" << width << "x" << height
<< " " << avcodec_get_name(codec_id) << "\n";
/* CUDA hwaccel setup */
AVBufferRef *hw_device = nullptr;
if (setup_hw_device(a.cuda_device, &hw_device) < 0) {
avformat_close_input(&fmt);
return 2;
}
AVCodecContext *ctx = avcodec_alloc_context3(decoder);
if (!ctx) { std::cerr << "alloc_context fail\n"; return 2; }
if (avcodec_parameters_to_context(ctx, vstream->codecpar) < 0) {
std::cerr << "parameters_to_context fail\n"; return 2;
}
ctx->hw_device_ctx = av_buffer_ref(hw_device);
ctx->get_format = get_hw_format;
r = avcodec_open2(ctx, decoder, nullptr);
if (r < 0) {
char err[256]; av_strerror(r, err, sizeof(err));
std::cerr << "avcodec_open2: " << err << "\n";
return 2;
}
/* Pre-allocate cuframes pool (NV12 — что nvdec выдаёт).
* v0.4: publisher сам аллоцирует через cuMemCreate (VMM). Раньше tool
* передавал external pool, но v0.4 не может export'нуть cudaMalloc-pointers
* как POSIX FD — VMM API требует cuMemCreate-allocated memory. */
int32_t pitch_y = 0, pitch_uv = 0;
size_t frame_size = cuframes::calc_frame_size(CUFRAMES_FORMAT_NV12,
width, height,
&pitch_y, &pitch_uv);
cudaSetDevice(a.cuda_device);
cuframes::PublisherOptions po;
po.key = a.key;
po.width = width;
po.height = height;
po.format = CUFRAMES_FORMAT_NV12;
po.policy = (a.policy == "wait")
? CUFRAMES_POLICY_STRICT_WAIT
: CUFRAMES_POLICY_DROP_OLDEST;
po.consumer_ack_timeout_ms = a.ack_timeout_ms;
po.cuda_device = a.cuda_device;
po.ring_size = a.ring_size;
cuframes::Publisher pub(po); /* LIBRARY ownership — publisher owns VMM pool */
std::cerr << "[cuframes-src] publisher 'cuframes-" << a.key
<< "' ready (v0.4 VMM), ring=" << a.ring_size
<< " frame_size=" << frame_size << " bytes\n";
/* v0.2 — encoded packet ring (опционально). */
if (a.enable_packet_ring) {
cuframes_packet_ring_options_t pkt_opts{};
pkt_opts.codec_id = (uint32_t)vstream->codecpar->codec_id;
/* остальные поля = 0 → library использует defaults (64 slots, 8MiB, 2MiB max) */
pub.enable_packets(&pkt_opts);
if (vstream->codecpar->extradata_size > 0 && vstream->codecpar->extradata) {
pub.set_codec_extradata(vstream->codecpar->extradata,
(size_t)vstream->codecpar->extradata_size);
std::cerr << "[cuframes-src] packet ring active, codec_id="
<< vstream->codecpar->codec_id
<< " extradata=" << vstream->codecpar->extradata_size
<< " bytes\n";
} else {
std::cerr << "[cuframes-src] packet ring active, codec_id="
<< vstream->codecpar->codec_id
<< " (no extradata in stream — will rely on in-band SPS/PPS)\n";
}
}
/* Stream для D2D copies */
cudaStream_t stream;
cudaStreamCreate(&stream);
AVPacket *pkt = av_packet_alloc();
AVFrame *frame = av_frame_alloc();
if (!pkt || !frame) return 2;
uint64_t frame_count = 0;
auto t_last_log = std::chrono::steady_clock::now();
uint64_t last_log_count = 0;
/* Realtime pacing: t0 = время первого frame; ждём до t0 + pts_in_real_time */
auto rt_t0 = std::chrono::steady_clock::time_point::min();
int64_t rt_pts0 = AV_NOPTS_VALUE;
AVRational stream_tb = vstream->time_base;
while (!g_stop.load()) {
r = av_read_frame(fmt, pkt);
if (r == AVERROR_EOF) {
if (a.loop) {
/* Seek в начало; offset PTS чтобы избежать non-monotonic */
av_seek_frame(fmt, video_idx, 0, AVSEEK_FLAG_BACKWARD);
avcodec_flush_buffers(ctx);
rt_t0 = std::chrono::steady_clock::time_point::min();
rt_pts0 = AV_NOPTS_VALUE;
av_packet_unref(pkt);
continue;
}
break;
}
if (r < 0) {
char err[256]; av_strerror(r, err, sizeof(err));
std::cerr << "[cuframes-src] av_read_frame: " << err << "\n";
av_packet_unref(pkt);
std::this_thread::sleep_for(std::chrono::seconds(1));
continue;
}
if (pkt->stream_index != video_idx) {
av_packet_unref(pkt);
continue;
}
/* v0.2 — публикуем encoded packet в packet ring ДО decoder. Это позволяет
* record-consumer'ам брать packet без второго RTSP-подключения к камере. */
if (a.enable_packet_ring) {
int64_t pkt_pts_ns = (pkt->pts != AV_NOPTS_VALUE)
? av_rescale_q(pkt->pts, stream_tb, AVRational{1, 1000000000})
: cuframes::now_ns();
int64_t pkt_dts_ns = (pkt->dts != AV_NOPTS_VALUE)
? av_rescale_q(pkt->dts, stream_tb, AVRational{1, 1000000000})
: pkt_pts_ns;
uint32_t pkt_flags = 0;
if (pkt->flags & AV_PKT_FLAG_KEY) pkt_flags |= CUFRAMES_PKT_FLAG_KEY;
if (pkt->flags & AV_PKT_FLAG_CORRUPT) pkt_flags |= CUFRAMES_PKT_FLAG_CORRUPT;
#ifdef AV_PKT_FLAG_DISCONTINUITY
if (pkt->flags & AV_PKT_FLAG_DISCONTINUITY) pkt_flags |= CUFRAMES_PKT_FLAG_DISCONTINUITY;
#endif
int prr = pub.publish_packet(pkt->data, (size_t)pkt->size,
pkt_pts_ns, pkt_dts_ns, pkt_flags);
if (prr != CUFRAMES_OK && a.verbose) {
std::cerr << "[cuframes-src] publish_packet rc=" << prr
<< " size=" << pkt->size << "\n";
}
}
r = avcodec_send_packet(ctx, pkt);
av_packet_unref(pkt);
if (r < 0) continue;
while (true) {
r = avcodec_receive_frame(ctx, frame);
if (r == AVERROR(EAGAIN) || r == AVERROR_EOF) break;
if (r < 0) {
std::cerr << "avcodec_receive_frame error\n";
break;
}
if (frame->format != AV_PIX_FMT_CUDA) {
av_frame_unref(frame);
continue;
}
/* frame->data[0] — указатель на CUDA Y plane; frame->data[1] — UV */
void *cuda_src_y = frame->data[0];
void *cuda_src_uv = frame->data[1];
int src_pitch_y = frame->linesize[0];
int src_pitch_uv = frame->linesize[1];
/* v0.4: acquire slot из publisher's VMM pool */
void *dst = nullptr;
try {
dst = pub.acquire();
} catch (const cuframes::Error &e) {
std::cerr << "acquire: " << e.what() << "\n";
av_frame_unref(frame);
continue;
}
/* D2D 2D-copy Y plane */
cudaError_t cerr = cudaMemcpy2DAsync(
dst, pitch_y, cuda_src_y, src_pitch_y, width, height,
cudaMemcpyDeviceToDevice, stream);
if (cerr != cudaSuccess) {
std::cerr << "memcpy Y: " << cudaGetErrorString(cerr) << "\n";
av_frame_unref(frame);
continue;
}
/* UV plane (height/2 строк) */
uint8_t *dst_uv = (uint8_t *)dst + (size_t)pitch_y * height;
cerr = cudaMemcpy2DAsync(
dst_uv, pitch_uv, cuda_src_uv, src_pitch_uv, width, height / 2,
cudaMemcpyDeviceToDevice, stream);
if (cerr != cudaSuccess) {
std::cerr << "memcpy UV: " << cudaGetErrorString(cerr) << "\n";
av_frame_unref(frame);
continue;
}
/* Realtime pacing — спим до момента когда wall-clock соответствует pts */
if (a.realtime && frame->pts != AV_NOPTS_VALUE) {
if (rt_pts0 == AV_NOPTS_VALUE) {
rt_pts0 = frame->pts;
rt_t0 = std::chrono::steady_clock::now();
}
int64_t rel_pts = frame->pts - rt_pts0;
double rel_sec = rel_pts * av_q2d(stream_tb);
auto target = rt_t0 + std::chrono::microseconds(
(int64_t)(rel_sec * 1e6));
std::this_thread::sleep_until(target);
}
int64_t pts_ns = cuframes::now_ns();
try {
pub.publish(stream, pts_ns);
} catch (const cuframes::Error &e) {
std::cerr << "publish: " << e.what() << "\n";
av_frame_unref(frame);
continue;
}
frame_count++;
av_frame_unref(frame);
if (a.verbose && (frame_count % 100 == 0)) {
auto now = std::chrono::steady_clock::now();
double dt = std::chrono::duration<double>(now - t_last_log).count();
if (dt >= 1.0) {
uint64_t delta = frame_count - last_log_count;
std::cerr << "[cuframes-src] frames=" << frame_count
<< " (" << (delta / dt) << " fps)\n";
t_last_log = now;
last_log_count = frame_count;
}
}
}
}
std::cerr << "[cuframes-src] shutdown, total frames=" << frame_count << "\n";
av_frame_free(&frame);
av_packet_free(&pkt);
avcodec_free_context(&ctx);
avformat_close_input(&fmt);
av_buffer_unref(&hw_device);
cudaStreamDestroy(stream);
/* v0.4: publisher owns VMM pool — destructor освободит cuMemRelease etc. */
return 0;
}