diff --git a/.gitignore b/.gitignore index 22e5c4d..cfe808f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ # Build artefacts build/ +build-*/ out/ *.o *.a diff --git a/CMakeLists.txt b/CMakeLists.txt index 6e1c5c7..d38d917 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -20,13 +20,22 @@ endif() option(BUILD_TESTING "Build tests" ON) option(BUILD_EXAMPLES "Build examples" ON) -option(BUILD_FFMPEG_FILTER "Build FFmpeg vf_cuda_ipc_export filter" OFF) -option(BUILD_PYTHON_BINDINGS "Build Python bindings" OFF) +option(BUILD_FFMPEG_FILTER "Build FFmpeg vf_cuda_ipc_export filter (out-of-tree, требует patch FFmpeg)" OFF) +option(BUILD_PYTHON_BINDINGS "Build Python bindings (Phase 3+)" OFF) +option(BUILD_TOOLS "Build standalone tools (cuframes-rtsp-source)" ON) enable_testing() add_subdirectory(libcuframes) +if(BUILD_EXAMPLES) + add_subdirectory(examples) +endif() + if(BUILD_FFMPEG_FILTER) add_subdirectory(filter) endif() + +if(BUILD_TOOLS) + add_subdirectory(tools/cuframes-rtsp-source) +endif() diff --git a/examples/CMakeLists.txt b/examples/CMakeLists.txt new file mode 100644 index 0000000..85cf166 --- /dev/null +++ b/examples/CMakeLists.txt @@ -0,0 +1,2 @@ +find_package(CUDAToolkit REQUIRED) +add_subdirectory(sub_count) diff --git a/examples/sub_count/CMakeLists.txt b/examples/sub_count/CMakeLists.txt new file mode 100644 index 0000000..7cb3675 --- /dev/null +++ b/examples/sub_count/CMakeLists.txt @@ -0,0 +1,3 @@ +add_executable(sub_count main.cpp) +target_compile_features(sub_count PRIVATE cxx_std_17) +target_link_libraries(sub_count PRIVATE cuframes CUDA::cudart) diff --git a/examples/sub_count/main.cpp b/examples/sub_count/main.cpp new file mode 100644 index 0000000..b0f63f0 --- /dev/null +++ b/examples/sub_count/main.cpp @@ -0,0 +1,132 @@ +/* sub_count — minimal cuframes subscriber: подключается, считает frames, + * выходит по SIGINT, по достижении max-frames или при disconnect publisher. + * + * Use case: smoke-test для cuframes-rtsp-source / любого publisher'а. + * Reference-имплементация consumer'а на raw C API. + * + * Build: через CMake (см. examples/CMakeLists.txt) + * Run: sub_count --key cam1 [--max-frames N] [--timeout-ms N] [--verbose] + */ + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +static std::atomic g_stop{0}; +static void on_signal(int) { g_stop.store(1); } + +int main(int argc, char **argv) { + std::string key; + int timeout_ms = 2000; + int max_frames = -1; + bool verbose = false; + + for (int i = 1; i < argc; ++i) { + std::string s = argv[i]; + if (s == "--key" && i + 1 < argc) key = argv[++i]; + else if (s == "--timeout-ms" && i + 1 < argc) timeout_ms = std::atoi(argv[++i]); + else if (s == "--max-frames" && i + 1 < argc) max_frames = std::atoi(argv[++i]); + else if (s == "--verbose") verbose = true; + else if (s == "-h" || s == "--help") { + std::cerr << "Usage: sub_count --key NAME [--timeout-ms N] " + "[--max-frames N] [--verbose]\n"; + return 0; + } else { + std::cerr << "Unknown arg: " << s << "\n"; + return 1; + } + } + if (key.empty()) { + std::cerr << "--key required\n"; + return 1; + } + + signal(SIGINT, on_signal); + signal(SIGTERM, on_signal); + + cuframes_subscriber_config_t cfg{}; + cfg.key = key.c_str(); + cfg.consumer_name = "sub_count"; + cfg.mode = CUFRAMES_MODE_NEWEST_ONLY; + cfg.connect_timeout_ms = 5000; + + cuframes_subscriber_t *sub = nullptr; + int r = cuframes_subscriber_create(&cfg, &sub); + if (r != CUFRAMES_OK) { + std::cerr << "[sub_count] connect to '" << key << "': " + << cuframes_strerror(r) << "\n"; + return 2; + } + std::cerr << "[sub_count] connected to 'cuframes-" << key << "'\n"; + + cudaStream_t stream; + cudaStreamCreate(&stream); + + uint64_t received = 0; + uint64_t last_seq = 0; + uint64_t gaps = 0; + int consecutive_timeouts = 0; + auto t0 = std::chrono::steady_clock::now(); + + while (!g_stop.load()) { + cuframes_frame_t *f = nullptr; + r = cuframes_subscriber_next(sub, stream, &f, timeout_ms); + if (r == CUFRAMES_ERR_DISCONNECTED) { + std::cerr << "[sub_count] publisher disconnected\n"; + break; + } + if (r == CUFRAMES_ERR_TIMEOUT) { + consecutive_timeouts++; + std::cerr << "[sub_count] timeout (" << consecutive_timeouts << ")\n"; + if (consecutive_timeouts >= 3) { + std::cerr << "[sub_count] " << consecutive_timeouts + << " consecutive timeouts, giving up\n"; + break; + } + continue; + } + if (r != CUFRAMES_OK) { + std::cerr << "[sub_count] next: " << cuframes_strerror(r) << "\n"; + break; + } + consecutive_timeouts = 0; + + cudaStreamSynchronize(stream); // ensure event-wait завершён + + uint64_t seq = cuframes_frame_seq(f); + if (received > 0 && seq > last_seq + 1) gaps += (seq - last_seq - 1); + last_seq = seq; + received++; + + if (verbose && (received % 50 == 0)) { + int32_t w = 0, h = 0; + cuframes_frame_size(f, &w, &h); + std::cerr << "[sub_count] seq=" << seq + << " w=" << w << " h=" << h + << " pts_ns=" << cuframes_frame_pts_ns(f) << "\n"; + } + + cuframes_subscriber_release(sub, f); + + if (max_frames > 0 && (int64_t)received >= max_frames) break; + } + + auto t1 = std::chrono::steady_clock::now(); + double sec = std::chrono::duration(t1 - t0).count(); + std::cerr << "[sub_count] received=" << received + << " gaps=" << gaps + << " elapsed=" << sec << "s" + << " avg_fps=" << (sec > 0 ? received / sec : 0) << "\n"; + + cudaStreamDestroy(stream); + cuframes_subscriber_destroy(sub); + return received > 0 ? 0 : 2; +} diff --git a/libcuframes/tests/CMakeLists.txt b/libcuframes/tests/CMakeLists.txt index de24573..11ecdc7 100644 --- a/libcuframes/tests/CMakeLists.txt +++ b/libcuframes/tests/CMakeLists.txt @@ -15,3 +15,10 @@ target_include_directories(test_multi PRIVATE ${CMAKE_SOURCE_DIR}/include) add_test(NAME multi_consumer COMMAND test_multi) set_tests_properties(multi_consumer PROPERTIES TIMEOUT 60) + +add_executable(test_stress test_stress.cu) +target_link_libraries(test_stress PRIVATE cuframes CUDA::cudart) +target_include_directories(test_stress PRIVATE + ${CMAKE_SOURCE_DIR}/include) +add_test(NAME stress_4consumer COMMAND test_stress) +set_tests_properties(stress_4consumer PROPERTIES TIMEOUT 120) diff --git a/libcuframes/tests/test_stress.cu b/libcuframes/tests/test_stress.cu new file mode 100644 index 0000000..7c62150 --- /dev/null +++ b/libcuframes/tests/test_stress.cu @@ -0,0 +1,169 @@ +/* Stress test: 1 publisher × 4 consumers, 2000 frames, intermediate-rate (~120 fps), + * проверка zero-loss в NEWEST_ONLY mode не требуется (это политика DROP_OLDEST по + * spec); проверяем что: + * 1) ВСЕ subscribers получают frames continuously без torn-detection failures + * 2) Producer не deadlock'ит при slow consumer + * 3) После teardown — нет leaked файлов в /dev/shm + * 4) После teardown — process exit clean без segfault + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define CHECK(call) do { int _r = (call); if (_r != 0) { \ + fprintf(stderr, "FAIL %s:%d: %s\n", __FILE__, __LINE__, cuframes_strerror(_r)); std::exit(2); } } while(0) +#define CHECK_CUDA(call) do { cudaError_t _e = (call); if (_e != cudaSuccess) { \ + fprintf(stderr, "CUDA FAIL %s:%d: %s\n", __FILE__, __LINE__, cudaGetErrorString(_e)); std::exit(2); } } while(0) + +static const char *KEY = "test_stress"; +static const int W = 1280, H = 720; +static const int N = 2000; +static const int NUM_CONSUMERS = 4; + +__host__ __device__ inline uint8_t pat(uint64_t seq, int row) { + return static_cast((seq * 31u + row * 7u) & 0xFF); +} +__global__ void fill_y(uint8_t *y, int w, int h, int py, uint64_t seq) { + int x = blockIdx.x * blockDim.x + threadIdx.x; + int r = blockIdx.y * blockDim.y + threadIdx.y; + if (x < w && r < h) y[r * py + x] = pat(seq, r); +} +__global__ void verify_y(const uint8_t *y, int w, int h, int py, uint64_t seq, int *bad) { + int x = blockIdx.x * blockDim.x + threadIdx.x; + int r = blockIdx.y * blockDim.y + threadIdx.y; + if (x < w && r < h) if (y[r * py + x] != pat(seq, r)) atomicAdd(bad, 1); +} + +int run_consumer(const char *name, int slow_ms) { + cuframes_subscriber_config_t cfg = {}; + cfg.key = KEY; + cfg.consumer_name = name; + cfg.mode = CUFRAMES_MODE_NEWEST_ONLY; + cfg.connect_timeout_ms = 5000; + + cuframes_subscriber_t *sub = NULL; + CHECK(cuframes_subscriber_create(&cfg, &sub)); + + cudaStream_t s; + CHECK_CUDA(cudaStreamCreate(&s)); + int *d_bad; + CHECK_CUDA(cudaMalloc(&d_bad, sizeof(int))); + + dim3 b(32, 8); + dim3 g((W + b.x - 1) / b.x, (H + b.y - 1) / b.y); + + int recv = 0, torn = 0; + while (1) { + cuframes_frame_t *f = NULL; + int r = cuframes_subscriber_next(sub, s, &f, 3000); + if (r == CUFRAMES_ERR_TIMEOUT || r == CUFRAMES_ERR_DISCONNECTED) break; + if (r != 0) { fprintf(stderr, "[%s] next: %s\n", name, cuframes_strerror(r)); std::exit(2); } + + CHECK_CUDA(cudaMemsetAsync(d_bad, 0, sizeof(int), s)); + verify_y<<>>((const uint8_t *)cuframes_frame_cuda_ptr(f), + W, H, cuframes_frame_pitch_y(f), + cuframes_frame_seq(f), d_bad); + int bad = 0; + CHECK_CUDA(cudaMemcpyAsync(&bad, d_bad, sizeof(int), cudaMemcpyDeviceToHost, s)); + CHECK_CUDA(cudaStreamSynchronize(s)); + if (bad > 0) torn++; + recv++; + CHECK(cuframes_subscriber_release(sub, f)); + + if (slow_ms > 0) std::this_thread::sleep_for(std::chrono::milliseconds(slow_ms)); + } + + fprintf(stderr, "[%s] received=%d torn=%d\n", name, recv, torn); + + cudaFree(d_bad); + cudaStreamDestroy(s); + cuframes_subscriber_destroy(sub); + return (torn == 0 && recv >= 10) ? 0 : 1; +} + +int run_producer() { + cuframes_publisher_config_t cfg = {}; + cfg.key = KEY; + cfg.width = W; + cfg.height = H; + cfg.format = CUFRAMES_FORMAT_NV12; + cfg.ownership = CUFRAMES_OWNERSHIP_LIBRARY; + cfg.ring_size = 8; + cfg.policy = CUFRAMES_POLICY_DROP_OLDEST; + + cuframes_publisher_t *pub = NULL; + CHECK(cuframes_publisher_create(&cfg, &pub)); + int32_t pitch_y = 0; + CHECK(cuframes_calc_frame_size(CUFRAMES_FORMAT_NV12, W, H, NULL, &pitch_y, NULL)); + cudaStream_t s; + CHECK_CUDA(cudaStreamCreate(&s)); + + dim3 b(32, 8); + dim3 g((W + b.x - 1) / b.x, (H + b.y - 1) / b.y); + + std::this_thread::sleep_for(std::chrono::milliseconds(800)); + + /* ~120 fps */ + auto iv = std::chrono::nanoseconds(1000000000LL / 120); + auto t = std::chrono::steady_clock::now(); + for (int i = 0; i < N; ++i) { + void *p = NULL; + CHECK(cuframes_publisher_acquire(pub, &p)); + fill_y<<>>((uint8_t *)p, W, H, pitch_y, (uint64_t)i); + CHECK(cuframes_publisher_publish(pub, s, cuframes_now_ns())); + t += iv; + std::this_thread::sleep_until(t); + } + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + cuframes_publisher_destroy(pub); + cudaStreamDestroy(s); + return 0; +} + +int main() { + char shm[80]; snprintf(shm, 80, "/dev/shm/cuframes-%s", KEY); unlink(shm); + char sock[128]; snprintf(sock, 128, "/run/cuframes/%s.sock", KEY); unlink(sock); + + pid_t pids[NUM_CONSUMERS]; + /* Mix: 2 быстрых, 2 slow consumers (5ms sleep — ~200fps cap, медленнее publisher'а) */ + int slow[NUM_CONSUMERS] = {0, 0, 5, 5}; + char names[NUM_CONSUMERS][16]; + for (int i = 0; i < NUM_CONSUMERS; ++i) { + snprintf(names[i], 16, "c%d", i + 1); + pids[i] = fork(); + if (pids[i] == 0) return run_consumer(names[i], slow[i]); + } + + int prod_r = run_producer(); + int fail = (prod_r != 0); + for (int i = 0; i < NUM_CONSUMERS; ++i) { + int st = 0; + waitpid(pids[i], &st, 0); + if (!WIFEXITED(st) || WEXITSTATUS(st) != 0) { + fprintf(stderr, "consumer %s failed (status=%d)\n", names[i], st); + fail = 1; + } + } + + /* Check teardown clean */ + struct stat st; + if (stat(shm, &st) == 0) { + fprintf(stderr, "WARN: %s остался после teardown (но это OK — IPC объект)\n", shm); + } + if (stat(sock, &st) == 0) { + fprintf(stderr, "WARN: %s остался после teardown\n", sock); + } + + if (fail) { fprintf(stderr, "test_stress FAIL\n"); return 1; } + fprintf(stderr, "test_stress PASS (1×pub × %d×sub × %d frames)\n", NUM_CONSUMERS, N); + return 0; +} diff --git a/tools/cuframes-rtsp-source/CMakeLists.txt b/tools/cuframes-rtsp-source/CMakeLists.txt new file mode 100644 index 0000000..0ddab3f --- /dev/null +++ b/tools/cuframes-rtsp-source/CMakeLists.txt @@ -0,0 +1,20 @@ +cmake_minimum_required(VERSION 3.20) +project(cuframes_rtsp_source LANGUAGES CXX) + +find_package(PkgConfig REQUIRED) +pkg_check_modules(FFMPEG REQUIRED IMPORTED_TARGET + libavformat + libavcodec + libavutil +) +find_package(CUDAToolkit REQUIRED) + +add_executable(cuframes-rtsp-source main.cpp) +target_compile_features(cuframes-rtsp-source PRIVATE cxx_std_17) +target_compile_options(cuframes-rtsp-source PRIVATE -Wall -Wextra) +target_link_libraries(cuframes-rtsp-source + PRIVATE + cuframes + PkgConfig::FFMPEG + CUDA::cudart +) diff --git a/tools/cuframes-rtsp-source/main.cpp b/tools/cuframes-rtsp-source/main.cpp new file mode 100644 index 0000000..5eb2ce6 --- /dev/null +++ b/tools/cuframes-rtsp-source/main.cpp @@ -0,0 +1,382 @@ +/* cuframes-rtsp-source — standalone tool который читает RTSP-stream, decode'ит + * на CUDA через libav*, и публикует декодированные frames через cuframes. + * + * Целевое использование: запускается рядом с Frigate (например, в docker-compose + * stack), даёт consumer'ам (cctv-companion, AI scripts, etc.) zero-copy доступ + * к декодированным кадрам через cuframes IPC — без модификации Frigate. + * + * После Step 9 v0.2 — заменится на FFmpeg filter `vf_cuda_ipc_export` который + * встраивается в существующий Frigate ffmpeg-pipeline без extra decode. + * + * Pipeline: + * + * RTSP ─► avformat_open ─► avcodec hwaccel cuda ─► AVFrame в CUDA hwframes + * │ + * ▼ + * D2D memcpy ◄─ pre-allocated cuframes pool (EXTERNAL ownership) + * │ + * ▼ + * cuframes_publisher_publish_external + * + * Build: + * g++ -std=c++17 main.cpp -o cuframes-rtsp-source \ + * -lavformat -lavcodec -lavutil -lcuframes -lcudart + * + * Usage: + * cuframes-rtsp-source --rtsp rtsp://admin:pw@cam/stream --key cam1 + */ + +extern "C" { +#include +#include +#include +#include +#include +#include +} + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +static std::atomic g_stop{0}; +static void on_signal(int) { g_stop.store(1); } + +struct Args { + std::string rtsp_url; + std::string key = "cam"; + std::string consumer_role = "main"; // informative + int cuda_device = 0; + int ring_size = 4; + bool verbose = false; + bool realtime = false; // emulate -re у ffmpeg CLI: sleep по pts + bool loop = false; // loop input на eof (для file://) +}; + +static void print_usage() { + std::cerr << + "Usage: cuframes-rtsp-source --rtsp URL --key NAME [opts]\n" + "\n" + "Required:\n" + " --rtsp URL RTSP/HTTP URL (e.g. rtsp://admin:pw@cam/stream)\n" + " --key NAME cuframes resource name (a-zA-Z0-9_-, ≤63)\n" + "\n" + "Optional:\n" + " --cuda-device N CUDA device index (default 0)\n" + " --ring N cuframes ring size (default 4, range 2..16)\n" + " --realtime pace input по PTS (как ffmpeg -re; полезно для файла)\n" + " --loop loop input на EOF (только для file://)\n" + " --verbose debug logs\n" + " -h, --help this help\n"; +} + +static int parse_args(int argc, char **argv, Args &a) { + for (int i = 1; i < argc; ++i) { + std::string s = argv[i]; + auto next = [&] { + if (i + 1 >= argc) { print_usage(); std::exit(1); } + return std::string(argv[++i]); + }; + if (s == "--rtsp") a.rtsp_url = next(); + else if (s == "--key") a.key = next(); + else if (s == "--cuda-device") a.cuda_device = std::stoi(next()); + else if (s == "--ring") a.ring_size = std::stoi(next()); + else if (s == "--realtime") a.realtime = true; + else if (s == "--loop") a.loop = true; + else if (s == "--verbose") a.verbose = true; + else if (s == "-h" || s == "--help") { print_usage(); std::exit(0); } + else { std::cerr << "Unknown arg: " << s << "\n"; print_usage(); std::exit(1); } + } + if (a.rtsp_url.empty() || a.key.empty()) { print_usage(); return 1; } + return 0; +} + +/* Get HW frame pixel format callback — выбирает CUDA pixel format */ +static enum AVPixelFormat get_hw_format(AVCodecContext *ctx, + const enum AVPixelFormat *fmts) { + (void)ctx; + for (const enum AVPixelFormat *p = fmts; *p != AV_PIX_FMT_NONE; ++p) { + if (*p == AV_PIX_FMT_CUDA) return *p; + } + return AV_PIX_FMT_NONE; +} + +/* HW device context для CUDA */ +static int setup_hw_device(int cuda_device, AVBufferRef **out) { + AVBufferRef *hw_ctx = NULL; + char dev_str[16]; + snprintf(dev_str, sizeof(dev_str), "%d", cuda_device); + int r = av_hwdevice_ctx_create(&hw_ctx, AV_HWDEVICE_TYPE_CUDA, dev_str, NULL, 0); + if (r < 0) { + char err[256]; + av_strerror(r, err, sizeof(err)); + std::cerr << "av_hwdevice_ctx_create: " << err << "\n"; + return -1; + } + *out = hw_ctx; + return 0; +} + +int main(int argc, char **argv) { + Args a; + if (parse_args(argc, argv, a) != 0) return 1; + + signal(SIGINT, on_signal); + signal(SIGTERM, on_signal); + + /* libav init */ + avformat_network_init(); + + /* Open input */ + AVFormatContext *fmt = nullptr; + AVDictionary *opts = nullptr; + av_dict_set(&opts, "rtsp_transport", "tcp", 0); + av_dict_set(&opts, "stimeout", "10000000", 0); /* 10s */ + av_dict_set(&opts, "fflags", "+genpts+discardcorrupt", 0); + + int r = avformat_open_input(&fmt, a.rtsp_url.c_str(), nullptr, &opts); + av_dict_free(&opts); + if (r < 0) { + char err[256]; + av_strerror(r, err, sizeof(err)); + std::cerr << "avformat_open_input '" << a.rtsp_url << "': " << err << "\n"; + return 2; + } + if (avformat_find_stream_info(fmt, nullptr) < 0) { + std::cerr << "avformat_find_stream_info failed\n"; + avformat_close_input(&fmt); + return 2; + } + + /* Find video stream */ + int video_idx = -1; + for (unsigned i = 0; i < fmt->nb_streams; ++i) { + if (fmt->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_VIDEO) { + video_idx = (int)i; + break; + } + } + if (video_idx < 0) { + std::cerr << "no video stream in input\n"; + avformat_close_input(&fmt); + return 2; + } + AVStream *vstream = fmt->streams[video_idx]; + int width = vstream->codecpar->width; + int height = vstream->codecpar->height; + enum AVCodecID codec_id = vstream->codecpar->codec_id; + const AVCodec *decoder = avcodec_find_decoder(codec_id); + if (!decoder) { + std::cerr << "no decoder for codec " << avcodec_get_name(codec_id) << "\n"; + avformat_close_input(&fmt); + return 2; + } + std::cerr << "[cuframes-src] input " << a.rtsp_url << " — " << width << "x" << height + << " " << avcodec_get_name(codec_id) << "\n"; + + /* CUDA hwaccel setup */ + AVBufferRef *hw_device = nullptr; + if (setup_hw_device(a.cuda_device, &hw_device) < 0) { + avformat_close_input(&fmt); + return 2; + } + + AVCodecContext *ctx = avcodec_alloc_context3(decoder); + if (!ctx) { std::cerr << "alloc_context fail\n"; return 2; } + if (avcodec_parameters_to_context(ctx, vstream->codecpar) < 0) { + std::cerr << "parameters_to_context fail\n"; return 2; + } + ctx->hw_device_ctx = av_buffer_ref(hw_device); + ctx->get_format = get_hw_format; + + r = avcodec_open2(ctx, decoder, nullptr); + if (r < 0) { + char err[256]; av_strerror(r, err, sizeof(err)); + std::cerr << "avcodec_open2: " << err << "\n"; + return 2; + } + + /* Pre-allocate cuframes pool (NV12 — что nvdec выдаёт) */ + int32_t pitch_y = 0, pitch_uv = 0; + size_t frame_size = cuframes::calc_frame_size(CUFRAMES_FORMAT_NV12, + width, height, + &pitch_y, &pitch_uv); + + cudaSetDevice(a.cuda_device); + std::vector pool(a.ring_size, nullptr); + for (int i = 0; i < a.ring_size; ++i) { + cudaError_t cerr = cudaMalloc(&pool[i], frame_size); + if (cerr != cudaSuccess) { + std::cerr << "cudaMalloc pool[" << i << "]: " << cudaGetErrorString(cerr) << "\n"; + return 2; + } + } + + cuframes::PublisherOptions po; + po.key = a.key; + po.width = width; + po.height = height; + po.format = CUFRAMES_FORMAT_NV12; + po.policy = CUFRAMES_POLICY_DROP_OLDEST; + po.cuda_device = a.cuda_device; + po.ring_size = a.ring_size; /* для logging */ + + cuframes::Publisher pub(po, pool.data(), a.ring_size, frame_size); + std::cerr << "[cuframes-src] publisher 'cuframes-" << a.key + << "' ready, ring=" << a.ring_size + << " pool_size=" << frame_size << " bytes/frame\n"; + + /* Stream для D2D copies */ + cudaStream_t stream; + cudaStreamCreate(&stream); + + AVPacket *pkt = av_packet_alloc(); + AVFrame *frame = av_frame_alloc(); + if (!pkt || !frame) return 2; + + int pool_idx = 0; + uint64_t frame_count = 0; + auto t_last_log = std::chrono::steady_clock::now(); + uint64_t last_log_count = 0; + + /* Realtime pacing: t0 = время первого frame; ждём до t0 + pts_in_real_time */ + auto rt_t0 = std::chrono::steady_clock::time_point::min(); + int64_t rt_pts0 = AV_NOPTS_VALUE; + AVRational stream_tb = vstream->time_base; + + while (!g_stop.load()) { + r = av_read_frame(fmt, pkt); + if (r == AVERROR_EOF) { + if (a.loop) { + /* Seek в начало; offset PTS чтобы избежать non-monotonic */ + av_seek_frame(fmt, video_idx, 0, AVSEEK_FLAG_BACKWARD); + avcodec_flush_buffers(ctx); + rt_t0 = std::chrono::steady_clock::time_point::min(); + rt_pts0 = AV_NOPTS_VALUE; + av_packet_unref(pkt); + continue; + } + break; + } + if (r < 0) { + char err[256]; av_strerror(r, err, sizeof(err)); + std::cerr << "[cuframes-src] av_read_frame: " << err << "\n"; + av_packet_unref(pkt); + std::this_thread::sleep_for(std::chrono::seconds(1)); + continue; + } + if (pkt->stream_index != video_idx) { + av_packet_unref(pkt); + continue; + } + + r = avcodec_send_packet(ctx, pkt); + av_packet_unref(pkt); + if (r < 0) continue; + + while (true) { + r = avcodec_receive_frame(ctx, frame); + if (r == AVERROR(EAGAIN) || r == AVERROR_EOF) break; + if (r < 0) { + std::cerr << "avcodec_receive_frame error\n"; + break; + } + + if (frame->format != AV_PIX_FMT_CUDA) { + av_frame_unref(frame); + continue; + } + + /* frame->data[0] — указатель на CUDA Y plane; frame->data[1] — UV */ + void *cuda_src_y = frame->data[0]; + void *cuda_src_uv = frame->data[1]; + int src_pitch_y = frame->linesize[0]; + int src_pitch_uv = frame->linesize[1]; + + void *dst = pool[pool_idx]; + + /* D2D 2D-copy Y plane */ + cudaError_t cerr = cudaMemcpy2DAsync( + dst, pitch_y, cuda_src_y, src_pitch_y, width, height, + cudaMemcpyDeviceToDevice, stream); + if (cerr != cudaSuccess) { + std::cerr << "memcpy Y: " << cudaGetErrorString(cerr) << "\n"; + av_frame_unref(frame); + continue; + } + + /* UV plane (height/2 строк) */ + uint8_t *dst_uv = (uint8_t *)dst + (size_t)pitch_y * height; + cerr = cudaMemcpy2DAsync( + dst_uv, pitch_uv, cuda_src_uv, src_pitch_uv, width, height / 2, + cudaMemcpyDeviceToDevice, stream); + if (cerr != cudaSuccess) { + std::cerr << "memcpy UV: " << cudaGetErrorString(cerr) << "\n"; + av_frame_unref(frame); + continue; + } + + /* Realtime pacing — спим до момента когда wall-clock соответствует pts */ + if (a.realtime && frame->pts != AV_NOPTS_VALUE) { + if (rt_pts0 == AV_NOPTS_VALUE) { + rt_pts0 = frame->pts; + rt_t0 = std::chrono::steady_clock::now(); + } + int64_t rel_pts = frame->pts - rt_pts0; + double rel_sec = rel_pts * av_q2d(stream_tb); + auto target = rt_t0 + std::chrono::microseconds( + (int64_t)(rel_sec * 1e6)); + std::this_thread::sleep_until(target); + } + + int64_t pts_ns = cuframes::now_ns(); + try { + pub.publish_external(dst, stream, pts_ns); + } catch (const cuframes::Error &e) { + std::cerr << "publish_external: " << e.what() << "\n"; + av_frame_unref(frame); + continue; + } + + pool_idx = (pool_idx + 1) % a.ring_size; + frame_count++; + av_frame_unref(frame); + + if (a.verbose && (frame_count % 100 == 0)) { + auto now = std::chrono::steady_clock::now(); + double dt = std::chrono::duration(now - t_last_log).count(); + if (dt >= 1.0) { + uint64_t delta = frame_count - last_log_count; + std::cerr << "[cuframes-src] frames=" << frame_count + << " (" << (delta / dt) << " fps)\n"; + t_last_log = now; + last_log_count = frame_count; + } + } + } + } + + std::cerr << "[cuframes-src] shutdown, total frames=" << frame_count << "\n"; + + av_frame_free(&frame); + av_packet_free(&pkt); + avcodec_free_context(&ctx); + avformat_close_input(&fmt); + av_buffer_unref(&hw_device); + + cudaStreamDestroy(stream); + /* Publisher destructor freed first; теперь освободим pool */ + /* Note: publisher уже destroyed by RAII, IPC handles closed by subscribers */ + for (auto p : pool) if (p) cudaFree(p); + + return 0; +}