Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 656e36e9b0 | |||
| 8c7abbc4e8 | |||
| 517107d741 | |||
| 4d54173bb2 | |||
| 52fb2ad722 | |||
| 3779175737 |
@@ -117,3 +117,95 @@ cd build && cmake -DBUILD_TESTING=ON .. && cmake --build . && ctest -R stress -
|
|||||||
Production деplo замеры — см. интеграционные guides:
|
Production деplo замеры — см. интеграционные guides:
|
||||||
- [docs/integration.md](docs/integration.md) — cctv-processor C++ pipeline
|
- [docs/integration.md](docs/integration.md) — cctv-processor C++ pipeline
|
||||||
- [filter/README.md](filter/README.md) — FFmpeg demuxer (Frigate setup)
|
- [filter/README.md](filter/README.md) — FFmpeg demuxer (Frigate setup)
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Real-world production deployment (2026-05-19, v0.2.0)
|
||||||
|
|
||||||
|
**Setup**: 4 Dahua IP-камеры (HEVC main 1920×1080 / 2688×1520, 25 fps) → 3
|
||||||
|
одновременных consumer'а на одном RTX 5090 хосте:
|
||||||
|
- **Frigate** detect (ONNX D-FINE-S, 640×480) + record (full-res H.265 mp4)
|
||||||
|
- **cctv-backend** custom C++ mosaic processor (composes 4×grid → RTSP output для TV)
|
||||||
|
|
||||||
|
### Before → after (measured production, идентичный workload)
|
||||||
|
|
||||||
|
| Метрика | Без cuframes | С cuframes v0.2 dual-input | Reduction |
|
||||||
|
|---|---:|---:|---:|
|
||||||
|
| **RTSP connections к камерам** | 12 (4 cam × 3 consumer) | **4** (publishers only) | **−67%** |
|
||||||
|
| **NVDEC sessions** | ~8 (decode на каждый consumer) | **4** (publishers only) | **−50%** |
|
||||||
|
| **Camera-side bandwidth** | ~34 Mbps (main+main+sub per cam) | **~16 Mbps** (main per cam) | **−54%** |
|
||||||
|
| **PCIe D2H copies (consumer side)** | ~346 MB/s (decoded frames → host) | **~0** (zero-copy CUDA IPC) | **−100%** |
|
||||||
|
| **Frigate ffmpeg с прямым RTSP** | 8 (detect+record × 4) | **0** (all через cuframes) | **−100%** |
|
||||||
|
|
||||||
|
### Live nvidia-smi metrics в running system
|
||||||
|
|
||||||
|
```
|
||||||
|
GPU SM: 4-5% (compute: detector + cuframes consumers)
|
||||||
|
GPU NVDEC: 2-4% (без cuframes ожидаемо было 15-25%)
|
||||||
|
GPU NVENC: 0-1%
|
||||||
|
```
|
||||||
|
|
||||||
|
### VRAM breakdown (measured)
|
||||||
|
|
||||||
|
| Component | VRAM |
|
||||||
|
|---|---:|
|
||||||
|
| 4× cuframes publishers (3× FHD ring + 1× 2688×1520 для LPR) | **4.4 GB** |
|
||||||
|
| cctv-backend (composer + grid output) | 1.0 GB |
|
||||||
|
| frigate.embeddings_manager (face + LPR ONNX models) | 1.6 GB |
|
||||||
|
| frigate.detector:onnx (D-FINE-S COCO) | 0.6 GB |
|
||||||
|
| **Total cuframes-stack VRAM** | **~7.7 GB** |
|
||||||
|
|
||||||
|
Из них на сам cuframes accounting — только **4.4 GB** в publishers (ring buffers +
|
||||||
|
NVDEC decode buffers). Consumers (Frigate, cctv-backend) держат свои CUDA
|
||||||
|
contexts независимо.
|
||||||
|
|
||||||
|
### Network bandwidth (real tcpdump, 10-sec sample)
|
||||||
|
|
||||||
|
**31.5 Mbps** от camera subnet (4 cameras → R9), измерено через
|
||||||
|
`tcpdump -w cam-traffic.pcap` за 10 секунд.
|
||||||
|
|
||||||
|
Breakdown approximate:
|
||||||
|
- 4 publishers × main HEVC RTP/UDP: **~16 Mbps** (cuframes core)
|
||||||
|
- go2rtc on-demand streams (Frigate UI live preview, если открыт): **0-10 Mbps**
|
||||||
|
- ONVIF discovery, RTSP keepalives, NTP-from-cameras: **~1-2 Mbps**
|
||||||
|
|
||||||
|
Без cuframes тот же setup (cctv-backend + Frigate detect + Frigate record × 4
|
||||||
|
camera) дал бы **~45-50 Mbps** (главное: record path забирал отдельный
|
||||||
|
main stream от каждой camera).
|
||||||
|
|
||||||
|
### Camera-side benefits
|
||||||
|
|
||||||
|
Dahua/Hikvision камеры обычно cap'нуты на 4-5 одновременных RTSP streams.
|
||||||
|
До cuframes setup (4 cam × 3 RTSP) делал каждую camera на **60-75% capacity**
|
||||||
|
её RTSP server'а. После — **20-25%**, headroom на 2-3 дополнительных
|
||||||
|
consumer'а без замены оборудования.
|
||||||
|
|
||||||
|
### Что **сохранено** (важно)
|
||||||
|
|
||||||
|
- **Качество записи**: record path через `cuframes_packets://` это **passthrough**
|
||||||
|
(`-c:v copy`), bit-exact original encoded stream от камеры. Frigate пишет mp4
|
||||||
|
с full-resolution оригинала, без re-encode.
|
||||||
|
- **Latency**: <2 ms publisher → consumer (cuframes IPC) vs ~50-80 ms RTSP setup
|
||||||
|
latency для каждого нового consumer.
|
||||||
|
- **Backward compatibility**: v0.2 publishers принимают v1 subscribers
|
||||||
|
(frames-only), rolling upgrade.
|
||||||
|
|
||||||
|
### Hardware-agnostic projection (для другого setup)
|
||||||
|
|
||||||
|
| If you have | Expected reduction |
|
||||||
|
|---|---|
|
||||||
|
| 16 cameras × 2 consumers | 32 → 16 NVDEC (−50%), 32 → 16 RTSP (−50%) |
|
||||||
|
| 8 cameras × 3 consumers | 24 → 8 NVDEC (−67%), 24 → 8 RTSP (−67%) |
|
||||||
|
| 4 cameras × 4 consumers (multi-AI pipeline) | 16 → 4 NVDEC (−75%), 16 → 4 RTSP (−75%) |
|
||||||
|
|
||||||
|
Reduction масштабируется **линейно** с N (consumers per camera). v0.1 (frames
|
||||||
|
only) сэкономит NVDEC; v0.2 (frames + packets) **дополнительно** сэкономит
|
||||||
|
RTSP connections для record/mux consumers.
|
||||||
|
|
||||||
|
### Что **НЕ** сэкономлено (честно)
|
||||||
|
|
||||||
|
- **Disk space**: запись остаётся full-resolution H.265 mp4. Cuframes не сжимает.
|
||||||
|
- **Detector inference latency**: ONNX/TensorRT detector работает на decoded
|
||||||
|
frames независимо от source. Cuframes только меняет где decode произошёл.
|
||||||
|
- **Camera RTSP server CPU**: сама камера всё равно encode'ит видео. Cuframes
|
||||||
|
reduces **consumer-side** load, не producer-side.
|
||||||
|
|||||||
+1
-1
@@ -1,6 +1,6 @@
|
|||||||
cmake_minimum_required(VERSION 3.20)
|
cmake_minimum_required(VERSION 3.20)
|
||||||
project(cuframes
|
project(cuframes
|
||||||
VERSION 0.2.0
|
VERSION 0.3.0
|
||||||
DESCRIPTION "Zero-copy frame sharing via CUDA IPC"
|
DESCRIPTION "Zero-copy frame sharing via CUDA IPC"
|
||||||
LANGUAGES C CXX CUDA
|
LANGUAGES C CXX CUDA
|
||||||
)
|
)
|
||||||
|
|||||||
+10
-18
@@ -75,27 +75,19 @@ ETA: 1-2 недели focused работы.
|
|||||||
|
|
||||||
Open questions: какой memory-type — `memory:CUDAMemory` (mainline) vs `memory:NVMM` (NVIDIA DeepStream-specific). Возможно два варианта/build flags.
|
Open questions: какой memory-type — `memory:CUDAMemory` (mainline) vs `memory:NVMM` (NVIDIA DeepStream-specific). Возможно два варианта/build flags.
|
||||||
|
|
||||||
### `vf_cuda_grid` — FFmpeg filter с runtime grid composition
|
### `vf_cuda_grid` — **выделен в отдельный продукт `gx/vf-cuda-grid`** ([repo](https://git.goldix.org/gx/vf-cuda-grid))
|
||||||
|
|
||||||
CCTV mosaic composition как FFmpeg filter, **полностью на GPU**. Заменяет custom C++ GridComposer (см. [gx/cctv#22](https://git.goldix.org/gx/cctv/issues/22) — performance investigation cctv-processor: CPU round-trip pipeline).
|
FFmpeg filter для GPU-native video grid composition + control-plane sidecar
|
||||||
|
(ZeroMQ/MQTT/HTTP/HA Discovery). Дизайн зафиксирован, см.
|
||||||
|
[`gx/vf-cuda-grid` docs/design.md](https://git.goldix.org/gx/vf-cuda-grid/src/branch/main/docs/design.md)
|
||||||
|
и [epic issue #1](https://git.goldix.org/gx/vf-cuda-grid/issues/1).
|
||||||
|
|
||||||
| Capability | Зачем |
|
Cuframes остаётся frame source provider для vf-cuda-grid в нашей экосистеме
|
||||||
|---|---|
|
(но vf-cuda-grid работает и с любым другим CUDA frame source — стандартный FFmpeg).
|
||||||
| Filter принимает N cuda-frames (через `[in0][in1][in2]...` filter inputs) | Композиция в одном filter graph без custom code |
|
|
||||||
| Output — один cuda-frame с N cells в layout | Прямой вход в `hwdownload` или `h264_nvenc` |
|
|
||||||
| Layout templates (`single`, `quad`, `main_plus_preview`, `nine_grid`, ...) | Конфигурируемые из CLI или filter command'ом |
|
|
||||||
| `sendcmd` / API для runtime smena layout'а | Не нужно teardown filter graph для переключения сетки |
|
|
||||||
| Per-cell overlays (text, bbox) через side data в AVFrame | Frigate detection/LPR/face — overlay внутри pipeline |
|
|
||||||
| Полностью CUDA-side: scale/composition/text rendering | Zero CPU round-trip, frame не покидает VRAM |
|
|
||||||
|
|
||||||
Это превращает cuframes из IPC-библиотеки в полноценную **GPU-native video routing platform**. Эстетически близко к NVIDIA DeepStream `nvstreammux` + `nvmultistreamtiler`, но open-source и с conventional FFmpeg-stack.
|
Закрывает [`gx/cctv#22`](https://git.goldix.org/gx/cctv/issues/22) Phase 4
|
||||||
|
(end-to-end GPU pipeline для cctv-processor mosaic composer) после Phase 4 vf-cuda-grid +
|
||||||
Open questions:
|
миграция cctv-processor GridComposer → vf_cuda_grid filter.
|
||||||
- Filter input mode: pull-based (filter pull'ает N inputs) или push-based (через external lock-step). FFmpeg filter API больше pull-friendly.
|
|
||||||
- Text rendering в CUDA — `vf_drawtext` имеет CPU path; нужен либо GPU font-renderer (Pango/freetype + texture upload), либо CPU-precomputed glyph atlases.
|
|
||||||
- Runtime layout commands через filter `process_command` API.
|
|
||||||
|
|
||||||
Это **большой scope** — отдельная major version (v0.5+) или standalone проект.
|
|
||||||
|
|
||||||
## v1.0 — Stable ABI 📋
|
## v1.0 — Stable ABI 📋
|
||||||
|
|
||||||
|
|||||||
@@ -36,7 +36,7 @@ extern "C" {
|
|||||||
/* ─────────────────────────────────────────────────────────────────────── */
|
/* ─────────────────────────────────────────────────────────────────────── */
|
||||||
|
|
||||||
#define CUFRAMES_VERSION_MAJOR 0
|
#define CUFRAMES_VERSION_MAJOR 0
|
||||||
#define CUFRAMES_VERSION_MINOR 2
|
#define CUFRAMES_VERSION_MINOR 3
|
||||||
#define CUFRAMES_VERSION_PATCH 0
|
#define CUFRAMES_VERSION_PATCH 0
|
||||||
|
|
||||||
/** @brief Runtime-версия библиотеки в формате "MAJOR.MINOR.PATCH". */
|
/** @brief Runtime-версия библиотеки в формате "MAJOR.MINOR.PATCH". */
|
||||||
|
|||||||
@@ -41,7 +41,7 @@ endforeach()
|
|||||||
|
|
||||||
# Set SOVERSION на shared lib для ABI tracking
|
# Set SOVERSION на shared lib для ABI tracking
|
||||||
set_target_properties(cuframes PROPERTIES
|
set_target_properties(cuframes PROPERTIES
|
||||||
VERSION 0.2.0
|
VERSION 0.3.0
|
||||||
SOVERSION 0
|
SOVERSION 0
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -44,7 +44,9 @@ struct cuframes_subscriber {
|
|||||||
cuframes_shm_header_t *hdr;
|
cuframes_shm_header_t *hdr;
|
||||||
char shm_name[80];
|
char shm_name[80];
|
||||||
|
|
||||||
cudaEvent_t producer_event;
|
cudaEvent_t producer_event; /* legacy fallback (v0.2 proto) */
|
||||||
|
cudaEvent_t slot_events[CUFRAMES_MAX_RING]; /* v0.3 — per-slot events */
|
||||||
|
int has_slot_events; /* 1 if v0.3 events opened OK */
|
||||||
void *mapped_ptrs[CUFRAMES_MAX_RING];
|
void *mapped_ptrs[CUFRAMES_MAX_RING];
|
||||||
|
|
||||||
uint32_t assigned_bit;
|
uint32_t assigned_bit;
|
||||||
@@ -201,13 +203,37 @@ int cuframes_subscriber_create(const cuframes_subscriber_config_t *cfg,
|
|||||||
r = CUFRAMES_ERR_CUDA; goto fail;
|
r = CUFRAMES_ERR_CUDA; goto fail;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Open producer's event */
|
/* Open producer's event (legacy single — v0.2 compat fallback) */
|
||||||
cerr = cudaIpcOpenEventHandle(&sub->producer_event, sub->hdr->ipc_event_handle);
|
cerr = cudaIpcOpenEventHandle(&sub->producer_event, sub->hdr->ipc_event_handle);
|
||||||
if (cerr != cudaSuccess) {
|
if (cerr != cudaSuccess) {
|
||||||
CUFRAMES_LOG_ERROR("cudaIpcOpenEventHandle: %s", cudaGetErrorString(cerr));
|
CUFRAMES_LOG_ERROR("cudaIpcOpenEventHandle: %s", cudaGetErrorString(cerr));
|
||||||
r = CUFRAMES_ERR_CUDA; goto fail;
|
r = CUFRAMES_ERR_CUDA; goto fail;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* v0.3 — open per-slot events если protocol supports. */
|
||||||
|
sub->has_slot_events = 0;
|
||||||
|
if (sub->hdr->proto_version >= CUFRAMES_PROTOCOL_V3) {
|
||||||
|
int ring_evt = (int)sub->hdr->ring_size;
|
||||||
|
if (ring_evt > CUFRAMES_MAX_RING) ring_evt = CUFRAMES_MAX_RING;
|
||||||
|
int evt_ok = 1;
|
||||||
|
for (int i = 0; i < ring_evt; i++) {
|
||||||
|
cerr = cudaIpcOpenEventHandle(&sub->slot_events[i],
|
||||||
|
sub->hdr->slot_event_handles[i]);
|
||||||
|
if (cerr != cudaSuccess) {
|
||||||
|
CUFRAMES_LOG_WARN("cudaIpcOpenEventHandle slot %d: %s — "
|
||||||
|
"fallback к legacy single event",
|
||||||
|
i, cudaGetErrorString(cerr));
|
||||||
|
for (int j = 0; j < i; j++) cudaEventDestroy(sub->slot_events[j]);
|
||||||
|
evt_ok = 0;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (evt_ok) {
|
||||||
|
sub->has_slot_events = 1;
|
||||||
|
CUFRAMES_LOG_INFO("subscribed с per-slot events (v0.3 proto)");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* Open mem handles */
|
/* Open mem handles */
|
||||||
int ring = (int)sub->hdr->ring_size;
|
int ring = (int)sub->hdr->ring_size;
|
||||||
if (ring > CUFRAMES_MAX_RING) ring = CUFRAMES_MAX_RING;
|
if (ring > CUFRAMES_MAX_RING) ring = CUFRAMES_MAX_RING;
|
||||||
@@ -275,10 +301,16 @@ int cuframes_subscriber_next(cuframes_subscriber_t *sub,
|
|||||||
int64_t pts = atomic_load_explicit(&sub->hdr->slots[slot_idx].pts_ns,
|
int64_t pts = atomic_load_explicit(&sub->hdr->slots[slot_idx].pts_ns,
|
||||||
memory_order_acquire);
|
memory_order_acquire);
|
||||||
|
|
||||||
/* Cross-process sync: wait event on consumer's stream */
|
/* Cross-process sync: wait event on consumer's stream.
|
||||||
|
* v0.3: per-slot event точно соответствует slot[slot_idx] —
|
||||||
|
* no TOCTOU race possible. v0.2 fallback: single global event +
|
||||||
|
* post-sync verify (less precise, but still correct). */
|
||||||
|
cudaEvent_t sync_event = sub->has_slot_events
|
||||||
|
? sub->slot_events[slot_idx]
|
||||||
|
: sub->producer_event;
|
||||||
if (consumer_stream) {
|
if (consumer_stream) {
|
||||||
cudaError_t cerr = cudaStreamWaitEvent((cudaStream_t)consumer_stream,
|
cudaError_t cerr = cudaStreamWaitEvent((cudaStream_t)consumer_stream,
|
||||||
sub->producer_event, 0);
|
sync_event, 0);
|
||||||
if (cerr != cudaSuccess) {
|
if (cerr != cudaSuccess) {
|
||||||
CUFRAMES_LOG_WARN("cudaStreamWaitEvent: %s",
|
CUFRAMES_LOG_WARN("cudaStreamWaitEvent: %s",
|
||||||
cudaGetErrorString(cerr));
|
cudaGetErrorString(cerr));
|
||||||
@@ -286,10 +318,21 @@ int cuframes_subscriber_next(cuframes_subscriber_t *sub,
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
/* Synchronize globally — для cudaMemcpyDeviceToHost users */
|
/* Synchronize globally — для cudaMemcpyDeviceToHost users */
|
||||||
cudaError_t cerr = cudaEventSynchronize(sub->producer_event);
|
cudaError_t cerr = cudaEventSynchronize(sync_event);
|
||||||
if (cerr != cudaSuccess) return CUFRAMES_ERR_CUDA;
|
if (cerr != cudaSuccess) return CUFRAMES_ERR_CUDA;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* TOCTOU защита (v0.2 fallback only): legacy single event signals
|
||||||
|
* для последнего published frame. v0.3 per-slot events не нужны
|
||||||
|
* этой проверки — event[slot] = strict slot ordering guarantee. */
|
||||||
|
if (!sub->has_slot_events) {
|
||||||
|
uint64_t verify_seq = atomic_load_explicit(&sub->hdr->slots[slot_idx].seq,
|
||||||
|
memory_order_acquire);
|
||||||
|
if (verify_seq != target_seq) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* Fill frame_out */
|
/* Fill frame_out */
|
||||||
struct cuframes_frame *f = &sub->frame_obj;
|
struct cuframes_frame *f = &sub->frame_obj;
|
||||||
f->cuda_ptr = sub->mapped_ptrs[slot_idx];
|
f->cuda_ptr = sub->mapped_ptrs[slot_idx];
|
||||||
@@ -358,6 +401,13 @@ int cuframes_subscriber_destroy(cuframes_subscriber_t *sub) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (sub->producer_event) cudaEventDestroy(sub->producer_event);
|
if (sub->producer_event) cudaEventDestroy(sub->producer_event);
|
||||||
|
if (sub->has_slot_events) {
|
||||||
|
int ring_evt = (int)sub->hdr->ring_size;
|
||||||
|
if (ring_evt > CUFRAMES_MAX_RING) ring_evt = CUFRAMES_MAX_RING;
|
||||||
|
for (int i = 0; i < ring_evt; i++) {
|
||||||
|
if (sub->slot_events[i]) cudaEventDestroy(sub->slot_events[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int ring = sub->hdr ? (int)sub->hdr->ring_size : 0;
|
int ring = sub->hdr ? (int)sub->hdr->ring_size : 0;
|
||||||
if (ring > CUFRAMES_MAX_RING) ring = CUFRAMES_MAX_RING;
|
if (ring > CUFRAMES_MAX_RING) ring = CUFRAMES_MAX_RING;
|
||||||
|
|||||||
@@ -24,6 +24,7 @@
|
|||||||
#define CUFRAMES_MAGIC 0xCC7C1DCCu
|
#define CUFRAMES_MAGIC 0xCC7C1DCCu
|
||||||
#define CUFRAMES_PROTOCOL_V1 1u
|
#define CUFRAMES_PROTOCOL_V1 1u
|
||||||
#define CUFRAMES_PROTOCOL_V2 2u /* v0.2 — packet ring support */
|
#define CUFRAMES_PROTOCOL_V2 2u /* v0.2 — packet ring support */
|
||||||
|
#define CUFRAMES_PROTOCOL_V3 3u /* v0.3 — per-slot CUDA events (no TOCTOU race) */
|
||||||
#define CUFRAMES_MAX_SUBSCRIBERS 32
|
#define CUFRAMES_MAX_SUBSCRIBERS 32
|
||||||
#define CUFRAMES_MAX_RING 16
|
#define CUFRAMES_MAX_RING 16
|
||||||
#define CUFRAMES_MAX_KEY_LEN 63
|
#define CUFRAMES_MAX_KEY_LEN 63
|
||||||
@@ -107,6 +108,11 @@ typedef struct __attribute__((packed)) cuframes_shm_header {
|
|||||||
/* offset 0x100 — variable-length tail */
|
/* offset 0x100 — variable-length tail */
|
||||||
cuframes_shm_slot_t slots[CUFRAMES_MAX_RING]; /* 192 × 16 = 3072 */
|
cuframes_shm_slot_t slots[CUFRAMES_MAX_RING]; /* 192 × 16 = 3072 */
|
||||||
cuframes_shm_subscriber_t subscribers[CUFRAMES_MAX_SUBSCRIBERS]; /* 128 × 32 = 4096 */
|
cuframes_shm_subscriber_t subscribers[CUFRAMES_MAX_SUBSCRIBERS]; /* 128 × 32 = 4096 */
|
||||||
|
/* v0.3 — per-slot CUDA event handles. Producer records event per publish;
|
||||||
|
* consumer waits event[slot_idx] specifically (не global ipc_event_handle
|
||||||
|
* который signals только для последнего published frame). Закрывает TOCTOU
|
||||||
|
* race в slot read. 64 × 16 = 1024 bytes. */
|
||||||
|
cudaIpcEventHandle_t slot_event_handles[CUFRAMES_MAX_RING];
|
||||||
} cuframes_shm_header_t;
|
} cuframes_shm_header_t;
|
||||||
|
|
||||||
/* Layout sanity checks (docs/protocol.md §2 table) */
|
/* Layout sanity checks (docs/protocol.md §2 table) */
|
||||||
|
|||||||
+97
-14
@@ -21,7 +21,8 @@ struct cuframes_publisher {
|
|||||||
char shm_name[80];
|
char shm_name[80];
|
||||||
|
|
||||||
/* CUDA */
|
/* CUDA */
|
||||||
cudaEvent_t event;
|
cudaEvent_t event; /* legacy single event (v0.2 compat) */
|
||||||
|
cudaEvent_t slot_events[CUFRAMES_MAX_RING]; /* v0.3 — per-slot events */
|
||||||
cudaIpcMemHandle_t ipc_mem[CUFRAMES_MAX_RING];
|
cudaIpcMemHandle_t ipc_mem[CUFRAMES_MAX_RING];
|
||||||
void *cuda_ptrs[CUFRAMES_MAX_RING]; /* mapped pointers */
|
void *cuda_ptrs[CUFRAMES_MAX_RING]; /* mapped pointers */
|
||||||
size_t frame_size_bytes;
|
size_t frame_size_bytes;
|
||||||
@@ -114,13 +115,28 @@ static int register_external_pool(struct cuframes_publisher *pub,
|
|||||||
}
|
}
|
||||||
|
|
||||||
static int create_event_handle(struct cuframes_publisher *pub) {
|
static int create_event_handle(struct cuframes_publisher *pub) {
|
||||||
|
/* Legacy single event — keep для v0.2 consumer compat fallback */
|
||||||
cudaError_t cerr = cudaEventCreateWithFlags(&pub->event,
|
cudaError_t cerr = cudaEventCreateWithFlags(&pub->event,
|
||||||
cudaEventDisableTiming | cudaEventInterprocess);
|
cudaEventDisableTiming | cudaEventInterprocess);
|
||||||
if (cerr != cudaSuccess) {
|
if (cerr != cudaSuccess) {
|
||||||
CUFRAMES_LOG_ERROR("cudaEventCreateWithFlags: %s",
|
CUFRAMES_LOG_ERROR("cudaEventCreateWithFlags (legacy): %s",
|
||||||
cudaGetErrorString(cerr));
|
cudaGetErrorString(cerr));
|
||||||
return CUFRAMES_ERR_CUDA;
|
return CUFRAMES_ERR_CUDA;
|
||||||
}
|
}
|
||||||
|
/* v0.3 — per-slot events. Каждый publish записывает event на свой slot;
|
||||||
|
* consumer waits event[slot_idx] specifically — закрывает TOCTOU race
|
||||||
|
* (один global event может signal'ить для другого frame). */
|
||||||
|
for (int32_t i = 0; i < pub->ring_size_actual; i++) {
|
||||||
|
cerr = cudaEventCreateWithFlags(&pub->slot_events[i],
|
||||||
|
cudaEventDisableTiming | cudaEventInterprocess);
|
||||||
|
if (cerr != cudaSuccess) {
|
||||||
|
CUFRAMES_LOG_ERROR("cudaEventCreateWithFlags (slot %d): %s",
|
||||||
|
i, cudaGetErrorString(cerr));
|
||||||
|
for (int32_t j = 0; j < i; j++) cudaEventDestroy(pub->slot_events[j]);
|
||||||
|
cudaEventDestroy(pub->event);
|
||||||
|
return CUFRAMES_ERR_CUDA;
|
||||||
|
}
|
||||||
|
}
|
||||||
return CUFRAMES_OK;
|
return CUFRAMES_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -172,7 +188,7 @@ static int setup_shm(struct cuframes_publisher *pub) {
|
|||||||
memset(pub->hdr, 0, sizeof(cuframes_shm_header_t));
|
memset(pub->hdr, 0, sizeof(cuframes_shm_header_t));
|
||||||
|
|
||||||
pub->hdr->magic = CUFRAMES_MAGIC;
|
pub->hdr->magic = CUFRAMES_MAGIC;
|
||||||
pub->hdr->proto_version = CUFRAMES_PROTOCOL_V1;
|
pub->hdr->proto_version = CUFRAMES_PROTOCOL_V3;
|
||||||
pub->hdr->lib_version_major = CUFRAMES_VERSION_MAJOR;
|
pub->hdr->lib_version_major = CUFRAMES_VERSION_MAJOR;
|
||||||
pub->hdr->lib_version_minor = CUFRAMES_VERSION_MINOR;
|
pub->hdr->lib_version_minor = CUFRAMES_VERSION_MINOR;
|
||||||
pub->hdr->lib_version_patch = CUFRAMES_VERSION_PATCH;
|
pub->hdr->lib_version_patch = CUFRAMES_VERSION_PATCH;
|
||||||
@@ -192,13 +208,22 @@ static int setup_shm(struct cuframes_publisher *pub) {
|
|||||||
pub->hdr->meta.pitch_uv = puv;
|
pub->hdr->meta.pitch_uv = puv;
|
||||||
pub->hdr->meta.frame_size_bytes = pub->frame_size_bytes;
|
pub->hdr->meta.frame_size_bytes = pub->frame_size_bytes;
|
||||||
|
|
||||||
/* Export event handle */
|
/* Export event handle (legacy single) */
|
||||||
cudaError_t cerr = cudaIpcGetEventHandle(&pub->hdr->ipc_event_handle, pub->event);
|
cudaError_t cerr = cudaIpcGetEventHandle(&pub->hdr->ipc_event_handle, pub->event);
|
||||||
if (cerr != cudaSuccess) {
|
if (cerr != cudaSuccess) {
|
||||||
CUFRAMES_LOG_ERROR("cudaIpcGetEventHandle: %s", cudaGetErrorString(cerr));
|
CUFRAMES_LOG_ERROR("cudaIpcGetEventHandle: %s", cudaGetErrorString(cerr));
|
||||||
return CUFRAMES_ERR_CUDA;
|
return CUFRAMES_ERR_CUDA;
|
||||||
}
|
}
|
||||||
|
/* v0.3 — export per-slot event handles */
|
||||||
|
for (int32_t i = 0; i < pub->ring_size_actual; i++) {
|
||||||
|
cerr = cudaIpcGetEventHandle(&pub->hdr->slot_event_handles[i],
|
||||||
|
pub->slot_events[i]);
|
||||||
|
if (cerr != cudaSuccess) {
|
||||||
|
CUFRAMES_LOG_ERROR("cudaIpcGetEventHandle (slot %d): %s",
|
||||||
|
i, cudaGetErrorString(cerr));
|
||||||
|
return CUFRAMES_ERR_CUDA;
|
||||||
|
}
|
||||||
|
}
|
||||||
/* Fill slot descriptors */
|
/* Fill slot descriptors */
|
||||||
for (int i = 0; i < pub->ring_size_actual; ++i) {
|
for (int i = 0; i < pub->ring_size_actual; ++i) {
|
||||||
pub->hdr->slots[i].mem_handle = pub->ipc_mem[i];
|
pub->hdr->slots[i].mem_handle = pub->ipc_mem[i];
|
||||||
@@ -407,10 +432,19 @@ int cuframes_publisher_acquire(cuframes_publisher_t *pub, void **cuda_ptr_out) {
|
|||||||
|
|
||||||
static int do_publish(cuframes_publisher_t *pub, int32_t slot,
|
static int do_publish(cuframes_publisher_t *pub, int32_t slot,
|
||||||
void *stream, int64_t pts_ns) {
|
void *stream, int64_t pts_ns) {
|
||||||
/* Record event on producer's stream */
|
/* v0.3 — record per-slot event для precise consumer sync. Closes TOCTOU
|
||||||
cudaError_t cerr = cudaEventRecord(pub->event, (cudaStream_t)stream);
|
* race где legacy `pub->event` signals "latest publish", not slot-specific. */
|
||||||
|
cudaError_t cerr = cudaEventRecord(pub->slot_events[slot], (cudaStream_t)stream);
|
||||||
if (cerr != cudaSuccess) {
|
if (cerr != cudaSuccess) {
|
||||||
CUFRAMES_LOG_ERROR("cudaEventRecord: %s", cudaGetErrorString(cerr));
|
CUFRAMES_LOG_ERROR("cudaEventRecord (slot %d): %s",
|
||||||
|
slot, cudaGetErrorString(cerr));
|
||||||
|
return CUFRAMES_ERR_CUDA;
|
||||||
|
}
|
||||||
|
/* Legacy event — keep recording для v0.2 consumer compat fallback */
|
||||||
|
cerr = cudaEventRecord(pub->event, (cudaStream_t)stream);
|
||||||
|
if (cerr != cudaSuccess) {
|
||||||
|
CUFRAMES_LOG_ERROR("cudaEventRecord (legacy): %s",
|
||||||
|
cudaGetErrorString(cerr));
|
||||||
return CUFRAMES_ERR_CUDA;
|
return CUFRAMES_ERR_CUDA;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -509,6 +543,9 @@ int cuframes_publisher_destroy(cuframes_publisher_t *pub) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (pub->event) cudaEventDestroy(pub->event);
|
if (pub->event) cudaEventDestroy(pub->event);
|
||||||
|
for (int32_t i = 0; i < pub->ring_size_actual; i++) {
|
||||||
|
if (pub->slot_events[i]) cudaEventDestroy(pub->slot_events[i]);
|
||||||
|
}
|
||||||
|
|
||||||
/* Packet ring cleanup (если активирован) */
|
/* Packet ring cleanup (если активирован) */
|
||||||
if (pub->has_pkt_ring) {
|
if (pub->has_pkt_ring) {
|
||||||
@@ -591,6 +628,38 @@ int cuframes_publisher_publish_packet(cuframes_publisher_t *pub,
|
|||||||
|
|
||||||
/* ─── Accept thread + handshake ──────────────────────────────────────── */
|
/* ─── Accept thread + handshake ──────────────────────────────────────── */
|
||||||
|
|
||||||
|
/* Per-subscriber lifecycle monitor — detects socket close (subscriber container
|
||||||
|
* exited / crashed) и освобождает bit + subscribers[] slot. Без этого каждый
|
||||||
|
* pipeline recreate leaks bit → bitmap overflows after 32 connections. */
|
||||||
|
struct sub_monitor_args {
|
||||||
|
struct cuframes_publisher *pub;
|
||||||
|
int fd;
|
||||||
|
uint32_t bit;
|
||||||
|
};
|
||||||
|
|
||||||
|
static void *subscriber_monitor_thread(void *arg) {
|
||||||
|
struct sub_monitor_args *m = (struct sub_monitor_args *)arg;
|
||||||
|
char buf[64];
|
||||||
|
/* Blocking read — return 0 (EOF) когда other side close socket, или
|
||||||
|
* <0 on error. Любой control message (PING — TODO в будущем) just consumed. */
|
||||||
|
while (1) {
|
||||||
|
ssize_t n = recv(m->fd, buf, sizeof(buf), 0);
|
||||||
|
if (n <= 0) {
|
||||||
|
/* Subscriber dead — clear bit + slot state. */
|
||||||
|
atomic_fetch_and_explicit(&m->pub->hdr->subscriber_bitmap,
|
||||||
|
~(1ULL << m->bit), memory_order_release);
|
||||||
|
atomic_store_explicit(&m->pub->hdr->subscribers[m->bit].state, 0,
|
||||||
|
memory_order_release);
|
||||||
|
close(m->fd);
|
||||||
|
CUFRAMES_LOG_INFO("subscriber bit=%u disconnected — freed",
|
||||||
|
m->bit);
|
||||||
|
free(m);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
/* future: parse control msgs (PING, UNSUBSCRIBE) here */
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void *accept_thread_main(void *arg) {
|
static void *accept_thread_main(void *arg) {
|
||||||
struct cuframes_publisher *pub = (struct cuframes_publisher *)arg;
|
struct cuframes_publisher *pub = (struct cuframes_publisher *)arg;
|
||||||
while (!pub->stop_flag) {
|
while (!pub->stop_flag) {
|
||||||
@@ -603,14 +672,12 @@ static void *accept_thread_main(void *arg) {
|
|||||||
CUFRAMES_LOG_WARN("accept: %s", strerror(errno));
|
CUFRAMES_LOG_WARN("accept: %s", strerror(errno));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
/* Synchronous handshake — после ответа socket остаётся открытым для
|
/* Handshake — на error close socket (no monitor spawned). На success
|
||||||
* lifetime signals (SHUTDOWN, PING). Close на error. */
|
* monitor thread становится owner socket'a + cleanup'ит при disconnect. */
|
||||||
int r = handshake_subscriber(pub, client);
|
int r = handshake_subscriber(pub, client);
|
||||||
if (r != CUFRAMES_OK) {
|
if (r != CUFRAMES_OK) {
|
||||||
close(client);
|
close(client);
|
||||||
}
|
}
|
||||||
/* TODO v0.2: track client fds для broadcast SHUTDOWN. Сейчас clients
|
|
||||||
* сами detect socket EOF при publisher_destroy через shutdown(). */
|
|
||||||
}
|
}
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
@@ -727,7 +794,23 @@ static int handshake_subscriber(struct cuframes_publisher *pub, int client_fd) {
|
|||||||
|
|
||||||
CUFRAMES_LOG_INFO("subscriber '%s' connected (bit=%u)", name, bit);
|
CUFRAMES_LOG_INFO("subscriber '%s' connected (bit=%u)", name, bit);
|
||||||
|
|
||||||
/* TODO v0.2: spawn per-client thread для liveness/PING/UNSUBSCRIBE.
|
/* Spawn detached monitor thread — owns client_fd, frees bit on socket
|
||||||
* Сейчас socket остаётся открытым на heap'е до publisher_destroy. */
|
* close (subscriber container exit / crash). Без этого bitmap утекал
|
||||||
|
* каждый pipeline recreate. */
|
||||||
|
struct sub_monitor_args *m = malloc(sizeof(*m));
|
||||||
|
if (!m) {
|
||||||
|
/* OOM — fallback: leak fd, bit будет released только publisher_destroy */
|
||||||
|
return CUFRAMES_OK;
|
||||||
|
}
|
||||||
|
m->pub = pub;
|
||||||
|
m->fd = client_fd;
|
||||||
|
m->bit = bit;
|
||||||
|
pthread_t monitor_tid;
|
||||||
|
if (pthread_create(&monitor_tid, NULL, subscriber_monitor_thread, m) != 0) {
|
||||||
|
CUFRAMES_LOG_WARN("monitor pthread_create fail — bit %u may leak", bit);
|
||||||
|
free(m);
|
||||||
|
} else {
|
||||||
|
pthread_detach(monitor_tid);
|
||||||
|
}
|
||||||
return CUFRAMES_OK;
|
return CUFRAMES_OK;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user