46c2b94939
Implements Steps 3-6 of Phase 1 according to docs/protocol.md.
libcuframes/src/:
- internal.h (660 lines) — shared structs (byte-exact protocol.md layout)
+ _Static_assert на offsets/sizes
- utils.c — error strings, frame size calc, now_ns, key validation
- protocol.c — TLV framing для Unix socket с poll-based timeout
- producer.c (~700 lines) — Step 3:
* LIBRARY mode: cudaMalloc pool, IPC handle export
* EXTERNAL mode: register user-provided pointers
* cudaIpcEventHandle_t для cross-process sync (R1/R2)
* Unix socket accept thread, handshake state machine
* Bit allocation 1..31, name collision check (Y5)
* STRICT_WAIT policy: timeout with dead-subscriber eviction
- consumer.c (~400 lines) — Step 4:
* Synchronous next() with poll-based wait
* cudaStreamWaitEvent на consumer-stream (R1/R2)
* Opaque cuframes_frame_t с accessor functions (Y6)
* NEWEST_ONLY и STRICT_ORDER modes
* ACK via atomic_fetch_or на bitmap
- consumer_async.c — Step 5: thread + callback wrapper над sync API
libcuframes/tests/:
- test_pingpong.cu — single producer × single consumer, 200 frames @ 60fps,
verify через kernel-on-consumer-stream (правильный test
для sync semantics, см. spike-v2)
- test_multi.cu — 1 producer × 3 consumers через fork()
Build:
- Top-level CMakeLists.txt с options
- libcuframes/CMakeLists.txt: shared + static library, c_std_11
- Suppress -Waddress-of-packed-member (известная безопасная warning x86_64)
Results (внутри cuframes-dev container, RTX 5090):
- pingpong_basic PASS 4.5s 200 frames, 0 torn
- multi_consumer PASS 4.1s 1 × 3 consumers, all PASS
Phase 1 Step 6 done. Дальше: Step 7 (C++ wrapper), Step 9 (FFmpeg filter).
143 lines
4.8 KiB
Plaintext
143 lines
4.8 KiB
Plaintext
/* Multi-consumer test: 1 producer × 3 consumers. */
|
||
|
||
#include <cuframes/cuframes.h>
|
||
#include <cuda_runtime.h>
|
||
#include <sys/wait.h>
|
||
#include <unistd.h>
|
||
#include <atomic>
|
||
#include <chrono>
|
||
#include <cstdio>
|
||
#include <cstdlib>
|
||
#include <cstring>
|
||
#include <thread>
|
||
|
||
#define CHECK(call) do { int _r = (call); if (_r != 0) { \
|
||
fprintf(stderr, "FAIL %s:%d: %d (%s)\n", __FILE__, __LINE__, _r, cuframes_strerror(_r)); std::exit(2); } } while(0)
|
||
#define CHECK_CUDA(call) do { cudaError_t _e = (call); if (_e != cudaSuccess) { \
|
||
fprintf(stderr, "CUDA FAIL %s:%d: %s\n", __FILE__, __LINE__, cudaGetErrorString(_e)); std::exit(2); } } while(0)
|
||
|
||
static const char *KEY = "test_multi";
|
||
static const int W = 320, H = 240;
|
||
static const int N = 150;
|
||
|
||
__host__ __device__ inline uint8_t pat(uint64_t seq, int row) {
|
||
return static_cast<uint8_t>((seq * 31u + row * 7u) & 0xFF);
|
||
}
|
||
__global__ void fill_y(uint8_t *y, int w, int h, int py, uint64_t seq) {
|
||
int x = blockIdx.x * blockDim.x + threadIdx.x;
|
||
int r = blockIdx.y * blockDim.y + threadIdx.y;
|
||
if (x < w && r < h) y[r * py + x] = pat(seq, r);
|
||
}
|
||
__global__ void verify_y(const uint8_t *y, int w, int h, int py, uint64_t seq, int *bad) {
|
||
int x = blockIdx.x * blockDim.x + threadIdx.x;
|
||
int r = blockIdx.y * blockDim.y + threadIdx.y;
|
||
if (x < w && r < h) if (y[r * py + x] != pat(seq, r)) atomicAdd(bad, 1);
|
||
}
|
||
|
||
int run_consumer(const char *name) {
|
||
cuframes_subscriber_config_t cfg = {};
|
||
cfg.key = KEY;
|
||
cfg.consumer_name = name;
|
||
cfg.mode = CUFRAMES_MODE_NEWEST_ONLY;
|
||
cfg.connect_timeout_ms = 5000;
|
||
|
||
cuframes_subscriber_t *sub = NULL;
|
||
CHECK(cuframes_subscriber_create(&cfg, &sub));
|
||
|
||
cudaStream_t s;
|
||
CHECK_CUDA(cudaStreamCreate(&s));
|
||
int *d_bad;
|
||
CHECK_CUDA(cudaMalloc(&d_bad, sizeof(int)));
|
||
|
||
dim3 b(32, 8);
|
||
dim3 g((W + b.x - 1) / b.x, (H + b.y - 1) / b.y);
|
||
|
||
int recv = 0, torn = 0;
|
||
while (1) {
|
||
cuframes_frame_t *f = NULL;
|
||
int r = cuframes_subscriber_next(sub, s, &f, 2000);
|
||
if (r == CUFRAMES_ERR_TIMEOUT || r == CUFRAMES_ERR_DISCONNECTED) break;
|
||
if (r != 0) { fprintf(stderr, "[%s] next: %s\n", name, cuframes_strerror(r)); std::exit(2); }
|
||
|
||
CHECK_CUDA(cudaMemsetAsync(d_bad, 0, sizeof(int), s));
|
||
verify_y<<<g, b, 0, s>>>((const uint8_t *)cuframes_frame_cuda_ptr(f),
|
||
W, H, cuframes_frame_pitch_y(f),
|
||
cuframes_frame_seq(f), d_bad);
|
||
int bad = 0;
|
||
CHECK_CUDA(cudaMemcpyAsync(&bad, d_bad, sizeof(int), cudaMemcpyDeviceToHost, s));
|
||
CHECK_CUDA(cudaStreamSynchronize(s));
|
||
if (bad > 0) torn++;
|
||
recv++;
|
||
CHECK(cuframes_subscriber_release(sub, f));
|
||
}
|
||
|
||
fprintf(stderr, "[%s] received=%d torn=%d\n", name, recv, torn);
|
||
|
||
cudaFree(d_bad);
|
||
cudaStreamDestroy(s);
|
||
cuframes_subscriber_destroy(sub);
|
||
return (recv >= N / 3 && torn == 0) ? 0 : 1;
|
||
}
|
||
|
||
int run_producer() {
|
||
cuframes_publisher_config_t cfg = {};
|
||
cfg.key = KEY;
|
||
cfg.width = W;
|
||
cfg.height = H;
|
||
cfg.format = CUFRAMES_FORMAT_NV12;
|
||
cfg.ownership = CUFRAMES_OWNERSHIP_LIBRARY;
|
||
cfg.ring_size = 4;
|
||
cfg.policy = CUFRAMES_POLICY_DROP_OLDEST;
|
||
|
||
cuframes_publisher_t *pub = NULL;
|
||
CHECK(cuframes_publisher_create(&cfg, &pub));
|
||
int32_t pitch_y = 0;
|
||
CHECK(cuframes_calc_frame_size(CUFRAMES_FORMAT_NV12, W, H, NULL, &pitch_y, NULL));
|
||
cudaStream_t s;
|
||
CHECK_CUDA(cudaStreamCreate(&s));
|
||
|
||
dim3 b(32, 8);
|
||
dim3 g((W + b.x - 1) / b.x, (H + b.y - 1) / b.y);
|
||
|
||
std::this_thread::sleep_for(std::chrono::milliseconds(800));
|
||
|
||
auto iv = std::chrono::nanoseconds(1000000000LL / 60);
|
||
auto t = std::chrono::steady_clock::now();
|
||
for (int i = 0; i < N; ++i) {
|
||
void *p = NULL;
|
||
CHECK(cuframes_publisher_acquire(pub, &p));
|
||
fill_y<<<g, b, 0, s>>>((uint8_t *)p, W, H, pitch_y, (uint64_t)i);
|
||
CHECK(cuframes_publisher_publish(pub, s, cuframes_now_ns()));
|
||
t += iv;
|
||
std::this_thread::sleep_until(t);
|
||
}
|
||
std::this_thread::sleep_for(std::chrono::milliseconds(500));
|
||
cuframes_publisher_destroy(pub);
|
||
cudaStreamDestroy(s);
|
||
return 0;
|
||
}
|
||
|
||
int main() {
|
||
char shm[80]; snprintf(shm, 80, "/dev/shm/cuframes-%s", KEY); unlink(shm);
|
||
char sock[128]; snprintf(sock, 128, "/run/cuframes/%s.sock", KEY); unlink(sock);
|
||
|
||
pid_t pids[3];
|
||
const char *names[3] = {"c1", "c2", "c3"};
|
||
for (int i = 0; i < 3; ++i) {
|
||
pids[i] = fork();
|
||
if (pids[i] == 0) return run_consumer(names[i]);
|
||
}
|
||
|
||
int prod_r = run_producer();
|
||
int fail = (prod_r != 0);
|
||
for (int i = 0; i < 3; ++i) {
|
||
int st = 0;
|
||
waitpid(pids[i], &st, 0);
|
||
if (!WIFEXITED(st) || WEXITSTATUS(st) != 0) fail = 1;
|
||
}
|
||
|
||
if (fail) { fprintf(stderr, "test_multi FAIL\n"); return 1; }
|
||
fprintf(stderr, "test_multi PASS\n");
|
||
return 0;
|
||
}
|