Phase 7 task #190: Frigate→cfc-grid overlay через MQTT subscribe

Robust pattern из docs/RESEARCH-frigate-overlay-integration.md:
композитор сам подписывается на frigate/events, парсит JSON, рисует
bounding rectangles в управляемый overlay-список с TTL. Никаких
sidecar'ов — root cause "перестало обновляться" в старом
cuda-grid-controller'е был именно two-hop ZMQ + state leak.

Содержимое:

- include/cuframes_composer/overlay.h — новый тип CFC_OVERLAY_DETECTION_BOXES
  + cfc_overlay_detbox_config_t (camera_key, detect_w/h, cell rect,
  thickness, color, TTL stale_ms) + API _create / _upsert / _end /
  _camera_key.
- src/overlay.c — реализация: массив 16 active детектов под mutex,
  draw как 4 fill_nv12 на каждый valid box (TTL фильтрация по now-update).
  Coordinate mapping detect→cell в момент draw — layout switch безопасен.
- include/cuframes_composer/frigate_mqtt.h + src/frigate_mqtt.c — MQTT
  subscriber на libmosquitto. Parses Frigate event JSON через json-c
  (after.camera, after.id, after.box, after.label). Lookup overlay по
  camera_key. Mosquitto reconnect_delay_set 1s→30s exp backoff,
  disconnect log rate-limited (раз в 30 сек).
- examples/grid_record.c — CLI флаги --frigate-mqtt host[:port],
  --frigate-topic frigate/events, --detection-cell key,camera,dx,dy,
  dw,dh,detect_w,detect_h. По одному --detection-cell создаётся overlay
  + регистрируется в MQTT subscriber.
- src/CMakeLists.txt — добавлен frigate_mqtt.c.

Параллельная подготовка на Frigate стороне:
- Frigate config /home/claude/cctv/frigate-config/config.yml — добавлен
  parking_overview.objects.filters.{car,person,bicycle,motorcycle}.
  required_zones=[canopy,parking_zone,private_area]. Это фильтрация
  на tracker уровне (review.required_zones влиял только на Frigate
  Review system). После: ~2 события/сутки реально с parking, вместо
  1000+ от проезжей части.

Live-test:
- Synthetic mosquitto_pub frigate/events с box [200,200,400,360]
  в detect 640×480 → cfc-grid render'ит зелёную рамку в parking cell
  правильной геометрии (300,225, 300×180 в 1920×1080 output).
- TTL 8 сек — рамка пропадает если update не пришёл.
- restarts=0, 12650 кадров stable 25 fps — frigate_mqtt thread
  не блокирует video.

Image gx/cuframes-composer:0.7 deployed.

Чеклист защиты от "перестало обновляться" (из research §6.4):
 MQTT auto-reconnect через mosquitto built-in backoff
 TTL независим от end event (страховка от потери MQTT)
 State полностью в composer-процессе (один источник истины)
 Mapping coords в момент draw (layout switch safe)
 Frigate-фильтр на источнике (objects.filters.required_zones)
 Health-метрики (frigate_mqtt_connected, events_received) — TODO
This commit is contained in:
2026-06-03 15:28:52 +01:00
parent fa6ab3069a
commit 17261377cb
6 changed files with 678 additions and 1 deletions
+101 -1
View File
@@ -25,6 +25,7 @@
#include "../include/cuframes_composer/health.h" #include "../include/cuframes_composer/health.h"
#include "../include/cuframes_composer/writer.h" #include "../include/cuframes_composer/writer.h"
#include "../include/cuframes_composer/audio.h" #include "../include/cuframes_composer/audio.h"
#include "../include/cuframes_composer/frigate_mqtt.h"
#include <cuda.h> #include <cuda.h>
@@ -122,6 +123,18 @@ int main(int argc, char **argv)
const char *mqtt_pass = NULL; const char *mqtt_pass = NULL;
const char *out_format = "h264"; /* --format h264|mpegts */ const char *out_format = "h264"; /* --format h264|mpegts */
const char *audio_source = NULL; /* --audio-source rtsp://.../live-audio */ const char *audio_source = NULL; /* --audio-source rtsp://.../live-audio */
const char *frigate_mqtt_host = NULL;
int frigate_mqtt_port = 1883;
const char *frigate_topic = "frigate/events";
/* --detection-cell key,camera,dx,dy,dw,dh,detect_w,detect_h
* key — символьное имя для логов (например "parking")
* camera — Frigate camera_key для MQTT match'а ("parking_overview")
* dx,dy,dw,dh — координаты ячейки composer'а на output frame
* detect_w,detect_h — Frigate detect.{width,height} (640,480) */
typedef struct { char key[32], camera[48];
int dx, dy, dw, dh, detect_w, detect_h; } detcell_t;
detcell_t detcells[MAX_CELLS] = { 0 };
int num_detcells = 0;
static struct option opts[] = { static struct option opts[] = {
{"out", required_argument, 0, 'o'}, {"out", required_argument, 0, 'o'},
@@ -142,10 +155,13 @@ int main(int argc, char **argv)
{"intra-refresh", no_argument, 0, 'R'}, {"intra-refresh", no_argument, 0, 'R'},
{"format", required_argument, 0, 'F'}, /* h264|mpegts */ {"format", required_argument, 0, 'F'}, /* h264|mpegts */
{"audio-source", required_argument, 0, 'A'}, /* RTSP audio URL */ {"audio-source", required_argument, 0, 'A'}, /* RTSP audio URL */
{"frigate-mqtt", required_argument, 0, 'G'}, /* host[:port] */
{"frigate-topic", required_argument, 0, 'T'},
{"detection-cell", required_argument, 0, 'D'},
{0, 0, 0, 0}, {0, 0, 0, 0},
}; };
int c; int c;
while ((c = getopt_long(argc, argv, "o:c:f:b:W:H:s:r:i:t:C:M:I:U:P:RF:A:", opts, NULL)) != -1) { while ((c = getopt_long(argc, argv, "o:c:f:b:W:H:s:r:i:t:C:M:I:U:P:RF:A:G:T:D:", opts, NULL)) != -1) {
switch (c) { switch (c) {
case 'o': out_path = optarg; break; case 'o': out_path = optarg; break;
case 'c': case 'c':
@@ -185,6 +201,42 @@ int main(int argc, char **argv)
case 'R': intra_refresh = 1; break; case 'R': intra_refresh = 1; break;
case 'F': out_format = optarg; break; case 'F': out_format = optarg; break;
case 'A': audio_source = optarg; break; case 'A': audio_source = optarg; break;
case 'G': {
frigate_mqtt_host = optarg;
const char *colon = strchr(optarg, ':');
if (colon) {
static char buf[64];
int n = colon - optarg;
if (n >= (int)sizeof(buf)) n = sizeof(buf) - 1;
memcpy(buf, optarg, n); buf[n] = '\0';
frigate_mqtt_host = buf;
frigate_mqtt_port = atoi(colon + 1);
}
break;
}
case 'T': frigate_topic = optarg; break;
case 'D': {
if (num_detcells >= MAX_CELLS) { fprintf(stderr, "max %d detcells\n", MAX_CELLS); return 1; }
char buf[256]; strncpy(buf, optarg, sizeof(buf) - 1); buf[sizeof(buf)-1] = '\0';
char *p = buf, *q;
q = strchr(p, ','); if (!q) { fprintf(stderr, "bad --detection-cell\n"); return 1; }
*q = '\0'; strncpy(detcells[num_detcells].key, p, 31); p = q + 1;
q = strchr(p, ','); if (!q) { fprintf(stderr, "bad --detection-cell\n"); return 1; }
*q = '\0'; strncpy(detcells[num_detcells].camera, p, 47); p = q + 1;
q = strchr(p, ','); if (!q) { fprintf(stderr, "bad --detection-cell\n"); return 1; }
*q = '\0'; detcells[num_detcells].dx = atoi(p); p = q + 1;
q = strchr(p, ','); if (!q) { fprintf(stderr, "bad --detection-cell\n"); return 1; }
*q = '\0'; detcells[num_detcells].dy = atoi(p); p = q + 1;
q = strchr(p, ','); if (!q) { fprintf(stderr, "bad --detection-cell\n"); return 1; }
*q = '\0'; detcells[num_detcells].dw = atoi(p); p = q + 1;
q = strchr(p, ','); if (!q) { fprintf(stderr, "bad --detection-cell\n"); return 1; }
*q = '\0'; detcells[num_detcells].dh = atoi(p); p = q + 1;
q = strchr(p, ','); if (!q) { fprintf(stderr, "bad --detection-cell\n"); return 1; }
*q = '\0'; detcells[num_detcells].detect_w = atoi(p); p = q + 1;
detcells[num_detcells].detect_h = atoi(p);
num_detcells++;
break;
}
case 't': { case 't': {
if (num_texts >= MAX_CELLS) { fprintf(stderr, "max %d texts\n", MAX_CELLS); return 1; } if (num_texts >= MAX_CELLS) { fprintf(stderr, "max %d texts\n", MAX_CELLS); return 1; }
/* Опциональный prefix "id=NAME:" — задаёт control-plane ID. */ /* Опциональный prefix "id=NAME:" — задаёт control-plane ID. */
@@ -349,6 +401,53 @@ int main(int argc, char **argv)
} }
} }
/* Detection-box overlay'и (Phase 7 task #190). По одному на каждый
* --detection-cell. Цвет — насыщенный жёлто-зелёный (BT.709 limited). */
cfc_overlay_t *detbox_overlays[MAX_CELLS] = { 0 };
for (int i = 0; i < num_detcells; i++) {
cfc_overlay_detbox_config_t dc = {
.camera_key = detcells[i].camera,
.detect_w = detcells[i].detect_w,
.detect_h = detcells[i].detect_h,
.cell_x = detcells[i].dx, .cell_y = detcells[i].dy,
.cell_w = detcells[i].dw, .cell_h = detcells[i].dh,
.thickness = 6,
.color_y = 210, .color_u = 50, .color_v = 100, /* кислотно-зелёный */
.alpha = 240,
.stale_ms = 8000,
};
if (cfc_overlay_create_detection_boxes(&dc, &detbox_overlays[i]) != 0) {
fprintf(stderr, "[grid_record] detbox create failed для '%s'\n",
detcells[i].camera);
continue;
}
cfc_composer_add_overlay(comp, detbox_overlays[i]);
fprintf(stderr, "[grid_record] detbox '%s' → cell %s (%d,%d %dx%d), detect %dx%d\n",
detcells[i].camera, detcells[i].key,
detcells[i].dx, detcells[i].dy, detcells[i].dw, detcells[i].dh,
detcells[i].detect_w, detcells[i].detect_h);
}
/* Frigate MQTT subscriber (если задан --frigate-mqtt). */
cfc_frigate_mqtt_t *frigate = NULL;
if (frigate_mqtt_host && num_detcells > 0) {
cfc_frigate_mqtt_config_t fc = {
.host = frigate_mqtt_host, .port = frigate_mqtt_port,
.username = mqtt_user, .password = mqtt_pass,
.topic = frigate_topic,
};
if (cfc_frigate_mqtt_create(&fc, &frigate) == 0) {
for (int i = 0; i < num_detcells; i++) {
if (detbox_overlays[i]) {
cfc_frigate_mqtt_register_overlay(frigate, detbox_overlays[i]);
}
}
cfc_frigate_mqtt_start(frigate);
} else {
fprintf(stderr, "[grid_record] frigate_mqtt create failed\n");
}
}
/* PNG иконки. */ /* PNG иконки. */
for (int i = 0; i < num_icons; i++) { for (int i = 0; i < num_icons; i++) {
cfc_overlay_png_config_t pc = { cfc_overlay_png_config_t pc = {
@@ -557,6 +656,7 @@ int main(int argc, char **argv)
wctx.bytes_written / 1048576.0); wctx.bytes_written / 1048576.0);
cfc_writer_close(wctx.writer); cfc_writer_close(wctx.writer);
if (frigate) cfc_frigate_mqtt_destroy(frigate);
if (audio) cfc_audio_destroy(audio); if (audio) cfc_audio_destroy(audio);
if (ctl) cfc_control_destroy(ctl); if (ctl) cfc_control_destroy(ctl);
if (hpub) cfc_health_destroy(hpub); if (hpub) cfc_health_destroy(hpub);
+52
View File
@@ -0,0 +1,52 @@
/* cuframes-composer — Frigate MQTT subscriber для detection overlay'ев.
*
* Subscribe на frigate/events topic, парсит JSON, lookup'ит overlay по
* camera_key, вызывает cfc_overlay_detbox_upsert / _end. Сам цикл и
* reconnect handle'ит mosquitto_loop_forever (паттерн отработан).
*
* Lifecycle:
* register_overlay(ov) для каждого CFC_OVERLAY_DETECTION_BOXES (composer
* map'ит camera_key → overlay внутри себя)
* create() → connect_async + loop_start → background thread
* destroy() → disconnect + loop_stop
*
* Лицензия: LGPL-2.1+
*/
#ifndef CUFRAMES_COMPOSER_FRIGATE_MQTT_H
#define CUFRAMES_COMPOSER_FRIGATE_MQTT_H
#include "overlay.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef struct cfc_frigate_mqtt_config {
const char *host; /* "cctv-mosquitto" */
int port; /* 1883 */
const char *username; /* MQTT user — обычно тот же, что у health */
const char *password;
const char *topic; /* default "frigate/events" */
} cfc_frigate_mqtt_config_t;
typedef struct cfc_frigate_mqtt cfc_frigate_mqtt_t;
int cfc_frigate_mqtt_create(const cfc_frigate_mqtt_config_t *cfg,
cfc_frigate_mqtt_t **out);
/* Зарегистрировать overlay для определённой камеры. Composer вызывает это
* для каждого CFC_OVERLAY_DETECTION_BOXES overlay'я до connect'а. */
int cfc_frigate_mqtt_register_overlay(cfc_frigate_mqtt_t *f,
cfc_overlay_t *ov);
/* Запустить connect + loop_start (вызывать ПОСЛЕ всех register_overlay'ев). */
int cfc_frigate_mqtt_start(cfc_frigate_mqtt_t *f);
int cfc_frigate_mqtt_destroy(cfc_frigate_mqtt_t *f);
#ifdef __cplusplus
}
#endif
#endif /* CUFRAMES_COMPOSER_FRIGATE_MQTT_H */
+56
View File
@@ -43,6 +43,7 @@ typedef enum cfc_overlay_type {
CFC_OVERLAY_BORDER = 0, CFC_OVERLAY_BORDER = 0,
CFC_OVERLAY_PNG = 1, /* Phase 3b */ CFC_OVERLAY_PNG = 1, /* Phase 3b */
CFC_OVERLAY_TEXT = 2, /* Phase 3c */ CFC_OVERLAY_TEXT = 2, /* Phase 3c */
CFC_OVERLAY_DETECTION_BOXES = 3, /* Phase 7 — Frigate-driven rectangles */
} cfc_overlay_type_t; } cfc_overlay_type_t;
typedef struct cfc_overlay cfc_overlay_t; typedef struct cfc_overlay cfc_overlay_t;
@@ -121,6 +122,61 @@ int cfc_overlay_update_text(cfc_overlay_t *ov,
/* Получить ширину/высоту текущего рендеренного текста (в пикселях). */ /* Получить ширину/высоту текущего рендеренного текста (в пикселях). */
int cfc_overlay_text_size(cfc_overlay_t *ov, int *width, int *height); int cfc_overlay_text_size(cfc_overlay_t *ov, int *width, int *height);
/* ── DETECTION_BOXES (Phase 7) ─────────────────────────────────────────
* Managed-список border'ов для bbox'ов от Frigate. Один overlay соответствует
* одной композитор-ячейке (одной камере). MQTT-subscriber (отдельный thread)
* вызывает _upsert при каждом frigate/events update и _end при event завершён.
* draw_detection_boxes итерирует по active box'ам, проверяет TTL, рисует
* рамку с координатным mapping'ом detect → cell.
*
* Координаты Frigate отдаются в detect-разрешении камеры (640×480 для
* parking_overview), composer ячейка обычно 960×540 на 1080p output → нужен
* линейный scale. Mapping вынесен внутрь draw, не кэшируется — layout switch
* безопасен. */
typedef struct cfc_overlay_detbox_config {
/* Какая Frigate-камера → какая ячейка composer'а. */
const char *camera_key; /* "parking_overview" — для lookup'а */
int detect_w, detect_h; /* Frigate detect.{width,height} */
int cell_x, cell_y, cell_w, cell_h; /* куда мапится на output frame */
/* Стиль рамки. */
int thickness; /* px (4-8 рекомендуется) */
int color_y, color_u, color_v; /* BT.709 limited (Y=16-235, UV=16-240) */
int alpha; /* 0..255 */
/* TTL — независимая страховка от потери MQTT events.
* Если последний update был > stale_ms назад — рамка пропадает.
* Рекомендуется 5000-8000 мс (Frigate publish раз в минуту для stationary,
* меньше при движении). */
int stale_ms;
} cfc_overlay_detbox_config_t;
int cfc_overlay_create_detection_boxes(
const cfc_overlay_detbox_config_t *cfg,
cfc_overlay_t **out
);
/* Получить camera_key из overlay'я — для MQTT subscriber'а чтобы найти
* правильный overlay по incoming event'у. */
const char *cfc_overlay_detbox_camera_key(cfc_overlay_t *ov);
/* Upsert одного active детекта.
* event_id — идентификатор Frigate event'а (для трекинга/end).
* label — "car", "person", и т.п. (для будущего цветового кодирования).
* x1/y1/x2/y2 — bbox в detect-разрешении (raw Frigate coords).
* frame_time_ms — Frigate frame_time для TTL.
* Thread-safe (mutex внутри). */
int cfc_overlay_detbox_upsert(
cfc_overlay_t *ov,
const char *event_id,
const char *label,
int x1, int y1, int x2, int y2,
int64_t frame_time_ms
);
/* End event'а — удалить рамку (вызывается при `type == "end"` в Frigate). */
int cfc_overlay_detbox_end(cfc_overlay_t *ov, const char *event_id);
/* Обновить параметры BORDER overlay'я (можно переключить visible, /* Обновить параметры BORDER overlay'я (можно переключить visible,
* сменить цвет, изменить позицию). Thread-safe? Нет — caller должен сам * сменить цвет, изменить позицию). Thread-safe? Нет — caller должен сам
* заботиться о том, чтобы update не пересекался с draw. В рамках одного * заботиться о том, чтобы update не пересекался с draw. В рамках одного
+1
View File
@@ -20,6 +20,7 @@ set(COMPOSER_SOURCES_C
health.c health.c
writer.c writer.c
audio.c audio.c
frigate_mqtt.c
) )
set(COMPOSER_SOURCES_CU set(COMPOSER_SOURCES_CU
cugrid/cugrid.cu cugrid/cugrid.cu
+266
View File
@@ -0,0 +1,266 @@
/* Frigate MQTT subscriber для detection overlay'ев.
*
* Subscribe'ится на `frigate/events` (default), парсит JSON каждого msg'а,
* мapping ov.camera_key → overlay → upsert/end.
*
* Robust pattern (защита от "перестало обновляться"):
* - mosquitto reconnect_on_failure=true + loop_start (auto-reconnect)
* - log every disconnect с rate-limit (раз в 30 сек)
* - TTL на каждый box overlay'я — независим от прихода `end`
* - State полностью в composer-процессе (один источник истины)
*
* Лицензия: LGPL-2.1+
*/
#include "../include/cuframes_composer/frigate_mqtt.h"
#include <json-c/json.h>
#include <mosquitto.h>
#include <pthread.h>
#include <stdatomic.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#define CFC_FRIGATE_MAX_OVERLAYS 32
struct cfc_frigate_mqtt {
cfc_frigate_mqtt_config_t cfg;
char host_copy[64], topic_copy[128];
char username_copy[64], password_copy[64];
struct mosquitto *mosq;
_Atomic int connected;
/* Registered overlays — lookup камера → overlay. Pre-connect register only,
* после connect read-only — без mutex'а. */
cfc_overlay_t *overlays[CFC_FRIGATE_MAX_OVERLAYS];
int num_overlays;
/* Stats для health. */
_Atomic int64_t events_received;
_Atomic int64_t parse_errors;
int64_t last_disconnect_log_ms;
};
static int64_t now_ms(void)
{
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return (int64_t)ts.tv_sec * 1000 + ts.tv_nsec / 1000000;
}
/* Найти зарегистрированный overlay по camera_key. */
static cfc_overlay_t *lookup_overlay(cfc_frigate_mqtt_t *f, const char *camera_key)
{
if (!camera_key) return NULL;
for (int i = 0; i < f->num_overlays; i++) {
const char *k = cfc_overlay_detbox_camera_key(f->overlays[i]);
if (k && !strcmp(k, camera_key)) return f->overlays[i];
}
return NULL;
}
/* Распарсить one Frigate-event JSON.
*
* Frigate 0.16 format:
* {
* "type": "new" | "update" | "end",
* "before": { ... }, "after": { ... }
* }
* .after.camera = "parking_overview"
* .after.id = event_id (стабильный через все update'ы)
* .after.label = "car" | "person" | ...
* .after.box = [x1, y1, x2, y2] в detect-разрешении
* .after.frame_time = float seconds
*/
static void parse_event(cfc_frigate_mqtt_t *f, const char *payload)
{
struct json_object *root = json_tokener_parse(payload);
if (!root) {
atomic_fetch_add(&f->parse_errors, 1);
return;
}
struct json_object *jtype = NULL, *jafter = NULL;
json_object_object_get_ex(root, "type", &jtype);
json_object_object_get_ex(root, "after", &jafter);
if (!jafter) { json_object_put(root); return; }
const char *type = jtype ? json_object_get_string(jtype) : "update";
struct json_object *jcam = NULL, *jid = NULL, *jlabel = NULL, *jbox = NULL,
*jft = NULL;
json_object_object_get_ex(jafter, "camera", &jcam);
json_object_object_get_ex(jafter, "id", &jid);
json_object_object_get_ex(jafter, "label", &jlabel);
json_object_object_get_ex(jafter, "box", &jbox);
json_object_object_get_ex(jafter, "frame_time", &jft);
if (!jcam || !jid) { json_object_put(root); return; }
const char *camera = json_object_get_string(jcam);
const char *event_id = json_object_get_string(jid);
cfc_overlay_t *ov = lookup_overlay(f, camera);
if (!ov) { json_object_put(root); return; } /* не наша камера */
atomic_fetch_add(&f->events_received, 1);
if (!strcmp(type, "end")) {
cfc_overlay_detbox_end(ov, event_id);
json_object_put(root);
return;
}
/* new или update — нужен box. */
if (!jbox || !json_object_is_type(jbox, json_type_array)) {
json_object_put(root);
return;
}
int boxlen = (int)json_object_array_length(jbox);
if (boxlen < 4) { json_object_put(root); return; }
int x1 = json_object_get_int(json_object_array_get_idx(jbox, 0));
int y1 = json_object_get_int(json_object_array_get_idx(jbox, 1));
int x2 = json_object_get_int(json_object_array_get_idx(jbox, 2));
int y2 = json_object_get_int(json_object_array_get_idx(jbox, 3));
const char *label = jlabel ? json_object_get_string(jlabel) : "";
double frame_time = jft ? json_object_get_double(jft) : 0.0;
cfc_overlay_detbox_upsert(ov, event_id, label, x1, y1, x2, y2,
(int64_t)(frame_time * 1000));
json_object_put(root);
}
static void on_connect(struct mosquitto *m, void *user, int rc)
{
cfc_frigate_mqtt_t *f = (cfc_frigate_mqtt_t *)user;
if (rc == 0) {
atomic_store(&f->connected, 1);
fprintf(stderr, "[cfc/frigate] connected, subscribe '%s'\n",
f->topic_copy);
mosquitto_subscribe(m, NULL, f->topic_copy, 0);
} else {
fprintf(stderr, "[cfc/frigate] connect failed: %s\n",
mosquitto_connack_string(rc));
}
}
static void on_disconnect(struct mosquitto *m, void *user, int rc)
{
(void)m;
cfc_frigate_mqtt_t *f = (cfc_frigate_mqtt_t *)user;
atomic_store(&f->connected, 0);
int64_t t = now_ms();
if (t - f->last_disconnect_log_ms > 30000) {
fprintf(stderr, "[cfc/frigate] disconnect: %s (rc=%d) — auto-reconnect\n",
mosquitto_strerror(rc), rc);
f->last_disconnect_log_ms = t;
}
}
static void on_message(struct mosquitto *m, void *user,
const struct mosquitto_message *msg)
{
(void)m;
cfc_frigate_mqtt_t *f = (cfc_frigate_mqtt_t *)user;
if (!msg || !msg->payload || msg->payloadlen <= 0) return;
/* Payload not NUL-terminated by mosquitto — копия в локальный буфер. */
char *buf = malloc(msg->payloadlen + 1);
if (!buf) return;
memcpy(buf, msg->payload, msg->payloadlen);
buf[msg->payloadlen] = '\0';
parse_event(f, buf);
free(buf);
}
int cfc_frigate_mqtt_create(const cfc_frigate_mqtt_config_t *cfg,
cfc_frigate_mqtt_t **out)
{
if (!cfg || !cfg->host || !out) return -1;
mosquitto_lib_init();
cfc_frigate_mqtt_t *f = calloc(1, sizeof(*f));
if (!f) return -1;
f->cfg = *cfg;
strncpy(f->host_copy, cfg->host, sizeof(f->host_copy) - 1);
strncpy(f->topic_copy, cfg->topic ? cfg->topic : "frigate/events",
sizeof(f->topic_copy) - 1);
if (cfg->username) strncpy(f->username_copy, cfg->username, sizeof(f->username_copy) - 1);
if (cfg->password) strncpy(f->password_copy, cfg->password, sizeof(f->password_copy) - 1);
atomic_init(&f->connected, 0);
atomic_init(&f->events_received, 0);
atomic_init(&f->parse_errors, 0);
char client_id[64];
snprintf(client_id, sizeof(client_id), "composer-frigate-%d", (int)now_ms());
f->mosq = mosquitto_new(client_id, true, f);
if (!f->mosq) goto fail;
if (f->username_copy[0]) {
mosquitto_username_pw_set(f->mosq, f->username_copy,
f->password_copy[0] ? f->password_copy : NULL);
}
mosquitto_connect_callback_set(f->mosq, on_connect);
mosquitto_disconnect_callback_set(f->mosq, on_disconnect);
mosquitto_message_callback_set(f->mosq, on_message);
/* reconnect delay 1s → 30s exponential (mosquitto built-in). */
mosquitto_reconnect_delay_set(f->mosq, 1, 30, true);
*out = f; /* register_overlay вызывается ДО connect_async */
return 0;
fail:
free(f);
return -1;
}
int cfc_frigate_mqtt_register_overlay(cfc_frigate_mqtt_t *f, cfc_overlay_t *ov)
{
if (!f || !ov) return -1;
if (f->num_overlays >= CFC_FRIGATE_MAX_OVERLAYS) return -1;
if (!cfc_overlay_detbox_camera_key(ov)) return -1; /* не detbox overlay */
f->overlays[f->num_overlays++] = ov;
return 0;
}
/* Start connection — отдельная функция, чтобы register_overlay'и были done
* до connect'а (хотя callback и так перерегистрирует subscription on each
* reconnect). Для simplicity — connect делаем lazy при первом register'е…
* Нет, чище — explicit start. */
int cfc_frigate_mqtt_start(cfc_frigate_mqtt_t *f)
{
if (!f) return -1;
int port = f->cfg.port > 0 ? f->cfg.port : 1883;
int r = mosquitto_connect_async(f->mosq, f->host_copy, port, 60);
if (r != MOSQ_ERR_SUCCESS) {
fprintf(stderr, "[cfc/frigate] connect_async failed: %s\n",
mosquitto_strerror(r));
return -1;
}
r = mosquitto_loop_start(f->mosq);
if (r != MOSQ_ERR_SUCCESS) {
fprintf(stderr, "[cfc/frigate] loop_start failed: %s\n",
mosquitto_strerror(r));
return -1;
}
fprintf(stderr, "[cfc/frigate] subscriber started: %s:%d topic=%s overlays=%d\n",
f->host_copy, port, f->topic_copy, f->num_overlays);
return 0;
}
int cfc_frigate_mqtt_destroy(cfc_frigate_mqtt_t *f)
{
if (!f) return 0;
if (f->mosq) {
mosquitto_disconnect(f->mosq);
mosquitto_loop_stop(f->mosq, true);
mosquitto_destroy(f->mosq);
}
free(f);
return 0;
}
+202
View File
@@ -21,10 +21,29 @@
#include <png.h> #include <png.h>
#include <ft2build.h> #include <ft2build.h>
#include FT_FREETYPE_H #include FT_FREETYPE_H
#include <pthread.h>
#include <stdint.h> #include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <string.h> #include <string.h>
#include <time.h>
/* Detection box — один active детект. event_id = key для upsert/end. */
#define CFC_DETBOX_MAX 16
typedef struct detbox_entry {
char event_id[48]; /* "" = slot пустой */
char label[16];
int x1, y1, x2, y2; /* raw detect coords */
int64_t last_update_ms; /* для TTL */
} detbox_entry_t;
typedef struct detbox_data {
cfc_overlay_detbox_config_t cfg;
char camera_key[64]; /* копия cfg.camera_key */
pthread_mutex_t mu;
detbox_entry_t entries[CFC_DETBOX_MAX];
int count;
} detbox_data_t;
typedef struct png_data { typedef struct png_data {
cfc_overlay_png_config_t cfg; cfc_overlay_png_config_t cfg;
@@ -52,9 +71,17 @@ struct cfc_overlay {
cfc_overlay_border_config_t border; cfc_overlay_border_config_t border;
png_data_t png; png_data_t png;
text_data_t text; text_data_t text;
detbox_data_t detbox;
} u; } u;
}; };
static int64_t now_ms(void)
{
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return (int64_t)ts.tv_sec * 1000 + ts.tv_nsec / 1000000;
}
int cfc_overlay_set_id(cfc_overlay_t *ov, const char *id) int cfc_overlay_set_id(cfc_overlay_t *ov, const char *id)
{ {
if (!ov || !id) return -1; if (!ov || !id) return -1;
@@ -538,6 +565,175 @@ static int draw_text(cfc_overlay_t *ov, CUstream stream,
t->cfg.extra_alpha); t->cfg.extra_alpha);
} }
/* ── DETECTION_BOXES (Phase 7) ────────────────────────────────────────── */
int cfc_overlay_create_detection_boxes(
const cfc_overlay_detbox_config_t *cfg, cfc_overlay_t **out)
{
if (!cfg || !cfg->camera_key || !out) return -1;
if (cfg->detect_w <= 0 || cfg->detect_h <= 0) return -1;
if (cfg->cell_w <= 0 || cfg->cell_h <= 0) return -1;
cfc_overlay_t *ov = calloc(1, sizeof(*ov));
if (!ov) return -1;
ov->type = CFC_OVERLAY_DETECTION_BOXES;
detbox_data_t *d = &ov->u.detbox;
d->cfg = *cfg;
strncpy(d->camera_key, cfg->camera_key, sizeof(d->camera_key) - 1);
d->cfg.camera_key = d->camera_key;
if (d->cfg.thickness <= 0) d->cfg.thickness = 4;
if (d->cfg.alpha <= 0) d->cfg.alpha = 230;
if (d->cfg.stale_ms <= 0) d->cfg.stale_ms = 6000;
pthread_mutex_init(&d->mu, NULL);
*out = ov;
return 0;
}
const char *cfc_overlay_detbox_camera_key(cfc_overlay_t *ov)
{
if (!ov || ov->type != CFC_OVERLAY_DETECTION_BOXES) return NULL;
return ov->u.detbox.camera_key;
}
int cfc_overlay_detbox_upsert(cfc_overlay_t *ov, const char *event_id,
const char *label,
int x1, int y1, int x2, int y2,
int64_t frame_time_ms)
{
if (!ov || ov->type != CFC_OVERLAY_DETECTION_BOXES) return -1;
if (!event_id) return -1;
(void)frame_time_ms; /* Frigate frame_time не используем — берём wallclock */
detbox_data_t *d = &ov->u.detbox;
pthread_mutex_lock(&d->mu);
/* Найти существующий слот по event_id или пустой. */
int slot = -1, first_empty = -1;
for (int i = 0; i < CFC_DETBOX_MAX; i++) {
if (!d->entries[i].event_id[0]) {
if (first_empty < 0) first_empty = i;
continue;
}
if (!strcmp(d->entries[i].event_id, event_id)) { slot = i; break; }
}
if (slot < 0) {
if (first_empty < 0) {
pthread_mutex_unlock(&d->mu);
return -1; /* overflow — все 16 слотов заняты, редкий случай */
}
slot = first_empty;
strncpy(d->entries[slot].event_id, event_id,
sizeof(d->entries[slot].event_id) - 1);
d->count++;
}
if (label) {
strncpy(d->entries[slot].label, label,
sizeof(d->entries[slot].label) - 1);
}
d->entries[slot].x1 = x1;
d->entries[slot].y1 = y1;
d->entries[slot].x2 = x2;
d->entries[slot].y2 = y2;
d->entries[slot].last_update_ms = now_ms();
pthread_mutex_unlock(&d->mu);
return 0;
}
int cfc_overlay_detbox_end(cfc_overlay_t *ov, const char *event_id)
{
if (!ov || ov->type != CFC_OVERLAY_DETECTION_BOXES) return -1;
if (!event_id) return -1;
detbox_data_t *d = &ov->u.detbox;
pthread_mutex_lock(&d->mu);
for (int i = 0; i < CFC_DETBOX_MAX; i++) {
if (d->entries[i].event_id[0] &&
!strcmp(d->entries[i].event_id, event_id)) {
d->entries[i].event_id[0] = '\0';
d->count--;
break;
}
}
pthread_mutex_unlock(&d->mu);
return 0;
}
static int draw_detection_boxes(cfc_overlay_t *ov,
CUstream stream,
CUdeviceptr dst_y, int pitch_y,
CUdeviceptr dst_uv, int pitch_uv,
int frame_w, int frame_h)
{
detbox_data_t *d = &ov->u.detbox;
int64_t cutoff = now_ms() - d->cfg.stale_ms;
/* Snapshot active boxes под mutex'ом — короткая критическая секция. */
typedef struct { int x1, y1, x2, y2; } box_t;
box_t snap[CFC_DETBOX_MAX];
int snap_n = 0;
pthread_mutex_lock(&d->mu);
for (int i = 0; i < CFC_DETBOX_MAX; i++) {
if (!d->entries[i].event_id[0]) continue;
if (d->entries[i].last_update_ms < cutoff) continue; /* TTL expired */
snap[snap_n++] = (box_t){
.x1 = d->entries[i].x1, .y1 = d->entries[i].y1,
.x2 = d->entries[i].x2, .y2 = d->entries[i].y2,
};
}
pthread_mutex_unlock(&d->mu);
if (snap_n == 0) return 0;
/* Coordinate mapping: detect → cell. Линейный scale + offset.
* detect_w/h не кэшированы — берутся из cfg в момент draw'а, layout
* switch не ломает. */
int dw = d->cfg.detect_w, dh = d->cfg.detect_h;
int cw = d->cfg.cell_w, ch = d->cfg.cell_h;
int cx = d->cfg.cell_x, cy = d->cfg.cell_y;
int t = d->cfg.thickness;
for (int i = 0; i < snap_n; i++) {
/* Map raw → cell pixel coords. */
int x = cx + snap[i].x1 * cw / dw;
int y = cy + snap[i].y1 * ch / dh;
int w = (snap[i].x2 - snap[i].x1) * cw / dw;
int h = (snap[i].y2 - snap[i].y1) * ch / dh;
/* Чётные координаты для NV12. */
x &= ~1; y &= ~1; w &= ~1; h &= ~1;
if (w <= 0 || h <= 0) continue;
/* Clamp to frame. */
if (x < 0) { w += x; x = 0; }
if (y < 0) { h += y; y = 0; }
if (x + w > frame_w) w = frame_w - x;
if (y + h > frame_h) h = frame_h - y;
if (w <= 0 || h <= 0) continue;
int tt = t;
if (tt * 2 > w) tt = w / 2;
if (tt * 2 > h) tt = h / 2;
if (tt < 2) tt = 2;
/* 4 fill_nv12 — top, bottom, left, right (та же кухня что border). */
cfc_cugrid_fill_nv12(stream, dst_y, pitch_y, dst_uv, pitch_uv,
x, y, w, tt,
d->cfg.color_y, d->cfg.color_u, d->cfg.color_v,
d->cfg.alpha);
cfc_cugrid_fill_nv12(stream, dst_y, pitch_y, dst_uv, pitch_uv,
x, y + h - tt, w, tt,
d->cfg.color_y, d->cfg.color_u, d->cfg.color_v,
d->cfg.alpha);
cfc_cugrid_fill_nv12(stream, dst_y, pitch_y, dst_uv, pitch_uv,
x, y + tt, tt, h - 2 * tt,
d->cfg.color_y, d->cfg.color_u, d->cfg.color_v,
d->cfg.alpha);
cfc_cugrid_fill_nv12(stream, dst_y, pitch_y, dst_uv, pitch_uv,
x + w - tt, y + tt, tt, h - 2 * tt,
d->cfg.color_y, d->cfg.color_u, d->cfg.color_v,
d->cfg.alpha);
}
return 0;
}
/* ── Public dispatch ─────────────────────────────────────────────────── */ /* ── Public dispatch ─────────────────────────────────────────────────── */
int cfc_overlay_draw(cfc_overlay_t *ov, int cfc_overlay_draw(cfc_overlay_t *ov,
@@ -557,6 +753,9 @@ int cfc_overlay_draw(cfc_overlay_t *ov,
case CFC_OVERLAY_TEXT: case CFC_OVERLAY_TEXT:
return draw_text(ov, stream, dst_y, pitch_y, dst_uv, pitch_uv, return draw_text(ov, stream, dst_y, pitch_y, dst_uv, pitch_uv,
frame_w, frame_h); frame_w, frame_h);
case CFC_OVERLAY_DETECTION_BOXES:
return draw_detection_boxes(ov, stream, dst_y, pitch_y, dst_uv, pitch_uv,
frame_w, frame_h);
} }
return -1; return -1;
} }
@@ -575,6 +774,9 @@ int cfc_overlay_destroy(cfc_overlay_t *ov)
free(td->text_owned); free(td->text_owned);
free(td->font_path_owned); free(td->font_path_owned);
} }
if (ov->type == CFC_OVERLAY_DETECTION_BOXES) {
pthread_mutex_destroy(&ov->u.detbox.mu);
}
free(ov); free(ov);
return 0; return 0;
} }