Files
gx 46c2b94939 libcuframes v0.1: producer + consumer (sync + async) + tests
Implements Steps 3-6 of Phase 1 according to docs/protocol.md.

libcuframes/src/:
- internal.h     (660 lines) — shared structs (byte-exact protocol.md layout)
                                + _Static_assert на offsets/sizes
- utils.c        — error strings, frame size calc, now_ns, key validation
- protocol.c     — TLV framing для Unix socket с poll-based timeout
- producer.c     (~700 lines) — Step 3:
                    * LIBRARY mode: cudaMalloc pool, IPC handle export
                    * EXTERNAL mode: register user-provided pointers
                    * cudaIpcEventHandle_t для cross-process sync (R1/R2)
                    * Unix socket accept thread, handshake state machine
                    * Bit allocation 1..31, name collision check (Y5)
                    * STRICT_WAIT policy: timeout with dead-subscriber eviction
- consumer.c     (~400 lines) — Step 4:
                    * Synchronous next() with poll-based wait
                    * cudaStreamWaitEvent на consumer-stream (R1/R2)
                    * Opaque cuframes_frame_t с accessor functions (Y6)
                    * NEWEST_ONLY и STRICT_ORDER modes
                    * ACK via atomic_fetch_or на bitmap
- consumer_async.c — Step 5: thread + callback wrapper над sync API

libcuframes/tests/:
- test_pingpong.cu — single producer × single consumer, 200 frames @ 60fps,
                     verify через kernel-on-consumer-stream (правильный test
                     для sync semantics, см. spike-v2)
- test_multi.cu    — 1 producer × 3 consumers через fork()

Build:
- Top-level CMakeLists.txt с options
- libcuframes/CMakeLists.txt: shared + static library, c_std_11
- Suppress -Waddress-of-packed-member (известная безопасная warning x86_64)

Results (внутри cuframes-dev container, RTX 5090):
- pingpong_basic PASS  4.5s  200 frames, 0 torn
- multi_consumer PASS  4.1s  1 × 3 consumers, all PASS

Phase 1 Step 6 done. Дальше: Step 7 (C++ wrapper), Step 9 (FFmpeg filter).
2026-05-14 23:21:30 +01:00

143 lines
4.8 KiB
Plaintext
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/* Multi-consumer test: 1 producer × 3 consumers. */
#include <cuframes/cuframes.h>
#include <cuda_runtime.h>
#include <sys/wait.h>
#include <unistd.h>
#include <atomic>
#include <chrono>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <thread>
#define CHECK(call) do { int _r = (call); if (_r != 0) { \
fprintf(stderr, "FAIL %s:%d: %d (%s)\n", __FILE__, __LINE__, _r, cuframes_strerror(_r)); std::exit(2); } } while(0)
#define CHECK_CUDA(call) do { cudaError_t _e = (call); if (_e != cudaSuccess) { \
fprintf(stderr, "CUDA FAIL %s:%d: %s\n", __FILE__, __LINE__, cudaGetErrorString(_e)); std::exit(2); } } while(0)
static const char *KEY = "test_multi";
static const int W = 320, H = 240;
static const int N = 150;
__host__ __device__ inline uint8_t pat(uint64_t seq, int row) {
return static_cast<uint8_t>((seq * 31u + row * 7u) & 0xFF);
}
__global__ void fill_y(uint8_t *y, int w, int h, int py, uint64_t seq) {
int x = blockIdx.x * blockDim.x + threadIdx.x;
int r = blockIdx.y * blockDim.y + threadIdx.y;
if (x < w && r < h) y[r * py + x] = pat(seq, r);
}
__global__ void verify_y(const uint8_t *y, int w, int h, int py, uint64_t seq, int *bad) {
int x = blockIdx.x * blockDim.x + threadIdx.x;
int r = blockIdx.y * blockDim.y + threadIdx.y;
if (x < w && r < h) if (y[r * py + x] != pat(seq, r)) atomicAdd(bad, 1);
}
int run_consumer(const char *name) {
cuframes_subscriber_config_t cfg = {};
cfg.key = KEY;
cfg.consumer_name = name;
cfg.mode = CUFRAMES_MODE_NEWEST_ONLY;
cfg.connect_timeout_ms = 5000;
cuframes_subscriber_t *sub = NULL;
CHECK(cuframes_subscriber_create(&cfg, &sub));
cudaStream_t s;
CHECK_CUDA(cudaStreamCreate(&s));
int *d_bad;
CHECK_CUDA(cudaMalloc(&d_bad, sizeof(int)));
dim3 b(32, 8);
dim3 g((W + b.x - 1) / b.x, (H + b.y - 1) / b.y);
int recv = 0, torn = 0;
while (1) {
cuframes_frame_t *f = NULL;
int r = cuframes_subscriber_next(sub, s, &f, 2000);
if (r == CUFRAMES_ERR_TIMEOUT || r == CUFRAMES_ERR_DISCONNECTED) break;
if (r != 0) { fprintf(stderr, "[%s] next: %s\n", name, cuframes_strerror(r)); std::exit(2); }
CHECK_CUDA(cudaMemsetAsync(d_bad, 0, sizeof(int), s));
verify_y<<<g, b, 0, s>>>((const uint8_t *)cuframes_frame_cuda_ptr(f),
W, H, cuframes_frame_pitch_y(f),
cuframes_frame_seq(f), d_bad);
int bad = 0;
CHECK_CUDA(cudaMemcpyAsync(&bad, d_bad, sizeof(int), cudaMemcpyDeviceToHost, s));
CHECK_CUDA(cudaStreamSynchronize(s));
if (bad > 0) torn++;
recv++;
CHECK(cuframes_subscriber_release(sub, f));
}
fprintf(stderr, "[%s] received=%d torn=%d\n", name, recv, torn);
cudaFree(d_bad);
cudaStreamDestroy(s);
cuframes_subscriber_destroy(sub);
return (recv >= N / 3 && torn == 0) ? 0 : 1;
}
int run_producer() {
cuframes_publisher_config_t cfg = {};
cfg.key = KEY;
cfg.width = W;
cfg.height = H;
cfg.format = CUFRAMES_FORMAT_NV12;
cfg.ownership = CUFRAMES_OWNERSHIP_LIBRARY;
cfg.ring_size = 4;
cfg.policy = CUFRAMES_POLICY_DROP_OLDEST;
cuframes_publisher_t *pub = NULL;
CHECK(cuframes_publisher_create(&cfg, &pub));
int32_t pitch_y = 0;
CHECK(cuframes_calc_frame_size(CUFRAMES_FORMAT_NV12, W, H, NULL, &pitch_y, NULL));
cudaStream_t s;
CHECK_CUDA(cudaStreamCreate(&s));
dim3 b(32, 8);
dim3 g((W + b.x - 1) / b.x, (H + b.y - 1) / b.y);
std::this_thread::sleep_for(std::chrono::milliseconds(800));
auto iv = std::chrono::nanoseconds(1000000000LL / 60);
auto t = std::chrono::steady_clock::now();
for (int i = 0; i < N; ++i) {
void *p = NULL;
CHECK(cuframes_publisher_acquire(pub, &p));
fill_y<<<g, b, 0, s>>>((uint8_t *)p, W, H, pitch_y, (uint64_t)i);
CHECK(cuframes_publisher_publish(pub, s, cuframes_now_ns()));
t += iv;
std::this_thread::sleep_until(t);
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
cuframes_publisher_destroy(pub);
cudaStreamDestroy(s);
return 0;
}
int main() {
char shm[80]; snprintf(shm, 80, "/dev/shm/cuframes-%s", KEY); unlink(shm);
char sock[128]; snprintf(sock, 128, "/run/cuframes/%s.sock", KEY); unlink(sock);
pid_t pids[3];
const char *names[3] = {"c1", "c2", "c3"};
for (int i = 0; i < 3; ++i) {
pids[i] = fork();
if (pids[i] == 0) return run_consumer(names[i]);
}
int prod_r = run_producer();
int fail = (prod_r != 0);
for (int i = 0; i < 3; ++i) {
int st = 0;
waitpid(pids[i], &st, 0);
if (!WIFEXITED(st) || WEXITSTATUS(st) != 0) fail = 1;
}
if (fail) { fprintf(stderr, "test_multi FAIL\n"); return 1; }
fprintf(stderr, "test_multi PASS\n");
return 0;
}