spike(phase0): minimal CUDA IPC ping-pong producer/consumer
PoC для validation концепта перед инвестированием в Phase 1. Структура: - tools/spike/common.h — типы SharedHeader / SlotDescriptor / NV12 meta - tools/spike/pingpong_producer.cu — аллоцирует CUDA pool, экспортирует IPC handles в /dev/shm/cuframes-spike-<key>, имитирует publish frames с monotonic pattern - tools/spike/pingpong_consumer.cu — открывает handles, читает frames, verify содержимого (no torn frames), измеряет latency, печатает summary - tools/spike/CMakeLists.txt — sm_75/86/89/90/120 для RTX 5090 - tools/spike/bench.sh — basic / multi-consumer / stress scenarios - tools/spike/README.md — what / how / acceptance Намеренные упрощения PoC (не идём в Phase 1 пока без validation): - 2-slot ring (Phase 1 будет N) - POSIX shared memory + atomic seq (без Unix socket handshake) - cudaStreamSynchronize sync (Phase 0 spike проверит будет ли достаточно; альтернатива cudaIpcEventHandle_t — отложена) - NV12 hardcoded (других форматов в Phase 1) - Drop-oldest backpressure (без ACK protocol) Acceptance Phase 0: - p99 latency на RTX 5090 для FullHD < 5 ms - throughput ≥ 1 GB/s - multi-consumer (3) с сопоставимой latency - cross-container работает - 1-hour stress без VRAM/RAM leak Если acceptance fail → дизайн пересмотр (sync через CUDA IPC events).
This commit is contained in:
@@ -0,0 +1,27 @@
|
||||
# Phase 0 spike — минимальный CMake для двух binaries.
|
||||
# Цель: max простота, без зависимостей кроме CUDA toolkit.
|
||||
|
||||
cmake_minimum_required(VERSION 3.20)
|
||||
project(cuframes_spike LANGUAGES CXX CUDA)
|
||||
|
||||
set(CMAKE_CXX_STANDARD 17)
|
||||
set(CMAKE_CXX_STANDARD_REQUIRED ON)
|
||||
set(CMAKE_CUDA_STANDARD 17)
|
||||
|
||||
# Compute capability autodetect, fallback на 5.0+ (старые GPU) до 12.0
|
||||
# (Blackwell). RTX 5090 = sm_120 — наш target.
|
||||
if(NOT DEFINED CMAKE_CUDA_ARCHITECTURES)
|
||||
set(CMAKE_CUDA_ARCHITECTURES "75;86;89;90;120")
|
||||
endif()
|
||||
|
||||
add_executable(pingpong_producer pingpong_producer.cu)
|
||||
add_executable(pingpong_consumer pingpong_consumer.cu)
|
||||
|
||||
# Compile flags — debug-friendly для PoC stage
|
||||
foreach(target pingpong_producer pingpong_consumer)
|
||||
target_compile_options(${target} PRIVATE
|
||||
$<$<COMPILE_LANGUAGE:CXX>:-Wall -Wextra -O2 -g>
|
||||
$<$<COMPILE_LANGUAGE:CUDA>:-O2 -g -lineinfo>
|
||||
)
|
||||
target_link_libraries(${target} PRIVATE cuda)
|
||||
endforeach()
|
||||
@@ -0,0 +1,94 @@
|
||||
# Phase 0 — CUDA IPC ping-pong spike
|
||||
|
||||
PoC для измерения latency / throughput / multi-consumer behavior **без**
|
||||
FFmpeg / Frigate / сложности. Чистый CUDA IPC между двумя процессами.
|
||||
|
||||
## Что проверяет
|
||||
|
||||
1. **End-to-end latency**: producer → write CUDA frame → consumer reads → ACK.
|
||||
2. **Throughput**: сколько FullHD-frames/sec можно протолкнуть.
|
||||
3. **Multi-consumer**: 1 producer × N consumers (zero-copy для каждого).
|
||||
4. **Cross-container**: producer и consumer в разных Docker-контейнерах.
|
||||
5. **Crash recovery**: kill consumer / producer → reconnect.
|
||||
6. **Memory leak**: 24-hour run без VRAM-роста.
|
||||
|
||||
## Структура
|
||||
|
||||
```
|
||||
tools/spike/
|
||||
├── README.md (этот файл)
|
||||
├── CMakeLists.txt стандартный CMake target
|
||||
├── common.h shared structs/constants
|
||||
├── pingpong_producer.cu producer-процесс
|
||||
├── pingpong_consumer.cu consumer-процесс
|
||||
└── bench.sh скрипт-обвязка с измерениями
|
||||
```
|
||||
|
||||
## Сборка (внутри dev-контейнера)
|
||||
|
||||
```bash
|
||||
docker compose -f docker/docker-compose.dev.yml exec dev bash
|
||||
# Внутри:
|
||||
cd /workspace
|
||||
cmake -B build -S tools/spike -G Ninja
|
||||
cmake --build build
|
||||
```
|
||||
|
||||
Артефакты: `build/pingpong_producer`, `build/pingpong_consumer`.
|
||||
|
||||
## Запуск (single-host single-container, базовый случай)
|
||||
|
||||
Терминал 1 (producer — пишет FullHD-frames пока не нажмёшь Ctrl-C):
|
||||
```bash
|
||||
docker compose -f docker/docker-compose.dev.yml exec dev \
|
||||
./build/pingpong_producer --key cam_test --width 1920 --height 1080 --fps 30
|
||||
```
|
||||
|
||||
Терминал 2 (consumer — читает и логирует latency):
|
||||
```bash
|
||||
docker compose -f docker/docker-compose.dev.yml exec dev \
|
||||
./build/pingpong_consumer --key cam_test --count 1000
|
||||
```
|
||||
|
||||
После 1000 кадров consumer печатает summary:
|
||||
|
||||
```
|
||||
=== cuframes spike summary ===
|
||||
frames received: 1000
|
||||
duration: 33.34 s
|
||||
fps: 30.00
|
||||
latency mean: 0.42 ms
|
||||
latency p50: 0.39 ms
|
||||
latency p99: 0.91 ms
|
||||
latency max: 3.12 ms
|
||||
producer→consumer: zero-copy ✓
|
||||
```
|
||||
|
||||
## Запуск (cross-container — Phase 0 critical test)
|
||||
|
||||
См. `bench.sh` — поднимает producer в одном контейнере, consumer в другом,
|
||||
оба используют `ipc: shareable` namespace.
|
||||
|
||||
## Acceptance criteria для Phase 0
|
||||
|
||||
- [ ] Базовый ping-pong работает (1×1)
|
||||
- [ ] Multi-consumer (3 consumers, latency сопоставимая)
|
||||
- [ ] Cross-container (producer container A → consumer container B)
|
||||
- [ ] 1-hour stress без VRAM/RAM leak
|
||||
- [ ] Замерено: p99 latency на RTX 5090 для FullHD-frame
|
||||
- [ ] Документ `docs/benchmarks-phase0.md` с numbers
|
||||
|
||||
Если latency p99 > 5 ms или throughput < 1 GB/s → **остановиться** и
|
||||
переосмыслить дизайн (возможно CUDA IPC sync через `cudaIpcEventHandle_t`
|
||||
вместо `cudaStreamSynchronize`).
|
||||
|
||||
## Что **не** делает spike
|
||||
|
||||
- Не использует FFmpeg (это Phase 2)
|
||||
- Не имеет ring buffer (это Phase 1; spike использует двойной буфер)
|
||||
- Не использует Unix sockets для handshake (handle передаётся через
|
||||
файл `/tmp/cuframes-spike-<key>.handle`)
|
||||
- Не делает proper backpressure (drop-oldest hardcoded)
|
||||
|
||||
Spike нарочно **минималистичен** — единственная цель замерить numbers
|
||||
и провалидировать concept перед инвестированием в Phase 1.
|
||||
Executable
+64
@@ -0,0 +1,64 @@
|
||||
#!/bin/bash
|
||||
# Phase 0 spike benchmark script. Запускать **внутри** dev-контейнера.
|
||||
#
|
||||
# Usage: ./tools/spike/bench.sh [SCENARIO]
|
||||
# Scenarios: basic | multi-consumer | cross-container | stress
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
cd "$(dirname "$0")/../.." # → корень репо
|
||||
|
||||
if [ ! -x build/pingpong_producer ]; then
|
||||
echo "==> build first (cmake -B build -S tools/spike -G Ninja && cmake --build build)"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
SCENARIO="${1:-basic}"
|
||||
|
||||
case "$SCENARIO" in
|
||||
basic)
|
||||
echo "=== Scenario: basic (1 producer × 1 consumer, FullHD@30fps, 1000 frames) ==="
|
||||
./build/pingpong_producer --key cam_test --fps 30 &
|
||||
PROD_PID=$!
|
||||
sleep 1
|
||||
./build/pingpong_consumer --key cam_test --count 1000
|
||||
kill $PROD_PID 2>/dev/null || true
|
||||
wait $PROD_PID 2>/dev/null || true
|
||||
;;
|
||||
|
||||
multi-consumer)
|
||||
echo "=== Scenario: 1 producer × 3 consumers ==="
|
||||
./build/pingpong_producer --key cam_test --fps 30 &
|
||||
PROD_PID=$!
|
||||
sleep 1
|
||||
./build/pingpong_consumer --key cam_test --count 500 > /tmp/c1.log 2>&1 &
|
||||
./build/pingpong_consumer --key cam_test --count 500 > /tmp/c2.log 2>&1 &
|
||||
./build/pingpong_consumer --key cam_test --count 500
|
||||
wait
|
||||
kill $PROD_PID 2>/dev/null || true
|
||||
echo "--- consumer 1 summary ---"; tail -20 /tmp/c1.log
|
||||
echo "--- consumer 2 summary ---"; tail -20 /tmp/c2.log
|
||||
;;
|
||||
|
||||
stress)
|
||||
echo "=== Scenario: 1-hour stress (FullHD@30fps, single consumer) ==="
|
||||
./build/pingpong_producer --key cam_test --fps 30 --duration 3600 &
|
||||
PROD_PID=$!
|
||||
sleep 1
|
||||
# Read forever (consumer завершится когда producer перестанет писать)
|
||||
./build/pingpong_consumer --key cam_test --count 108000 -v
|
||||
wait $PROD_PID 2>/dev/null || true
|
||||
;;
|
||||
|
||||
cross-container)
|
||||
echo "=== Scenario: cross-container ==="
|
||||
echo "TODO: запустить второй контейнер с ipc:container:cuframes-dev"
|
||||
echo " это в Phase 0 manual — задокументировать процесс в docs/"
|
||||
;;
|
||||
|
||||
*)
|
||||
echo "Unknown scenario: $SCENARIO"
|
||||
echo "Available: basic | multi-consumer | stress | cross-container"
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
@@ -0,0 +1,68 @@
|
||||
// Phase 0 spike — общие типы / константы для producer и consumer.
|
||||
//
|
||||
// NB: реальный protocol будет в libcuframes (Phase 1). Здесь — минимум
|
||||
// для PoC measurement.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <cuda_runtime.h>
|
||||
#include <cstdint>
|
||||
#include <cstdio>
|
||||
#include <cstring>
|
||||
|
||||
namespace cuframes_spike {
|
||||
|
||||
// Slot — 1 frame в shared VRAM. Producer пишет, consumer читает.
|
||||
// Используем double-buffering (2 slot'а) — пока consumer читает slot A,
|
||||
// producer пишет в slot B. Поможет уменьшить contention в PoC.
|
||||
constexpr int RING_SIZE = 2;
|
||||
|
||||
// Format hardcoded для PoC: NV12 (как выдаёт NVDEC по умолчанию).
|
||||
// Y plane + UV plane (subsampled 2x2).
|
||||
struct FrameMeta {
|
||||
int32_t width;
|
||||
int32_t height;
|
||||
int32_t pitch_y; // строка Y, выровнен на 256 для CUDA
|
||||
int32_t pitch_uv; // строка UV
|
||||
int32_t format; // 0 = NV12
|
||||
};
|
||||
|
||||
// Descriptor одного slot'а. Хранится в /tmp/cuframes-spike-<key>.handle
|
||||
// — это POSIX shared memory.
|
||||
struct SlotDescriptor {
|
||||
cudaIpcMemHandle_t handle; // 64 байта от NVIDIA для IPC
|
||||
uint64_t producer_seq; // обновляется атомарно при publish
|
||||
uint64_t consumer_ack_count; // сколько consumers подтвердили
|
||||
int64_t pts_ns; // timestamp когда producer публикнул
|
||||
};
|
||||
|
||||
// Header shared memory file.
|
||||
struct SharedHeader {
|
||||
uint32_t magic; // 0xCC7C1DCC ("CUFRAMES C0")
|
||||
uint32_t version;
|
||||
FrameMeta meta;
|
||||
SlotDescriptor slots[RING_SIZE];
|
||||
uint64_t global_seq; // монотонная последовательность producer
|
||||
};
|
||||
|
||||
constexpr uint32_t CUFRAMES_SPIKE_MAGIC = 0xCC7C1DCCu;
|
||||
constexpr uint32_t CUFRAMES_SPIKE_VERSION = 1;
|
||||
|
||||
// Helper: error check + abort.
|
||||
#define CHECK_CUDA(call) do { \
|
||||
cudaError_t _err = (call); \
|
||||
if (_err != cudaSuccess) { \
|
||||
fprintf(stderr, "CUDA error at %s:%d: %s\n", \
|
||||
__FILE__, __LINE__, cudaGetErrorString(_err)); \
|
||||
std::exit(1); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
// Получить timestamp в наносекундах (для latency-замеров).
|
||||
static inline int64_t now_ns() {
|
||||
timespec ts;
|
||||
clock_gettime(CLOCK_MONOTONIC, &ts);
|
||||
return static_cast<int64_t>(ts.tv_sec) * 1000000000LL + ts.tv_nsec;
|
||||
}
|
||||
|
||||
} // namespace cuframes_spike
|
||||
@@ -0,0 +1,191 @@
|
||||
// Phase 0 spike consumer.
|
||||
//
|
||||
// Open IPC handle от producer, читает frames, проверяет монотонность,
|
||||
// измеряет latency и печатает summary.
|
||||
//
|
||||
// Run:
|
||||
// ./pingpong_consumer --key cam_test --count 1000
|
||||
|
||||
#include "common.h"
|
||||
|
||||
#include <cuda_runtime.h>
|
||||
#include <fcntl.h>
|
||||
#include <sys/mman.h>
|
||||
#include <sys/stat.h>
|
||||
#include <unistd.h>
|
||||
#include <algorithm>
|
||||
#include <chrono>
|
||||
#include <cstdlib>
|
||||
#include <cstring>
|
||||
#include <iostream>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
|
||||
using namespace cuframes_spike;
|
||||
|
||||
struct Args {
|
||||
std::string key = "cam_test";
|
||||
int count = 1000; // сколько frames прочитать перед exit
|
||||
int verbose = 0;
|
||||
};
|
||||
|
||||
static Args parse_args(int argc, char** argv) {
|
||||
Args a;
|
||||
for (int i = 1; i < argc; ++i) {
|
||||
std::string arg = argv[i];
|
||||
auto next = [&] { return std::string(argv[++i]); };
|
||||
if (arg == "--key") a.key = next();
|
||||
else if (arg == "--count") a.count = std::stoi(next());
|
||||
else if (arg == "--verbose" || arg == "-v") a.verbose = 1;
|
||||
else if (arg == "--help" || arg == "-h") {
|
||||
std::cout
|
||||
<< "Usage: pingpong_consumer [--key NAME] [--count N] [-v]\n";
|
||||
std::exit(0);
|
||||
}
|
||||
}
|
||||
return a;
|
||||
}
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
Args args = parse_args(argc, argv);
|
||||
std::cout << "[consumer] key=" << args.key
|
||||
<< " count=" << args.count << "\n";
|
||||
|
||||
int dev = 0;
|
||||
CHECK_CUDA(cudaSetDevice(dev));
|
||||
|
||||
// Открыть shm от producer
|
||||
std::string shm_path = "/dev/shm/cuframes-spike-" + args.key;
|
||||
int fd = -1;
|
||||
for (int retry = 0; retry < 50; ++retry) {
|
||||
fd = open(shm_path.c_str(), O_RDWR);
|
||||
if (fd >= 0) break;
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
}
|
||||
if (fd < 0) {
|
||||
std::cerr << "[consumer] producer not running (no " << shm_path << ")\n";
|
||||
return 1;
|
||||
}
|
||||
auto* shared = static_cast<SharedHeader*>(
|
||||
mmap(nullptr, sizeof(SharedHeader), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0));
|
||||
if (shared == MAP_FAILED) {
|
||||
perror("[consumer] mmap");
|
||||
return 1;
|
||||
}
|
||||
close(fd);
|
||||
|
||||
// Verify magic + version
|
||||
if (shared->magic != CUFRAMES_SPIKE_MAGIC) {
|
||||
std::cerr << "[consumer] bad magic 0x" << std::hex << shared->magic << "\n";
|
||||
return 1;
|
||||
}
|
||||
if (shared->version != CUFRAMES_SPIKE_VERSION) {
|
||||
std::cerr << "[consumer] version mismatch " << shared->version << "\n";
|
||||
return 1;
|
||||
}
|
||||
std::cout << "[consumer] connected, frame " << shared->meta.width
|
||||
<< "x" << shared->meta.height << "\n";
|
||||
|
||||
// Open IPC handles → получить CUDA pointer'ы
|
||||
void* slot_ptrs[RING_SIZE];
|
||||
for (int i = 0; i < RING_SIZE; ++i) {
|
||||
CHECK_CUDA(cudaIpcOpenMemHandle(&slot_ptrs[i],
|
||||
shared->slots[i].handle,
|
||||
cudaIpcMemLazyEnablePeerAccess));
|
||||
}
|
||||
std::cout << "[consumer] mapped " << RING_SIZE << " slots into VRAM\n";
|
||||
|
||||
const int pitch_y = shared->meta.pitch_y;
|
||||
const int height = shared->meta.height;
|
||||
const size_t y_bytes = static_cast<size_t>(pitch_y) * height;
|
||||
|
||||
// Host-side buffer для одной строки Y (для verify монотонности)
|
||||
std::vector<uint8_t> host_row(pitch_y);
|
||||
|
||||
// Latency histogram
|
||||
std::vector<int64_t> latencies_ns;
|
||||
latencies_ns.reserve(args.count);
|
||||
|
||||
uint64_t last_seen = UINT64_MAX;
|
||||
auto t_start = std::chrono::steady_clock::now();
|
||||
int frames_received = 0;
|
||||
int torn_frames = 0;
|
||||
int skipped_frames = 0;
|
||||
|
||||
while (frames_received < args.count) {
|
||||
// Poll global_seq на изменение
|
||||
uint64_t seq = __atomic_load_n(&shared->global_seq, __ATOMIC_ACQUIRE);
|
||||
if (seq == last_seen || (last_seen == UINT64_MAX && seq == 0 &&
|
||||
shared->slots[0].producer_seq == 0)) {
|
||||
// нет нового; busy-wait минимально
|
||||
std::this_thread::sleep_for(std::chrono::microseconds(100));
|
||||
continue;
|
||||
}
|
||||
if (last_seen != UINT64_MAX && seq > last_seen + 1) {
|
||||
skipped_frames += (seq - last_seen - 1);
|
||||
}
|
||||
last_seen = seq;
|
||||
|
||||
const int slot_idx = seq % RING_SIZE;
|
||||
const int64_t pts_ns = __atomic_load_n(&shared->slots[slot_idx].pts_ns,
|
||||
__ATOMIC_ACQUIRE);
|
||||
const int64_t recv_ns = now_ns();
|
||||
const int64_t latency_ns = recv_ns - pts_ns;
|
||||
|
||||
// Verify содержимое — Y-plane все байты должны быть (seq % 256)
|
||||
// (читаем первую строку — 1920 байт хватит для проверки)
|
||||
CHECK_CUDA(cudaMemcpy(host_row.data(), slot_ptrs[slot_idx],
|
||||
pitch_y, cudaMemcpyDeviceToHost));
|
||||
uint8_t expected = static_cast<uint8_t>(seq % 256);
|
||||
bool ok = true;
|
||||
for (int i = 0; i < pitch_y; ++i) {
|
||||
if (host_row[i] != expected) { ok = false; break; }
|
||||
}
|
||||
if (!ok) torn_frames++;
|
||||
|
||||
latencies_ns.push_back(latency_ns);
|
||||
frames_received++;
|
||||
|
||||
if (args.verbose && frames_received % 100 == 0) {
|
||||
std::cout << "[consumer] received " << frames_received
|
||||
<< " seq=" << seq
|
||||
<< " latency=" << latency_ns / 1000 << "us\n";
|
||||
}
|
||||
}
|
||||
|
||||
auto t_end = std::chrono::steady_clock::now();
|
||||
double duration_sec =
|
||||
std::chrono::duration<double>(t_end - t_start).count();
|
||||
|
||||
// Cleanup
|
||||
for (int i = 0; i < RING_SIZE; ++i) {
|
||||
CHECK_CUDA(cudaIpcCloseMemHandle(slot_ptrs[i]));
|
||||
}
|
||||
munmap(shared, sizeof(SharedHeader));
|
||||
|
||||
// Summary
|
||||
std::sort(latencies_ns.begin(), latencies_ns.end());
|
||||
auto pct = [&](double p) -> int64_t {
|
||||
return latencies_ns[static_cast<size_t>(latencies_ns.size() * p)];
|
||||
};
|
||||
int64_t sum = 0;
|
||||
for (auto v : latencies_ns) sum += v;
|
||||
int64_t mean = sum / latencies_ns.size();
|
||||
|
||||
std::cout << "\n=== cuframes spike summary ===\n";
|
||||
std::cout << "frames received: " << frames_received << "\n";
|
||||
std::cout << "duration: " << duration_sec << " s\n";
|
||||
std::cout << "effective fps: " << frames_received / duration_sec << "\n";
|
||||
std::cout << "frames skipped (producer ahead): " << skipped_frames << "\n";
|
||||
std::cout << "torn frames (data corrupt): " << torn_frames << "\n";
|
||||
std::cout << "\nlatency producer→consumer (microseconds):\n";
|
||||
std::cout << " mean: " << mean / 1000 << " us\n";
|
||||
std::cout << " p50: " << pct(0.50) / 1000 << " us\n";
|
||||
std::cout << " p95: " << pct(0.95) / 1000 << " us\n";
|
||||
std::cout << " p99: " << pct(0.99) / 1000 << " us\n";
|
||||
std::cout << " max: " << latencies_ns.back() / 1000 << " us\n";
|
||||
std::cout << "\nzero-copy: " << (torn_frames == 0 ? "✓" : "✗ (torn frames!)") << "\n";
|
||||
|
||||
return torn_frames == 0 ? 0 : 1;
|
||||
}
|
||||
@@ -0,0 +1,180 @@
|
||||
// Phase 0 spike producer.
|
||||
//
|
||||
// Аллоцирует CUDA-память (NV12 FullHD), экспортирует IPC-handle в
|
||||
// /tmp/cuframes-spike-<key>.shm, имитирует decoded frames заполняя их
|
||||
// pattern'ом (значение растёт с каждым frame'ом — consumer проверяет
|
||||
// что значения монотонные = нет re-ordering / torn frames).
|
||||
//
|
||||
// Run:
|
||||
// ./pingpong_producer --key cam_test --width 1920 --height 1080 --fps 30
|
||||
|
||||
#include "common.h"
|
||||
|
||||
#include <cuda_runtime.h>
|
||||
#include <fcntl.h>
|
||||
#include <sys/mman.h>
|
||||
#include <sys/stat.h>
|
||||
#include <unistd.h>
|
||||
#include <atomic>
|
||||
#include <chrono>
|
||||
#include <cstdlib>
|
||||
#include <cstring>
|
||||
#include <iostream>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
|
||||
using namespace cuframes_spike;
|
||||
|
||||
// CUDA kernel — заполняет Y-plane значением `value`.
|
||||
// Это минимум чтобы touched memory; реальный producer получает frame от NVDEC.
|
||||
__global__ void fill_y_plane(uint8_t* y, int width, int height, int pitch_y,
|
||||
uint8_t value) {
|
||||
int x = blockIdx.x * blockDim.x + threadIdx.x;
|
||||
int y_coord = blockIdx.y * blockDim.y + threadIdx.y;
|
||||
if (x < width && y_coord < height) {
|
||||
y[y_coord * pitch_y + x] = value;
|
||||
}
|
||||
}
|
||||
|
||||
struct Args {
|
||||
std::string key = "cam_test";
|
||||
int width = 1920;
|
||||
int height = 1080;
|
||||
int fps = 30;
|
||||
int duration_sec = 0; // 0 = forever
|
||||
};
|
||||
|
||||
static Args parse_args(int argc, char** argv) {
|
||||
Args a;
|
||||
for (int i = 1; i < argc; ++i) {
|
||||
std::string arg = argv[i];
|
||||
auto next = [&] { return std::string(argv[++i]); };
|
||||
if (arg == "--key") a.key = next();
|
||||
else if (arg == "--width") a.width = std::stoi(next());
|
||||
else if (arg == "--height") a.height = std::stoi(next());
|
||||
else if (arg == "--fps") a.fps = std::stoi(next());
|
||||
else if (arg == "--duration") a.duration_sec = std::stoi(next());
|
||||
else if (arg == "--help" || arg == "-h") {
|
||||
std::cout
|
||||
<< "Usage: pingpong_producer [--key NAME] [--width W] [--height H] "
|
||||
"[--fps N] [--duration SEC]\n";
|
||||
std::exit(0);
|
||||
}
|
||||
}
|
||||
return a;
|
||||
}
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
Args args = parse_args(argc, argv);
|
||||
std::cout << "[producer] key=" << args.key
|
||||
<< " " << args.width << "x" << args.height
|
||||
<< " @ " << args.fps << " fps\n";
|
||||
|
||||
// CUDA init
|
||||
int dev = 0;
|
||||
CHECK_CUDA(cudaSetDevice(dev));
|
||||
|
||||
// NV12: Y plane (W×H bytes) + UV plane (W × H/2 bytes), aligned по pitch.
|
||||
// Для простоты — без CUDA alignment, используем width прямо как pitch.
|
||||
const int pitch_y = args.width;
|
||||
const int pitch_uv = args.width;
|
||||
const size_t y_bytes = static_cast<size_t>(pitch_y) * args.height;
|
||||
const size_t uv_bytes = static_cast<size_t>(pitch_uv) * (args.height / 2);
|
||||
const size_t frame_bytes = y_bytes + uv_bytes;
|
||||
|
||||
std::cout << "[producer] frame_bytes=" << frame_bytes
|
||||
<< " (Y=" << y_bytes << " UV=" << uv_bytes << ")\n";
|
||||
|
||||
// Аллоцируем RING_SIZE буферов в VRAM (один cudaMalloc на весь пул;
|
||||
// отдельные cudaIpcGetMemHandle для каждого).
|
||||
void* slot_ptrs[RING_SIZE];
|
||||
cudaIpcMemHandle_t slot_handles[RING_SIZE];
|
||||
for (int i = 0; i < RING_SIZE; ++i) {
|
||||
CHECK_CUDA(cudaMalloc(&slot_ptrs[i], frame_bytes));
|
||||
CHECK_CUDA(cudaIpcGetMemHandle(&slot_handles[i], slot_ptrs[i]));
|
||||
}
|
||||
std::cout << "[producer] allocated " << RING_SIZE << " slots × "
|
||||
<< frame_bytes << " bytes\n";
|
||||
|
||||
// POSIX shared memory для descriptor'ов
|
||||
std::string shm_path = "/dev/shm/cuframes-spike-" + args.key;
|
||||
int fd = open(shm_path.c_str(), O_CREAT | O_RDWR, 0666);
|
||||
if (fd < 0) {
|
||||
perror("[producer] open shm");
|
||||
return 1;
|
||||
}
|
||||
if (ftruncate(fd, sizeof(SharedHeader)) < 0) {
|
||||
perror("[producer] ftruncate shm");
|
||||
return 1;
|
||||
}
|
||||
auto* shared = static_cast<SharedHeader*>(
|
||||
mmap(nullptr, sizeof(SharedHeader), PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0));
|
||||
if (shared == MAP_FAILED) {
|
||||
perror("[producer] mmap shm");
|
||||
return 1;
|
||||
}
|
||||
|
||||
// Init shared header
|
||||
std::memset(shared, 0, sizeof(SharedHeader));
|
||||
shared->magic = CUFRAMES_SPIKE_MAGIC;
|
||||
shared->version = CUFRAMES_SPIKE_VERSION;
|
||||
shared->meta = {args.width, args.height, pitch_y, pitch_uv, 0};
|
||||
for (int i = 0; i < RING_SIZE; ++i) {
|
||||
shared->slots[i].handle = slot_handles[i];
|
||||
shared->slots[i].producer_seq = 0;
|
||||
}
|
||||
// memory fence перед публикацией magic'а (но __atomic_thread_fence нужен)
|
||||
__atomic_thread_fence(__ATOMIC_RELEASE);
|
||||
std::cout << "[producer] shm ready at " << shm_path << "\n";
|
||||
|
||||
cudaStream_t stream;
|
||||
CHECK_CUDA(cudaStreamCreate(&stream));
|
||||
|
||||
const auto frame_interval = std::chrono::nanoseconds(1'000'000'000LL / args.fps);
|
||||
auto next_frame = std::chrono::steady_clock::now();
|
||||
|
||||
dim3 block(16, 16);
|
||||
dim3 grid((args.width + 15) / 16, (args.height + 15) / 16);
|
||||
|
||||
uint64_t seq = 0;
|
||||
const int64_t end_ns = args.duration_sec > 0
|
||||
? now_ns() + args.duration_sec * 1'000'000'000LL
|
||||
: 0;
|
||||
|
||||
while (true) {
|
||||
if (end_ns && now_ns() > end_ns) break;
|
||||
|
||||
const int slot_idx = seq % RING_SIZE;
|
||||
|
||||
// Fill Y-plane значением (seq % 256) — consumer проверит монотонность
|
||||
fill_y_plane<<<grid, block, 0, stream>>>(
|
||||
static_cast<uint8_t*>(slot_ptrs[slot_idx]),
|
||||
args.width, args.height, pitch_y,
|
||||
static_cast<uint8_t>(seq % 256));
|
||||
|
||||
CHECK_CUDA(cudaStreamSynchronize(stream));
|
||||
|
||||
// Publish (атомарно обновляем seq на slot'е)
|
||||
__atomic_store_n(&shared->slots[slot_idx].producer_seq, seq, __ATOMIC_RELEASE);
|
||||
__atomic_store_n(&shared->slots[slot_idx].pts_ns, now_ns(), __ATOMIC_RELEASE);
|
||||
__atomic_store_n(&shared->global_seq, seq, __ATOMIC_RELEASE);
|
||||
|
||||
if (seq % 100 == 0) {
|
||||
std::cout << "[producer] seq=" << seq << " slot=" << slot_idx << "\n";
|
||||
}
|
||||
|
||||
seq++;
|
||||
next_frame += frame_interval;
|
||||
std::this_thread::sleep_until(next_frame);
|
||||
}
|
||||
|
||||
std::cout << "[producer] published " << seq << " frames, cleanup\n";
|
||||
|
||||
for (int i = 0; i < RING_SIZE; ++i) {
|
||||
CHECK_CUDA(cudaFree(slot_ptrs[i]));
|
||||
}
|
||||
munmap(shared, sizeof(SharedHeader));
|
||||
close(fd);
|
||||
unlink(shm_path.c_str());
|
||||
return 0;
|
||||
}
|
||||
Reference in New Issue
Block a user