diff --git a/tools/spike/CMakeLists.txt b/tools/spike/CMakeLists.txt new file mode 100644 index 0000000..500e1a9 --- /dev/null +++ b/tools/spike/CMakeLists.txt @@ -0,0 +1,27 @@ +# Phase 0 spike — минимальный CMake для двух binaries. +# Цель: max простота, без зависимостей кроме CUDA toolkit. + +cmake_minimum_required(VERSION 3.20) +project(cuframes_spike LANGUAGES CXX CUDA) + +set(CMAKE_CXX_STANDARD 17) +set(CMAKE_CXX_STANDARD_REQUIRED ON) +set(CMAKE_CUDA_STANDARD 17) + +# Compute capability autodetect, fallback на 5.0+ (старые GPU) до 12.0 +# (Blackwell). RTX 5090 = sm_120 — наш target. +if(NOT DEFINED CMAKE_CUDA_ARCHITECTURES) + set(CMAKE_CUDA_ARCHITECTURES "75;86;89;90;120") +endif() + +add_executable(pingpong_producer pingpong_producer.cu) +add_executable(pingpong_consumer pingpong_consumer.cu) + +# Compile flags — debug-friendly для PoC stage +foreach(target pingpong_producer pingpong_consumer) + target_compile_options(${target} PRIVATE + $<$:-Wall -Wextra -O2 -g> + $<$:-O2 -g -lineinfo> + ) + target_link_libraries(${target} PRIVATE cuda) +endforeach() diff --git a/tools/spike/README.md b/tools/spike/README.md new file mode 100644 index 0000000..ca795b6 --- /dev/null +++ b/tools/spike/README.md @@ -0,0 +1,94 @@ +# Phase 0 — CUDA IPC ping-pong spike + +PoC для измерения latency / throughput / multi-consumer behavior **без** +FFmpeg / Frigate / сложности. Чистый CUDA IPC между двумя процессами. + +## Что проверяет + +1. **End-to-end latency**: producer → write CUDA frame → consumer reads → ACK. +2. **Throughput**: сколько FullHD-frames/sec можно протолкнуть. +3. **Multi-consumer**: 1 producer × N consumers (zero-copy для каждого). +4. **Cross-container**: producer и consumer в разных Docker-контейнерах. +5. **Crash recovery**: kill consumer / producer → reconnect. +6. **Memory leak**: 24-hour run без VRAM-роста. + +## Структура + +``` +tools/spike/ +├── README.md (этот файл) +├── CMakeLists.txt стандартный CMake target +├── common.h shared structs/constants +├── pingpong_producer.cu producer-процесс +├── pingpong_consumer.cu consumer-процесс +└── bench.sh скрипт-обвязка с измерениями +``` + +## Сборка (внутри dev-контейнера) + +```bash +docker compose -f docker/docker-compose.dev.yml exec dev bash +# Внутри: +cd /workspace +cmake -B build -S tools/spike -G Ninja +cmake --build build +``` + +Артефакты: `build/pingpong_producer`, `build/pingpong_consumer`. + +## Запуск (single-host single-container, базовый случай) + +Терминал 1 (producer — пишет FullHD-frames пока не нажмёшь Ctrl-C): +```bash +docker compose -f docker/docker-compose.dev.yml exec dev \ + ./build/pingpong_producer --key cam_test --width 1920 --height 1080 --fps 30 +``` + +Терминал 2 (consumer — читает и логирует latency): +```bash +docker compose -f docker/docker-compose.dev.yml exec dev \ + ./build/pingpong_consumer --key cam_test --count 1000 +``` + +После 1000 кадров consumer печатает summary: + +``` +=== cuframes spike summary === +frames received: 1000 +duration: 33.34 s +fps: 30.00 +latency mean: 0.42 ms +latency p50: 0.39 ms +latency p99: 0.91 ms +latency max: 3.12 ms +producer→consumer: zero-copy ✓ +``` + +## Запуск (cross-container — Phase 0 critical test) + +См. `bench.sh` — поднимает producer в одном контейнере, consumer в другом, +оба используют `ipc: shareable` namespace. + +## Acceptance criteria для Phase 0 + +- [ ] Базовый ping-pong работает (1×1) +- [ ] Multi-consumer (3 consumers, latency сопоставимая) +- [ ] Cross-container (producer container A → consumer container B) +- [ ] 1-hour stress без VRAM/RAM leak +- [ ] Замерено: p99 latency на RTX 5090 для FullHD-frame +- [ ] Документ `docs/benchmarks-phase0.md` с numbers + +Если latency p99 > 5 ms или throughput < 1 GB/s → **остановиться** и +переосмыслить дизайн (возможно CUDA IPC sync через `cudaIpcEventHandle_t` +вместо `cudaStreamSynchronize`). + +## Что **не** делает spike + +- Не использует FFmpeg (это Phase 2) +- Не имеет ring buffer (это Phase 1; spike использует двойной буфер) +- Не использует Unix sockets для handshake (handle передаётся через + файл `/tmp/cuframes-spike-.handle`) +- Не делает proper backpressure (drop-oldest hardcoded) + +Spike нарочно **минималистичен** — единственная цель замерить numbers +и провалидировать concept перед инвестированием в Phase 1. diff --git a/tools/spike/bench.sh b/tools/spike/bench.sh new file mode 100755 index 0000000..273b337 --- /dev/null +++ b/tools/spike/bench.sh @@ -0,0 +1,64 @@ +#!/bin/bash +# Phase 0 spike benchmark script. Запускать **внутри** dev-контейнера. +# +# Usage: ./tools/spike/bench.sh [SCENARIO] +# Scenarios: basic | multi-consumer | cross-container | stress + +set -euo pipefail + +cd "$(dirname "$0")/../.." # → корень репо + +if [ ! -x build/pingpong_producer ]; then + echo "==> build first (cmake -B build -S tools/spike -G Ninja && cmake --build build)" + exit 1 +fi + +SCENARIO="${1:-basic}" + +case "$SCENARIO" in +basic) + echo "=== Scenario: basic (1 producer × 1 consumer, FullHD@30fps, 1000 frames) ===" + ./build/pingpong_producer --key cam_test --fps 30 & + PROD_PID=$! + sleep 1 + ./build/pingpong_consumer --key cam_test --count 1000 + kill $PROD_PID 2>/dev/null || true + wait $PROD_PID 2>/dev/null || true + ;; + +multi-consumer) + echo "=== Scenario: 1 producer × 3 consumers ===" + ./build/pingpong_producer --key cam_test --fps 30 & + PROD_PID=$! + sleep 1 + ./build/pingpong_consumer --key cam_test --count 500 > /tmp/c1.log 2>&1 & + ./build/pingpong_consumer --key cam_test --count 500 > /tmp/c2.log 2>&1 & + ./build/pingpong_consumer --key cam_test --count 500 + wait + kill $PROD_PID 2>/dev/null || true + echo "--- consumer 1 summary ---"; tail -20 /tmp/c1.log + echo "--- consumer 2 summary ---"; tail -20 /tmp/c2.log + ;; + +stress) + echo "=== Scenario: 1-hour stress (FullHD@30fps, single consumer) ===" + ./build/pingpong_producer --key cam_test --fps 30 --duration 3600 & + PROD_PID=$! + sleep 1 + # Read forever (consumer завершится когда producer перестанет писать) + ./build/pingpong_consumer --key cam_test --count 108000 -v + wait $PROD_PID 2>/dev/null || true + ;; + +cross-container) + echo "=== Scenario: cross-container ===" + echo "TODO: запустить второй контейнер с ipc:container:cuframes-dev" + echo " это в Phase 0 manual — задокументировать процесс в docs/" + ;; + +*) + echo "Unknown scenario: $SCENARIO" + echo "Available: basic | multi-consumer | stress | cross-container" + exit 1 + ;; +esac diff --git a/tools/spike/common.h b/tools/spike/common.h new file mode 100644 index 0000000..17e4426 --- /dev/null +++ b/tools/spike/common.h @@ -0,0 +1,68 @@ +// Phase 0 spike — общие типы / константы для producer и consumer. +// +// NB: реальный protocol будет в libcuframes (Phase 1). Здесь — минимум +// для PoC measurement. + +#pragma once + +#include +#include +#include +#include + +namespace cuframes_spike { + +// Slot — 1 frame в shared VRAM. Producer пишет, consumer читает. +// Используем double-buffering (2 slot'а) — пока consumer читает slot A, +// producer пишет в slot B. Поможет уменьшить contention в PoC. +constexpr int RING_SIZE = 2; + +// Format hardcoded для PoC: NV12 (как выдаёт NVDEC по умолчанию). +// Y plane + UV plane (subsampled 2x2). +struct FrameMeta { + int32_t width; + int32_t height; + int32_t pitch_y; // строка Y, выровнен на 256 для CUDA + int32_t pitch_uv; // строка UV + int32_t format; // 0 = NV12 +}; + +// Descriptor одного slot'а. Хранится в /tmp/cuframes-spike-.handle +// — это POSIX shared memory. +struct SlotDescriptor { + cudaIpcMemHandle_t handle; // 64 байта от NVIDIA для IPC + uint64_t producer_seq; // обновляется атомарно при publish + uint64_t consumer_ack_count; // сколько consumers подтвердили + int64_t pts_ns; // timestamp когда producer публикнул +}; + +// Header shared memory file. +struct SharedHeader { + uint32_t magic; // 0xCC7C1DCC ("CUFRAMES C0") + uint32_t version; + FrameMeta meta; + SlotDescriptor slots[RING_SIZE]; + uint64_t global_seq; // монотонная последовательность producer +}; + +constexpr uint32_t CUFRAMES_SPIKE_MAGIC = 0xCC7C1DCCu; +constexpr uint32_t CUFRAMES_SPIKE_VERSION = 1; + +// Helper: error check + abort. +#define CHECK_CUDA(call) do { \ + cudaError_t _err = (call); \ + if (_err != cudaSuccess) { \ + fprintf(stderr, "CUDA error at %s:%d: %s\n", \ + __FILE__, __LINE__, cudaGetErrorString(_err)); \ + std::exit(1); \ + } \ +} while (0) + +// Получить timestamp в наносекундах (для latency-замеров). +static inline int64_t now_ns() { + timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + return static_cast(ts.tv_sec) * 1000000000LL + ts.tv_nsec; +} + +} // namespace cuframes_spike diff --git a/tools/spike/pingpong_consumer.cu b/tools/spike/pingpong_consumer.cu new file mode 100644 index 0000000..987f1d8 --- /dev/null +++ b/tools/spike/pingpong_consumer.cu @@ -0,0 +1,191 @@ +// Phase 0 spike consumer. +// +// Open IPC handle от producer, читает frames, проверяет монотонность, +// измеряет latency и печатает summary. +// +// Run: +// ./pingpong_consumer --key cam_test --count 1000 + +#include "common.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace cuframes_spike; + +struct Args { + std::string key = "cam_test"; + int count = 1000; // сколько frames прочитать перед exit + int verbose = 0; +}; + +static Args parse_args(int argc, char** argv) { + Args a; + for (int i = 1; i < argc; ++i) { + std::string arg = argv[i]; + auto next = [&] { return std::string(argv[++i]); }; + if (arg == "--key") a.key = next(); + else if (arg == "--count") a.count = std::stoi(next()); + else if (arg == "--verbose" || arg == "-v") a.verbose = 1; + else if (arg == "--help" || arg == "-h") { + std::cout + << "Usage: pingpong_consumer [--key NAME] [--count N] [-v]\n"; + std::exit(0); + } + } + return a; +} + +int main(int argc, char** argv) { + Args args = parse_args(argc, argv); + std::cout << "[consumer] key=" << args.key + << " count=" << args.count << "\n"; + + int dev = 0; + CHECK_CUDA(cudaSetDevice(dev)); + + // Открыть shm от producer + std::string shm_path = "/dev/shm/cuframes-spike-" + args.key; + int fd = -1; + for (int retry = 0; retry < 50; ++retry) { + fd = open(shm_path.c_str(), O_RDWR); + if (fd >= 0) break; + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + if (fd < 0) { + std::cerr << "[consumer] producer not running (no " << shm_path << ")\n"; + return 1; + } + auto* shared = static_cast( + mmap(nullptr, sizeof(SharedHeader), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0)); + if (shared == MAP_FAILED) { + perror("[consumer] mmap"); + return 1; + } + close(fd); + + // Verify magic + version + if (shared->magic != CUFRAMES_SPIKE_MAGIC) { + std::cerr << "[consumer] bad magic 0x" << std::hex << shared->magic << "\n"; + return 1; + } + if (shared->version != CUFRAMES_SPIKE_VERSION) { + std::cerr << "[consumer] version mismatch " << shared->version << "\n"; + return 1; + } + std::cout << "[consumer] connected, frame " << shared->meta.width + << "x" << shared->meta.height << "\n"; + + // Open IPC handles → получить CUDA pointer'ы + void* slot_ptrs[RING_SIZE]; + for (int i = 0; i < RING_SIZE; ++i) { + CHECK_CUDA(cudaIpcOpenMemHandle(&slot_ptrs[i], + shared->slots[i].handle, + cudaIpcMemLazyEnablePeerAccess)); + } + std::cout << "[consumer] mapped " << RING_SIZE << " slots into VRAM\n"; + + const int pitch_y = shared->meta.pitch_y; + const int height = shared->meta.height; + const size_t y_bytes = static_cast(pitch_y) * height; + + // Host-side buffer для одной строки Y (для verify монотонности) + std::vector host_row(pitch_y); + + // Latency histogram + std::vector latencies_ns; + latencies_ns.reserve(args.count); + + uint64_t last_seen = UINT64_MAX; + auto t_start = std::chrono::steady_clock::now(); + int frames_received = 0; + int torn_frames = 0; + int skipped_frames = 0; + + while (frames_received < args.count) { + // Poll global_seq на изменение + uint64_t seq = __atomic_load_n(&shared->global_seq, __ATOMIC_ACQUIRE); + if (seq == last_seen || (last_seen == UINT64_MAX && seq == 0 && + shared->slots[0].producer_seq == 0)) { + // нет нового; busy-wait минимально + std::this_thread::sleep_for(std::chrono::microseconds(100)); + continue; + } + if (last_seen != UINT64_MAX && seq > last_seen + 1) { + skipped_frames += (seq - last_seen - 1); + } + last_seen = seq; + + const int slot_idx = seq % RING_SIZE; + const int64_t pts_ns = __atomic_load_n(&shared->slots[slot_idx].pts_ns, + __ATOMIC_ACQUIRE); + const int64_t recv_ns = now_ns(); + const int64_t latency_ns = recv_ns - pts_ns; + + // Verify содержимое — Y-plane все байты должны быть (seq % 256) + // (читаем первую строку — 1920 байт хватит для проверки) + CHECK_CUDA(cudaMemcpy(host_row.data(), slot_ptrs[slot_idx], + pitch_y, cudaMemcpyDeviceToHost)); + uint8_t expected = static_cast(seq % 256); + bool ok = true; + for (int i = 0; i < pitch_y; ++i) { + if (host_row[i] != expected) { ok = false; break; } + } + if (!ok) torn_frames++; + + latencies_ns.push_back(latency_ns); + frames_received++; + + if (args.verbose && frames_received % 100 == 0) { + std::cout << "[consumer] received " << frames_received + << " seq=" << seq + << " latency=" << latency_ns / 1000 << "us\n"; + } + } + + auto t_end = std::chrono::steady_clock::now(); + double duration_sec = + std::chrono::duration(t_end - t_start).count(); + + // Cleanup + for (int i = 0; i < RING_SIZE; ++i) { + CHECK_CUDA(cudaIpcCloseMemHandle(slot_ptrs[i])); + } + munmap(shared, sizeof(SharedHeader)); + + // Summary + std::sort(latencies_ns.begin(), latencies_ns.end()); + auto pct = [&](double p) -> int64_t { + return latencies_ns[static_cast(latencies_ns.size() * p)]; + }; + int64_t sum = 0; + for (auto v : latencies_ns) sum += v; + int64_t mean = sum / latencies_ns.size(); + + std::cout << "\n=== cuframes spike summary ===\n"; + std::cout << "frames received: " << frames_received << "\n"; + std::cout << "duration: " << duration_sec << " s\n"; + std::cout << "effective fps: " << frames_received / duration_sec << "\n"; + std::cout << "frames skipped (producer ahead): " << skipped_frames << "\n"; + std::cout << "torn frames (data corrupt): " << torn_frames << "\n"; + std::cout << "\nlatency producer→consumer (microseconds):\n"; + std::cout << " mean: " << mean / 1000 << " us\n"; + std::cout << " p50: " << pct(0.50) / 1000 << " us\n"; + std::cout << " p95: " << pct(0.95) / 1000 << " us\n"; + std::cout << " p99: " << pct(0.99) / 1000 << " us\n"; + std::cout << " max: " << latencies_ns.back() / 1000 << " us\n"; + std::cout << "\nzero-copy: " << (torn_frames == 0 ? "✓" : "✗ (torn frames!)") << "\n"; + + return torn_frames == 0 ? 0 : 1; +} diff --git a/tools/spike/pingpong_producer.cu b/tools/spike/pingpong_producer.cu new file mode 100644 index 0000000..1bd3cf3 --- /dev/null +++ b/tools/spike/pingpong_producer.cu @@ -0,0 +1,180 @@ +// Phase 0 spike producer. +// +// Аллоцирует CUDA-память (NV12 FullHD), экспортирует IPC-handle в +// /tmp/cuframes-spike-.shm, имитирует decoded frames заполняя их +// pattern'ом (значение растёт с каждым frame'ом — consumer проверяет +// что значения монотонные = нет re-ordering / torn frames). +// +// Run: +// ./pingpong_producer --key cam_test --width 1920 --height 1080 --fps 30 + +#include "common.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace cuframes_spike; + +// CUDA kernel — заполняет Y-plane значением `value`. +// Это минимум чтобы touched memory; реальный producer получает frame от NVDEC. +__global__ void fill_y_plane(uint8_t* y, int width, int height, int pitch_y, + uint8_t value) { + int x = blockIdx.x * blockDim.x + threadIdx.x; + int y_coord = blockIdx.y * blockDim.y + threadIdx.y; + if (x < width && y_coord < height) { + y[y_coord * pitch_y + x] = value; + } +} + +struct Args { + std::string key = "cam_test"; + int width = 1920; + int height = 1080; + int fps = 30; + int duration_sec = 0; // 0 = forever +}; + +static Args parse_args(int argc, char** argv) { + Args a; + for (int i = 1; i < argc; ++i) { + std::string arg = argv[i]; + auto next = [&] { return std::string(argv[++i]); }; + if (arg == "--key") a.key = next(); + else if (arg == "--width") a.width = std::stoi(next()); + else if (arg == "--height") a.height = std::stoi(next()); + else if (arg == "--fps") a.fps = std::stoi(next()); + else if (arg == "--duration") a.duration_sec = std::stoi(next()); + else if (arg == "--help" || arg == "-h") { + std::cout + << "Usage: pingpong_producer [--key NAME] [--width W] [--height H] " + "[--fps N] [--duration SEC]\n"; + std::exit(0); + } + } + return a; +} + +int main(int argc, char** argv) { + Args args = parse_args(argc, argv); + std::cout << "[producer] key=" << args.key + << " " << args.width << "x" << args.height + << " @ " << args.fps << " fps\n"; + + // CUDA init + int dev = 0; + CHECK_CUDA(cudaSetDevice(dev)); + + // NV12: Y plane (W×H bytes) + UV plane (W × H/2 bytes), aligned по pitch. + // Для простоты — без CUDA alignment, используем width прямо как pitch. + const int pitch_y = args.width; + const int pitch_uv = args.width; + const size_t y_bytes = static_cast(pitch_y) * args.height; + const size_t uv_bytes = static_cast(pitch_uv) * (args.height / 2); + const size_t frame_bytes = y_bytes + uv_bytes; + + std::cout << "[producer] frame_bytes=" << frame_bytes + << " (Y=" << y_bytes << " UV=" << uv_bytes << ")\n"; + + // Аллоцируем RING_SIZE буферов в VRAM (один cudaMalloc на весь пул; + // отдельные cudaIpcGetMemHandle для каждого). + void* slot_ptrs[RING_SIZE]; + cudaIpcMemHandle_t slot_handles[RING_SIZE]; + for (int i = 0; i < RING_SIZE; ++i) { + CHECK_CUDA(cudaMalloc(&slot_ptrs[i], frame_bytes)); + CHECK_CUDA(cudaIpcGetMemHandle(&slot_handles[i], slot_ptrs[i])); + } + std::cout << "[producer] allocated " << RING_SIZE << " slots × " + << frame_bytes << " bytes\n"; + + // POSIX shared memory для descriptor'ов + std::string shm_path = "/dev/shm/cuframes-spike-" + args.key; + int fd = open(shm_path.c_str(), O_CREAT | O_RDWR, 0666); + if (fd < 0) { + perror("[producer] open shm"); + return 1; + } + if (ftruncate(fd, sizeof(SharedHeader)) < 0) { + perror("[producer] ftruncate shm"); + return 1; + } + auto* shared = static_cast( + mmap(nullptr, sizeof(SharedHeader), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0)); + if (shared == MAP_FAILED) { + perror("[producer] mmap shm"); + return 1; + } + + // Init shared header + std::memset(shared, 0, sizeof(SharedHeader)); + shared->magic = CUFRAMES_SPIKE_MAGIC; + shared->version = CUFRAMES_SPIKE_VERSION; + shared->meta = {args.width, args.height, pitch_y, pitch_uv, 0}; + for (int i = 0; i < RING_SIZE; ++i) { + shared->slots[i].handle = slot_handles[i]; + shared->slots[i].producer_seq = 0; + } + // memory fence перед публикацией magic'а (но __atomic_thread_fence нужен) + __atomic_thread_fence(__ATOMIC_RELEASE); + std::cout << "[producer] shm ready at " << shm_path << "\n"; + + cudaStream_t stream; + CHECK_CUDA(cudaStreamCreate(&stream)); + + const auto frame_interval = std::chrono::nanoseconds(1'000'000'000LL / args.fps); + auto next_frame = std::chrono::steady_clock::now(); + + dim3 block(16, 16); + dim3 grid((args.width + 15) / 16, (args.height + 15) / 16); + + uint64_t seq = 0; + const int64_t end_ns = args.duration_sec > 0 + ? now_ns() + args.duration_sec * 1'000'000'000LL + : 0; + + while (true) { + if (end_ns && now_ns() > end_ns) break; + + const int slot_idx = seq % RING_SIZE; + + // Fill Y-plane значением (seq % 256) — consumer проверит монотонность + fill_y_plane<<>>( + static_cast(slot_ptrs[slot_idx]), + args.width, args.height, pitch_y, + static_cast(seq % 256)); + + CHECK_CUDA(cudaStreamSynchronize(stream)); + + // Publish (атомарно обновляем seq на slot'е) + __atomic_store_n(&shared->slots[slot_idx].producer_seq, seq, __ATOMIC_RELEASE); + __atomic_store_n(&shared->slots[slot_idx].pts_ns, now_ns(), __ATOMIC_RELEASE); + __atomic_store_n(&shared->global_seq, seq, __ATOMIC_RELEASE); + + if (seq % 100 == 0) { + std::cout << "[producer] seq=" << seq << " slot=" << slot_idx << "\n"; + } + + seq++; + next_frame += frame_interval; + std::this_thread::sleep_until(next_frame); + } + + std::cout << "[producer] published " << seq << " frames, cleanup\n"; + + for (int i = 0; i < RING_SIZE; ++i) { + CHECK_CUDA(cudaFree(slot_ptrs[i])); + } + munmap(shared, sizeof(SharedHeader)); + close(fd); + unlink(shm_path.c_str()); + return 0; +}