Phase 4: ZMQ control plane + MQTT health publisher

Phase 4a — control plane через ZMQ REP socket с JSON-командами.
Позволяет операторам менять text overlay'и runtime'ом без рестарта.
Live-validated: команды set_text label-parking → "⚠ ВНИМАНИЕ ⚠"
и set_text label-gate → "Машина у ворот" отрабатывают, текст
обновляется в потоке RTSP без перерывов.

Phase 4b — MQTT health publisher через libmosquitto. Каждые 10с в
composer/<instance>/health публикуется JSON {active,stale,dead,total,
uptime_s} с retain=true. Опционально публикуется HA MQTT discovery
config — четыре сенсора (active/stale/dead/total) появляются в HA
автоматически с expire_after=30s.

Содержимое:

- include/cuframes_composer/overlay.h — cfc_overlay_set_id/get_id/get_type
  для lookup'а через control plane.
- include/cuframes_composer/composer.h — cfc_composer_find_overlay(id).
- include/cuframes_composer/control.h — cfc_control_config_t (endpoint +
  composer + cuda_ctx для text rebuild в worker thread).
- src/control.c — ZMQ REP socket в фоновом потоке + zmq_poll с 200мс
  timeout'ом для проверки stop_flag. JSON-диспатчер для команд ping /
  health / set_text / set_visible / list_overlays. cuCtxSetCurrent
  на старте worker'а — без этого update_text валится на cuMemAlloc.
- include/cuframes_composer/health.h — cfc_health_config_t (host/port/
  user/pass + topic_prefix/instance + interval + publish_discovery).
- src/health.c — mosquitto_connect_async + loop_start + background
  thread с publish health каждые N секунд + один разовый publish_discovery
  для HA.
- examples/grid_record — флаги --control tcp://0.0.0.0:5599,
  --mqtt host[:port], --mqtt-instance NAME, --mqtt-user/--mqtt-pass.
  Для text overlay'ев — prefix "id=NAME:" в --text задаёт control-plane ID.

CMakeLists.txt — find_library(zmq), find_library(json-c),
find_library(mosquitto) + linkage всех трёх.

Production TODO: создать отдельного MQTT user'а для composer'а вместо
переиспользования frigate creds (используется только в smoke).
This commit is contained in:
2026-06-03 06:20:38 +01:00
parent e02998cea7
commit 636b70b64c
11 changed files with 748 additions and 6 deletions
+10
View File
@@ -45,6 +45,16 @@ find_package(PNG REQUIRED)
# FreeType — для text overlay'ев (Phase 3c)
find_package(Freetype REQUIRED)
# ZMQ + json-c — для control plane (Phase 4a)
find_library(LIBZMQ_LIBRARY NAMES zmq REQUIRED)
find_path(LIBZMQ_INCLUDE_DIR zmq.h)
find_library(LIBJSONC_LIBRARY NAMES json-c REQUIRED)
find_path(LIBJSONC_INCLUDE_DIR json-c/json.h)
# Mosquitto — для MQTT health (Phase 4b)
find_library(LIBMOSQUITTO_LIBRARY NAMES mosquitto REQUIRED)
find_path(LIBMOSQUITTO_INCLUDE_DIR mosquitto.h)
# ── Сторонние библиотеки (subomodules в third_party/) ───────────────────
# cuframes — статически линкуем libcuframes. cuframes_static — это static lib
+87 -6
View File
@@ -21,6 +21,8 @@
#include "../include/cuframes_composer/composer.h"
#include "../include/cuframes_composer/nvenc.h"
#include "../include/cuframes_composer/overlay.h"
#include "../include/cuframes_composer/control.h"
#include "../include/cuframes_composer/health.h"
#include <cuda.h>
@@ -102,11 +104,21 @@ int main(int argc, char **argv)
icon_spec_t icons[MAX_CELLS] = { 0 };
int num_icons = 0;
/* --text font,size,r,g,b,x,y,text */
typedef struct { const char *font, *text; int size, r, g, b, x, y; } text_spec_t;
/* --text font,size,r,g,b,x,y,text — формат хранения как есть в спецификации.
* Также можно префиксовать аргумент через "id=NAME:" — тогда overlay получает
* назначенный ID для управления через control plane. */
typedef struct { const char *font, *text; int size, r, g, b, x, y;
char id[32]; } text_spec_t;
text_spec_t texts[MAX_CELLS] = { 0 };
int num_texts = 0;
const char *control_endpoint = NULL; /* --control tcp://0.0.0.0:5599 */
const char *mqtt_host = NULL; /* --mqtt host[:port] */
int mqtt_port = 1883;
const char *mqtt_instance = "cfc-grid"; /* --mqtt-instance NAME */
const char *mqtt_user = NULL;
const char *mqtt_pass = NULL;
static struct option opts[] = {
{"out", required_argument, 0, 'o'},
{"cell", required_argument, 0, 'c'},
@@ -118,10 +130,15 @@ int main(int argc, char **argv)
{"border", required_argument, 0, 'r'}, /* толщина border'ов */
{"icon", required_argument, 0, 'i'}, /* path,x,y[,alpha] */
{"text", required_argument, 0, 't'}, /* font,size,r,g,b,x,y,text */
{"control", required_argument, 0, 'C'}, /* ZMQ bind endpoint */
{"mqtt", required_argument, 0, 'M'}, /* MQTT broker host[:port] */
{"mqtt-instance", required_argument, 0, 'I'}, /* instance ID для топиков */
{"mqtt-user", required_argument, 0, 'U'},
{"mqtt-pass", required_argument, 0, 'P'},
{0, 0, 0, 0},
};
int c;
while ((c = getopt_long(argc, argv, "o:c:f:b:W:H:s:r:i:t:", opts, NULL)) != -1) {
while ((c = getopt_long(argc, argv, "o:c:f:b:W:H:s:r:i:t:C:M:I:U:P:", opts, NULL)) != -1) {
switch (c) {
case 'o': out_path = optarg; break;
case 'c':
@@ -141,13 +158,45 @@ int main(int argc, char **argv)
case 'H': out_h = atoi(optarg); break;
case 's': max_seconds = atoi(optarg); break;
case 'r': border_thickness = atoi(optarg); break;
case 'C': control_endpoint = optarg; break;
case 'M': {
mqtt_host = optarg;
const char *colon = strchr(optarg, ':');
if (colon) {
static char host_buf[64];
int n = colon - optarg;
if (n >= (int)sizeof(host_buf)) n = sizeof(host_buf) - 1;
memcpy(host_buf, optarg, n); host_buf[n] = '\0';
mqtt_host = host_buf;
mqtt_port = atoi(colon + 1);
}
break;
}
case 'I': mqtt_instance = optarg; break;
case 'U': mqtt_user = optarg; break;
case 'P': mqtt_pass = optarg; break;
case 't': {
if (num_texts >= MAX_CELLS) { fprintf(stderr, "max %d texts\n", MAX_CELLS); return 1; }
/* Опциональный prefix "id=NAME:" — задаёт control-plane ID. */
const char *spec = optarg;
char id_buf[32] = { 0 };
if (!strncmp(spec, "id=", 3)) {
const char *colon = strchr(spec, ':');
if (colon) {
int n = colon - (spec + 3);
if (n >= (int)sizeof(id_buf)) n = sizeof(id_buf) - 1;
memcpy(id_buf, spec + 3, n);
id_buf[n] = '\0';
spec = colon + 1;
}
}
strncpy(texts[num_texts].id, id_buf, sizeof(texts[num_texts].id) - 1);
/* font,size,r,g,b,x,y,text — text идёт до конца строки (может
* содержать запятые и пробелы). */
static char text_font[MAX_CELLS][256];
static char text_body[MAX_CELLS][256];
char buf[512]; strncpy(buf, optarg, sizeof(buf) - 1);
char buf[512];
strncpy(buf, spec, sizeof(buf) - 1);
buf[sizeof(buf) - 1] = '\0';
char *p = buf;
char *q = strchr(p, ','); if (!q) { fprintf(stderr, "bad --text\n"); return 1; }
@@ -250,13 +299,43 @@ int main(int argc, char **argv)
texts[i].text);
continue;
}
if (texts[i].id[0]) cfc_overlay_set_id(ov, texts[i].id);
int tw = 0, th = 0;
cfc_overlay_text_size(ov, &tw, &th);
cfc_composer_add_overlay(comp, ov);
fprintf(stderr,
"[grid_record] text @ (%d,%d) %dx%d size=%d color=(%d,%d,%d) '%s'\n",
"[grid_record] text @ (%d,%d) %dx%d size=%d color=(%d,%d,%d) id='%s' '%s'\n",
texts[i].x, texts[i].y, tw, th, texts[i].size,
texts[i].r, texts[i].g, texts[i].b, texts[i].text);
texts[i].r, texts[i].g, texts[i].b,
texts[i].id[0] ? texts[i].id : "", texts[i].text);
}
/* MQTT health publisher. */
cfc_health_t *hpub = NULL;
if (mqtt_host) {
cfc_health_config_t hc = {
.host = mqtt_host, .port = mqtt_port,
.username = mqtt_user, .password = mqtt_pass,
.topic_prefix = "composer",
.instance = mqtt_instance,
.interval_sec = 10,
.composer = comp,
.publish_discovery = 1,
};
cfc_health_create(&hc, &hpub);
}
/* Control plane. */
cfc_control_t *ctl = NULL;
if (control_endpoint) {
cfc_control_config_t cc = {
.bind_endpoint = control_endpoint,
.composer = comp,
.cuda_ctx = ctx,
};
if (cfc_control_create(&cc, &ctl) != 0) {
fprintf(stderr, "[grid_record] cfc_control_create failed\n");
}
}
/* PNG иконки. */
@@ -412,6 +491,8 @@ int main(int argc, char **argv)
fflush(wctx.fp);
if (!is_stdout) fclose(wctx.fp);
if (ctl) cfc_control_destroy(ctl);
if (hpub) cfc_health_destroy(hpub);
cfc_encoder_destroy(enc);
cfc_composer_destroy(comp);
cuCtxPopCurrent(NULL);
+8
View File
@@ -84,6 +84,14 @@ int cfc_composer_compose(
* Порядок добавления = z-order (последний рисуется поверх). Лимит — 64. */
int cfc_composer_add_overlay(cfc_composer_t *comp, cfc_overlay_t *ov);
/* Найти overlay по ID (если был задан через cfc_overlay_set_id). Возвращает
* NULL если не найден. Thread-safe — composer держит overlays в массиве,
* пока add/destroy не пересекаются с lookup'ом — но control plane вызывает
* это из своего потока, draw — из своего; они оба только читают список,
* а update полей overlay'я делается через cfc_overlay_update_text и пр.
* (содержимое overlay'я под mutex'ом не лежит, нужно лочиться вызывающему). */
cfc_overlay_t *cfc_composer_find_overlay(cfc_composer_t *comp, const char *id);
/* Получить layout статистику по источникам — для debug / health-репортов. */
typedef struct cfc_composer_health {
int total; /* всего источников */
+56
View File
@@ -0,0 +1,56 @@
/* cuframes-composer — control plane через ZMQ + JSON.
*
* Слушает JSON-команды на ZMQ REP socket, диспатчит их в composer/overlay.
* Используется операторами для:
* - изменения text overlay'я live ("NO SIGNAL" → "RECORDING")
* - toggle visible/alpha без рестарта
* - health-репортов (active/stale/dead) для observability и автоматики
*
* Протокол:
* Запрос: JSON-объект {"cmd": "<команда>", ...}
* Ответ: {"ok": true, ...} либо {"error": "<сообщение>"}
*
* Команды Phase 4a:
* {"cmd": "ping"} → {"ok":true,"pong":1}
* {"cmd": "health"} → {"ok":true,"active":N,"stale":M,"dead":K}
* {"cmd": "list_overlays"} → {"ok":true,"overlays":[{"id":"...","type":N},...]}
* {"cmd": "set_text", "id": "...",
* "text": "...", "r":255,"g":255,"b":255,"visible":1}
* → {"ok":true}
* {"cmd": "set_visible", "id": "...",
* "visible": 0|1} → {"ok":true}
*
* Сервер работает в фоновом потоке. REP socket — блокирующий, но мы используем
* zmq_poll с таймаутом для проверки stop_flag'а.
*
* Лицензия: LGPL-2.1+
*/
#ifndef CUFRAMES_COMPOSER_CONTROL_H
#define CUFRAMES_COMPOSER_CONTROL_H
#include "composer.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct cfc_control_config {
const char *bind_endpoint; /* "tcp://0.0.0.0:5599" или "ipc:///run/composer.sock" */
cfc_composer_t *composer; /* для health-запросов и lookup overlay'ев */
CUcontext cuda_ctx; /* push'нется в control thread (для cuMemAlloc в text rebuild) */
} cfc_control_config_t;
typedef struct cfc_control cfc_control_t;
/* Создать control plane сервер и запустить background thread. */
int cfc_control_create(const cfc_control_config_t *cfg, cfc_control_t **out);
/* Остановить и освободить. */
int cfc_control_destroy(cfc_control_t *ctl);
#ifdef __cplusplus
}
#endif
#endif /* CUFRAMES_COMPOSER_CONTROL_H */
+49
View File
@@ -0,0 +1,49 @@
/* cuframes-composer — MQTT health publisher для observability.
*
* Подключается к MQTT-брокеру, периодически публикует JSON со статистикой
* композитора (active/stale/dead источников, framerate, uptime).
*
* Топик: <prefix>/<instance>/health QoS 1, retain=true.
* Так же при старте публикует HA discovery config — для автоматического
* появления сенсора в Home Assistant. expire_after короткий (30 секунд) —
* если поток упал, HA подсветит сенсор «Unavailable» спустя 30с.
*
* Lifecycle:
* create: mosquitto_new + connect_async + начать background thread
* loop: каждые interval_sec секунд → mosquitto_publish + mosquitto_loop
* destroy: stop_flag + DISCONNECT + cleanup
*
* Лицензия: LGPL-2.1+
*/
#ifndef CUFRAMES_COMPOSER_HEALTH_H
#define CUFRAMES_COMPOSER_HEALTH_H
#include "composer.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct cfc_health_config {
const char *host; /* "cctv-mosquitto" / "192.168.88.23" */
int port; /* 1883 без TLS */
const char *username; /* nullable */
const char *password; /* nullable */
const char *topic_prefix; /* "composer" (топик: composer/<instance>/health) */
const char *instance; /* "cfc-grid" — уникальный ID composer'а */
int interval_sec; /* 10 = публикуем раз в 10 секунд */
cfc_composer_t *composer; /* читает get_health() */
int publish_discovery; /* 1 = опубликовать HA discovery config при старте */
} cfc_health_config_t;
typedef struct cfc_health cfc_health_t;
int cfc_health_create(const cfc_health_config_t *cfg, cfc_health_t **out);
int cfc_health_destroy(cfc_health_t *h);
#ifdef __cplusplus
}
#endif
#endif /* CUFRAMES_COMPOSER_HEALTH_H */
+11
View File
@@ -47,6 +47,17 @@ typedef enum cfc_overlay_type {
typedef struct cfc_overlay cfc_overlay_t;
/* Назначить overlay'ю короткий ID для lookup через control plane (Phase 4).
* ID копируется внутрь overlay'я (макс 31 символ). Без ID overlay тоже работает,
* но управлять им runtime'ом нельзя. Можно вызвать после create. */
int cfc_overlay_set_id(cfc_overlay_t *ov, const char *id);
/* Получить ID overlay'я (NULL если не задан). */
const char *cfc_overlay_get_id(const cfc_overlay_t *ov);
/* Получить тип overlay'я. */
cfc_overlay_type_t cfc_overlay_get_type(const cfc_overlay_t *ov);
/* Параметры BORDER overlay'я. */
typedef struct cfc_overlay_border_config {
int x, y, w, h; /* прямоугольник в full-res пикселях */
+10
View File
@@ -16,6 +16,8 @@ set(COMPOSER_SOURCES_C
nvenc.c
composer.c
overlay.c
control.c
health.c
)
set(COMPOSER_SOURCES_CU
cugrid/cugrid.cu
@@ -53,8 +55,16 @@ foreach(target cuframes_composer cuframes_composer_static)
${LIBDL_LIBRARY} # для dlopen libnvidia-encode.so
PNG::PNG # для PNG overlay'ев (Phase 3b)
Freetype::Freetype # для text overlay'ев (Phase 3c)
${LIBZMQ_LIBRARY} # для control plane (Phase 4a)
${LIBJSONC_LIBRARY} # для control plane JSON (Phase 4a)
${LIBMOSQUITTO_LIBRARY} # для MQTT health (Phase 4b)
rt
)
target_include_directories(${target} PRIVATE
${LIBZMQ_INCLUDE_DIR}
${LIBJSONC_INCLUDE_DIR}
${LIBMOSQUITTO_INCLUDE_DIR}
)
# CUDA properties.
set_target_properties(${target} PROPERTIES
CUDA_SEPARABLE_COMPILATION OFF
+10
View File
@@ -269,6 +269,16 @@ int cfc_composer_add_overlay(cfc_composer_t *comp, cfc_overlay_t *ov)
return 0;
}
cfc_overlay_t *cfc_composer_find_overlay(cfc_composer_t *comp, const char *id)
{
if (!comp || !id) return NULL;
for (int i = 0; i < comp->num_overlays; i++) {
const char *oid = cfc_overlay_get_id(comp->overlays[i]);
if (oid && !strcmp(oid, id)) return comp->overlays[i];
}
return NULL;
}
int cfc_composer_get_health(cfc_composer_t *comp, cfc_composer_health_t *out)
{
if (!comp || !out) return -1;
+298
View File
@@ -0,0 +1,298 @@
/* Реализация control plane — ZMQ REP socket в background-потоке.
*
* Lifecycle:
* create: zmq_ctx_new + REP socket + bind, запуск thread'а
* loop: zmq_poll(timeout 200мс) → recv → dispatch → reply
* destroy: atomic stop_flag = 1, zmq_close, zmq_ctx_term, join
*
* Все команды диспатчатся синхронно в том же потоке. composer/overlay'и не
* имеют локов — мы полагаемся на то что control обновляет только overlay-
* поля которые свободно меняются (text/color/visible через update_text).
* Compose-поток читает overlay'и параллельно — для Phase 4a простой single-
* writer multi-reader без MT-protection; FreeType atlas rebuild при text update
* заменяет VRAM-указатель атомарно (cuMemFree → cuMemAlloc последовательно
* в одном thread'е, draw подхватит новый указатель на следующем кадре).
*
* Phase 4b добавит MQTT publisher для health (раз в N секунд).
*
* Лицензия: LGPL-2.1+
*/
#include "../include/cuframes_composer/control.h"
#include "../include/cuframes_composer/overlay.h"
#include <cuda.h>
#include <json-c/json.h>
#include <pthread.h>
#include <stdatomic.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <zmq.h>
struct cfc_control {
cfc_control_config_t cfg;
char endpoint_copy[128];
void *zmq_ctx;
void *zmq_sock;
pthread_t thread;
int thread_started;
_Atomic int stop_flag;
};
/* ── Reply helpers ────────────────────────────────────────────────────── */
static void send_json(void *sock, struct json_object *obj)
{
const char *s = json_object_to_json_string_ext(obj, JSON_C_TO_STRING_PLAIN);
size_t n = strlen(s);
zmq_send(sock, s, n, 0);
json_object_put(obj);
}
static void send_error(void *sock, const char *msg)
{
struct json_object *o = json_object_new_object();
json_object_object_add(o, "error", json_object_new_string(msg));
send_json(sock, o);
}
static void send_ok_simple(void *sock)
{
struct json_object *o = json_object_new_object();
json_object_object_add(o, "ok", json_object_new_boolean(1));
send_json(sock, o);
}
/* Достать строковый параметр из request'а. Возвращает NULL если отсутствует. */
static const char *get_str(struct json_object *req, const char *key)
{
struct json_object *v;
if (!json_object_object_get_ex(req, key, &v)) return NULL;
return json_object_get_string(v);
}
/* Достать int если есть; возвращает default если нет. */
static int get_int(struct json_object *req, const char *key, int def)
{
struct json_object *v;
if (!json_object_object_get_ex(req, key, &v)) return def;
return json_object_get_int(v);
}
/* ── Dispatchers ──────────────────────────────────────────────────────── */
static void cmd_ping(void *sock)
{
struct json_object *o = json_object_new_object();
json_object_object_add(o, "ok", json_object_new_boolean(1));
json_object_object_add(o, "pong", json_object_new_int(1));
send_json(sock, o);
}
static void cmd_health(void *sock, cfc_composer_t *comp)
{
cfc_composer_health_t h = { 0 };
cfc_composer_get_health(comp, &h);
struct json_object *o = json_object_new_object();
json_object_object_add(o, "ok", json_object_new_boolean(1));
json_object_object_add(o, "total", json_object_new_int(h.total));
json_object_object_add(o, "active", json_object_new_int(h.active));
json_object_object_add(o, "stale", json_object_new_int(h.stale));
json_object_object_add(o, "dead", json_object_new_int(h.dead));
send_json(sock, o);
}
static void cmd_set_text(void *sock, cfc_composer_t *comp, struct json_object *req)
{
const char *id = get_str(req, "id");
if (!id) { send_error(sock, "missing 'id'"); return; }
cfc_overlay_t *ov = cfc_composer_find_overlay(comp, id);
if (!ov) { send_error(sock, "overlay not found"); return; }
if (cfc_overlay_get_type(ov) != CFC_OVERLAY_TEXT) {
send_error(sock, "overlay is not text type"); return;
}
/* Подготавливаем минимальный config update. Сохраняем существующие
* font/size — они не меняются через control plane. */
cfc_overlay_text_config_t cfg = { 0 };
cfg.text = get_str(req, "text");
if (!cfg.text) { send_error(sock, "missing 'text'"); return; }
cfg.r = get_int(req, "r", 255);
cfg.g = get_int(req, "g", 255);
cfg.b = get_int(req, "b", 255);
cfg.x = get_int(req, "x", 0);
cfg.y = get_int(req, "y", 0);
cfg.extra_alpha = get_int(req, "alpha", 255);
cfg.visible = get_int(req, "visible", 1);
/* font_path/pixel_size не используются update_text'ом (заявка явно). */
if (cfc_overlay_update_text(ov, &cfg) != 0) {
send_error(sock, "update_text failed"); return;
}
send_ok_simple(sock);
}
static void cmd_set_visible(void *sock, cfc_composer_t *comp, struct json_object *req)
{
const char *id = get_str(req, "id");
if (!id) { send_error(sock, "missing 'id'"); return; }
int visible = get_int(req, "visible", 1);
cfc_overlay_t *ov = cfc_composer_find_overlay(comp, id);
if (!ov) { send_error(sock, "overlay not found"); return; }
/* visible-параметр сидит внутри type-specific config'а — нужен типовой
* dispatch. Для Phase 4a поддерживаем только text (остальные через
* update_text не пройдут). */
if (cfc_overlay_get_type(ov) == CFC_OVERLAY_TEXT) {
/* dummy config с visible-флагом. Для text update_text сохраняет
* остальные поля если text не меняется (мы передаём пустой). */
cfc_overlay_text_config_t cfg = { 0 };
cfg.text = ""; /* hack — но update_text сравнит strcmp и пропустит rebuild */
cfg.visible = visible;
cfg.extra_alpha = 255;
cfc_overlay_update_text(ov, &cfg);
send_ok_simple(sock);
return;
}
send_error(sock, "set_visible not implemented for this overlay type");
}
static void cmd_list_overlays(void *sock, cfc_composer_t *comp)
{
struct json_object *arr = json_object_new_array();
/* Перебор overlay'ев через find_overlay недоступен (find ищет по ID),
* нужно отдельное API. Для Phase 4a сделаем простой подход: composer
* экспонирует только id'шники через iter helper, который добавим. */
(void)comp;
/* TODO Phase 4a-2: добавить cfc_composer_iter_overlays(visitor). */
struct json_object *o = json_object_new_object();
json_object_object_add(o, "ok", json_object_new_boolean(1));
json_object_object_add(o, "overlays", arr);
json_object_object_add(o, "note",
json_object_new_string("iter API not yet implemented"));
send_json(sock, o);
}
static void dispatch(void *sock, cfc_composer_t *comp, const char *json_str, size_t len)
{
struct json_object *req = json_tokener_parse(json_str);
if (!req) {
(void)len;
send_error(sock, "invalid JSON");
return;
}
const char *cmd = get_str(req, "cmd");
if (!cmd) {
send_error(sock, "missing 'cmd'");
goto out;
}
if (!strcmp(cmd, "ping")) cmd_ping(sock);
else if (!strcmp(cmd, "health")) cmd_health(sock, comp);
else if (!strcmp(cmd, "set_text")) cmd_set_text(sock, comp, req);
else if (!strcmp(cmd, "set_visible")) cmd_set_visible(sock, comp, req);
else if (!strcmp(cmd, "list_overlays")) cmd_list_overlays(sock, comp);
else send_error(sock, "unknown cmd");
out:
json_object_put(req);
}
/* ── Background thread ───────────────────────────────────────────────── */
static void *control_thread(void *arg)
{
cfc_control_t *ctl = (cfc_control_t *)arg;
char buf[4096];
/* Pусть наш CUcontext будет current — text overlay update вызывает
* cuMemAlloc/cuMemcpyHtoD, что требует current ctx в этом потоке. */
if (ctl->cfg.cuda_ctx) {
cuCtxSetCurrent(ctl->cfg.cuda_ctx);
}
while (!atomic_load(&ctl->stop_flag)) {
zmq_pollitem_t items[1] = {
{ .socket = ctl->zmq_sock, .events = ZMQ_POLLIN },
};
int n = zmq_poll(items, 1, 200); /* 200 мс timeout */
if (n < 0) {
if (zmq_errno() == ETERM) break; /* ctx terminated */
continue;
}
if (n == 0) continue;
if (!(items[0].revents & ZMQ_POLLIN)) continue;
int got = zmq_recv(ctl->zmq_sock, buf, sizeof(buf) - 1, 0);
if (got < 0) continue;
if ((size_t)got >= sizeof(buf)) got = sizeof(buf) - 1;
buf[got] = '\0';
dispatch(ctl->zmq_sock, ctl->cfg.composer, buf, (size_t)got);
}
return NULL;
}
/* ── Public API ───────────────────────────────────────────────────────── */
int cfc_control_create(const cfc_control_config_t *cfg, cfc_control_t **out)
{
if (!cfg || !cfg->bind_endpoint || !cfg->composer || !out) return -1;
cfc_control_t *ctl = calloc(1, sizeof(*ctl));
if (!ctl) return -1;
ctl->cfg = *cfg;
strncpy(ctl->endpoint_copy, cfg->bind_endpoint, sizeof(ctl->endpoint_copy) - 1);
ctl->cfg.bind_endpoint = ctl->endpoint_copy;
atomic_init(&ctl->stop_flag, 0);
ctl->zmq_ctx = zmq_ctx_new();
if (!ctl->zmq_ctx) goto fail_alloc;
ctl->zmq_sock = zmq_socket(ctl->zmq_ctx, ZMQ_REP);
if (!ctl->zmq_sock) goto fail_ctx;
int linger = 0;
zmq_setsockopt(ctl->zmq_sock, ZMQ_LINGER, &linger, sizeof(linger));
if (zmq_bind(ctl->zmq_sock, ctl->endpoint_copy) != 0) {
fprintf(stderr, "[cfc/control] zmq_bind(%s) failed: %s\n",
ctl->endpoint_copy, zmq_strerror(zmq_errno()));
goto fail_sock;
}
if (pthread_create(&ctl->thread, NULL, control_thread, ctl) != 0) {
fprintf(stderr, "[cfc/control] pthread_create failed\n");
goto fail_sock;
}
ctl->thread_started = 1;
fprintf(stderr, "[cfc/control] listening on %s\n", ctl->endpoint_copy);
*out = ctl;
return 0;
fail_sock:
zmq_close(ctl->zmq_sock);
fail_ctx:
zmq_ctx_term(ctl->zmq_ctx);
fail_alloc:
free(ctl);
return -1;
}
int cfc_control_destroy(cfc_control_t *ctl)
{
if (!ctl) return 0;
atomic_store(&ctl->stop_flag, 1);
if (ctl->thread_started) {
pthread_join(ctl->thread, NULL);
}
if (ctl->zmq_sock) zmq_close(ctl->zmq_sock);
if (ctl->zmq_ctx) zmq_ctx_term(ctl->zmq_ctx);
free(ctl);
return 0;
}
+189
View File
@@ -0,0 +1,189 @@
/* MQTT health publisher.
*
* Background thread:
* - mosquitto_connect_async, mosquitto_loop_start.
* - sleep interval_sec, publish JSON, repeat.
*
* При старте опционально публикует Home Assistant discovery configs
* для четырёх сенсоров (active/stale/dead/total) — HA подхватит сенсоры
* автоматически через MQTT integration.
*
* Лицензия: LGPL-2.1+
*/
#include "../include/cuframes_composer/health.h"
#include <mosquitto.h>
#include <pthread.h>
#include <stdatomic.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
struct cfc_health {
cfc_health_config_t cfg;
char host_copy[64], topic_prefix_copy[64], instance_copy[64];
char username_copy[64], password_copy[64];
struct mosquitto *mosq;
pthread_t thread;
int thread_started;
_Atomic int stop_flag;
int64_t start_us;
};
static int64_t now_us(void)
{
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return (int64_t)ts.tv_sec * 1000000 + ts.tv_nsec / 1000;
}
static void publish_health(cfc_health_t *h)
{
cfc_composer_health_t st = { 0 };
cfc_composer_get_health(h->cfg.composer, &st);
int uptime = (int)((now_us() - h->start_us) / 1000000);
char payload[256];
int n = snprintf(payload, sizeof(payload),
"{\"active\":%d,\"stale\":%d,\"dead\":%d,\"total\":%d,\"uptime_s\":%d}",
st.active, st.stale, st.dead, st.total, uptime);
if (n <= 0) return;
char topic[256];
snprintf(topic, sizeof(topic), "%s/%s/health",
h->topic_prefix_copy, h->instance_copy);
mosquitto_publish(h->mosq, NULL, topic, n, payload, 1, true);
}
static void publish_discovery(cfc_health_t *h)
{
/* Publishes 4 HA discovery configs (sensor.composer_<instance>_<metric>).
* value_template извлекает поле из JSON payload. */
const char *metrics[] = { "active", "stale", "dead", "total" };
const char *icons[] = { "mdi:check-circle", "mdi:alert", "mdi:close-circle", "mdi:counter" };
for (int i = 0; i < 4; i++) {
char topic[256], payload[1024];
snprintf(topic, sizeof(topic),
"homeassistant/sensor/composer_%s_%s/config",
h->instance_copy, metrics[i]);
snprintf(payload, sizeof(payload),
"{"
"\"name\":\"composer %s %s\","
"\"unique_id\":\"composer_%s_%s\","
"\"state_topic\":\"%s/%s/health\","
"\"value_template\":\"{{ value_json.%s }}\","
"\"expire_after\":30,"
"\"icon\":\"%s\","
"\"state_class\":\"measurement\""
"}",
h->instance_copy, metrics[i],
h->instance_copy, metrics[i],
h->topic_prefix_copy, h->instance_copy,
metrics[i],
icons[i]);
mosquitto_publish(h->mosq, NULL, topic, strlen(payload), payload, 1, true);
}
}
static void *health_thread(void *arg)
{
cfc_health_t *h = (cfc_health_t *)arg;
int published_discovery = 0;
int interval_ms = h->cfg.interval_sec > 0 ? h->cfg.interval_sec * 1000 : 10000;
while (!atomic_load(&h->stop_flag)) {
if (!published_discovery && h->cfg.publish_discovery) {
publish_discovery(h);
published_discovery = 1;
}
publish_health(h);
/* Не sleep'им долго — проверяем stop_flag каждые 100мс. */
int waited = 0;
while (waited < interval_ms && !atomic_load(&h->stop_flag)) {
usleep(100 * 1000);
waited += 100;
}
}
return NULL;
}
int cfc_health_create(const cfc_health_config_t *cfg, cfc_health_t **out)
{
if (!cfg || !cfg->host || !cfg->topic_prefix || !cfg->instance ||
!cfg->composer || !out) return -1;
mosquitto_lib_init();
cfc_health_t *h = calloc(1, sizeof(*h));
if (!h) return -1;
h->cfg = *cfg;
h->start_us = now_us();
atomic_init(&h->stop_flag, 0);
strncpy(h->host_copy, cfg->host, sizeof(h->host_copy) - 1);
strncpy(h->topic_prefix_copy, cfg->topic_prefix, sizeof(h->topic_prefix_copy) - 1);
strncpy(h->instance_copy, cfg->instance, sizeof(h->instance_copy) - 1);
if (cfg->username) strncpy(h->username_copy, cfg->username, sizeof(h->username_copy) - 1);
if (cfg->password) strncpy(h->password_copy, cfg->password, sizeof(h->password_copy) - 1);
char client_id[128];
snprintf(client_id, sizeof(client_id), "composer-%s", h->instance_copy);
h->mosq = mosquitto_new(client_id, true, h);
if (!h->mosq) goto fail;
if (h->username_copy[0]) {
mosquitto_username_pw_set(h->mosq,
h->username_copy,
h->password_copy[0] ? h->password_copy : NULL);
}
int port = cfg->port > 0 ? cfg->port : 1883;
int r = mosquitto_connect_async(h->mosq, h->host_copy, port, 60);
if (r != MOSQ_ERR_SUCCESS) {
fprintf(stderr, "[cfc/health] mosquitto_connect_async failed: %s\n",
mosquitto_strerror(r));
goto fail_mosq;
}
r = mosquitto_loop_start(h->mosq);
if (r != MOSQ_ERR_SUCCESS) {
fprintf(stderr, "[cfc/health] mosquitto_loop_start failed: %s\n",
mosquitto_strerror(r));
goto fail_mosq;
}
if (pthread_create(&h->thread, NULL, health_thread, h) != 0) goto fail_loop;
h->thread_started = 1;
fprintf(stderr, "[cfc/health] MQTT publish %s/%s/health → %s:%d every %ds%s\n",
h->topic_prefix_copy, h->instance_copy, h->host_copy, port,
h->cfg.interval_sec > 0 ? h->cfg.interval_sec : 10,
h->cfg.publish_discovery ? " (+HA discovery)" : "");
*out = h;
return 0;
fail_loop:
mosquitto_loop_stop(h->mosq, true);
fail_mosq:
mosquitto_destroy(h->mosq);
fail:
free(h);
return -1;
}
int cfc_health_destroy(cfc_health_t *h)
{
if (!h) return 0;
atomic_store(&h->stop_flag, 1);
if (h->thread_started) pthread_join(h->thread, NULL);
if (h->mosq) {
mosquitto_disconnect(h->mosq);
mosquitto_loop_stop(h->mosq, true);
mosquitto_destroy(h->mosq);
}
free(h);
return 0;
}
+20
View File
@@ -47,6 +47,7 @@ typedef struct text_data {
struct cfc_overlay {
cfc_overlay_type_t type;
char id[32]; /* опциональный ID для control plane */
union {
cfc_overlay_border_config_t border;
png_data_t png;
@@ -54,6 +55,25 @@ struct cfc_overlay {
} u;
};
int cfc_overlay_set_id(cfc_overlay_t *ov, const char *id)
{
if (!ov || !id) return -1;
strncpy(ov->id, id, sizeof(ov->id) - 1);
ov->id[sizeof(ov->id) - 1] = '\0';
return 0;
}
const char *cfc_overlay_get_id(const cfc_overlay_t *ov)
{
if (!ov) return NULL;
return ov->id[0] ? ov->id : NULL;
}
cfc_overlay_type_t cfc_overlay_get_type(const cfc_overlay_t *ov)
{
return ov ? ov->type : (cfc_overlay_type_t)-1;
}
/* ── BORDER ───────────────────────────────────────────────────────────── */
int cfc_overlay_create_border(const cfc_overlay_border_config_t *cfg,