/* Обвязка над cuframes_subscriber. Реализация публичного API * cuframes_composer/source.h. * * Архитектура: * Отдельный поток на источник. cuframes_subscriber_next блокирующий, поток * циклически зовёт его с таймаутом. На каждый успешный кадр — обновляет * snapshot (под mutex'ом). На таймаут — переходит к проверке состояния. * * Snapshot pattern: * Главный поток (тот что зовёт get_latest) не блокируется на subscribe. * Он читает under mutex последний сохранённый CUdeviceptr + meta. * Mutex короткий (просто копия указателя и нескольких int'ов). * * State machine: * DISCONNECTED → создаём подписку → CONNECTING * CONNECTING → подписка успешна, первый кадр получен → ACTIVE * → подписка fail → DEAD (ждём reconnect_backoff) * ACTIVE → последний кадр < stale_threshold_ms → остаёмся ACTIVE * → > stale_threshold_ms → STALE * STALE → новый кадр → ACTIVE * → нет > dead_threshold_ms → DEAD (destroy subscriber) * DEAD → ждём backoff (exp от reconnect_min до reconnect_max) → CONNECTING * * Phase 1 особенности: * - Один источник на cfc_source_t (нет multi-source агрегации, это compose). * - cuframes_subscriber_release вызывается при следующем get_latest либо * при выходе. Это означает что caller не может «удерживать» snapshot * дольше чем до следующего get_latest — должен прочитать сразу. * В Phase 2 это уточнится при переходе на double buffering. */ #include "../include/cuframes_composer/source.h" #include #include #include #include #include #include #include #include #include /* Внутренний таймаут блокирующего cuframes_subscriber_next. Короткий * чтобы поток мог периодически проверять stop_flag и состояние. */ #define CFC_SOURCE_NEXT_TIMEOUT_MS 200 struct cfc_source { cfc_source_config_t cfg; char key_copy[64]; /* персистентная копия cfg.key */ char name_copy[32]; /* персистентная копия cfg.consumer_name */ pthread_t thread; int thread_started; _Atomic int stop_flag; pthread_mutex_t state_mu; cfc_source_state_t state; /* Snapshot — обновляется потоком, читается через get_latest. */ cuframes_subscriber_t *sub; /* nullable — есть только в ACTIVE/STALE */ cuframes_frame_t *current_frame; /* удерживаемый frame; release при следующем next() */ cfc_source_snapshot_t snapshot; int64_t last_frame_us; /* CLOCK_MONOTONIC момент последнего успешного кадра */ }; static int64_t now_us(void) { struct timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); return (int64_t)ts.tv_sec * 1000000 + ts.tv_nsec / 1000; } static void set_state(cfc_source_t *src, cfc_source_state_t s) { pthread_mutex_lock(&src->state_mu); if (src->state != s) { src->state = s; src->snapshot.state = s; } pthread_mutex_unlock(&src->state_mu); } static void release_current_frame(cfc_source_t *src) { if (src->current_frame && src->sub) { cuframes_subscriber_release(src->sub, src->current_frame); src->current_frame = NULL; } } static int try_subscribe(cfc_source_t *src) { cuframes_subscriber_config_t scfg = { 0 }; scfg.key = src->key_copy; scfg.consumer_name = src->name_copy; scfg.mode = CUFRAMES_MODE_NEWEST_ONLY; scfg.cuda_device = src->cfg.cuda_device; scfg.connect_timeout_ms = 2000; int r = cuframes_subscriber_create(&scfg, &src->sub); if (r != CUFRAMES_OK) { fprintf(stderr, "[cfc/source:%s] subscriber_create failed: %s\n", src->name_copy, cuframes_strerror(r)); return -1; } return 0; } static void destroy_subscriber(cfc_source_t *src) { release_current_frame(src); if (src->sub) { cuframes_subscriber_destroy(src->sub); src->sub = NULL; } } /* Основной поток. Цикл: subscribe → next → update snapshot → проверка * stale/dead → reconnect при необходимости. */ static void *source_thread(void *arg) { cfc_source_t *src = (cfc_source_t *)arg; int64_t reconnect_backoff_ms = src->cfg.reconnect_min_ms; while (!atomic_load(&src->stop_flag)) { cfc_source_state_t cur; pthread_mutex_lock(&src->state_mu); cur = src->state; pthread_mutex_unlock(&src->state_mu); switch (cur) { case CFC_SOURCE_DISCONNECTED: case CFC_SOURCE_DEAD: { /* Ждём backoff либо stop. */ int64_t wait_ms = reconnect_backoff_ms; while (wait_ms > 0 && !atomic_load(&src->stop_flag)) { int chunk = wait_ms > 100 ? 100 : (int)wait_ms; struct timespec ts = {.tv_sec = chunk / 1000, .tv_nsec = (long)(chunk % 1000) * 1000000L}; nanosleep(&ts, NULL); wait_ms -= chunk; } if (atomic_load(&src->stop_flag)) break; set_state(src, CFC_SOURCE_CONNECTING); break; } case CFC_SOURCE_CONNECTING: { if (try_subscribe(src) == 0) { set_state(src, CFC_SOURCE_ACTIVE); reconnect_backoff_ms = src->cfg.reconnect_min_ms; /* сброс backoff */ } else { set_state(src, CFC_SOURCE_DEAD); /* Удвоить backoff до max. */ reconnect_backoff_ms *= 2; if (reconnect_backoff_ms > src->cfg.reconnect_max_ms) { reconnect_backoff_ms = src->cfg.reconnect_max_ms; } } break; } case CFC_SOURCE_ACTIVE: case CFC_SOURCE_STALE: { /* cuframes требует release предыдущего frame ДО next (frame_busy * проверяется в начале subscriber_next; см. libcuframes/src/consumer.c * line 334). Поэтому release заранее. Это значит snapshot.ptr * может стать невалидным к моменту чтения caller'ом — Phase 2 * это исправит double-buffering'ом. */ release_current_frame(src); cuframes_frame_t *frame = NULL; /* consumer_stream=NULL — default stream; sync через cuMemcpy2D * на default stream. Phase 2 — свой stream + waitEvent. */ int r = cuframes_subscriber_next(src->sub, (void *)0, &frame, CFC_SOURCE_NEXT_TIMEOUT_MS); if (r == CUFRAMES_OK) { src->current_frame = frame; /* Обновляем snapshot под mutex'ом. */ int32_t w = 0, h = 0; cuframes_frame_size(frame, &w, &h); pthread_mutex_lock(&src->state_mu); src->snapshot.ptr = (CUdeviceptr)cuframes_frame_cuda_ptr(frame); src->snapshot.width = w; src->snapshot.height = h; src->snapshot.pitch_y = cuframes_frame_pitch_y(frame); src->snapshot.pitch_uv = cuframes_frame_pitch_uv(frame); src->snapshot.pts_ns = cuframes_frame_pts_ns(frame); src->snapshot.seq = cuframes_frame_seq(frame); src->last_frame_us = now_us(); src->snapshot.last_frame_age_us = 0; if (src->state == CFC_SOURCE_STALE) { src->state = CFC_SOURCE_ACTIVE; src->snapshot.state = CFC_SOURCE_ACTIVE; } pthread_mutex_unlock(&src->state_mu); } else if (r == CUFRAMES_ERR_TIMEOUT || r == CUFRAMES_ERR_WOULD_BLOCK) { /* Нет нового кадра — проверим age, может быть STALE/DEAD. */ int64_t age_ms = (now_us() - src->last_frame_us) / 1000; if (age_ms > src->cfg.dead_threshold_ms) { fprintf(stderr, "[cfc/source:%s] no frame for %lldms → DEAD\n", src->name_copy, (long long)age_ms); destroy_subscriber(src); set_state(src, CFC_SOURCE_DEAD); } else if (age_ms > src->cfg.stale_threshold_ms) { set_state(src, CFC_SOURCE_STALE); } } else if (r == CUFRAMES_ERR_DISCONNECTED) { fprintf(stderr, "[cfc/source:%s] DISCONNECTED from publisher\n", src->name_copy); destroy_subscriber(src); set_state(src, CFC_SOURCE_DEAD); } else { fprintf(stderr, "[cfc/source:%s] cuframes_subscriber_next failed: %s\n", src->name_copy, cuframes_strerror(r)); destroy_subscriber(src); set_state(src, CFC_SOURCE_DEAD); } break; } } } destroy_subscriber(src); return NULL; } /* ── Public API ───────────────────────────────────────────────────────── */ int cfc_source_create(const cfc_source_config_t *cfg, cfc_source_t **out) { if (!cfg || !cfg->key || !cfg->consumer_name || !out) return -1; cfc_source_t *src = calloc(1, sizeof(*src)); if (!src) return -1; src->cfg = *cfg; strncpy(src->key_copy, cfg->key, sizeof(src->key_copy) - 1); strncpy(src->name_copy, cfg->consumer_name, sizeof(src->name_copy) - 1); src->cfg.key = src->key_copy; src->cfg.consumer_name = src->name_copy; /* Дефолты */ if (src->cfg.reconnect_min_ms <= 0) src->cfg.reconnect_min_ms = 1000; if (src->cfg.reconnect_max_ms <= 0) src->cfg.reconnect_max_ms = 30000; if (src->cfg.stale_threshold_ms <= 0) src->cfg.stale_threshold_ms = 500; if (src->cfg.dead_threshold_ms <= 0) src->cfg.dead_threshold_ms = 5000; pthread_mutex_init(&src->state_mu, NULL); src->state = CFC_SOURCE_DISCONNECTED; src->snapshot.state = CFC_SOURCE_DISCONNECTED; src->snapshot.last_frame_age_us = -1; atomic_init(&src->stop_flag, 0); if (pthread_create(&src->thread, NULL, source_thread, src) != 0) { pthread_mutex_destroy(&src->state_mu); free(src); return -1; } src->thread_started = 1; *out = src; return 0; } int cfc_source_get_latest(cfc_source_t *src, cfc_source_snapshot_t *out) { if (!src || !out) return -1; pthread_mutex_lock(&src->state_mu); *out = src->snapshot; if (src->last_frame_us > 0) { out->last_frame_age_us = now_us() - src->last_frame_us; } else { out->last_frame_age_us = -1; } pthread_mutex_unlock(&src->state_mu); return 0; } int cfc_source_destroy(cfc_source_t *src) { if (!src) return 0; atomic_store(&src->stop_flag, 1); if (src->thread_started) { pthread_join(src->thread, NULL); } pthread_mutex_destroy(&src->state_mu); free(src); return 0; }