Files
cuframes/tools/spike-v2/consumer.cu
T
gx ad543054fc spike-v2: validate sync semantics (R1/R2 architectural review)
Architectural review (2026-05-15) указал что cudaStreamSynchronize-only на
producer-side не достаточен для cross-process visibility — NVIDIA Programming
Guide §3.2.8 требует cudaIpcEventHandle_t. Phase 0 PoC v1 не проверял этот
случай из-за cudaMemcpy который имеет implicit barriers.

spike-v2 воспроизводит правильный сценарий: consumer запускает verify_kernel
на ОТДЕЛЬНОМ stream'е (real-world use case — PyTorch / OpenCV CUDA), pattern
включает row-based component для отлова partial-frame torn.

Запуск 4 scenarios × 1500/600 frames:
  A-fhd60 (stream sync, FHD@60):  0 torn, p99=267µs, max=14.7ms
  B-fhd60 (event  sync, FHD@60):  0 torn, p99=344µs, max=5.2ms
  A-4k30  (stream sync, 4K@30):   0 torn, p99=606µs, max=4.4ms
  B-4k30  (event  sync, 4K@30):   0 torn, p99=437µs, max=3.7ms

Все 4 показали 0 torn frames. R1 на single-host single-GPU фактически
не воспроизводится — но NVIDIA contractually не гарантирует это.

Decision: events as default (R1/R2 resolved). Architecture.md §6.6 закрыт.
Tradeoff: mean latency +20µs, max latency в 3× ниже (predictable tail) +
future-proof для multi-GPU.

Также Dockerfile.dev — апдейт CUDA до 13.0.3 (12.4 не существует с devel-ubuntu24.04).

Связано с PR review: R1, R2, R3 (R3, R4 — в следующих коммитах).
2026-05-14 23:00:13 +01:00

216 lines
8.2 KiB
Plaintext
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// spike-v2 consumer.
//
// КЛЮЧЕВОЕ отличие от v1: verify работает через CUDA kernel на ОТДЕЛЬНОМ
// (consumer'ском) stream'е, а не через cudaMemcpy. Это правильно эмулирует
// real-world consumer (PyTorch, OpenCV CUDA) и проверяет sync semantics.
//
// Scenario A (--sync=stream): должны быть torn frames на high-fps
// Scenario B (--sync=event): torn frames должны быть 0
#include "common.h"
#include <fcntl.h>
#include <sys/mman.h>
#include <unistd.h>
#include <algorithm>
#include <chrono>
#include <iostream>
#include <string>
#include <thread>
#include <vector>
using namespace cuframes_spike_v2;
// Verify kernel: проверяет что каждая строка имеет pattern(seq, row).
// Записывает кол-во несовпадающих байтов в out_bad_count.
__global__ void verify_pattern(const uint8_t* y, int width, int height,
int pitch_y, uint64_t expected_seq,
int* out_bad_count) {
int x = blockIdx.x * blockDim.x + threadIdx.x;
int row = blockIdx.y * blockDim.y + threadIdx.y;
if (x < width && row < height) {
uint8_t expect = pattern_value(expected_seq, row);
uint8_t actual = y[row * pitch_y + x];
if (actual != expect) {
atomicAdd(out_bad_count, 1);
}
}
}
struct Args {
std::string key = "A";
int count = 500;
};
static Args parse_args(int argc, char** argv) {
Args a;
for (int i = 1; i < argc; ++i) {
std::string arg = argv[i];
auto next = [&] { return std::string(argv[++i]); };
if (arg == "--key") a.key = next();
else if (arg == "--count") a.count = std::stoi(next());
}
return a;
}
int main(int argc, char** argv) {
Args args = parse_args(argc, argv);
std::cout << "[consumer] key=" << args.key << " count=" << args.count << "\n";
CHECK_CUDA(cudaSetDevice(0));
// Open SHM
std::string shm_path = "/dev/shm/cuframes-v2-" + args.key;
int fd = -1;
for (int retry = 0; retry < 50; ++retry) {
fd = open(shm_path.c_str(), O_RDWR);
if (fd >= 0) break;
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
if (fd < 0) {
std::cerr << "[consumer] no producer at " << shm_path << "\n";
return 1;
}
auto* shared = static_cast<SharedHeader*>(
mmap(nullptr, sizeof(SharedHeader), PROT_READ | PROT_WRITE,
MAP_SHARED, fd, 0));
if (shared == MAP_FAILED) { perror("[consumer] mmap"); return 1; }
close(fd);
if (shared->magic != MAGIC || shared->version != VERSION) {
std::cerr << "[consumer] protocol mismatch magic=" << std::hex
<< shared->magic << " ver=" << shared->version << "\n";
return 1;
}
bool use_events = shared->use_events != 0;
std::cout << "[consumer] connected, " << shared->meta.width
<< "x" << shared->meta.height
<< " sync=" << (use_events ? "event" : "stream") << "\n";
// Map producer's memory
void* slot_ptrs[RING_SIZE];
for (int i = 0; i < RING_SIZE; ++i) {
CHECK_CUDA(cudaIpcOpenMemHandle(&slot_ptrs[i],
shared->slots[i].mem_handle,
cudaIpcMemLazyEnablePeerAccess));
}
// Map producer's event если sync=event
cudaEvent_t producer_event = nullptr;
if (use_events) {
CHECK_CUDA(cudaIpcOpenEventHandle(&producer_event,
shared->event_handle));
std::cout << "[consumer] opened producer's cuda event\n";
}
// Consumer's own stream — это и есть ключевой момент.
cudaStream_t consumer_stream;
CHECK_CUDA(cudaStreamCreate(&consumer_stream));
// Counter в device memory для verify_pattern kernel
int* d_bad_count;
CHECK_CUDA(cudaMalloc(&d_bad_count, sizeof(int)));
const int width = shared->meta.width;
const int height = shared->meta.height;
const int pitch_y = shared->meta.pitch_y;
dim3 block(32, 8);
dim3 grid((width + block.x - 1) / block.x,
(height + block.y - 1) / block.y);
std::vector<int64_t> latencies_ns;
latencies_ns.reserve(args.count);
uint64_t last_seen = UINT64_MAX;
int frames_received = 0;
int torn_frames = 0;
int skipped_frames = 0;
auto t_start = std::chrono::steady_clock::now();
while (frames_received < args.count) {
uint64_t seq = __atomic_load_n(&shared->global_seq, __ATOMIC_ACQUIRE);
if (seq == last_seen ||
(last_seen == UINT64_MAX && seq == 0 &&
__atomic_load_n(&shared->slots[0].producer_seq, __ATOMIC_ACQUIRE) == 0)) {
std::this_thread::sleep_for(std::chrono::microseconds(50));
continue;
}
if (last_seen != UINT64_MAX && seq > last_seen + 1) {
skipped_frames += (seq - last_seen - 1);
}
last_seen = seq;
const int slot_idx = seq % RING_SIZE;
const int64_t pts_ns = __atomic_load_n(&shared->slots[slot_idx].pts_ns,
__ATOMIC_ACQUIRE);
// *** КЛЮЧЕВОЕ ***
// Если используем events — wait на producer's event перед kernel'ом.
// Если stream-only sync — НЕ ждём (consumer не знает producer's stream).
if (use_events) {
CHECK_CUDA(cudaStreamWaitEvent(consumer_stream, producer_event, 0));
}
// Reset bad_count + verify kernel на consumer_stream
CHECK_CUDA(cudaMemsetAsync(d_bad_count, 0, sizeof(int), consumer_stream));
verify_pattern<<<grid, block, 0, consumer_stream>>>(
static_cast<const uint8_t*>(slot_ptrs[slot_idx]),
width, height, pitch_y, seq, d_bad_count);
// Получить result (это синхронизирует consumer_stream)
int bad_count = 0;
CHECK_CUDA(cudaMemcpyAsync(&bad_count, d_bad_count, sizeof(int),
cudaMemcpyDeviceToHost, consumer_stream));
CHECK_CUDA(cudaStreamSynchronize(consumer_stream));
const int64_t recv_ns = now_ns();
const int64_t latency_ns = recv_ns - pts_ns;
if (bad_count > 0) {
torn_frames++;
// Записать в SHM чтобы producer тоже видел
__atomic_fetch_add(&shared->torn_frame_count, 1, __ATOMIC_RELEASE);
}
latencies_ns.push_back(latency_ns);
frames_received++;
}
auto t_end = std::chrono::steady_clock::now();
double duration_sec = std::chrono::duration<double>(t_end - t_start).count();
for (int i = 0; i < RING_SIZE; ++i) {
CHECK_CUDA(cudaIpcCloseMemHandle(slot_ptrs[i]));
}
if (producer_event) cudaEventDestroy(producer_event);
cudaFree(d_bad_count);
cudaStreamDestroy(consumer_stream);
munmap(shared, sizeof(SharedHeader));
std::sort(latencies_ns.begin(), latencies_ns.end());
auto pct = [&](double p) -> int64_t {
return latencies_ns[static_cast<size_t>(latencies_ns.size() * p)];
};
int64_t sum = 0;
for (auto v : latencies_ns) sum += v;
std::cout << "\n=== cuframes spike-v2 summary ===\n";
std::cout << "scenario: " << (use_events ? "B (event sync)" : "A (stream sync)") << "\n";
std::cout << "frames received: " << frames_received << "\n";
std::cout << "duration: " << duration_sec << " s\n";
std::cout << "effective fps: " << frames_received / duration_sec << "\n";
std::cout << "skipped (caught up): " << skipped_frames << "\n";
std::cout << "TORN FRAMES: " << torn_frames
<< " ← " << (torn_frames == 0 ? "✓ clean" : "✗ DATA RACE!") << "\n";
std::cout << "\nlatency consumer-receive-to-kernel-done (microseconds):\n";
std::cout << " mean: " << sum / 1000 / latencies_ns.size() << " us\n";
std::cout << " p50: " << pct(0.50) / 1000 << " us\n";
std::cout << " p95: " << pct(0.95) / 1000 << " us\n";
std::cout << " p99: " << pct(0.99) / 1000 << " us\n";
std::cout << " max: " << latencies_ns.back() / 1000 << " us\n";
return torn_frames == 0 ? 0 : 2;
}