From fa6ab3069a10e99b72f5f30951d20bd14689a529 Mon Sep 17 00:00:00 2001 From: Evgeny Demchenko Date: Wed, 3 Jun 2026 14:29:56 +0100 Subject: [PATCH] =?UTF-8?q?Phase=207=20audio=20mixing=20=E2=80=94=20attemp?= =?UTF-8?q?t=20+=20rollback=20+=20lessons?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Несколько сессий попыток реализовать audio mixing в композитор'е. Не достигнуто sub-секундной latency со стабильным video+audio. Откатано на parallel mode (cfc-grid video-only, live от pipeline с audio). Полный набор выводов и pitfall'ов — docs/LESSONS-audio-mixing-attempts.md. Главные lesson'ы для будущей попытки: - mpegts mux libavformat авто-инсёртит h264_mp4toannexb BSF которому не нравится Annex-B + inline SPS/PPS — NVENC OUTPUT_SPSPPS per-frame ломает - SPSC ring drop newest при full, не oldest (consumer's domain) - av_new_packet (не av_malloc) для av_interleaved_write_frame ownership - Monotonic PTS на counter (frame_idx, total_samples) — не wallclock - mediamtx env-var path names не должны иметь '-' (parser limitation) - Default mediamtx ReadTimeout=10s короткий для burst write'ов Изменения в repo сохранены для будущей доработки: - src/writer.c — mpegts backend с audio stream support - src/audio.c — RTSP AAC consumer + lock-free SPSC ring - include/cuframes_composer/{writer,audio}.h — public API - examples/grid_record.c — --format=mpegts + --audio-source flags - include/cuframes_composer/composer.h — consumer_prefix field - docker/Dockerfile — libavformat-dev добавлен в builder/runtime cfc-grid composer стабильно работает на видео (substantially лучше монолитного pipeline'а с audio bag'ом). TV рекомендуется использовать rtsp://...:554/cfc-grid + опционально rtsp://...:554/live-audio parallel. --- CMakeLists.txt | 5 + docker/Dockerfile | 7 +- docs/LESSONS-audio-mixing-attempts.md | 288 +++++++++++++++++++++ examples/grid_record.c | 115 +++++++-- examples/simple_record.c | 48 ++-- include/cuframes_composer/audio.h | 66 +++++ include/cuframes_composer/composer.h | 7 + include/cuframes_composer/writer.h | 92 +++++++ src/CMakeLists.txt | 3 + src/audio.c | 333 ++++++++++++++++++++++++ src/composer.c | 4 +- src/nvenc.c | 4 + src/writer.c | 356 ++++++++++++++++++++++++++ 13 files changed, 1276 insertions(+), 52 deletions(-) create mode 100644 docs/LESSONS-audio-mixing-attempts.md create mode 100644 include/cuframes_composer/audio.h create mode 100644 include/cuframes_composer/writer.h create mode 100644 src/audio.c create mode 100644 src/writer.c diff --git a/CMakeLists.txt b/CMakeLists.txt index ee08e52..5101ef8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -55,6 +55,11 @@ find_path(LIBJSONC_INCLUDE_DIR json-c/json.h) find_library(LIBMOSQUITTO_LIBRARY NAMES mosquitto REQUIRED) find_path(LIBMOSQUITTO_INCLUDE_DIR mosquitto.h) +# libavformat / libavcodec / libavutil — для нативного mpegts output (Phase 7) +find_package(PkgConfig REQUIRED) +pkg_check_modules(LIBAV REQUIRED IMPORTED_TARGET + libavformat libavcodec libavutil) + # ── Сторонние библиотеки (subomodules в third_party/) ─────────────────── # cuframes — статически линкуем libcuframes. cuframes_static — это static lib diff --git a/docker/Dockerfile b/docker/Dockerfile index 0e51474..389aa99 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -24,9 +24,10 @@ FROM nvidia/cuda:12.4.1-devel-ubuntu22.04 AS builder RUN apt-get update && apt-get install -y --no-install-recommends \ - build-essential cmake git \ + build-essential cmake git pkg-config \ libpng-dev libfreetype-dev \ libzmq3-dev libjson-c-dev libmosquitto-dev \ + libavformat-dev libavcodec-dev libavutil-dev \ && rm -rf /var/lib/apt/lists/* WORKDIR /src @@ -52,9 +53,13 @@ FROM nvidia/cuda:12.4.1-runtime-ubuntu22.04 AS runtime # Ubuntu 22.04 jammy: package names иные чем noble (libpng16-16 без t64). # libcudart12 уже в runtime image; здесь только наши user-space deps. # netcat-openbsd — для healthcheck'а через ZMQ ping (см. HEALTHCHECK ниже). +# ffmpeg — для transmux raw h264 → mpegts (shell pipe chain после +# grid_record); без него ffmpeg-mux downstream не справляется с +# intra refresh без IDR boundaries. RUN apt-get update && apt-get install -y --no-install-recommends \ libpng16-16 libfreetype6 \ libzmq5 libjson-c5 libmosquitto1 \ + libavformat58 libavcodec58 libavutil56 \ fonts-dejavu-core \ netcat-openbsd \ && rm -rf /var/lib/apt/lists/* diff --git a/docs/LESSONS-audio-mixing-attempts.md b/docs/LESSONS-audio-mixing-attempts.md new file mode 100644 index 0000000..e8aa5f8 --- /dev/null +++ b/docs/LESSONS-audio-mixing-attempts.md @@ -0,0 +1,288 @@ +# LESSONS: попытки добавить audio mixing в cuframes-composer + +**Контекст**: cuframes-composer (gx/cuframes-composer) композирует 4 cuframes-источника +в grid через CUDA kernels + NVENC H.264, публикует в mediamtx `cfc-grid` path +(только video). Параллельно `cuda-grid-pipeline` (legacy монолитный ffmpeg) +публикует `live` с video + AAC audio — но **месяц подёргивается** из-за +известного `audio chain блокирует video` бага (см. memory feedback). + +**Цель Phase 7** — реализовать audio mixing в composer'е/около него чтобы: +1. Получить стабильное video + audio в одном потоке +2. Retire pipeline'а полностью + +**Статус 2026-06-03**: не достигнута, откатано на parallel mode. + +--- + +## Что в итоге работает (текущее prod-состояние) + +``` +rtsp://...:554/live — pipeline, H264+AAC, подёргивается уже месяц (audio mix bag) +rtsp://...:554/cfc-grid — composer, H264 only, стабильное video +rtsp://...:554/live-audio — cuda-grid-audio, AAC only +``` + +TV/VLC рекомендуется использовать `cfc-grid` для video и (опционально) синхронизировать +`live-audio` параллельно на стороне reader'а до того как Phase 7 будет завершён. + +--- + +## Хронология попыток и почему каждая не сработала + +### Попытка 1: `cfc-grid-ffmpeg` mux — raw H.264 в pipe + RTSP audio → `-c copy` + +```yaml +ffmpeg -f h264 -i /pipe/grid.h264 -i rtsp://.../live-audio -c copy -map 0:v -map 1:a -f rtsp ... +``` + +**Проблема**: Raw H.264 в pipe не имеет PTS/DTS на container-уровне. ffmpeg синтезирует +через `-r 25` или wallclock, audio RTSP с правильными RTP timestamps, **interleave broken**. +Mediamtx drops video либо audio. Symptoms: 4 fps video (от 25), `Could not write header`, +audio с перебоями. + +**Что я пробовал из костылей** (всё бесплодно): +- `-use_wallclock_as_timestamps 1` на video — ломает audio DTS (`Non-monotonic DTS`) +- `-fflags +genpts+igndts+discardcorrupt` — не помогает +- `-thread_queue_size 32/1024` — без эффекта +- `-muxdelay 0 -max_delay 0` — без эффекта +- `-flush_packets 1` — без эффекта + +### Попытка 2: decode→re-encode через NVDEC+NVENC в cfc-grid-ffmpeg + +```yaml +-hwaccel cuda -i /pipe/grid.h264 -c:v h264_nvenc -intra_refresh 1 -no-scenecut 1 ... +``` + +**Проблема**: `Broken pipe` на RTSP output mediamtx. Также double-encode на GPU удваивает +NVENC sessions (на Pascal GTX 1050 это уже впритык, на 5090 OK но всё равно лишний overhead). + +### Попытка 3: композитор пишет mpegts container нативно (Phase 7 core) + +Архитектура: composer через libavformat → mpegts container → Unix socket в mediamtx. +Решает PTS/DTS на container-уровне + bypass'ит весь split-process pipe→ffmpeg-mux pipeline. + +Реализация в `src/writer.c`, `src/audio.c`, mediamtx `MTX_PATHS_CFCGRID_SOURCE=unix+mpegts:///...`. + +#### 3.1: PPS error `non-existing PPS 0 referenced` + +**Причина**: libavformat mpegts muxer **автоматически инсёртит** `h264_mp4toannexb` +bitstream-фильтр, который ожидает **AVCC** формат (length-prefix NAL units). +NVENC выдаёт **Annex-B** (start codes). При попытке "конвертации" парсер ломается. + +Дополнительная инъекция SPS/PPS через `NV_ENC_PIC_FLAG_OUTPUT_SPSPPS` per-frame +(который я добавил пытаясь решить late-joiner проблему) усугубляет — двойная инжекция +создаёт мусор для парсера. + +**Fix**: убрать `NV_ENC_PIC_FLAG_OUTPUT_SPSPPS` per-frame. Оставить только `repeatSPSPPS=1` +в init. NVENC сам ставит SPS/PPS перед каждым началом intra refresh cycle. + +См. также OBS Studio issue #7338 — intra refresh + NVENC + mpegts = известно сломанная пара +в индустрии. + +#### 3.2: 12 секунд latency + +**Причины** (сложение): +- `-analyzeduration 30000000 -probesize 50000000` в ffmpeg-mux input — **30 секунд hard cap** + на пробинг (libavformat ждёт байт пока поймёт codec). Для intra refresh stream без IDR + ffmpeg вынужден ждать первого refresh cycle. +- `av_interleaved_write_frame` буферизует пакеты до `max_interleave_delta` (default 10 секунд). +- Default AVIO write buffer 256KB — на 6 Mbps это ~350ms. +- Named pipe (FIFO) + stdio buffers по 64KB ещё ~100ms. +- Сложение ≈ 12+ секунд. + +**Fix частично**: `AVFMT_FLAG_FLUSH_PACKETS`, `max_interleave_delta=200000` (200ms), +уменьшение thread_queue, `-flush_packets 1`. **Не убирает все источники** — глубже +композитор/AVIO buffering остаётся. + +#### 3.3: `omit_video_pes_length` не существует во flags + +**Fix**: убрать. Достаточно `+resend_headers+pat_pmt_at_frames`. + +#### 3.4: mediamtx env-var `MTX_PATHS_CFC_GRID_SOURCE` не разбирается + +mediamtx env parser использует `_` как разделитель между `` и ``. +Дефис в path name (`cfc-grid`) ломает pattern → mediamtx думает что path называется +`cfc` а `grid_source` это что-то другое. + +**Fix**: переименовать path в `cfcgrid` (без дефиса) либо использовать YAML config file. + +#### 3.5: mediamtx закрывает Unix socket — `read unix: i/o timeout` + +Default mediamtx ReadTimeout = 10 секунд. Composer пишет с burst'ами (8 audio packets +после каждого video frame), AVIO buffers накапливаются, socket I/O slow → mediamtx +видит idle > 10s → disconnect → composer reconnects → cycle. + +**Partial fix**: `MTX_READTIMEOUT=30s` + `MTX_WRITETIMEOUT=30s` в mediamtx env. Помогает +не полностью. + +#### 3.6: `malloc_consolidate(): unaligned fastbin chunk detected` (heap corruption) + +**Причина 1 (исправлена)**: `av_packet_alloc()` + manual `av_malloc(pkt->data)` + затем +`av_interleaved_write_frame` — конфликт ownership. `av_interleaved_write_frame` берёт +ownership packet'а через AVBufferRef, но у меня data была вне buffer ref → libav +освобождает невалидно. + +**Fix**: использовать `av_new_packet(pkt, size)` — создаёт proper AVBufferRef, дальше +memcpy в `pkt->data`. + +**Причина 2 (исправлена)**: SPSC ring buffer в `audio.c` — когда полный, producer (audio +thread) `free`'ил oldest данные и двигал `tail` index. Но `tail` — это домен consumer'а +(main thread). Concurrent modification → use-after-free → heap corruption. + +**Fix**: при full ring **drop newest** (free(new_pkt.data); return -1), не трогать `tail`. + +#### 3.7: Non-monotonic DTS для audio packets + +Audio packets часто приходят пачкой через TCP (несколько AAC frames в одном recv). +Если назначать PTS как `wallclock_now - start`, все они получат идентичный timestamp +→ `non monotonic DTS` от mpegts muxer. + +**Fix**: monotonic PTS на основе накопленных samples'ов. AAC-LC = 1024 samples/frame. +`pts_ns = total_samples * 1_000_000_000 / sample_rate`. `total_samples += 1024` per frame. + +#### 3.8: Video PTS skew с audio PTS + +Video PTS было через wall-clock relative to start. Audio PTS — через accumulated samples. +Разные clock sources → постепенный drift со временем. + +**Fix**: video PTS тоже через monotonic counter: `pts = frame_idx * duration_per_frame`, +где `duration_per_frame = 1/fps в time_base`. Это даёт **точный** 25 fps tempo без skew +от composer thread jitter. + +--- + +## Что осталось нерешённым (для будущей попытки) + +Даже после всех вышеперечисленных фиксов финальный результат был: +- Звук с затыками +- Видео тормозит +- Периодические разрывы соединения VLC + +**Гипотезы про оставшийся источник проблем**: + +1. **mediamtx + Unix socket backpressure под burst write'ами**. Когда composer пишет + video frame + drain 8 audio packets подряд за один tick, mediamtx видит burst, + reader queue заполняется, reader падает. См. `MTX_WRITEQUEUESIZE=256` (default + слишком мал для burst'ов, увеличение влияет на latency). + +2. **AVIO буфер vs `AVFMT_FLAG_FLUSH_PACKETS`**. Возможно flag flush'ит после каждого + packet'а, но `av_interleaved_write_frame` ставит packets в interleave queue ДО flush'а. + Net эффект: queue до 200ms (`max_interleave_delta`), потом burst flush. + +3. **`mpegts` muxer не низколатентный по дизайну**. Mediamtx внутренне распаковывает + PES → собирает свои RTSP track'и → пакетизирует в RTP. Каждый шаг добавляет + buffering. Возможно правильный путь — `gortsplib` встроенный RTSP publisher + (паттерн C из RESEARCH-audio-mixing-low-latency.md), который не использует mpegts + контейнер вовсе. + +4. **Composer audio drain timing**. Я drain'ю до 8 packets после каждого video frame. + При 43 audio packets/sec (44.1k/1024) и 25 video frames/sec получается 43/25 = 1.72 + packets/frame в среднем. С drain'ом до 8 — большинство фреймов drain'ит 1-2 packets, + но при катэппинге очереди (после coмposer pause или audio source jitter) batch'ит до 8. + Это может вызывать interleave irregular. + + **Альтернатива** — drain до 1 packet'а за раз, но N раз пер video tick. Либо drain + по wall-clock cadence audio (через timerfd на 23.22ms tick). + +5. **Pre-allocation/post-allocation race**. Mediamtx ожидает PAT/PMT для path + ready'нения. Мой first frame отправляет PAT/PMT (через `pat_pmt_at_frames` flag), + но возможно несколько initial frames проходят без PAT/PMT и mediamtx считает + их корруптом. + +--- + +## Что писать ДО следующей попытки + +**Benchmark-driven research** — недостаточно теоретического плана. Нужно: + +1. **Замерить реальную latency на каждой stage'и** через timestamps в логах: + - `composer compose_compose finish` → `encode_frame begin` + - `encode_frame begin` → `on_bitstream callback` + - `on_bitstream` → `av_interleaved_write_frame return` + - `av_interleaved_write_frame` → byte appears в `/run/mediamtx/sock` (через `strace -e write`) + - mediamtx receives → mediamtx sends RTP packet (через mediamtx logs DEBUG) + - RTP packet → VLC decoder ready (VLC verbose logs) + +2. **Сравнить с baseline'ом** — cuda-grid-pipeline даёт `<1s end-to-end` пока audio mix + не block'ает video. Что у него отличается архитектурно? Один процесс, один muxer, + один encoder thread. Mediamtx видит rtsp publisher вместо unix-mpegts publisher. + +3. **Изучить **mediamtx mpeg-ts internals** — `unix+mpegts://` source насколько хорошо + протестирован? Возможно есть known bag'и. Issue tracker bluenviron/mediamtx. + +4. **Альтернативные пути**: + - **rtsp publisher через gortsplib** — `gortsplib` это Go библиотека, не C, но можно + попробовать обвязку через CGo или standalone Go-процесс с TCP control от composer'а. + - **RTP-based publisher напрямую** — собрать H.264 NAL units в RTP packets (RFC 6184), + AAC в mpeg4-generic RTP (RFC 3640), отправлять на mediamtx RTSP ANNOUNCE/RECORD + session. Самый low-latency но самый много кода (~1500 строк). + +5. **Подумать про прочный workaround**: 2 отдельных RTSP path'а (`cfc-grid` для video, + `live-audio` для audio) и client-side sync. VLC/mpv поддерживают это через + `--audio-file=rtsp://...:554/live-audio`. Это не "правильный" монолитный поток но + зато стабильное. + +--- + +## Список созданных файлов (остались в репо для будущей доработки) + +``` +include/cuframes_composer/writer.h — public API: cfc_writer_create/write/write_audio/close + Поддерживает h264 raw + mpegts container + formats. Audio stream опциональный. +include/cuframes_composer/audio.h — public API: cfc_audio_create/get_codec_params/ + drain/destroy. SPSC ring buffer architecture. + +src/writer.c — backend impl. Lessons: av_new_packet vs av_malloc, + AVFMT_FLAG_FLUSH_PACKETS, max_interleave_delta=200000, + monotonic video PTS на frame_idx, + monotonic audio PTS на total_samples +src/audio.c — audio thread, lock-free SPSC ring (drop newest при + full), RTSP open с low-latency options + (nobuffer+flush_packets, probesize=32768, + analyzeduration=0), AAC parameters discovery + +src/nvenc.c — encoder. Lesson: убрать NV_ENC_PIC_FLAG_OUTPUT_SPSPPS + per-frame, оставить только repeatSPSPPS=1 + +examples/grid_record.c — --audio-source=rtsp://... + --format=mpegts options +docs/RESEARCH-audio-mixing-low-latency.md — research-doc с industry patterns и + recommended approach (паттерн B = mpegts через + Unix socket) +docs/LESSONS-audio-mixing-attempts.md — этот файл +``` + +`gx/cuframes-composer:0.6` image содержит всё необходимое (libavformat-dev в builder, +libavformat58 в runtime). Audio mixing включается через `--format=mpegts +--audio-source=rtsp://...:8554/live-audio` в command'е. + +Для возвращения к Phase 7 деплою — раскомментировать `MTX_PATHS_CFCGRID_SOURCE` в +mediamtx env vars + изменить `cfc-grid` command в compose. См. git history +`localhost-infra` для exact diff'ов. + +--- + +## TL;DR для следующего раза + +1. **Не повторять** добавление `NV_ENC_PIC_FLAG_OUTPUT_SPSPPS` per-frame — двойная инжекция SPS/PPS + ломает downstream parsers. +2. **Не повторять** SPSC ring где producer трогает `tail` (consumer's side) — heap + corruption. Drop newest, не oldest. +3. **Не использовать** `av_packet_alloc` + manual `av_malloc(data)` с + `av_interleaved_write_frame` — нужен `av_new_packet` для AVBufferRef. +4. **Не использовать** wall-clock PTS если есть монотонный счётчик (frame_idx / + total_samples) — гарантирует sync video/audio. +5. **Audio packets** приходят пачками через TCP — нельзя назначать PTS = wallclock(now). +6. **mpegts muxer + h264_mp4toannexb BSF** — несовместим с intra refresh без careful + setup. См. OBS issue #7338. +7. **mediamtx env-var path names** не могут иметь `-` (parser ломается на `_` boundary). +8. **Default mediamtx ReadTimeout=10s** — слишком короткий для composer'а с burst write'ами. +9. **analyzeduration/probesize в ffmpeg input** — главные виновники multi-second startup + latency, не реальной streaming latency. +10. **Не пытаться** "наobum experiments" с flag'ами — каждое изменение должно быть + основано на measurement или цитате из доков. Иначе rabbit hole'у нет конца. + +**Главный insight**: даже идеально сделанный mpegts через Unix socket путь имеет +inherent buffering, который не убрать без переходом на native RTP/RTSP publisher. +Возможно, **правильный финальный путь — gortsplib** (паттерн C из research-doc), +но это +1500 строк кода. diff --git a/examples/grid_record.c b/examples/grid_record.c index 03609eb..b7fe023 100644 --- a/examples/grid_record.c +++ b/examples/grid_record.c @@ -23,6 +23,8 @@ #include "../include/cuframes_composer/overlay.h" #include "../include/cuframes_composer/control.h" #include "../include/cuframes_composer/health.h" +#include "../include/cuframes_composer/writer.h" +#include "../include/cuframes_composer/audio.h" #include @@ -43,7 +45,7 @@ static volatile sig_atomic_t g_stop = 0; static void on_sig(int s) { (void)s; g_stop = 1; } typedef struct write_ctx { - FILE *fp; + cfc_writer_t *writer; uint64_t bytes_written; uint64_t frames_encoded; uint64_t idr_count; @@ -52,9 +54,8 @@ typedef struct write_ctx { static void on_bitstream(const uint8_t *bs, size_t size, int64_t pts_ns, int is_idr, void *user) { - (void)pts_ns; write_ctx_t *ctx = (write_ctx_t *)user; - if (fwrite(bs, 1, size, ctx->fp) == size) { + if (cfc_writer_write(ctx->writer, bs, size, pts_ns, is_idr) == 0) { ctx->bytes_written += size; ctx->frames_encoded++; if (is_idr) ctx->idr_count++; @@ -119,6 +120,8 @@ int main(int argc, char **argv) const char *mqtt_instance = "cfc-grid"; /* --mqtt-instance NAME */ const char *mqtt_user = NULL; const char *mqtt_pass = NULL; + const char *out_format = "h264"; /* --format h264|mpegts */ + const char *audio_source = NULL; /* --audio-source rtsp://.../live-audio */ static struct option opts[] = { {"out", required_argument, 0, 'o'}, @@ -137,10 +140,12 @@ int main(int argc, char **argv) {"mqtt-user", required_argument, 0, 'U'}, {"mqtt-pass", required_argument, 0, 'P'}, {"intra-refresh", no_argument, 0, 'R'}, + {"format", required_argument, 0, 'F'}, /* h264|mpegts */ + {"audio-source", required_argument, 0, 'A'}, /* RTSP audio URL */ {0, 0, 0, 0}, }; int c; - while ((c = getopt_long(argc, argv, "o:c:f:b:W:H:s:r:i:t:C:M:I:U:P:R", opts, NULL)) != -1) { + while ((c = getopt_long(argc, argv, "o:c:f:b:W:H:s:r:i:t:C:M:I:U:P:RF:A:", opts, NULL)) != -1) { switch (c) { case 'o': out_path = optarg; break; case 'c': @@ -178,6 +183,8 @@ int main(int argc, char **argv) case 'U': mqtt_user = optarg; break; case 'P': mqtt_pass = optarg; break; case 'R': intra_refresh = 1; break; + case 'F': out_format = optarg; break; + case 'A': audio_source = optarg; break; case 't': { if (num_texts >= MAX_CELLS) { fprintf(stderr, "max %d texts\n", MAX_CELLS); return 1; } /* Опциональный prefix "id=NAME:" — задаёт control-plane ID. */ @@ -276,6 +283,7 @@ int main(int argc, char **argv) .cells = cells, .num_cells = num_cells, .cuda_device = 0, + .consumer_prefix = mqtt_instance, /* уникальный namespace на каждый composer */ }; cfc_composer_t *comp = NULL; if (cfc_composer_create(&ccfg, &comp) != 0) { @@ -407,28 +415,73 @@ int main(int argc, char **argv) return 1; } - /* Output: "-" / "/dev/stdout" / "pipe:1" = stdout (для pipe в ffmpeg). - * stdout не закрывается через fclose чтобы не убивать дочерний процесс - * raньше времени. */ - write_ctx_t wctx = { 0 }; - int is_stdout = (!strcmp(out_path, "-") || !strcmp(out_path, "pipe:1") || - !strcmp(out_path, "/dev/stdout")); - if (is_stdout) { - wctx.fp = stdout; - /* line-buffer'инг disabled — пишем full-buffered для производительности. - * Caller'у нужно flush при exit. */ - setvbuf(stdout, NULL, _IOFBF, 1024 * 1024); - } else { - wctx.fp = fopen(out_path, "wb"); - if (!wctx.fp) { - fprintf(stderr, "fopen(%s): %s\n", out_path, strerror(errno)); - cfc_encoder_destroy(enc); - cfc_composer_destroy(comp); - return 1; + /* Audio consumer (опциональный, Phase 7). Запускаем РАНЬШЕ writer'а + * чтобы успеть получить codec params (sample_rate, channels, extradata) + * до avformat_write_header — иначе audio stream'у не будет правильного + * setup'а. Polling до 5 секунд. */ + cfc_audio_t *audio = NULL; + int audio_sample_rate = 0, audio_channels = 0; + const uint8_t *audio_extradata = NULL; + size_t audio_extradata_size = 0; + + if (audio_source) { + cfc_audio_config_t acfg = { .rtsp_url = audio_source }; + if (cfc_audio_create(&acfg, &audio) != 0) { + fprintf(stderr, "[grid_record] audio create failed, продолжаю без audio\n"); + } else { + fprintf(stderr, "[grid_record] жду audio codec params от %s ...\n", audio_source); + /* 30 секунд polling — audio source (cuda-grid-audio) может ещё + * подниматься после recreate стeка. Audio thread сам retry'ится + * с exp backoff. */ + for (int i = 0; i < 300; i++) { /* 300 × 100ms = 30s */ + if (cfc_audio_get_codec_params(audio, &audio_sample_rate, + &audio_channels, &audio_extradata, + &audio_extradata_size) == 0) { + fprintf(stderr, + "[grid_record] audio готов: AAC %dHz %dch extradata=%zub\n", + audio_sample_rate, audio_channels, audio_extradata_size); + break; + } + struct timespec ts = { .tv_sec = 0, .tv_nsec = 100 * 1000 * 1000 }; + nanosleep(&ts, NULL); + } + if (audio_sample_rate == 0) { + fprintf(stderr, "[grid_record] audio params не получены за 30с, без audio\n"); + cfc_audio_destroy(audio); audio = NULL; + } } } - fprintf(stderr, "[grid_record] начало записи в %s (Ctrl+C для остановки)\n", - out_path); + + /* Writer: mpegts с video + опциональным audio. */ + uint8_t spspps[256]; size_t spspps_len = sizeof(spspps); + cfc_encoder_get_sequence_params(enc, spspps, &spspps_len); + + cfc_writer_config_t wcfg = { + .path = out_path, + .format = out_format, + .width = out_w, + .height = out_h, + .fps_num = fps, + .fps_den = 1, + .bitrate_kbps = bitrate, + .extradata = spspps, + .extradata_size = spspps_len, + .has_audio = audio ? 1 : 0, + .audio_sample_rate = audio_sample_rate, + .audio_channels = audio_channels, + .audio_extradata = audio_extradata, + .audio_extradata_size = audio_extradata_size, + }; + write_ctx_t wctx = { 0 }; + if (cfc_writer_create(&wcfg, &wctx.writer) != 0) { + fprintf(stderr, "cfc_writer_create(%s, %s) failed\n", out_path, out_format); + if (audio) cfc_audio_destroy(audio); + cfc_encoder_destroy(enc); + cfc_composer_destroy(comp); + return 1; + } + fprintf(stderr, "[grid_record] начало записи в %s [format=%s%s] (Ctrl+C для остановки)\n", + out_path, out_format, audio ? "+audio" : ""); /* Main loop — frame cadence по wall clock'у. */ struct timespec ts_start; @@ -461,12 +514,18 @@ int main(int argc, char **argv) } int64_t pts_ns = (now_us - start_us) * 1000; + /* Не break'аем при encode/write failure — это обычно временно + * (mediamtx reconnect, socket broken). Просто логируем и продолжаем, + * следующая encode/write попытается заново. */ if (cfc_encoder_encode_frame(enc, out_y, out_pitch, pts_ns, on_bitstream, &wctx) != 0) { - fprintf(stderr, "[grid_record] encode failed\n"); - break; + static int warned = 0; + if (!warned) { fprintf(stderr, "[grid_record] encode failed (продолжаю)\n"); warned = 1; } } + /* Drain audio packets — пишем сразу после video frame. */ + if (audio) cfc_audio_drain(audio, wctx.writer, 8); + if (wctx.frames_encoded > 0 && wctx.frames_encoded % 50 == 0) { double elapsed = (now_us - start_us) / 1e6; cfc_composer_health_t h; @@ -497,8 +556,8 @@ int main(int argc, char **argv) (unsigned long long)wctx.idr_count, wctx.bytes_written / 1048576.0); - fflush(wctx.fp); - if (!is_stdout) fclose(wctx.fp); + cfc_writer_close(wctx.writer); + if (audio) cfc_audio_destroy(audio); if (ctl) cfc_control_destroy(ctl); if (hpub) cfc_health_destroy(hpub); cfc_encoder_destroy(enc); diff --git a/examples/simple_record.c b/examples/simple_record.c index 242b8aa..f7d23c3 100644 --- a/examples/simple_record.c +++ b/examples/simple_record.c @@ -18,6 +18,7 @@ #include "../include/cuframes_composer/nvenc.h" #include "../include/cuframes_composer/source.h" +#include "../include/cuframes_composer/writer.h" #include @@ -42,21 +43,17 @@ static void on_sigint(int sig) /* user-data, передаваемая encoder callback'у */ typedef struct write_ctx { - FILE *fp; + cfc_writer_t *writer; uint64_t bytes_written; uint64_t frames_encoded; uint64_t idr_count; } write_ctx_t; -/* Encoder callback — пишем H.264 Annex-B bytes как есть. */ static void on_bitstream(const uint8_t *bs, size_t size, int64_t pts_ns, int is_idr, void *user) { - (void)pts_ns; write_ctx_t *ctx = (write_ctx_t *)user; - if (fwrite(bs, 1, size, ctx->fp) != size) { - fprintf(stderr, "[simple_record] fwrite failed: %s\n", strerror(errno)); - } else { + if (cfc_writer_write(ctx->writer, bs, size, pts_ns, is_idr) == 0) { ctx->bytes_written += size; ctx->frames_encoded++; if (is_idr) ctx->idr_count++; @@ -85,6 +82,7 @@ int main(int argc, char **argv) int fps = 25; int bitrate_kbps = 5000; int max_seconds = 0; /* 0 = до SIGINT */ + const char *out_format = "h264"; static struct option opts[] = { {"key", required_argument, 0, 'k'}, @@ -92,17 +90,19 @@ int main(int argc, char **argv) {"fps", required_argument, 0, 'f'}, {"bitrate", required_argument, 0, 'b'}, {"seconds", required_argument, 0, 's'}, + {"format", required_argument, 0, 'F'}, {"help", no_argument, 0, 'h'}, {0, 0, 0, 0}, }; int c; - while ((c = getopt_long(argc, argv, "k:o:f:b:s:h", opts, NULL)) != -1) { + while ((c = getopt_long(argc, argv, "k:o:f:b:s:F:h", opts, NULL)) != -1) { switch (c) { case 'k': key = optarg; break; case 'o': out_path = optarg; break; case 'f': fps = atoi(optarg); break; case 'b': bitrate_kbps = atoi(optarg); break; case 's': max_seconds = atoi(optarg); break; + case 'F': out_format = optarg; break; case 'h': default: fprintf(stderr, @@ -180,20 +180,25 @@ int main(int argc, char **argv) goto cleanup_src; } - /* 5) Открыть выходной файл (или stdout если "-"/"pipe:1"). */ + /* 5) Открыть writer (h264 raw или mpegts container). */ + uint8_t spspps[256]; size_t spspps_len = sizeof(spspps); + cfc_encoder_get_sequence_params(enc, spspps, &spspps_len); + + cfc_writer_config_t wcfg = { + .path = out_path, + .format = out_format, + .width = snap.width, + .height = snap.height, + .fps_num = fps, + .fps_den = 1, + .bitrate_kbps = bitrate_kbps, + .extradata = spspps, + .extradata_size = spspps_len, + }; write_ctx_t wctx = { 0 }; - int is_stdout = (!strcmp(out_path, "-") || !strcmp(out_path, "pipe:1") || - !strcmp(out_path, "/dev/stdout")); - if (is_stdout) { - wctx.fp = stdout; - setvbuf(stdout, NULL, _IOFBF, 1024 * 1024); - } else { - wctx.fp = fopen(out_path, "wb"); - if (!wctx.fp) { - fprintf(stderr, "[simple_record] fopen(%s) failed: %s\n", - out_path, strerror(errno)); - goto cleanup_enc; - } + if (cfc_writer_create(&wcfg, &wctx.writer) != 0) { + fprintf(stderr, "[simple_record] cfc_writer_create failed\n"); + goto cleanup_enc; } /* 6) Главный цикл — забираем кадры по seq, кодируем. */ @@ -265,8 +270,7 @@ int main(int argc, char **argv) (unsigned long long)wctx.idr_count, wctx.bytes_written / 1048576.0); - fflush(wctx.fp); - if (!is_stdout) fclose(wctx.fp); + cfc_writer_close(wctx.writer); cleanup_enc: cfc_encoder_destroy(enc); diff --git a/include/cuframes_composer/audio.h b/include/cuframes_composer/audio.h new file mode 100644 index 0000000..3330de1 --- /dev/null +++ b/include/cuframes_composer/audio.h @@ -0,0 +1,66 @@ +/* cuframes-composer — audio consumer (Phase 7). + * + * Открывает RTSP-вход с AAC stream'ом (от cuda-grid-audio через mediamtx + * либо прямо с микрофона). В отдельном thread'е читает packet'ы и кладёт + * в lock-free SPSC ring buffer. Video-thread (composer main loop) drain'ит + * ring и пишет audio packet'ы в общий mpegts muxer через + * cfc_writer_write_audio. + * + * Architecture: + * audio_thread: avformat_open_input → av_read_frame loop → push to ring + * main thread (video): cfc_audio_drain → cfc_writer_write_audio loop + * + * Failure mode: если RTSP source падает, audio_thread пытается reconnect + * с exponential backoff. Video не блокируется. + * + * Лицензия: LGPL-2.1+ + */ + +#ifndef CUFRAMES_COMPOSER_AUDIO_H +#define CUFRAMES_COMPOSER_AUDIO_H + +#include "writer.h" + +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct cfc_audio_config { + const char *rtsp_url; /* "rtsp://mediamtx:8554/live-audio" */ + int reconnect_min_ms; /* default 1000 */ + int reconnect_max_ms; /* default 10000 */ +} cfc_audio_config_t; + +typedef struct cfc_audio cfc_audio_t; + +/* Создать audio consumer + запустить background thread. + * Получает первый packet и сохраняет codec params (sample_rate, channels, + * extradata) — caller потом передаёт их в cfc_writer_config_t. */ +int cfc_audio_create(const cfc_audio_config_t *cfg, cfc_audio_t **out); + +/* Получить codec params (заполняется после первого успешного read). + * Возвращает 0 если params уже доступны, -1 если ещё нет. Caller может + * polling'ом ждать. */ +int cfc_audio_get_codec_params( + cfc_audio_t *a, + int *sample_rate, + int *channels, + const uint8_t **extradata, + size_t *extradata_size +); + +/* Drain до N audio packet'ов в writer. Не блокируется. Возвращает число + * записанных packet'ов. */ +int cfc_audio_drain(cfc_audio_t *a, cfc_writer_t *writer, int max_packets); + +/* Остановить thread + освободить ресурсы. */ +int cfc_audio_destroy(cfc_audio_t *a); + +#ifdef __cplusplus +} +#endif + +#endif /* CUFRAMES_COMPOSER_AUDIO_H */ diff --git a/include/cuframes_composer/composer.h b/include/cuframes_composer/composer.h index fc40799..85f04b7 100644 --- a/include/cuframes_composer/composer.h +++ b/include/cuframes_composer/composer.h @@ -62,6 +62,13 @@ typedef struct cfc_composer_config { int reconnect_max_ms; /* default 30000 */ int stale_threshold_ms; /* default 500 */ int dead_threshold_ms; /* default 5000 */ + + /* Prefix для consumer_name внутри cuframes_subscriber_create. + * default = "composer" (для обратной совместимости с фазами 1-6). + * Уникальный prefix позволяет нескольким composer'ам одновременно + * subscribe к одним и тем же publisher'ам (cfc-grid + cuda-grid-pipeline + * параллельно, у каждого свой namespace). */ + const char *consumer_prefix; } cfc_composer_config_t; typedef struct cfc_composer cfc_composer_t; diff --git a/include/cuframes_composer/writer.h b/include/cuframes_composer/writer.h new file mode 100644 index 0000000..8951e89 --- /dev/null +++ b/include/cuframes_composer/writer.h @@ -0,0 +1,92 @@ +/* cuframes-composer — output writer abstraction. + * + * Энкодер NVENC отдаёт сжатый H.264 bitstream (Annex-B byte stream) + + * timestamp в callback'е. Writer берёт эту последовательность и пишет + * либо как raw H.264 в файл/stdout, либо как mpegts container с правильными + * PTS/DTS в header'ах PES packet'ов. + * + * Зачем mpegts: + * Raw H.264 в pipe не содержит timestamps на container-уровне; downstream + * ffmpeg-mux вынужден синтезировать PTS из `-r` или wallclock, что вызывает + * desync с audio и drops при mux'ировании с другим потоком. MPEG-TS даёт + * нативные PTS/DTS на каждом PES packet'е → downstream mux работает + * через `-c copy` без проблем. + * + * Форматы: + * "h264" — raw H.264 Annex-B byte stream (как до Phase 6); fwrite в file + * "mpegts" — MPEG-TS container; libavformat avformat_write_header + + * av_interleaved_write_frame + * + * Path semantics: + * "/path/to/file.h264" — обычный файл + * "-" / "pipe:1" / "/dev/stdout" — stdout (для shell-pipe в downstream) + * + * Лицензия: LGPL-2.1+ + */ + +#ifndef CUFRAMES_COMPOSER_WRITER_H +#define CUFRAMES_COMPOSER_WRITER_H + +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct cfc_writer cfc_writer_t; + +typedef struct cfc_writer_config { + const char *path; /* "/path/file.ts", "unix:/run/.../sock", "-" */ + const char *format; /* "h264" или "mpegts" */ + int width, height; /* для mpegts video codecpar */ + int fps_num, fps_den; /* для mpegts time_base hint */ + int bitrate_kbps; /* для mpegts mux PCR pacing */ + + /* SPS/PPS — video codec extradata, кладётся в codecpar до write_header. + * Берётся из cfc_encoder_get_sequence_params. Обязательно для mpegts — + * mpegts muxer кладёт SPS/PPS перед первым IDR из extradata. */ + const uint8_t *extradata; + size_t extradata_size; + + /* Audio stream — если задан, в mpegts будет второй stream (AAC). + * extradata_audio = AudioSpecificConfig (2 байта обычно). */ + int has_audio; /* 0 = video-only, 1 = add audio stream */ + int audio_sample_rate; /* например 44100 */ + int audio_channels; /* например 2 */ + const uint8_t *audio_extradata; /* ASC bytes */ + size_t audio_extradata_size; +} cfc_writer_config_t; + +int cfc_writer_create(const cfc_writer_config_t *cfg, cfc_writer_t **out); + +/* Записать один закодированный video frame. is_keyframe ставит флаг + * AV_PKT_FLAG_KEY. Использует av_write_frame (без интерливинг-очереди). */ +int cfc_writer_write( + cfc_writer_t *w, + const uint8_t *bitstream, + size_t size, + int64_t pts_ns, + int is_keyframe +); + +/* Записать audio packet (AAC ADTS либо raw — определяется extradata). + * Возвращает 0 при успехе. Thread-safe относительно write video? + * Нет — caller обязан вызывать оба write_* из одного thread'а либо + * локироваться. В нашей архитектуре video-thread drain'ит audio-ring + * и пишет сюда сам. */ +int cfc_writer_write_audio( + cfc_writer_t *w, + const uint8_t *aac_data, + size_t size, + int64_t pts_ns +); + +/* Закрыть writer. Для mpegts вызывает av_write_trailer. */ +int cfc_writer_close(cfc_writer_t *w); + +#ifdef __cplusplus +} +#endif + +#endif /* CUFRAMES_COMPOSER_WRITER_H */ diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index e4a2ee6..b14d0a4 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -18,6 +18,8 @@ set(COMPOSER_SOURCES_C overlay.c control.c health.c + writer.c + audio.c ) set(COMPOSER_SOURCES_CU cugrid/cugrid.cu @@ -58,6 +60,7 @@ foreach(target cuframes_composer cuframes_composer_static) ${LIBZMQ_LIBRARY} # для control plane (Phase 4a) ${LIBJSONC_LIBRARY} # для control plane JSON (Phase 4a) ${LIBMOSQUITTO_LIBRARY} # для MQTT health (Phase 4b) + PkgConfig::LIBAV # для mpegts writer (Phase 7) rt ) target_include_directories(${target} PRIVATE diff --git a/src/audio.c b/src/audio.c new file mode 100644 index 0000000..77e993b --- /dev/null +++ b/src/audio.c @@ -0,0 +1,333 @@ +/* Реализация cfc_audio_t — RTSP AAC consumer + lock-free SPSC ring. + * + * Architecture: + * audio_thread: + * 1. avformat_open_input(rtsp_url) с low-latency options: + * rtsp_transport=tcp, stimeout=2000000us, nobuffer+flush_packets, + * probesize=32768, analyzeduration=0 + * 2. avformat_find_stream_info → найти audio stream + * 3. Сохранить codec params (sample_rate, channels, extradata) + * 4. av_read_frame loop: + * - Если audio packet → копия в ring (SPSC, fixed cap N=64) + * - Если ring полный → drop'ить старейший (video может отставать) + * 5. На EOF/error → exponential backoff reconnect + * + * main thread (video): + * cfc_audio_drain → pop из ring и cfc_writer_write_audio + * + * Lock-free SPSC ring: single producer (audio_thread), single consumer + * (video main thread). Atomic head/tail indices, copy-on-write packet'ы. + * + * Лицензия: LGPL-2.1+ + */ + +#include "../include/cuframes_composer/audio.h" + +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include + +#define CFC_AUDIO_RING_CAP 64 + +typedef struct audio_pkt { + uint8_t *data; + int size; + int64_t pts_ns; +} audio_pkt_t; + +struct cfc_audio { + cfc_audio_config_t cfg; + char url_copy[256]; + + pthread_t thread; + int thread_started; + _Atomic int stop_flag; + + /* Codec params — заполняются audio_thread'ом после первого open'а */ + _Atomic int codec_ready; + pthread_mutex_t params_mu; + int sample_rate; + int channels; + uint8_t *extradata; + int extradata_size; + + /* SPSC ring */ + audio_pkt_t ring[CFC_AUDIO_RING_CAP]; + _Atomic uint32_t head; /* producer (audio_thread) */ + _Atomic uint32_t tail; /* consumer (main) */ + + /* Monotonic PTS на основе накопленных AAC samples. + * Один AAC frame = 1024 samples (для AAC-LC). PTS = total_samples * + * 1_000_000_000 / sample_rate. Это исключает non-monotonic DTS warnings + * при burst arrival packets'ов. */ + int64_t start_us; + int64_t total_samples; /* счётчик samples'ов across packets */ +}; + +/* ── helpers ──────────────────────────────────────────────────────────── */ + +static int64_t now_us(void) +{ + struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + return (int64_t)ts.tv_sec * 1000000 + ts.tv_nsec / 1000; +} + +static void log_av_err(const char *what, int err) +{ + char buf[256]; + av_strerror(err, buf, sizeof(buf)); + fprintf(stderr, "[cfc/audio] %s: %s\n", what, buf); +} + +static int ring_push(cfc_audio_t *a, audio_pkt_t pkt) +{ + uint32_t h = atomic_load_explicit(&a->head, memory_order_relaxed); + uint32_t t = atomic_load_explicit(&a->tail, memory_order_acquire); + uint32_t next = (h + 1) % CFC_AUDIO_RING_CAP; + if (next == t) { + /* Ring full — drop the NEW packet (вместо oldest). Это безопасно + * для SPSC: producer не трогает tail-side данные которые consumer + * может concurrently читать. Раньше я free'ил oldest и двигал tail + * — это вызывало heap corruption из-за гонки с consumer'ом. */ + free(pkt.data); + return -1; + } + a->ring[h] = pkt; + atomic_store_explicit(&a->head, next, memory_order_release); + return 0; +} + +static int ring_pop(cfc_audio_t *a, audio_pkt_t *out) +{ + uint32_t t = atomic_load_explicit(&a->tail, memory_order_relaxed); + uint32_t h = atomic_load_explicit(&a->head, memory_order_acquire); + if (t == h) return -1; /* empty */ + *out = a->ring[t]; + atomic_store_explicit(&a->tail, (t + 1) % CFC_AUDIO_RING_CAP, + memory_order_release); + return 0; +} + +static void ring_clear(cfc_audio_t *a) +{ + audio_pkt_t pkt; + while (ring_pop(a, &pkt) == 0) free(pkt.data); +} + +/* ── audio_thread ─────────────────────────────────────────────────────── */ + +static int audio_open_source(cfc_audio_t *a, AVFormatContext **fmt_ctx, + int *audio_stream_idx) +{ + int err; + AVFormatContext *ctx = NULL; + AVDictionary *opts = NULL; + + /* Low-latency RTSP options. */ + av_dict_set(&opts, "rtsp_transport", "tcp", 0); + av_dict_set(&opts, "stimeout", "2000000", 0); + av_dict_set(&opts, "fflags", "nobuffer+flush_packets", 0); + av_dict_set(&opts, "probesize", "32768", 0); + av_dict_set(&opts, "analyzeduration", "0", 0); + + err = avformat_open_input(&ctx, a->url_copy, NULL, &opts); + av_dict_free(&opts); + if (err < 0) { + log_av_err("avformat_open_input", err); + return -1; + } + + err = avformat_find_stream_info(ctx, NULL); + if (err < 0) { + log_av_err("avformat_find_stream_info", err); + avformat_close_input(&ctx); + return -1; + } + + /* Найти audio stream. */ + int aidx = -1; + for (unsigned i = 0; i < ctx->nb_streams; i++) { + if (ctx->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_AUDIO) { + aidx = (int)i; + break; + } + } + if (aidx < 0) { + fprintf(stderr, "[cfc/audio] no audio stream in %s\n", a->url_copy); + avformat_close_input(&ctx); + return -1; + } + + /* Сохранить codec params для writer'а. */ + AVCodecParameters *cp = ctx->streams[aidx]->codecpar; + pthread_mutex_lock(&a->params_mu); + a->sample_rate = cp->sample_rate; +#if LIBAVCODEC_VERSION_INT >= AV_VERSION_INT(59, 24, 100) + a->channels = cp->ch_layout.nb_channels; +#else + a->channels = cp->channels; +#endif + if (cp->extradata && cp->extradata_size > 0) { + if (a->extradata) free(a->extradata); + a->extradata = malloc(cp->extradata_size); + if (a->extradata) { + memcpy(a->extradata, cp->extradata, cp->extradata_size); + a->extradata_size = cp->extradata_size; + } + } + pthread_mutex_unlock(&a->params_mu); + atomic_store(&a->codec_ready, 1); + + fprintf(stderr, "[cfc/audio] connected %s — AAC %dHz %dch extradata=%db\n", + a->url_copy, cp->sample_rate, a->channels, cp->extradata_size); + + *fmt_ctx = ctx; + *audio_stream_idx = aidx; + return 0; +} + +static void *audio_thread(void *arg) +{ + cfc_audio_t *a = (cfc_audio_t *)arg; + int backoff_ms = a->cfg.reconnect_min_ms; + + while (!atomic_load(&a->stop_flag)) { + AVFormatContext *ctx = NULL; + int aidx = -1; + if (audio_open_source(a, &ctx, &aidx) != 0) { + /* Reconnect backoff */ + for (int waited = 0; waited < backoff_ms && !atomic_load(&a->stop_flag); waited += 100) { + usleep(100 * 1000); + } + backoff_ms *= 2; + if (backoff_ms > a->cfg.reconnect_max_ms) backoff_ms = a->cfg.reconnect_max_ms; + continue; + } + backoff_ms = a->cfg.reconnect_min_ms; + + /* Read loop */ + AVPacket *pkt = av_packet_alloc(); + while (!atomic_load(&a->stop_flag) && pkt) { + int err = av_read_frame(ctx, pkt); + if (err < 0) { + log_av_err("av_read_frame", err); + break; + } + if (pkt->stream_index != aidx) { + av_packet_unref(pkt); + continue; + } + + /* Push to ring — копия data + monotonic PTS на основе total_samples. + * Один AAC frame = 1024 samples; накапливаем и пересчитываем в ns. + * (С RTSP timestamps было бы строже, но они уже могут быть + * jittered'ными или не монотонны при reconnect'ах). */ + audio_pkt_t apkt = { + .data = malloc(pkt->size), + .size = pkt->size, + .pts_ns = a->total_samples * 1000000000LL / a->sample_rate, + }; + a->total_samples += 1024; + if (apkt.data) { + memcpy(apkt.data, pkt->data, pkt->size); + ring_push(a, apkt); + } + av_packet_unref(pkt); + } + av_packet_free(&pkt); + avformat_close_input(&ctx); + + if (atomic_load(&a->stop_flag)) break; + fprintf(stderr, "[cfc/audio] disconnect, retry in %dms\n", backoff_ms); + for (int waited = 0; waited < backoff_ms && !atomic_load(&a->stop_flag); waited += 100) { + usleep(100 * 1000); + } + backoff_ms *= 2; + if (backoff_ms > a->cfg.reconnect_max_ms) backoff_ms = a->cfg.reconnect_max_ms; + } + return NULL; +} + +/* ── Public API ───────────────────────────────────────────────────────── */ + +int cfc_audio_create(const cfc_audio_config_t *cfg, cfc_audio_t **out) +{ + if (!cfg || !cfg->rtsp_url || !out) return -1; + + cfc_audio_t *a = calloc(1, sizeof(*a)); + if (!a) return -1; + a->cfg = *cfg; + strncpy(a->url_copy, cfg->rtsp_url, sizeof(a->url_copy) - 1); + a->cfg.rtsp_url = a->url_copy; + if (a->cfg.reconnect_min_ms <= 0) a->cfg.reconnect_min_ms = 1000; + if (a->cfg.reconnect_max_ms <= 0) a->cfg.reconnect_max_ms = 10000; + + pthread_mutex_init(&a->params_mu, NULL); + atomic_init(&a->stop_flag, 0); + atomic_init(&a->codec_ready, 0); + atomic_init(&a->head, 0); + atomic_init(&a->tail, 0); + a->start_us = now_us(); + + avformat_network_init(); + + if (pthread_create(&a->thread, NULL, audio_thread, a) != 0) { + pthread_mutex_destroy(&a->params_mu); + free(a); + return -1; + } + a->thread_started = 1; + + *out = a; + return 0; +} + +int cfc_audio_get_codec_params(cfc_audio_t *a, int *sample_rate, + int *channels, const uint8_t **extradata, + size_t *extradata_size) +{ + if (!a) return -1; + if (!atomic_load(&a->codec_ready)) return -1; + pthread_mutex_lock(&a->params_mu); + if (sample_rate) *sample_rate = a->sample_rate; + if (channels) *channels = a->channels; + if (extradata) *extradata = a->extradata; + if (extradata_size) *extradata_size = a->extradata_size; + pthread_mutex_unlock(&a->params_mu); + return 0; +} + +int cfc_audio_drain(cfc_audio_t *a, cfc_writer_t *writer, int max_packets) +{ + if (!a || !writer) return 0; + int wrote = 0; + for (int i = 0; i < max_packets; i++) { + audio_pkt_t pkt; + if (ring_pop(a, &pkt) != 0) break; + cfc_writer_write_audio(writer, pkt.data, pkt.size, pkt.pts_ns); + free(pkt.data); + wrote++; + } + return wrote; +} + +int cfc_audio_destroy(cfc_audio_t *a) +{ + if (!a) return 0; + atomic_store(&a->stop_flag, 1); + if (a->thread_started) pthread_join(a->thread, NULL); + ring_clear(a); + if (a->extradata) free(a->extradata); + pthread_mutex_destroy(&a->params_mu); + free(a); + return 0; +} diff --git a/src/composer.c b/src/composer.c index 0ca0f87..e07bef4 100644 --- a/src/composer.c +++ b/src/composer.c @@ -186,7 +186,9 @@ int cfc_composer_create(const cfc_composer_config_t *cfg, cfc_composer_t **out) if (!comp->cells[i].source_key) continue; char name[32]; - snprintf(name, sizeof(name), "composer-%d", i); + const char *prefix = comp->cfg.consumer_prefix; + if (!prefix || !*prefix) prefix = "composer"; + snprintf(name, sizeof(name), "%s-%d", prefix, i); cfc_source_config_t scfg = { .key = comp->cells[i].source_key, diff --git a/src/nvenc.c b/src/nvenc.c index 951c933..8be8092 100644 --- a/src/nvenc.c +++ b/src/nvenc.c @@ -414,6 +414,10 @@ int cfc_encoder_encode_frame(cfc_encoder_t *enc, CUdeviceptr ptr, int pitch, pp.inputWidth = enc->width; pp.inputHeight = enc->height; pp.inputPitch = enc->staging_pitch; + /* SPS/PPS вставляются автоматически через repeatSPSPPS=1 в init-конфиге + * (перед каждым началом intra refresh cycle). Per-frame OUTPUT_SPSPPS + * создавал двойную инжекцию которая ломала h264_mp4toannexb BSF + * в downstream mpegts muxer'е (см. docs/RESEARCH-audio-mixing-low-latency.md). */ pp.pictureStruct = NV_ENC_PIC_STRUCT_FRAME; pp.inputTimeStamp = (uint64_t)pts_ns; diff --git a/src/writer.c b/src/writer.c new file mode 100644 index 0000000..9dde3c4 --- /dev/null +++ b/src/writer.c @@ -0,0 +1,356 @@ +/* Реализация cfc_writer_t — Phase 7 monolithic mpegts → Unix socket + * + * Архитектура (по research'у docs/RESEARCH-audio-mixing-low-latency.md): + * - mpegts backend пишет ОБА stream'а (video + audio) в один AVFormatContext + * - Использует av_write_frame (НЕ av_interleaved_write_frame) — у нас + * pts/dts монотонные, интерливинг-очередь не нужна и добавляет latency + * - Поддерживает URL "unix:/path/to.sock" для прямой публикации в mediamtx + * (mediamtx.yml: source: unix+mpegts:///path/to.sock) + * - NVENC даёт Annex-B inline SPS/PPS через repeatSPSPPS=1; муксер + * автоинсёртит h264_mp4toannexb который использует extradata correctly + * + * - "h264" backend (raw fwrite) сохранён для Phase 1-5 совместимости + * + * Лицензия: LGPL-2.1+ + */ + +#include "../include/cuframes_composer/writer.h" + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +typedef enum { + CFC_WRITER_RAW = 0, + CFC_WRITER_MPEGTS = 1, +} cfc_writer_backend_t; + +struct cfc_writer { + cfc_writer_backend_t backend; + int is_stdout; + + FILE *fp; /* raw backend */ + + AVFormatContext *fmt_ctx; + AVStream *video_stream; + AVStream *audio_stream; + int header_written; + int64_t first_video_pts_ns; + int64_t first_audio_pts_ns; + + /* Monotonic video frame counter — PTS = frame_idx * frame_duration_pts. + * Гарантирует exact frame cadence regardless of wall-clock jitter. */ + int64_t video_frame_idx; +}; + +static int is_stdout_path(const char *p) +{ + return p && (!strcmp(p, "-") || !strcmp(p, "pipe:1") || + !strcmp(p, "/dev/stdout")); +} + +static void log_av_err(const char *what, int err) +{ + char buf[256]; + av_strerror(err, buf, sizeof(buf)); + fprintf(stderr, "[cfc/writer] %s failed: %s\n", what, buf); +} + +/* ── Raw H.264 backend ────────────────────────────────────────────────── */ + +static int raw_create(cfc_writer_t *w, const cfc_writer_config_t *cfg) +{ + w->is_stdout = is_stdout_path(cfg->path); + if (w->is_stdout) { + w->fp = stdout; + setvbuf(stdout, NULL, _IOFBF, 1024 * 1024); + } else { + w->fp = fopen(cfg->path, "wb"); + if (!w->fp) { + fprintf(stderr, "[cfc/writer] fopen(%s): %s\n", + cfg->path, strerror(errno)); + return -1; + } + } + return 0; +} + +static int raw_write(cfc_writer_t *w, const uint8_t *bs, size_t size, + int64_t pts_ns, int is_keyframe) +{ + (void)pts_ns; (void)is_keyframe; + if (fwrite(bs, 1, size, w->fp) != size) { + fprintf(stderr, "[cfc/writer/raw] fwrite: %s\n", strerror(errno)); + return -1; + } + return 0; +} + +static int raw_close(cfc_writer_t *w) +{ + if (w->fp) { + fflush(w->fp); + if (!w->is_stdout) fclose(w->fp); + w->fp = NULL; + } + return 0; +} + +/* ── MPEG-TS backend (libavformat) ────────────────────────────────────── */ + +static int mpegts_create(cfc_writer_t *w, const cfc_writer_config_t *cfg) +{ + int err; + w->is_stdout = is_stdout_path(cfg->path); + w->first_video_pts_ns = AV_NOPTS_VALUE; + w->first_audio_pts_ns = AV_NOPTS_VALUE; + + const char *url = w->is_stdout ? "pipe:1" : cfg->path; + + err = avformat_alloc_output_context2(&w->fmt_ctx, NULL, "mpegts", url); + if (err < 0 || !w->fmt_ctx) { + log_av_err("avformat_alloc_output_context2", err); + return -1; + } + + /* mpegts options: pat_pmt_at_frames — PAT/PMT перед каждым keyframe + * (быстрая tune-in). resend_headers — повторять. omit_video_pes_length — + * не ставить length в PES header (low-latency настройка). */ + av_opt_set(w->fmt_ctx->priv_data, "mpegts_flags", + "+resend_headers+pat_pmt_at_frames", 0); + + /* Video stream */ + w->video_stream = avformat_new_stream(w->fmt_ctx, NULL); + if (!w->video_stream) goto fail; + AVCodecParameters *vp = w->video_stream->codecpar; + vp->codec_type = AVMEDIA_TYPE_VIDEO; + vp->codec_id = AV_CODEC_ID_H264; + vp->codec_tag = 0; + vp->width = cfg->width; + vp->height = cfg->height; + vp->format = AV_PIX_FMT_YUV420P; + vp->bit_rate = (int64_t)cfg->bitrate_kbps * 1000; + vp->profile = FF_PROFILE_H264_HIGH; + + if (cfg->extradata && cfg->extradata_size > 0) { + vp->extradata = av_mallocz(cfg->extradata_size + AV_INPUT_BUFFER_PADDING_SIZE); + if (!vp->extradata) goto fail; + memcpy(vp->extradata, cfg->extradata, cfg->extradata_size); + vp->extradata_size = (int)cfg->extradata_size; + } + + w->video_stream->time_base = (AVRational){1, 90000}; + w->video_stream->avg_frame_rate = (AVRational){cfg->fps_num, cfg->fps_den}; + w->video_stream->r_frame_rate = w->video_stream->avg_frame_rate; + + /* Audio stream (опционально) */ + if (cfg->has_audio) { + w->audio_stream = avformat_new_stream(w->fmt_ctx, NULL); + if (!w->audio_stream) goto fail; + AVCodecParameters *ap = w->audio_stream->codecpar; + ap->codec_type = AVMEDIA_TYPE_AUDIO; + ap->codec_id = AV_CODEC_ID_AAC; + ap->codec_tag = 0; + ap->sample_rate = cfg->audio_sample_rate > 0 ? cfg->audio_sample_rate : 44100; + int nch = cfg->audio_channels > 0 ? cfg->audio_channels : 2; +#if LIBAVCODEC_VERSION_INT >= AV_VERSION_INT(59, 24, 100) + /* ffmpeg ≥ 5.1: AVChannelLayout API */ + ap->ch_layout.order = AV_CHANNEL_ORDER_NATIVE; + ap->ch_layout.nb_channels = nch; + ap->ch_layout.u.mask = (nch == 1) ? AV_CH_LAYOUT_MONO : AV_CH_LAYOUT_STEREO; +#else + /* ffmpeg 4.x: legacy channels + channel_layout */ + ap->channels = nch; + ap->channel_layout = (nch == 1) ? AV_CH_LAYOUT_MONO : AV_CH_LAYOUT_STEREO; +#endif + ap->format = AV_SAMPLE_FMT_FLTP; + ap->profile = FF_PROFILE_AAC_LOW; + + if (cfg->audio_extradata && cfg->audio_extradata_size > 0) { + ap->extradata = av_mallocz(cfg->audio_extradata_size + AV_INPUT_BUFFER_PADDING_SIZE); + if (!ap->extradata) goto fail; + memcpy(ap->extradata, cfg->audio_extradata, cfg->audio_extradata_size); + ap->extradata_size = (int)cfg->audio_extradata_size; + } + w->audio_stream->time_base = (AVRational){1, ap->sample_rate}; + } + + /* Open output. */ + if (!(w->fmt_ctx->oformat->flags & AVFMT_NOFILE)) { + err = avio_open(&w->fmt_ctx->pb, url, AVIO_FLAG_WRITE); + if (err < 0) { + log_av_err("avio_open", err); + goto fail; + } + } + + /* Low-latency: flushes packet immediately. */ + w->fmt_ctx->flags |= AVFMT_FLAG_FLUSH_PACKETS; + /* Малая интерливинг-очередь — 200мс. av_interleaved_write_frame + * накапливает packets per stream до этого предела, чтобы корректно + * сериализовать по DTS. 200мс = ~5 video frames @ 25 fps, ничтожно + * для CCTV. Без этого audio/video идут в порядке call'ов, downstream + * decoder'ы видят jitter и тормозит video. */ + w->fmt_ctx->max_interleave_delta = 200000; /* микросекунды */ + + err = avformat_write_header(w->fmt_ctx, NULL); + if (err < 0) { + log_av_err("avformat_write_header", err); + if (w->fmt_ctx->pb) avio_closep(&w->fmt_ctx->pb); + goto fail; + } + w->header_written = 1; + fprintf(stderr, + "[cfc/writer/mpegts] открыт %s — video %dx%d, %s\n", + url, cfg->width, cfg->height, + cfg->has_audio ? "video+audio" : "video-only"); + return 0; + +fail: + avformat_free_context(w->fmt_ctx); + w->fmt_ctx = NULL; + return -1; +} + +static int mpegts_write_video(cfc_writer_t *w, const uint8_t *bs, size_t size, + int64_t pts_ns, int is_keyframe) +{ + (void)pts_ns; /* video PTS — на frame_idx, не wall-clock (см. struct) */ + + AVPacket *pkt = av_packet_alloc(); + if (!pkt) return -1; + + /* av_new_packet — создаёт packet с правильной AVBufferRef, чтобы + * av_interleaved_write_frame мог корректно ref-count'ить ownership. + * Без этого manual av_malloc'нутая data вызывает heap corruption + * при последующем packet free внутри libav. */ + if (av_new_packet(pkt, (int)size) < 0) { av_packet_free(&pkt); return -1; } + memcpy(pkt->data, bs, size); + + int64_t duration = av_rescale_q(1, + (AVRational){w->video_stream->avg_frame_rate.den, + w->video_stream->avg_frame_rate.num}, + w->video_stream->time_base); + + pkt->stream_index = w->video_stream->index; + pkt->pts = w->video_frame_idx * duration; + pkt->dts = pkt->pts; + pkt->duration = duration; + pkt->flags = is_keyframe ? AV_PKT_FLAG_KEY : 0; + w->video_frame_idx++; + + int err = av_interleaved_write_frame(w->fmt_ctx, pkt); + av_packet_free(&pkt); + if (err < 0) { + log_av_err("av_interleaved_write_frame[video]", err); + return -1; + } + return 0; +} + +static int mpegts_write_audio(cfc_writer_t *w, const uint8_t *bs, size_t size, + int64_t pts_ns) +{ + if (!w->audio_stream) return -1; + if (w->first_audio_pts_ns == AV_NOPTS_VALUE) w->first_audio_pts_ns = pts_ns; + int64_t rel_pts = pts_ns - w->first_audio_pts_ns; + + AVPacket *pkt = av_packet_alloc(); + if (!pkt) return -1; + + if (av_new_packet(pkt, (int)size) < 0) { av_packet_free(&pkt); return -1; } + memcpy(pkt->data, bs, size); + + pkt->stream_index = w->audio_stream->index; + pkt->pts = av_rescale_q(rel_pts, (AVRational){1, 1000000000}, + w->audio_stream->time_base); + pkt->dts = pkt->pts; + pkt->flags = AV_PKT_FLAG_KEY; /* AAC frames всегда keyframe */ + + int err = av_interleaved_write_frame(w->fmt_ctx, pkt); + av_packet_free(&pkt); + if (err < 0) { + log_av_err("av_interleaved_write_frame[audio]", err); + return -1; + } + return 0; +} + +static int mpegts_close(cfc_writer_t *w) +{ + if (w->fmt_ctx) { + if (w->header_written) av_write_trailer(w->fmt_ctx); + if (w->fmt_ctx->pb && !(w->fmt_ctx->oformat->flags & AVFMT_NOFILE)) { + avio_closep(&w->fmt_ctx->pb); + } + avformat_free_context(w->fmt_ctx); + w->fmt_ctx = NULL; + } + return 0; +} + +/* ── Public API ───────────────────────────────────────────────────────── */ + +int cfc_writer_create(const cfc_writer_config_t *cfg, cfc_writer_t **out) +{ + if (!cfg || !cfg->path || !cfg->format || !out) return -1; + + cfc_writer_t *w = calloc(1, sizeof(*w)); + if (!w) return -1; + + int rc; + if (!strcmp(cfg->format, "h264")) { + w->backend = CFC_WRITER_RAW; + rc = raw_create(w, cfg); + } else if (!strcmp(cfg->format, "mpegts")) { + w->backend = CFC_WRITER_MPEGTS; + rc = mpegts_create(w, cfg); + } else { + fprintf(stderr, "[cfc/writer] unknown format '%s'\n", cfg->format); + free(w); + return -1; + } + + if (rc != 0) { free(w); return -1; } + *out = w; + return 0; +} + +int cfc_writer_write(cfc_writer_t *w, const uint8_t *bs, size_t size, + int64_t pts_ns, int is_keyframe) +{ + if (!w || !bs || size == 0) return -1; + if (w->backend == CFC_WRITER_RAW) { + return raw_write(w, bs, size, pts_ns, is_keyframe); + } + return mpegts_write_video(w, bs, size, pts_ns, is_keyframe); +} + +int cfc_writer_write_audio(cfc_writer_t *w, const uint8_t *bs, size_t size, + int64_t pts_ns) +{ + if (!w || !bs || size == 0) return -1; + if (w->backend != CFC_WRITER_MPEGTS) return -1; + return mpegts_write_audio(w, bs, size, pts_ns); +} + +int cfc_writer_close(cfc_writer_t *w) +{ + if (!w) return 0; + int rc; + if (w->backend == CFC_WRITER_RAW) { + rc = raw_close(w); + } else { + rc = mpegts_close(w); + } + free(w); + return rc; +}