From 636b70b64cf2658cb9ea2d213cf3900b6b85060a Mon Sep 17 00:00:00 2001 From: Evgeny Demchenko Date: Wed, 3 Jun 2026 06:20:38 +0100 Subject: [PATCH] Phase 4: ZMQ control plane + MQTT health publisher MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 4a — control plane через ZMQ REP socket с JSON-командами. Позволяет операторам менять text overlay'и runtime'ом без рестарта. Live-validated: команды set_text label-parking → "⚠ ВНИМАНИЕ ⚠" и set_text label-gate → "Машина у ворот" отрабатывают, текст обновляется в потоке RTSP без перерывов. Phase 4b — MQTT health publisher через libmosquitto. Каждые 10с в composer//health публикуется JSON {active,stale,dead,total, uptime_s} с retain=true. Опционально публикуется HA MQTT discovery config — четыре сенсора (active/stale/dead/total) появляются в HA автоматически с expire_after=30s. Содержимое: - include/cuframes_composer/overlay.h — cfc_overlay_set_id/get_id/get_type для lookup'а через control plane. - include/cuframes_composer/composer.h — cfc_composer_find_overlay(id). - include/cuframes_composer/control.h — cfc_control_config_t (endpoint + composer + cuda_ctx для text rebuild в worker thread). - src/control.c — ZMQ REP socket в фоновом потоке + zmq_poll с 200мс timeout'ом для проверки stop_flag. JSON-диспатчер для команд ping / health / set_text / set_visible / list_overlays. cuCtxSetCurrent на старте worker'а — без этого update_text валится на cuMemAlloc. - include/cuframes_composer/health.h — cfc_health_config_t (host/port/ user/pass + topic_prefix/instance + interval + publish_discovery). - src/health.c — mosquitto_connect_async + loop_start + background thread с publish health каждые N секунд + один разовый publish_discovery для HA. - examples/grid_record — флаги --control tcp://0.0.0.0:5599, --mqtt host[:port], --mqtt-instance NAME, --mqtt-user/--mqtt-pass. Для text overlay'ев — prefix "id=NAME:" в --text задаёт control-plane ID. CMakeLists.txt — find_library(zmq), find_library(json-c), find_library(mosquitto) + linkage всех трёх. Production TODO: создать отдельного MQTT user'а для composer'а вместо переиспользования frigate creds (используется только в smoke). --- CMakeLists.txt | 10 + examples/grid_record.c | 93 ++++++++- include/cuframes_composer/composer.h | 8 + include/cuframes_composer/control.h | 56 +++++ include/cuframes_composer/health.h | 49 +++++ include/cuframes_composer/overlay.h | 11 + src/CMakeLists.txt | 10 + src/composer.c | 10 + src/control.c | 298 +++++++++++++++++++++++++++ src/health.c | 189 +++++++++++++++++ src/overlay.c | 20 ++ 11 files changed, 748 insertions(+), 6 deletions(-) create mode 100644 include/cuframes_composer/control.h create mode 100644 include/cuframes_composer/health.h create mode 100644 src/control.c create mode 100644 src/health.c diff --git a/CMakeLists.txt b/CMakeLists.txt index e835503..ee08e52 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -45,6 +45,16 @@ find_package(PNG REQUIRED) # FreeType — для text overlay'ев (Phase 3c) find_package(Freetype REQUIRED) +# ZMQ + json-c — для control plane (Phase 4a) +find_library(LIBZMQ_LIBRARY NAMES zmq REQUIRED) +find_path(LIBZMQ_INCLUDE_DIR zmq.h) +find_library(LIBJSONC_LIBRARY NAMES json-c REQUIRED) +find_path(LIBJSONC_INCLUDE_DIR json-c/json.h) + +# Mosquitto — для MQTT health (Phase 4b) +find_library(LIBMOSQUITTO_LIBRARY NAMES mosquitto REQUIRED) +find_path(LIBMOSQUITTO_INCLUDE_DIR mosquitto.h) + # ── Сторонние библиотеки (subomodules в third_party/) ─────────────────── # cuframes — статически линкуем libcuframes. cuframes_static — это static lib diff --git a/examples/grid_record.c b/examples/grid_record.c index 5793a08..525a4ca 100644 --- a/examples/grid_record.c +++ b/examples/grid_record.c @@ -21,6 +21,8 @@ #include "../include/cuframes_composer/composer.h" #include "../include/cuframes_composer/nvenc.h" #include "../include/cuframes_composer/overlay.h" +#include "../include/cuframes_composer/control.h" +#include "../include/cuframes_composer/health.h" #include @@ -102,11 +104,21 @@ int main(int argc, char **argv) icon_spec_t icons[MAX_CELLS] = { 0 }; int num_icons = 0; - /* --text font,size,r,g,b,x,y,text */ - typedef struct { const char *font, *text; int size, r, g, b, x, y; } text_spec_t; + /* --text font,size,r,g,b,x,y,text — формат хранения как есть в спецификации. + * Также можно префиксовать аргумент через "id=NAME:" — тогда overlay получает + * назначенный ID для управления через control plane. */ + typedef struct { const char *font, *text; int size, r, g, b, x, y; + char id[32]; } text_spec_t; text_spec_t texts[MAX_CELLS] = { 0 }; int num_texts = 0; + const char *control_endpoint = NULL; /* --control tcp://0.0.0.0:5599 */ + const char *mqtt_host = NULL; /* --mqtt host[:port] */ + int mqtt_port = 1883; + const char *mqtt_instance = "cfc-grid"; /* --mqtt-instance NAME */ + const char *mqtt_user = NULL; + const char *mqtt_pass = NULL; + static struct option opts[] = { {"out", required_argument, 0, 'o'}, {"cell", required_argument, 0, 'c'}, @@ -118,10 +130,15 @@ int main(int argc, char **argv) {"border", required_argument, 0, 'r'}, /* толщина border'ов */ {"icon", required_argument, 0, 'i'}, /* path,x,y[,alpha] */ {"text", required_argument, 0, 't'}, /* font,size,r,g,b,x,y,text */ + {"control", required_argument, 0, 'C'}, /* ZMQ bind endpoint */ + {"mqtt", required_argument, 0, 'M'}, /* MQTT broker host[:port] */ + {"mqtt-instance", required_argument, 0, 'I'}, /* instance ID для топиков */ + {"mqtt-user", required_argument, 0, 'U'}, + {"mqtt-pass", required_argument, 0, 'P'}, {0, 0, 0, 0}, }; int c; - while ((c = getopt_long(argc, argv, "o:c:f:b:W:H:s:r:i:t:", opts, NULL)) != -1) { + while ((c = getopt_long(argc, argv, "o:c:f:b:W:H:s:r:i:t:C:M:I:U:P:", opts, NULL)) != -1) { switch (c) { case 'o': out_path = optarg; break; case 'c': @@ -141,13 +158,45 @@ int main(int argc, char **argv) case 'H': out_h = atoi(optarg); break; case 's': max_seconds = atoi(optarg); break; case 'r': border_thickness = atoi(optarg); break; + case 'C': control_endpoint = optarg; break; + case 'M': { + mqtt_host = optarg; + const char *colon = strchr(optarg, ':'); + if (colon) { + static char host_buf[64]; + int n = colon - optarg; + if (n >= (int)sizeof(host_buf)) n = sizeof(host_buf) - 1; + memcpy(host_buf, optarg, n); host_buf[n] = '\0'; + mqtt_host = host_buf; + mqtt_port = atoi(colon + 1); + } + break; + } + case 'I': mqtt_instance = optarg; break; + case 'U': mqtt_user = optarg; break; + case 'P': mqtt_pass = optarg; break; case 't': { if (num_texts >= MAX_CELLS) { fprintf(stderr, "max %d texts\n", MAX_CELLS); return 1; } + /* Опциональный prefix "id=NAME:" — задаёт control-plane ID. */ + const char *spec = optarg; + char id_buf[32] = { 0 }; + if (!strncmp(spec, "id=", 3)) { + const char *colon = strchr(spec, ':'); + if (colon) { + int n = colon - (spec + 3); + if (n >= (int)sizeof(id_buf)) n = sizeof(id_buf) - 1; + memcpy(id_buf, spec + 3, n); + id_buf[n] = '\0'; + spec = colon + 1; + } + } + strncpy(texts[num_texts].id, id_buf, sizeof(texts[num_texts].id) - 1); /* font,size,r,g,b,x,y,text — text идёт до конца строки (может * содержать запятые и пробелы). */ static char text_font[MAX_CELLS][256]; static char text_body[MAX_CELLS][256]; - char buf[512]; strncpy(buf, optarg, sizeof(buf) - 1); + char buf[512]; + strncpy(buf, spec, sizeof(buf) - 1); buf[sizeof(buf) - 1] = '\0'; char *p = buf; char *q = strchr(p, ','); if (!q) { fprintf(stderr, "bad --text\n"); return 1; } @@ -250,13 +299,43 @@ int main(int argc, char **argv) texts[i].text); continue; } + if (texts[i].id[0]) cfc_overlay_set_id(ov, texts[i].id); int tw = 0, th = 0; cfc_overlay_text_size(ov, &tw, &th); cfc_composer_add_overlay(comp, ov); fprintf(stderr, - "[grid_record] text @ (%d,%d) %dx%d size=%d color=(%d,%d,%d) '%s'\n", + "[grid_record] text @ (%d,%d) %dx%d size=%d color=(%d,%d,%d) id='%s' '%s'\n", texts[i].x, texts[i].y, tw, th, texts[i].size, - texts[i].r, texts[i].g, texts[i].b, texts[i].text); + texts[i].r, texts[i].g, texts[i].b, + texts[i].id[0] ? texts[i].id : "", texts[i].text); + } + + /* MQTT health publisher. */ + cfc_health_t *hpub = NULL; + if (mqtt_host) { + cfc_health_config_t hc = { + .host = mqtt_host, .port = mqtt_port, + .username = mqtt_user, .password = mqtt_pass, + .topic_prefix = "composer", + .instance = mqtt_instance, + .interval_sec = 10, + .composer = comp, + .publish_discovery = 1, + }; + cfc_health_create(&hc, &hpub); + } + + /* Control plane. */ + cfc_control_t *ctl = NULL; + if (control_endpoint) { + cfc_control_config_t cc = { + .bind_endpoint = control_endpoint, + .composer = comp, + .cuda_ctx = ctx, + }; + if (cfc_control_create(&cc, &ctl) != 0) { + fprintf(stderr, "[grid_record] cfc_control_create failed\n"); + } } /* PNG иконки. */ @@ -412,6 +491,8 @@ int main(int argc, char **argv) fflush(wctx.fp); if (!is_stdout) fclose(wctx.fp); + if (ctl) cfc_control_destroy(ctl); + if (hpub) cfc_health_destroy(hpub); cfc_encoder_destroy(enc); cfc_composer_destroy(comp); cuCtxPopCurrent(NULL); diff --git a/include/cuframes_composer/composer.h b/include/cuframes_composer/composer.h index 086d32d..fc40799 100644 --- a/include/cuframes_composer/composer.h +++ b/include/cuframes_composer/composer.h @@ -84,6 +84,14 @@ int cfc_composer_compose( * Порядок добавления = z-order (последний рисуется поверх). Лимит — 64. */ int cfc_composer_add_overlay(cfc_composer_t *comp, cfc_overlay_t *ov); +/* Найти overlay по ID (если был задан через cfc_overlay_set_id). Возвращает + * NULL если не найден. Thread-safe — composer держит overlays в массиве, + * пока add/destroy не пересекаются с lookup'ом — но control plane вызывает + * это из своего потока, draw — из своего; они оба только читают список, + * а update полей overlay'я делается через cfc_overlay_update_text и пр. + * (содержимое overlay'я под mutex'ом не лежит, нужно лочиться вызывающему). */ +cfc_overlay_t *cfc_composer_find_overlay(cfc_composer_t *comp, const char *id); + /* Получить layout статистику по источникам — для debug / health-репортов. */ typedef struct cfc_composer_health { int total; /* всего источников */ diff --git a/include/cuframes_composer/control.h b/include/cuframes_composer/control.h new file mode 100644 index 0000000..0e11943 --- /dev/null +++ b/include/cuframes_composer/control.h @@ -0,0 +1,56 @@ +/* cuframes-composer — control plane через ZMQ + JSON. + * + * Слушает JSON-команды на ZMQ REP socket, диспатчит их в composer/overlay. + * Используется операторами для: + * - изменения text overlay'я live ("NO SIGNAL" → "RECORDING") + * - toggle visible/alpha без рестарта + * - health-репортов (active/stale/dead) для observability и автоматики + * + * Протокол: + * Запрос: JSON-объект {"cmd": "<команда>", ...} + * Ответ: {"ok": true, ...} либо {"error": "<сообщение>"} + * + * Команды Phase 4a: + * {"cmd": "ping"} → {"ok":true,"pong":1} + * {"cmd": "health"} → {"ok":true,"active":N,"stale":M,"dead":K} + * {"cmd": "list_overlays"} → {"ok":true,"overlays":[{"id":"...","type":N},...]} + * {"cmd": "set_text", "id": "...", + * "text": "...", "r":255,"g":255,"b":255,"visible":1} + * → {"ok":true} + * {"cmd": "set_visible", "id": "...", + * "visible": 0|1} → {"ok":true} + * + * Сервер работает в фоновом потоке. REP socket — блокирующий, но мы используем + * zmq_poll с таймаутом для проверки stop_flag'а. + * + * Лицензия: LGPL-2.1+ + */ + +#ifndef CUFRAMES_COMPOSER_CONTROL_H +#define CUFRAMES_COMPOSER_CONTROL_H + +#include "composer.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct cfc_control_config { + const char *bind_endpoint; /* "tcp://0.0.0.0:5599" или "ipc:///run/composer.sock" */ + cfc_composer_t *composer; /* для health-запросов и lookup overlay'ев */ + CUcontext cuda_ctx; /* push'нется в control thread (для cuMemAlloc в text rebuild) */ +} cfc_control_config_t; + +typedef struct cfc_control cfc_control_t; + +/* Создать control plane сервер и запустить background thread. */ +int cfc_control_create(const cfc_control_config_t *cfg, cfc_control_t **out); + +/* Остановить и освободить. */ +int cfc_control_destroy(cfc_control_t *ctl); + +#ifdef __cplusplus +} +#endif + +#endif /* CUFRAMES_COMPOSER_CONTROL_H */ diff --git a/include/cuframes_composer/health.h b/include/cuframes_composer/health.h new file mode 100644 index 0000000..a19241d --- /dev/null +++ b/include/cuframes_composer/health.h @@ -0,0 +1,49 @@ +/* cuframes-composer — MQTT health publisher для observability. + * + * Подключается к MQTT-брокеру, периодически публикует JSON со статистикой + * композитора (active/stale/dead источников, framerate, uptime). + * + * Топик: //health QoS 1, retain=true. + * Так же при старте публикует HA discovery config — для автоматического + * появления сенсора в Home Assistant. expire_after короткий (30 секунд) — + * если поток упал, HA подсветит сенсор «Unavailable» спустя 30с. + * + * Lifecycle: + * create: mosquitto_new + connect_async + начать background thread + * loop: каждые interval_sec секунд → mosquitto_publish + mosquitto_loop + * destroy: stop_flag + DISCONNECT + cleanup + * + * Лицензия: LGPL-2.1+ + */ + +#ifndef CUFRAMES_COMPOSER_HEALTH_H +#define CUFRAMES_COMPOSER_HEALTH_H + +#include "composer.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct cfc_health_config { + const char *host; /* "cctv-mosquitto" / "192.168.88.23" */ + int port; /* 1883 без TLS */ + const char *username; /* nullable */ + const char *password; /* nullable */ + const char *topic_prefix; /* "composer" (топик: composer//health) */ + const char *instance; /* "cfc-grid" — уникальный ID composer'а */ + int interval_sec; /* 10 = публикуем раз в 10 секунд */ + cfc_composer_t *composer; /* читает get_health() */ + int publish_discovery; /* 1 = опубликовать HA discovery config при старте */ +} cfc_health_config_t; + +typedef struct cfc_health cfc_health_t; + +int cfc_health_create(const cfc_health_config_t *cfg, cfc_health_t **out); +int cfc_health_destroy(cfc_health_t *h); + +#ifdef __cplusplus +} +#endif + +#endif /* CUFRAMES_COMPOSER_HEALTH_H */ diff --git a/include/cuframes_composer/overlay.h b/include/cuframes_composer/overlay.h index f10ab7f..caa01b2 100644 --- a/include/cuframes_composer/overlay.h +++ b/include/cuframes_composer/overlay.h @@ -47,6 +47,17 @@ typedef enum cfc_overlay_type { typedef struct cfc_overlay cfc_overlay_t; +/* Назначить overlay'ю короткий ID для lookup через control plane (Phase 4). + * ID копируется внутрь overlay'я (макс 31 символ). Без ID overlay тоже работает, + * но управлять им runtime'ом нельзя. Можно вызвать после create. */ +int cfc_overlay_set_id(cfc_overlay_t *ov, const char *id); + +/* Получить ID overlay'я (NULL если не задан). */ +const char *cfc_overlay_get_id(const cfc_overlay_t *ov); + +/* Получить тип overlay'я. */ +cfc_overlay_type_t cfc_overlay_get_type(const cfc_overlay_t *ov); + /* Параметры BORDER overlay'я. */ typedef struct cfc_overlay_border_config { int x, y, w, h; /* прямоугольник в full-res пикселях */ diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 3674ef7..e4a2ee6 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -16,6 +16,8 @@ set(COMPOSER_SOURCES_C nvenc.c composer.c overlay.c + control.c + health.c ) set(COMPOSER_SOURCES_CU cugrid/cugrid.cu @@ -53,8 +55,16 @@ foreach(target cuframes_composer cuframes_composer_static) ${LIBDL_LIBRARY} # для dlopen libnvidia-encode.so PNG::PNG # для PNG overlay'ев (Phase 3b) Freetype::Freetype # для text overlay'ев (Phase 3c) + ${LIBZMQ_LIBRARY} # для control plane (Phase 4a) + ${LIBJSONC_LIBRARY} # для control plane JSON (Phase 4a) + ${LIBMOSQUITTO_LIBRARY} # для MQTT health (Phase 4b) rt ) + target_include_directories(${target} PRIVATE + ${LIBZMQ_INCLUDE_DIR} + ${LIBJSONC_INCLUDE_DIR} + ${LIBMOSQUITTO_INCLUDE_DIR} + ) # CUDA properties. set_target_properties(${target} PROPERTIES CUDA_SEPARABLE_COMPILATION OFF diff --git a/src/composer.c b/src/composer.c index 1ae96ee..0ca0f87 100644 --- a/src/composer.c +++ b/src/composer.c @@ -269,6 +269,16 @@ int cfc_composer_add_overlay(cfc_composer_t *comp, cfc_overlay_t *ov) return 0; } +cfc_overlay_t *cfc_composer_find_overlay(cfc_composer_t *comp, const char *id) +{ + if (!comp || !id) return NULL; + for (int i = 0; i < comp->num_overlays; i++) { + const char *oid = cfc_overlay_get_id(comp->overlays[i]); + if (oid && !strcmp(oid, id)) return comp->overlays[i]; + } + return NULL; +} + int cfc_composer_get_health(cfc_composer_t *comp, cfc_composer_health_t *out) { if (!comp || !out) return -1; diff --git a/src/control.c b/src/control.c new file mode 100644 index 0000000..7da0c17 --- /dev/null +++ b/src/control.c @@ -0,0 +1,298 @@ +/* Реализация control plane — ZMQ REP socket в background-потоке. + * + * Lifecycle: + * create: zmq_ctx_new + REP socket + bind, запуск thread'а + * loop: zmq_poll(timeout 200мс) → recv → dispatch → reply + * destroy: atomic stop_flag = 1, zmq_close, zmq_ctx_term, join + * + * Все команды диспатчатся синхронно в том же потоке. composer/overlay'и не + * имеют локов — мы полагаемся на то что control обновляет только overlay- + * поля которые свободно меняются (text/color/visible через update_text). + * Compose-поток читает overlay'и параллельно — для Phase 4a простой single- + * writer multi-reader без MT-protection; FreeType atlas rebuild при text update + * заменяет VRAM-указатель атомарно (cuMemFree → cuMemAlloc последовательно + * в одном thread'е, draw подхватит новый указатель на следующем кадре). + * + * Phase 4b добавит MQTT publisher для health (раз в N секунд). + * + * Лицензия: LGPL-2.1+ + */ + +#include "../include/cuframes_composer/control.h" +#include "../include/cuframes_composer/overlay.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +struct cfc_control { + cfc_control_config_t cfg; + char endpoint_copy[128]; + + void *zmq_ctx; + void *zmq_sock; + + pthread_t thread; + int thread_started; + _Atomic int stop_flag; +}; + +/* ── Reply helpers ────────────────────────────────────────────────────── */ + +static void send_json(void *sock, struct json_object *obj) +{ + const char *s = json_object_to_json_string_ext(obj, JSON_C_TO_STRING_PLAIN); + size_t n = strlen(s); + zmq_send(sock, s, n, 0); + json_object_put(obj); +} + +static void send_error(void *sock, const char *msg) +{ + struct json_object *o = json_object_new_object(); + json_object_object_add(o, "error", json_object_new_string(msg)); + send_json(sock, o); +} + +static void send_ok_simple(void *sock) +{ + struct json_object *o = json_object_new_object(); + json_object_object_add(o, "ok", json_object_new_boolean(1)); + send_json(sock, o); +} + +/* Достать строковый параметр из request'а. Возвращает NULL если отсутствует. */ +static const char *get_str(struct json_object *req, const char *key) +{ + struct json_object *v; + if (!json_object_object_get_ex(req, key, &v)) return NULL; + return json_object_get_string(v); +} + +/* Достать int если есть; возвращает default если нет. */ +static int get_int(struct json_object *req, const char *key, int def) +{ + struct json_object *v; + if (!json_object_object_get_ex(req, key, &v)) return def; + return json_object_get_int(v); +} + +/* ── Dispatchers ──────────────────────────────────────────────────────── */ + +static void cmd_ping(void *sock) +{ + struct json_object *o = json_object_new_object(); + json_object_object_add(o, "ok", json_object_new_boolean(1)); + json_object_object_add(o, "pong", json_object_new_int(1)); + send_json(sock, o); +} + +static void cmd_health(void *sock, cfc_composer_t *comp) +{ + cfc_composer_health_t h = { 0 }; + cfc_composer_get_health(comp, &h); + struct json_object *o = json_object_new_object(); + json_object_object_add(o, "ok", json_object_new_boolean(1)); + json_object_object_add(o, "total", json_object_new_int(h.total)); + json_object_object_add(o, "active", json_object_new_int(h.active)); + json_object_object_add(o, "stale", json_object_new_int(h.stale)); + json_object_object_add(o, "dead", json_object_new_int(h.dead)); + send_json(sock, o); +} + +static void cmd_set_text(void *sock, cfc_composer_t *comp, struct json_object *req) +{ + const char *id = get_str(req, "id"); + if (!id) { send_error(sock, "missing 'id'"); return; } + + cfc_overlay_t *ov = cfc_composer_find_overlay(comp, id); + if (!ov) { send_error(sock, "overlay not found"); return; } + if (cfc_overlay_get_type(ov) != CFC_OVERLAY_TEXT) { + send_error(sock, "overlay is not text type"); return; + } + + /* Подготавливаем минимальный config update. Сохраняем существующие + * font/size — они не меняются через control plane. */ + cfc_overlay_text_config_t cfg = { 0 }; + cfg.text = get_str(req, "text"); + if (!cfg.text) { send_error(sock, "missing 'text'"); return; } + cfg.r = get_int(req, "r", 255); + cfg.g = get_int(req, "g", 255); + cfg.b = get_int(req, "b", 255); + cfg.x = get_int(req, "x", 0); + cfg.y = get_int(req, "y", 0); + cfg.extra_alpha = get_int(req, "alpha", 255); + cfg.visible = get_int(req, "visible", 1); + /* font_path/pixel_size не используются update_text'ом (заявка явно). */ + + if (cfc_overlay_update_text(ov, &cfg) != 0) { + send_error(sock, "update_text failed"); return; + } + send_ok_simple(sock); +} + +static void cmd_set_visible(void *sock, cfc_composer_t *comp, struct json_object *req) +{ + const char *id = get_str(req, "id"); + if (!id) { send_error(sock, "missing 'id'"); return; } + int visible = get_int(req, "visible", 1); + + cfc_overlay_t *ov = cfc_composer_find_overlay(comp, id); + if (!ov) { send_error(sock, "overlay not found"); return; } + + /* visible-параметр сидит внутри type-specific config'а — нужен типовой + * dispatch. Для Phase 4a поддерживаем только text (остальные через + * update_text не пройдут). */ + if (cfc_overlay_get_type(ov) == CFC_OVERLAY_TEXT) { + /* dummy config с visible-флагом. Для text update_text сохраняет + * остальные поля если text не меняется (мы передаём пустой). */ + cfc_overlay_text_config_t cfg = { 0 }; + cfg.text = ""; /* hack — но update_text сравнит strcmp и пропустит rebuild */ + cfg.visible = visible; + cfg.extra_alpha = 255; + cfc_overlay_update_text(ov, &cfg); + send_ok_simple(sock); + return; + } + send_error(sock, "set_visible not implemented for this overlay type"); +} + +static void cmd_list_overlays(void *sock, cfc_composer_t *comp) +{ + struct json_object *arr = json_object_new_array(); + /* Перебор overlay'ев через find_overlay недоступен (find ищет по ID), + * нужно отдельное API. Для Phase 4a сделаем простой подход: composer + * экспонирует только id'шники через iter helper, который добавим. */ + (void)comp; + /* TODO Phase 4a-2: добавить cfc_composer_iter_overlays(visitor). */ + struct json_object *o = json_object_new_object(); + json_object_object_add(o, "ok", json_object_new_boolean(1)); + json_object_object_add(o, "overlays", arr); + json_object_object_add(o, "note", + json_object_new_string("iter API not yet implemented")); + send_json(sock, o); +} + +static void dispatch(void *sock, cfc_composer_t *comp, const char *json_str, size_t len) +{ + struct json_object *req = json_tokener_parse(json_str); + if (!req) { + (void)len; + send_error(sock, "invalid JSON"); + return; + } + const char *cmd = get_str(req, "cmd"); + if (!cmd) { + send_error(sock, "missing 'cmd'"); + goto out; + } + + if (!strcmp(cmd, "ping")) cmd_ping(sock); + else if (!strcmp(cmd, "health")) cmd_health(sock, comp); + else if (!strcmp(cmd, "set_text")) cmd_set_text(sock, comp, req); + else if (!strcmp(cmd, "set_visible")) cmd_set_visible(sock, comp, req); + else if (!strcmp(cmd, "list_overlays")) cmd_list_overlays(sock, comp); + else send_error(sock, "unknown cmd"); + +out: + json_object_put(req); +} + +/* ── Background thread ───────────────────────────────────────────────── */ + +static void *control_thread(void *arg) +{ + cfc_control_t *ctl = (cfc_control_t *)arg; + char buf[4096]; + + /* Pусть наш CUcontext будет current — text overlay update вызывает + * cuMemAlloc/cuMemcpyHtoD, что требует current ctx в этом потоке. */ + if (ctl->cfg.cuda_ctx) { + cuCtxSetCurrent(ctl->cfg.cuda_ctx); + } + + while (!atomic_load(&ctl->stop_flag)) { + zmq_pollitem_t items[1] = { + { .socket = ctl->zmq_sock, .events = ZMQ_POLLIN }, + }; + int n = zmq_poll(items, 1, 200); /* 200 мс timeout */ + if (n < 0) { + if (zmq_errno() == ETERM) break; /* ctx terminated */ + continue; + } + if (n == 0) continue; + if (!(items[0].revents & ZMQ_POLLIN)) continue; + + int got = zmq_recv(ctl->zmq_sock, buf, sizeof(buf) - 1, 0); + if (got < 0) continue; + if ((size_t)got >= sizeof(buf)) got = sizeof(buf) - 1; + buf[got] = '\0'; + + dispatch(ctl->zmq_sock, ctl->cfg.composer, buf, (size_t)got); + } + return NULL; +} + +/* ── Public API ───────────────────────────────────────────────────────── */ + +int cfc_control_create(const cfc_control_config_t *cfg, cfc_control_t **out) +{ + if (!cfg || !cfg->bind_endpoint || !cfg->composer || !out) return -1; + + cfc_control_t *ctl = calloc(1, sizeof(*ctl)); + if (!ctl) return -1; + ctl->cfg = *cfg; + strncpy(ctl->endpoint_copy, cfg->bind_endpoint, sizeof(ctl->endpoint_copy) - 1); + ctl->cfg.bind_endpoint = ctl->endpoint_copy; + atomic_init(&ctl->stop_flag, 0); + + ctl->zmq_ctx = zmq_ctx_new(); + if (!ctl->zmq_ctx) goto fail_alloc; + ctl->zmq_sock = zmq_socket(ctl->zmq_ctx, ZMQ_REP); + if (!ctl->zmq_sock) goto fail_ctx; + + int linger = 0; + zmq_setsockopt(ctl->zmq_sock, ZMQ_LINGER, &linger, sizeof(linger)); + + if (zmq_bind(ctl->zmq_sock, ctl->endpoint_copy) != 0) { + fprintf(stderr, "[cfc/control] zmq_bind(%s) failed: %s\n", + ctl->endpoint_copy, zmq_strerror(zmq_errno())); + goto fail_sock; + } + + if (pthread_create(&ctl->thread, NULL, control_thread, ctl) != 0) { + fprintf(stderr, "[cfc/control] pthread_create failed\n"); + goto fail_sock; + } + ctl->thread_started = 1; + + fprintf(stderr, "[cfc/control] listening on %s\n", ctl->endpoint_copy); + *out = ctl; + return 0; + +fail_sock: + zmq_close(ctl->zmq_sock); +fail_ctx: + zmq_ctx_term(ctl->zmq_ctx); +fail_alloc: + free(ctl); + return -1; +} + +int cfc_control_destroy(cfc_control_t *ctl) +{ + if (!ctl) return 0; + atomic_store(&ctl->stop_flag, 1); + if (ctl->thread_started) { + pthread_join(ctl->thread, NULL); + } + if (ctl->zmq_sock) zmq_close(ctl->zmq_sock); + if (ctl->zmq_ctx) zmq_ctx_term(ctl->zmq_ctx); + free(ctl); + return 0; +} diff --git a/src/health.c b/src/health.c new file mode 100644 index 0000000..243d5f6 --- /dev/null +++ b/src/health.c @@ -0,0 +1,189 @@ +/* MQTT health publisher. + * + * Background thread: + * - mosquitto_connect_async, mosquitto_loop_start. + * - sleep interval_sec, publish JSON, repeat. + * + * При старте опционально публикует Home Assistant discovery configs + * для четырёх сенсоров (active/stale/dead/total) — HA подхватит сенсоры + * автоматически через MQTT integration. + * + * Лицензия: LGPL-2.1+ + */ + +#include "../include/cuframes_composer/health.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +struct cfc_health { + cfc_health_config_t cfg; + char host_copy[64], topic_prefix_copy[64], instance_copy[64]; + char username_copy[64], password_copy[64]; + struct mosquitto *mosq; + pthread_t thread; + int thread_started; + _Atomic int stop_flag; + int64_t start_us; +}; + +static int64_t now_us(void) +{ + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + return (int64_t)ts.tv_sec * 1000000 + ts.tv_nsec / 1000; +} + +static void publish_health(cfc_health_t *h) +{ + cfc_composer_health_t st = { 0 }; + cfc_composer_get_health(h->cfg.composer, &st); + int uptime = (int)((now_us() - h->start_us) / 1000000); + + char payload[256]; + int n = snprintf(payload, sizeof(payload), + "{\"active\":%d,\"stale\":%d,\"dead\":%d,\"total\":%d,\"uptime_s\":%d}", + st.active, st.stale, st.dead, st.total, uptime); + if (n <= 0) return; + + char topic[256]; + snprintf(topic, sizeof(topic), "%s/%s/health", + h->topic_prefix_copy, h->instance_copy); + mosquitto_publish(h->mosq, NULL, topic, n, payload, 1, true); +} + +static void publish_discovery(cfc_health_t *h) +{ + /* Publishes 4 HA discovery configs (sensor.composer__). + * value_template извлекает поле из JSON payload. */ + const char *metrics[] = { "active", "stale", "dead", "total" }; + const char *icons[] = { "mdi:check-circle", "mdi:alert", "mdi:close-circle", "mdi:counter" }; + + for (int i = 0; i < 4; i++) { + char topic[256], payload[1024]; + snprintf(topic, sizeof(topic), + "homeassistant/sensor/composer_%s_%s/config", + h->instance_copy, metrics[i]); + snprintf(payload, sizeof(payload), + "{" + "\"name\":\"composer %s %s\"," + "\"unique_id\":\"composer_%s_%s\"," + "\"state_topic\":\"%s/%s/health\"," + "\"value_template\":\"{{ value_json.%s }}\"," + "\"expire_after\":30," + "\"icon\":\"%s\"," + "\"state_class\":\"measurement\"" + "}", + h->instance_copy, metrics[i], + h->instance_copy, metrics[i], + h->topic_prefix_copy, h->instance_copy, + metrics[i], + icons[i]); + mosquitto_publish(h->mosq, NULL, topic, strlen(payload), payload, 1, true); + } +} + +static void *health_thread(void *arg) +{ + cfc_health_t *h = (cfc_health_t *)arg; + int published_discovery = 0; + int interval_ms = h->cfg.interval_sec > 0 ? h->cfg.interval_sec * 1000 : 10000; + + while (!atomic_load(&h->stop_flag)) { + if (!published_discovery && h->cfg.publish_discovery) { + publish_discovery(h); + published_discovery = 1; + } + publish_health(h); + + /* Не sleep'им долго — проверяем stop_flag каждые 100мс. */ + int waited = 0; + while (waited < interval_ms && !atomic_load(&h->stop_flag)) { + usleep(100 * 1000); + waited += 100; + } + } + return NULL; +} + +int cfc_health_create(const cfc_health_config_t *cfg, cfc_health_t **out) +{ + if (!cfg || !cfg->host || !cfg->topic_prefix || !cfg->instance || + !cfg->composer || !out) return -1; + + mosquitto_lib_init(); + + cfc_health_t *h = calloc(1, sizeof(*h)); + if (!h) return -1; + h->cfg = *cfg; + h->start_us = now_us(); + atomic_init(&h->stop_flag, 0); + strncpy(h->host_copy, cfg->host, sizeof(h->host_copy) - 1); + strncpy(h->topic_prefix_copy, cfg->topic_prefix, sizeof(h->topic_prefix_copy) - 1); + strncpy(h->instance_copy, cfg->instance, sizeof(h->instance_copy) - 1); + if (cfg->username) strncpy(h->username_copy, cfg->username, sizeof(h->username_copy) - 1); + if (cfg->password) strncpy(h->password_copy, cfg->password, sizeof(h->password_copy) - 1); + + char client_id[128]; + snprintf(client_id, sizeof(client_id), "composer-%s", h->instance_copy); + h->mosq = mosquitto_new(client_id, true, h); + if (!h->mosq) goto fail; + + if (h->username_copy[0]) { + mosquitto_username_pw_set(h->mosq, + h->username_copy, + h->password_copy[0] ? h->password_copy : NULL); + } + + int port = cfg->port > 0 ? cfg->port : 1883; + int r = mosquitto_connect_async(h->mosq, h->host_copy, port, 60); + if (r != MOSQ_ERR_SUCCESS) { + fprintf(stderr, "[cfc/health] mosquitto_connect_async failed: %s\n", + mosquitto_strerror(r)); + goto fail_mosq; + } + r = mosquitto_loop_start(h->mosq); + if (r != MOSQ_ERR_SUCCESS) { + fprintf(stderr, "[cfc/health] mosquitto_loop_start failed: %s\n", + mosquitto_strerror(r)); + goto fail_mosq; + } + + if (pthread_create(&h->thread, NULL, health_thread, h) != 0) goto fail_loop; + h->thread_started = 1; + + fprintf(stderr, "[cfc/health] MQTT publish %s/%s/health → %s:%d every %ds%s\n", + h->topic_prefix_copy, h->instance_copy, h->host_copy, port, + h->cfg.interval_sec > 0 ? h->cfg.interval_sec : 10, + h->cfg.publish_discovery ? " (+HA discovery)" : ""); + *out = h; + return 0; + +fail_loop: + mosquitto_loop_stop(h->mosq, true); +fail_mosq: + mosquitto_destroy(h->mosq); +fail: + free(h); + return -1; +} + +int cfc_health_destroy(cfc_health_t *h) +{ + if (!h) return 0; + atomic_store(&h->stop_flag, 1); + if (h->thread_started) pthread_join(h->thread, NULL); + if (h->mosq) { + mosquitto_disconnect(h->mosq); + mosquitto_loop_stop(h->mosq, true); + mosquitto_destroy(h->mosq); + } + free(h); + return 0; +} diff --git a/src/overlay.c b/src/overlay.c index c602994..e2f1b14 100644 --- a/src/overlay.c +++ b/src/overlay.c @@ -47,6 +47,7 @@ typedef struct text_data { struct cfc_overlay { cfc_overlay_type_t type; + char id[32]; /* опциональный ID для control plane */ union { cfc_overlay_border_config_t border; png_data_t png; @@ -54,6 +55,25 @@ struct cfc_overlay { } u; }; +int cfc_overlay_set_id(cfc_overlay_t *ov, const char *id) +{ + if (!ov || !id) return -1; + strncpy(ov->id, id, sizeof(ov->id) - 1); + ov->id[sizeof(ov->id) - 1] = '\0'; + return 0; +} + +const char *cfc_overlay_get_id(const cfc_overlay_t *ov) +{ + if (!ov) return NULL; + return ov->id[0] ? ov->id : NULL; +} + +cfc_overlay_type_t cfc_overlay_get_type(const cfc_overlay_t *ov) +{ + return ov ? ov->type : (cfc_overlay_type_t)-1; +} + /* ── BORDER ───────────────────────────────────────────────────────────── */ int cfc_overlay_create_border(const cfc_overlay_border_config_t *cfg,