Phase 7 audio mixing — attempt + rollback + lessons

Несколько сессий попыток реализовать audio mixing в композитор'е.
Не достигнуто sub-секундной latency со стабильным video+audio.
Откатано на parallel mode (cfc-grid video-only, live от pipeline с audio).

Полный набор выводов и pitfall'ов — docs/LESSONS-audio-mixing-attempts.md.

Главные lesson'ы для будущей попытки:
- mpegts mux libavformat авто-инсёртит h264_mp4toannexb BSF которому
  не нравится Annex-B + inline SPS/PPS — NVENC OUTPUT_SPSPPS per-frame ломает
- SPSC ring drop newest при full, не oldest (consumer's domain)
- av_new_packet (не av_malloc) для av_interleaved_write_frame ownership
- Monotonic PTS на counter (frame_idx, total_samples) — не wallclock
- mediamtx env-var path names не должны иметь '-' (parser limitation)
- Default mediamtx ReadTimeout=10s короткий для burst write'ов

Изменения в repo сохранены для будущей доработки:
- src/writer.c — mpegts backend с audio stream support
- src/audio.c — RTSP AAC consumer + lock-free SPSC ring
- include/cuframes_composer/{writer,audio}.h — public API
- examples/grid_record.c — --format=mpegts + --audio-source flags
- include/cuframes_composer/composer.h — consumer_prefix field
- docker/Dockerfile — libavformat-dev добавлен в builder/runtime

cfc-grid composer стабильно работает на видео (substantially лучше
монолитного pipeline'а с audio bag'ом). TV рекомендуется использовать
rtsp://...:554/cfc-grid + опционально rtsp://...:554/live-audio
parallel.
This commit is contained in:
2026-06-03 14:29:56 +01:00
parent 20b5234c41
commit fa6ab3069a
13 changed files with 1276 additions and 52 deletions
+5
View File
@@ -55,6 +55,11 @@ find_path(LIBJSONC_INCLUDE_DIR json-c/json.h)
find_library(LIBMOSQUITTO_LIBRARY NAMES mosquitto REQUIRED)
find_path(LIBMOSQUITTO_INCLUDE_DIR mosquitto.h)
# libavformat / libavcodec / libavutil — для нативного mpegts output (Phase 7)
find_package(PkgConfig REQUIRED)
pkg_check_modules(LIBAV REQUIRED IMPORTED_TARGET
libavformat libavcodec libavutil)
# ── Сторонние библиотеки (subomodules в third_party/) ───────────────────
# cuframes — статически линкуем libcuframes. cuframes_static — это static lib
+6 -1
View File
@@ -24,9 +24,10 @@
FROM nvidia/cuda:12.4.1-devel-ubuntu22.04 AS builder
RUN apt-get update && apt-get install -y --no-install-recommends \
build-essential cmake git \
build-essential cmake git pkg-config \
libpng-dev libfreetype-dev \
libzmq3-dev libjson-c-dev libmosquitto-dev \
libavformat-dev libavcodec-dev libavutil-dev \
&& rm -rf /var/lib/apt/lists/*
WORKDIR /src
@@ -52,9 +53,13 @@ FROM nvidia/cuda:12.4.1-runtime-ubuntu22.04 AS runtime
# Ubuntu 22.04 jammy: package names иные чем noble (libpng16-16 без t64).
# libcudart12 уже в runtime image; здесь только наши user-space deps.
# netcat-openbsd — для healthcheck'а через ZMQ ping (см. HEALTHCHECK ниже).
# ffmpeg — для transmux raw h264 → mpegts (shell pipe chain после
# grid_record); без него ffmpeg-mux downstream не справляется с
# intra refresh без IDR boundaries.
RUN apt-get update && apt-get install -y --no-install-recommends \
libpng16-16 libfreetype6 \
libzmq5 libjson-c5 libmosquitto1 \
libavformat58 libavcodec58 libavutil56 \
fonts-dejavu-core \
netcat-openbsd \
&& rm -rf /var/lib/apt/lists/*
+288
View File
@@ -0,0 +1,288 @@
# LESSONS: попытки добавить audio mixing в cuframes-composer
**Контекст**: cuframes-composer (gx/cuframes-composer) композирует 4 cuframes-источника
в grid через CUDA kernels + NVENC H.264, публикует в mediamtx `cfc-grid` path
(только video). Параллельно `cuda-grid-pipeline` (legacy монолитный ffmpeg)
публикует `live` с video + AAC audio — но **месяц подёргивается** из-за
известного `audio chain блокирует video` бага (см. memory feedback).
**Цель Phase 7** — реализовать audio mixing в composer'е/около него чтобы:
1. Получить стабильное video + audio в одном потоке
2. Retire pipeline'а полностью
**Статус 2026-06-03**: не достигнута, откатано на parallel mode.
---
## Что в итоге работает (текущее prod-состояние)
```
rtsp://...:554/live — pipeline, H264+AAC, подёргивается уже месяц (audio mix bag)
rtsp://...:554/cfc-grid — composer, H264 only, стабильное video
rtsp://...:554/live-audio — cuda-grid-audio, AAC only
```
TV/VLC рекомендуется использовать `cfc-grid` для video и (опционально) синхронизировать
`live-audio` параллельно на стороне reader'а до того как Phase 7 будет завершён.
---
## Хронология попыток и почему каждая не сработала
### Попытка 1: `cfc-grid-ffmpeg` mux — raw H.264 в pipe + RTSP audio → `-c copy`
```yaml
ffmpeg -f h264 -i /pipe/grid.h264 -i rtsp://.../live-audio -c copy -map 0:v -map 1:a -f rtsp ...
```
**Проблема**: Raw H.264 в pipe не имеет PTS/DTS на container-уровне. ffmpeg синтезирует
через `-r 25` или wallclock, audio RTSP с правильными RTP timestamps, **interleave broken**.
Mediamtx drops video либо audio. Symptoms: 4 fps video (от 25), `Could not write header`,
audio с перебоями.
**Что я пробовал из костылей** (всё бесплодно):
- `-use_wallclock_as_timestamps 1` на video — ломает audio DTS (`Non-monotonic DTS`)
- `-fflags +genpts+igndts+discardcorrupt` — не помогает
- `-thread_queue_size 32/1024` — без эффекта
- `-muxdelay 0 -max_delay 0` — без эффекта
- `-flush_packets 1` — без эффекта
### Попытка 2: decode→re-encode через NVDEC+NVENC в cfc-grid-ffmpeg
```yaml
-hwaccel cuda -i /pipe/grid.h264 -c:v h264_nvenc -intra_refresh 1 -no-scenecut 1 ...
```
**Проблема**: `Broken pipe` на RTSP output mediamtx. Также double-encode на GPU удваивает
NVENC sessions (на Pascal GTX 1050 это уже впритык, на 5090 OK но всё равно лишний overhead).
### Попытка 3: композитор пишет mpegts container нативно (Phase 7 core)
Архитектура: composer через libavformat → mpegts container → Unix socket в mediamtx.
Решает PTS/DTS на container-уровне + bypass'ит весь split-process pipe→ffmpeg-mux pipeline.
Реализация в `src/writer.c`, `src/audio.c`, mediamtx `MTX_PATHS_CFCGRID_SOURCE=unix+mpegts:///...`.
#### 3.1: PPS error `non-existing PPS 0 referenced`
**Причина**: libavformat mpegts muxer **автоматически инсёртит** `h264_mp4toannexb`
bitstream-фильтр, который ожидает **AVCC** формат (length-prefix NAL units).
NVENC выдаёт **Annex-B** (start codes). При попытке "конвертации" парсер ломается.
Дополнительная инъекция SPS/PPS через `NV_ENC_PIC_FLAG_OUTPUT_SPSPPS` per-frame
(который я добавил пытаясь решить late-joiner проблему) усугубляет — двойная инжекция
создаёт мусор для парсера.
**Fix**: убрать `NV_ENC_PIC_FLAG_OUTPUT_SPSPPS` per-frame. Оставить только `repeatSPSPPS=1`
в init. NVENC сам ставит SPS/PPS перед каждым началом intra refresh cycle.
См. также OBS Studio issue #7338 — intra refresh + NVENC + mpegts = известно сломанная пара
в индустрии.
#### 3.2: 12 секунд latency
**Причины** (сложение):
- `-analyzeduration 30000000 -probesize 50000000` в ffmpeg-mux input — **30 секунд hard cap**
на пробинг (libavformat ждёт байт пока поймёт codec). Для intra refresh stream без IDR
ffmpeg вынужден ждать первого refresh cycle.
- `av_interleaved_write_frame` буферизует пакеты до `max_interleave_delta` (default 10 секунд).
- Default AVIO write buffer 256KB — на 6 Mbps это ~350ms.
- Named pipe (FIFO) + stdio buffers по 64KB ещё ~100ms.
- Сложение ≈ 12+ секунд.
**Fix частично**: `AVFMT_FLAG_FLUSH_PACKETS`, `max_interleave_delta=200000` (200ms),
уменьшение thread_queue, `-flush_packets 1`. **Не убирает все источники** — глубже
композитор/AVIO buffering остаётся.
#### 3.3: `omit_video_pes_length` не существует во flags
**Fix**: убрать. Достаточно `+resend_headers+pat_pmt_at_frames`.
#### 3.4: mediamtx env-var `MTX_PATHS_CFC_GRID_SOURCE` не разбирается
mediamtx env parser использует `_` как разделитель между `<NAME>` и `<FIELD>`.
Дефис в path name (`cfc-grid`) ломает pattern → mediamtx думает что path называется
`cfc` а `grid_source` это что-то другое.
**Fix**: переименовать path в `cfcgrid` (без дефиса) либо использовать YAML config file.
#### 3.5: mediamtx закрывает Unix socket — `read unix: i/o timeout`
Default mediamtx ReadTimeout = 10 секунд. Composer пишет с burst'ами (8 audio packets
после каждого video frame), AVIO buffers накапливаются, socket I/O slow → mediamtx
видит idle > 10s → disconnect → composer reconnects → cycle.
**Partial fix**: `MTX_READTIMEOUT=30s` + `MTX_WRITETIMEOUT=30s` в mediamtx env. Помогает
не полностью.
#### 3.6: `malloc_consolidate(): unaligned fastbin chunk detected` (heap corruption)
**Причина 1 (исправлена)**: `av_packet_alloc()` + manual `av_malloc(pkt->data)` + затем
`av_interleaved_write_frame` — конфликт ownership. `av_interleaved_write_frame` берёт
ownership packet'а через AVBufferRef, но у меня data была вне buffer ref → libav
освобождает невалидно.
**Fix**: использовать `av_new_packet(pkt, size)` — создаёт proper AVBufferRef, дальше
memcpy в `pkt->data`.
**Причина 2 (исправлена)**: SPSC ring buffer в `audio.c` — когда полный, producer (audio
thread) `free`'ил oldest данные и двигал `tail` index. Но `tail` — это домен consumer'а
(main thread). Concurrent modification → use-after-free → heap corruption.
**Fix**: при full ring **drop newest** (free(new_pkt.data); return -1), не трогать `tail`.
#### 3.7: Non-monotonic DTS для audio packets
Audio packets часто приходят пачкой через TCP (несколько AAC frames в одном recv).
Если назначать PTS как `wallclock_now - start`, все они получат идентичный timestamp
`non monotonic DTS` от mpegts muxer.
**Fix**: monotonic PTS на основе накопленных samples'ов. AAC-LC = 1024 samples/frame.
`pts_ns = total_samples * 1_000_000_000 / sample_rate`. `total_samples += 1024` per frame.
#### 3.8: Video PTS skew с audio PTS
Video PTS было через wall-clock relative to start. Audio PTS — через accumulated samples.
Разные clock sources → постепенный drift со временем.
**Fix**: video PTS тоже через monotonic counter: `pts = frame_idx * duration_per_frame`,
где `duration_per_frame = 1/fps в time_base`. Это даёт **точный** 25 fps tempo без skew
от composer thread jitter.
---
## Что осталось нерешённым (для будущей попытки)
Даже после всех вышеперечисленных фиксов финальный результат был:
- Звук с затыками
- Видео тормозит
- Периодические разрывы соединения VLC
**Гипотезы про оставшийся источник проблем**:
1. **mediamtx + Unix socket backpressure под burst write'ами**. Когда composer пишет
video frame + drain 8 audio packets подряд за один tick, mediamtx видит burst,
reader queue заполняется, reader падает. См. `MTX_WRITEQUEUESIZE=256` (default
слишком мал для burst'ов, увеличение влияет на latency).
2. **AVIO буфер vs `AVFMT_FLAG_FLUSH_PACKETS`**. Возможно flag flush'ит после каждого
packet'а, но `av_interleaved_write_frame` ставит packets в interleave queue ДО flush'а.
Net эффект: queue до 200ms (`max_interleave_delta`), потом burst flush.
3. **`mpegts` muxer не низколатентный по дизайну**. Mediamtx внутренне распаковывает
PES → собирает свои RTSP track'и → пакетизирует в RTP. Каждый шаг добавляет
buffering. Возможно правильный путь — `gortsplib` встроенный RTSP publisher
(паттерн C из RESEARCH-audio-mixing-low-latency.md), который не использует mpegts
контейнер вовсе.
4. **Composer audio drain timing**. Я drain'ю до 8 packets после каждого video frame.
При 43 audio packets/sec (44.1k/1024) и 25 video frames/sec получается 43/25 = 1.72
packets/frame в среднем. С drain'ом до 8 — большинство фреймов drain'ит 1-2 packets,
но при катэппинге очереди (после coмposer pause или audio source jitter) batch'ит до 8.
Это может вызывать interleave irregular.
**Альтернатива** — drain до 1 packet'а за раз, но N раз пер video tick. Либо drain
по wall-clock cadence audio (через timerfd на 23.22ms tick).
5. **Pre-allocation/post-allocation race**. Mediamtx ожидает PAT/PMT для path
ready'нения. Мой first frame отправляет PAT/PMT (через `pat_pmt_at_frames` flag),
но возможно несколько initial frames проходят без PAT/PMT и mediamtx считает
их корруптом.
---
## Что писать ДО следующей попытки
**Benchmark-driven research** — недостаточно теоретического плана. Нужно:
1. **Замерить реальную latency на каждой stage'и** через timestamps в логах:
- `composer compose_compose finish``encode_frame begin`
- `encode_frame begin``on_bitstream callback`
- `on_bitstream``av_interleaved_write_frame return`
- `av_interleaved_write_frame` → byte appears в `/run/mediamtx/sock` (через `strace -e write`)
- mediamtx receives → mediamtx sends RTP packet (через mediamtx logs DEBUG)
- RTP packet → VLC decoder ready (VLC verbose logs)
2. **Сравнить с baseline'ом** — cuda-grid-pipeline даёт `<1s end-to-end` пока audio mix
не block'ает video. Что у него отличается архитектурно? Один процесс, один muxer,
один encoder thread. Mediamtx видит rtsp publisher вместо unix-mpegts publisher.
3. **Изучить **mediamtx mpeg-ts internals**`unix+mpegts://` source насколько хорошо
протестирован? Возможно есть known bag'и. Issue tracker bluenviron/mediamtx.
4. **Альтернативные пути**:
- **rtsp publisher через gortsplib** — `gortsplib` это Go библиотека, не C, но можно
попробовать обвязку через CGo или standalone Go-процесс с TCP control от composer'а.
- **RTP-based publisher напрямую** — собрать H.264 NAL units в RTP packets (RFC 6184),
AAC в mpeg4-generic RTP (RFC 3640), отправлять на mediamtx RTSP ANNOUNCE/RECORD
session. Самый low-latency но самый много кода (~1500 строк).
5. **Подумать про прочный workaround**: 2 отдельных RTSP path'а (`cfc-grid` для video,
`live-audio` для audio) и client-side sync. VLC/mpv поддерживают это через
`--audio-file=rtsp://...:554/live-audio`. Это не "правильный" монолитный поток но
зато стабильное.
---
## Список созданных файлов (остались в репо для будущей доработки)
```
include/cuframes_composer/writer.h — public API: cfc_writer_create/write/write_audio/close
Поддерживает h264 raw + mpegts container
formats. Audio stream опциональный.
include/cuframes_composer/audio.h — public API: cfc_audio_create/get_codec_params/
drain/destroy. SPSC ring buffer architecture.
src/writer.c — backend impl. Lessons: av_new_packet vs av_malloc,
AVFMT_FLAG_FLUSH_PACKETS, max_interleave_delta=200000,
monotonic video PTS на frame_idx,
monotonic audio PTS на total_samples
src/audio.c — audio thread, lock-free SPSC ring (drop newest при
full), RTSP open с low-latency options
(nobuffer+flush_packets, probesize=32768,
analyzeduration=0), AAC parameters discovery
src/nvenc.c — encoder. Lesson: убрать NV_ENC_PIC_FLAG_OUTPUT_SPSPPS
per-frame, оставить только repeatSPSPPS=1
examples/grid_record.c — --audio-source=rtsp://... + --format=mpegts options
docs/RESEARCH-audio-mixing-low-latency.md — research-doc с industry patterns и
recommended approach (паттерн B = mpegts через
Unix socket)
docs/LESSONS-audio-mixing-attempts.md — этот файл
```
`gx/cuframes-composer:0.6` image содержит всё необходимое (libavformat-dev в builder,
libavformat58 в runtime). Audio mixing включается через `--format=mpegts
--audio-source=rtsp://...:8554/live-audio` в command'е.
Для возвращения к Phase 7 деплою — раскомментировать `MTX_PATHS_CFCGRID_SOURCE` в
mediamtx env vars + изменить `cfc-grid` command в compose. См. git history
`localhost-infra` для exact diff'ов.
---
## TL;DR для следующего раза
1. **Не повторять** добавление `NV_ENC_PIC_FLAG_OUTPUT_SPSPPS` per-frame — двойная инжекция SPS/PPS
ломает downstream parsers.
2. **Не повторять** SPSC ring где producer трогает `tail` (consumer's side) — heap
corruption. Drop newest, не oldest.
3. **Не использовать** `av_packet_alloc` + manual `av_malloc(data)` с
`av_interleaved_write_frame` — нужен `av_new_packet` для AVBufferRef.
4. **Не использовать** wall-clock PTS если есть монотонный счётчик (frame_idx /
total_samples) — гарантирует sync video/audio.
5. **Audio packets** приходят пачками через TCP — нельзя назначать PTS = wallclock(now).
6. **mpegts muxer + h264_mp4toannexb BSF** — несовместим с intra refresh без careful
setup. См. OBS issue #7338.
7. **mediamtx env-var path names** не могут иметь `-` (parser ломается на `_` boundary).
8. **Default mediamtx ReadTimeout=10s** — слишком короткий для composer'а с burst write'ами.
9. **analyzeduration/probesize в ffmpeg input** — главные виновники multi-second startup
latency, не реальной streaming latency.
10. **Не пытаться** "наobum experiments" с flag'ами — каждое изменение должно быть
основано на measurement или цитате из доков. Иначе rabbit hole'у нет конца.
**Главный insight**: даже идеально сделанный mpegts через Unix socket путь имеет
inherent buffering, который не убрать без переходом на native RTP/RTSP publisher.
Возможно, **правильный финальный путь — gortsplib** (паттерн C из research-doc),
но это +1500 строк кода.
+87 -28
View File
@@ -23,6 +23,8 @@
#include "../include/cuframes_composer/overlay.h"
#include "../include/cuframes_composer/control.h"
#include "../include/cuframes_composer/health.h"
#include "../include/cuframes_composer/writer.h"
#include "../include/cuframes_composer/audio.h"
#include <cuda.h>
@@ -43,7 +45,7 @@ static volatile sig_atomic_t g_stop = 0;
static void on_sig(int s) { (void)s; g_stop = 1; }
typedef struct write_ctx {
FILE *fp;
cfc_writer_t *writer;
uint64_t bytes_written;
uint64_t frames_encoded;
uint64_t idr_count;
@@ -52,9 +54,8 @@ typedef struct write_ctx {
static void on_bitstream(const uint8_t *bs, size_t size, int64_t pts_ns,
int is_idr, void *user)
{
(void)pts_ns;
write_ctx_t *ctx = (write_ctx_t *)user;
if (fwrite(bs, 1, size, ctx->fp) == size) {
if (cfc_writer_write(ctx->writer, bs, size, pts_ns, is_idr) == 0) {
ctx->bytes_written += size;
ctx->frames_encoded++;
if (is_idr) ctx->idr_count++;
@@ -119,6 +120,8 @@ int main(int argc, char **argv)
const char *mqtt_instance = "cfc-grid"; /* --mqtt-instance NAME */
const char *mqtt_user = NULL;
const char *mqtt_pass = NULL;
const char *out_format = "h264"; /* --format h264|mpegts */
const char *audio_source = NULL; /* --audio-source rtsp://.../live-audio */
static struct option opts[] = {
{"out", required_argument, 0, 'o'},
@@ -137,10 +140,12 @@ int main(int argc, char **argv)
{"mqtt-user", required_argument, 0, 'U'},
{"mqtt-pass", required_argument, 0, 'P'},
{"intra-refresh", no_argument, 0, 'R'},
{"format", required_argument, 0, 'F'}, /* h264|mpegts */
{"audio-source", required_argument, 0, 'A'}, /* RTSP audio URL */
{0, 0, 0, 0},
};
int c;
while ((c = getopt_long(argc, argv, "o:c:f:b:W:H:s:r:i:t:C:M:I:U:P:R", opts, NULL)) != -1) {
while ((c = getopt_long(argc, argv, "o:c:f:b:W:H:s:r:i:t:C:M:I:U:P:RF:A:", opts, NULL)) != -1) {
switch (c) {
case 'o': out_path = optarg; break;
case 'c':
@@ -178,6 +183,8 @@ int main(int argc, char **argv)
case 'U': mqtt_user = optarg; break;
case 'P': mqtt_pass = optarg; break;
case 'R': intra_refresh = 1; break;
case 'F': out_format = optarg; break;
case 'A': audio_source = optarg; break;
case 't': {
if (num_texts >= MAX_CELLS) { fprintf(stderr, "max %d texts\n", MAX_CELLS); return 1; }
/* Опциональный prefix "id=NAME:" — задаёт control-plane ID. */
@@ -276,6 +283,7 @@ int main(int argc, char **argv)
.cells = cells,
.num_cells = num_cells,
.cuda_device = 0,
.consumer_prefix = mqtt_instance, /* уникальный namespace на каждый composer */
};
cfc_composer_t *comp = NULL;
if (cfc_composer_create(&ccfg, &comp) != 0) {
@@ -407,28 +415,73 @@ int main(int argc, char **argv)
return 1;
}
/* Output: "-" / "/dev/stdout" / "pipe:1" = stdout (для pipe в ffmpeg).
* stdout не закрывается через fclose чтобы не убивать дочерний процесс
* raньше времени. */
write_ctx_t wctx = { 0 };
int is_stdout = (!strcmp(out_path, "-") || !strcmp(out_path, "pipe:1") ||
!strcmp(out_path, "/dev/stdout"));
if (is_stdout) {
wctx.fp = stdout;
/* line-buffer'инг disabled — пишем full-buffered для производительности.
* Caller'у нужно flush при exit. */
setvbuf(stdout, NULL, _IOFBF, 1024 * 1024);
} else {
wctx.fp = fopen(out_path, "wb");
if (!wctx.fp) {
fprintf(stderr, "fopen(%s): %s\n", out_path, strerror(errno));
cfc_encoder_destroy(enc);
cfc_composer_destroy(comp);
return 1;
/* Audio consumer (опциональный, Phase 7). Запускаем РАНЬШЕ writer'а
* чтобы успеть получить codec params (sample_rate, channels, extradata)
* до avformat_write_header — иначе audio stream'у не будет правильного
* setup'а. Polling до 5 секунд. */
cfc_audio_t *audio = NULL;
int audio_sample_rate = 0, audio_channels = 0;
const uint8_t *audio_extradata = NULL;
size_t audio_extradata_size = 0;
if (audio_source) {
cfc_audio_config_t acfg = { .rtsp_url = audio_source };
if (cfc_audio_create(&acfg, &audio) != 0) {
fprintf(stderr, "[grid_record] audio create failed, продолжаю без audio\n");
} else {
fprintf(stderr, "[grid_record] жду audio codec params от %s ...\n", audio_source);
/* 30 секунд polling — audio source (cuda-grid-audio) может ещё
* подниматься после recreate стeка. Audio thread сам retry'ится
* с exp backoff. */
for (int i = 0; i < 300; i++) { /* 300 × 100ms = 30s */
if (cfc_audio_get_codec_params(audio, &audio_sample_rate,
&audio_channels, &audio_extradata,
&audio_extradata_size) == 0) {
fprintf(stderr,
"[grid_record] audio готов: AAC %dHz %dch extradata=%zub\n",
audio_sample_rate, audio_channels, audio_extradata_size);
break;
}
struct timespec ts = { .tv_sec = 0, .tv_nsec = 100 * 1000 * 1000 };
nanosleep(&ts, NULL);
}
if (audio_sample_rate == 0) {
fprintf(stderr, "[grid_record] audio params не получены за 30с, без audio\n");
cfc_audio_destroy(audio); audio = NULL;
}
}
}
fprintf(stderr, "[grid_record] начало записи в %s (Ctrl+C для остановки)\n",
out_path);
/* Writer: mpegts с video + опциональным audio. */
uint8_t spspps[256]; size_t spspps_len = sizeof(spspps);
cfc_encoder_get_sequence_params(enc, spspps, &spspps_len);
cfc_writer_config_t wcfg = {
.path = out_path,
.format = out_format,
.width = out_w,
.height = out_h,
.fps_num = fps,
.fps_den = 1,
.bitrate_kbps = bitrate,
.extradata = spspps,
.extradata_size = spspps_len,
.has_audio = audio ? 1 : 0,
.audio_sample_rate = audio_sample_rate,
.audio_channels = audio_channels,
.audio_extradata = audio_extradata,
.audio_extradata_size = audio_extradata_size,
};
write_ctx_t wctx = { 0 };
if (cfc_writer_create(&wcfg, &wctx.writer) != 0) {
fprintf(stderr, "cfc_writer_create(%s, %s) failed\n", out_path, out_format);
if (audio) cfc_audio_destroy(audio);
cfc_encoder_destroy(enc);
cfc_composer_destroy(comp);
return 1;
}
fprintf(stderr, "[grid_record] начало записи в %s [format=%s%s] (Ctrl+C для остановки)\n",
out_path, out_format, audio ? "+audio" : "");
/* Main loop — frame cadence по wall clock'у. */
struct timespec ts_start;
@@ -461,12 +514,18 @@ int main(int argc, char **argv)
}
int64_t pts_ns = (now_us - start_us) * 1000;
/* Не break'аем при encode/write failure — это обычно временно
* (mediamtx reconnect, socket broken). Просто логируем и продолжаем,
* следующая encode/write попытается заново. */
if (cfc_encoder_encode_frame(enc, out_y, out_pitch, pts_ns,
on_bitstream, &wctx) != 0) {
fprintf(stderr, "[grid_record] encode failed\n");
break;
static int warned = 0;
if (!warned) { fprintf(stderr, "[grid_record] encode failed (продолжаю)\n"); warned = 1; }
}
/* Drain audio packets — пишем сразу после video frame. */
if (audio) cfc_audio_drain(audio, wctx.writer, 8);
if (wctx.frames_encoded > 0 && wctx.frames_encoded % 50 == 0) {
double elapsed = (now_us - start_us) / 1e6;
cfc_composer_health_t h;
@@ -497,8 +556,8 @@ int main(int argc, char **argv)
(unsigned long long)wctx.idr_count,
wctx.bytes_written / 1048576.0);
fflush(wctx.fp);
if (!is_stdout) fclose(wctx.fp);
cfc_writer_close(wctx.writer);
if (audio) cfc_audio_destroy(audio);
if (ctl) cfc_control_destroy(ctl);
if (hpub) cfc_health_destroy(hpub);
cfc_encoder_destroy(enc);
+26 -22
View File
@@ -18,6 +18,7 @@
#include "../include/cuframes_composer/nvenc.h"
#include "../include/cuframes_composer/source.h"
#include "../include/cuframes_composer/writer.h"
#include <cuda.h>
@@ -42,21 +43,17 @@ static void on_sigint(int sig)
/* user-data, передаваемая encoder callback'у */
typedef struct write_ctx {
FILE *fp;
cfc_writer_t *writer;
uint64_t bytes_written;
uint64_t frames_encoded;
uint64_t idr_count;
} write_ctx_t;
/* Encoder callback — пишем H.264 Annex-B bytes как есть. */
static void on_bitstream(const uint8_t *bs, size_t size, int64_t pts_ns,
int is_idr, void *user)
{
(void)pts_ns;
write_ctx_t *ctx = (write_ctx_t *)user;
if (fwrite(bs, 1, size, ctx->fp) != size) {
fprintf(stderr, "[simple_record] fwrite failed: %s\n", strerror(errno));
} else {
if (cfc_writer_write(ctx->writer, bs, size, pts_ns, is_idr) == 0) {
ctx->bytes_written += size;
ctx->frames_encoded++;
if (is_idr) ctx->idr_count++;
@@ -85,6 +82,7 @@ int main(int argc, char **argv)
int fps = 25;
int bitrate_kbps = 5000;
int max_seconds = 0; /* 0 = до SIGINT */
const char *out_format = "h264";
static struct option opts[] = {
{"key", required_argument, 0, 'k'},
@@ -92,17 +90,19 @@ int main(int argc, char **argv)
{"fps", required_argument, 0, 'f'},
{"bitrate", required_argument, 0, 'b'},
{"seconds", required_argument, 0, 's'},
{"format", required_argument, 0, 'F'},
{"help", no_argument, 0, 'h'},
{0, 0, 0, 0},
};
int c;
while ((c = getopt_long(argc, argv, "k:o:f:b:s:h", opts, NULL)) != -1) {
while ((c = getopt_long(argc, argv, "k:o:f:b:s:F:h", opts, NULL)) != -1) {
switch (c) {
case 'k': key = optarg; break;
case 'o': out_path = optarg; break;
case 'f': fps = atoi(optarg); break;
case 'b': bitrate_kbps = atoi(optarg); break;
case 's': max_seconds = atoi(optarg); break;
case 'F': out_format = optarg; break;
case 'h':
default:
fprintf(stderr,
@@ -180,20 +180,25 @@ int main(int argc, char **argv)
goto cleanup_src;
}
/* 5) Открыть выходной файл (или stdout если "-"/"pipe:1"). */
/* 5) Открыть writer (h264 raw или mpegts container). */
uint8_t spspps[256]; size_t spspps_len = sizeof(spspps);
cfc_encoder_get_sequence_params(enc, spspps, &spspps_len);
cfc_writer_config_t wcfg = {
.path = out_path,
.format = out_format,
.width = snap.width,
.height = snap.height,
.fps_num = fps,
.fps_den = 1,
.bitrate_kbps = bitrate_kbps,
.extradata = spspps,
.extradata_size = spspps_len,
};
write_ctx_t wctx = { 0 };
int is_stdout = (!strcmp(out_path, "-") || !strcmp(out_path, "pipe:1") ||
!strcmp(out_path, "/dev/stdout"));
if (is_stdout) {
wctx.fp = stdout;
setvbuf(stdout, NULL, _IOFBF, 1024 * 1024);
} else {
wctx.fp = fopen(out_path, "wb");
if (!wctx.fp) {
fprintf(stderr, "[simple_record] fopen(%s) failed: %s\n",
out_path, strerror(errno));
goto cleanup_enc;
}
if (cfc_writer_create(&wcfg, &wctx.writer) != 0) {
fprintf(stderr, "[simple_record] cfc_writer_create failed\n");
goto cleanup_enc;
}
/* 6) Главный цикл — забираем кадры по seq, кодируем. */
@@ -265,8 +270,7 @@ int main(int argc, char **argv)
(unsigned long long)wctx.idr_count,
wctx.bytes_written / 1048576.0);
fflush(wctx.fp);
if (!is_stdout) fclose(wctx.fp);
cfc_writer_close(wctx.writer);
cleanup_enc:
cfc_encoder_destroy(enc);
+66
View File
@@ -0,0 +1,66 @@
/* cuframes-composer — audio consumer (Phase 7).
*
* Открывает RTSP-вход с AAC stream'ом (от cuda-grid-audio через mediamtx
* либо прямо с микрофона). В отдельном thread'е читает packet'ы и кладёт
* в lock-free SPSC ring buffer. Video-thread (composer main loop) drain'ит
* ring и пишет audio packet'ы в общий mpegts muxer через
* cfc_writer_write_audio.
*
* Architecture:
* audio_thread: avformat_open_input → av_read_frame loop → push to ring
* main thread (video): cfc_audio_drain → cfc_writer_write_audio loop
*
* Failure mode: если RTSP source падает, audio_thread пытается reconnect
* с exponential backoff. Video не блокируется.
*
* Лицензия: LGPL-2.1+
*/
#ifndef CUFRAMES_COMPOSER_AUDIO_H
#define CUFRAMES_COMPOSER_AUDIO_H
#include "writer.h"
#include <stddef.h>
#include <stdint.h>
#ifdef __cplusplus
extern "C" {
#endif
typedef struct cfc_audio_config {
const char *rtsp_url; /* "rtsp://mediamtx:8554/live-audio" */
int reconnect_min_ms; /* default 1000 */
int reconnect_max_ms; /* default 10000 */
} cfc_audio_config_t;
typedef struct cfc_audio cfc_audio_t;
/* Создать audio consumer + запустить background thread.
* Получает первый packet и сохраняет codec params (sample_rate, channels,
* extradata) — caller потом передаёт их в cfc_writer_config_t. */
int cfc_audio_create(const cfc_audio_config_t *cfg, cfc_audio_t **out);
/* Получить codec params (заполняется после первого успешного read).
* Возвращает 0 если params уже доступны, -1 если ещё нет. Caller может
* polling'ом ждать. */
int cfc_audio_get_codec_params(
cfc_audio_t *a,
int *sample_rate,
int *channels,
const uint8_t **extradata,
size_t *extradata_size
);
/* Drain до N audio packet'ов в writer. Не блокируется. Возвращает число
* записанных packet'ов. */
int cfc_audio_drain(cfc_audio_t *a, cfc_writer_t *writer, int max_packets);
/* Остановить thread + освободить ресурсы. */
int cfc_audio_destroy(cfc_audio_t *a);
#ifdef __cplusplus
}
#endif
#endif /* CUFRAMES_COMPOSER_AUDIO_H */
+7
View File
@@ -62,6 +62,13 @@ typedef struct cfc_composer_config {
int reconnect_max_ms; /* default 30000 */
int stale_threshold_ms; /* default 500 */
int dead_threshold_ms; /* default 5000 */
/* Prefix для consumer_name внутри cuframes_subscriber_create.
* default = "composer" (для обратной совместимости с фазами 1-6).
* Уникальный prefix позволяет нескольким composer'ам одновременно
* subscribe к одним и тем же publisher'ам (cfc-grid + cuda-grid-pipeline
* параллельно, у каждого свой namespace). */
const char *consumer_prefix;
} cfc_composer_config_t;
typedef struct cfc_composer cfc_composer_t;
+92
View File
@@ -0,0 +1,92 @@
/* cuframes-composer — output writer abstraction.
*
* Энкодер NVENC отдаёт сжатый H.264 bitstream (Annex-B byte stream) +
* timestamp в callback'е. Writer берёт эту последовательность и пишет
* либо как raw H.264 в файл/stdout, либо как mpegts container с правильными
* PTS/DTS в header'ах PES packet'ов.
*
* Зачем mpegts:
* Raw H.264 в pipe не содержит timestamps на container-уровне; downstream
* ffmpeg-mux вынужден синтезировать PTS из `-r` или wallclock, что вызывает
* desync с audio и drops при mux'ировании с другим потоком. MPEG-TS даёт
* нативные PTS/DTS на каждом PES packet'е → downstream mux работает
* через `-c copy` без проблем.
*
* Форматы:
* "h264" — raw H.264 Annex-B byte stream (как до Phase 6); fwrite в file
* "mpegts" — MPEG-TS container; libavformat avformat_write_header +
* av_interleaved_write_frame
*
* Path semantics:
* "/path/to/file.h264" — обычный файл
* "-" / "pipe:1" / "/dev/stdout" — stdout (для shell-pipe в downstream)
*
* Лицензия: LGPL-2.1+
*/
#ifndef CUFRAMES_COMPOSER_WRITER_H
#define CUFRAMES_COMPOSER_WRITER_H
#include <stddef.h>
#include <stdint.h>
#ifdef __cplusplus
extern "C" {
#endif
typedef struct cfc_writer cfc_writer_t;
typedef struct cfc_writer_config {
const char *path; /* "/path/file.ts", "unix:/run/.../sock", "-" */
const char *format; /* "h264" или "mpegts" */
int width, height; /* для mpegts video codecpar */
int fps_num, fps_den; /* для mpegts time_base hint */
int bitrate_kbps; /* для mpegts mux PCR pacing */
/* SPS/PPS — video codec extradata, кладётся в codecpar до write_header.
* Берётся из cfc_encoder_get_sequence_params. Обязательно для mpegts —
* mpegts muxer кладёт SPS/PPS перед первым IDR из extradata. */
const uint8_t *extradata;
size_t extradata_size;
/* Audio stream — если задан, в mpegts будет второй stream (AAC).
* extradata_audio = AudioSpecificConfig (2 байта обычно). */
int has_audio; /* 0 = video-only, 1 = add audio stream */
int audio_sample_rate; /* например 44100 */
int audio_channels; /* например 2 */
const uint8_t *audio_extradata; /* ASC bytes */
size_t audio_extradata_size;
} cfc_writer_config_t;
int cfc_writer_create(const cfc_writer_config_t *cfg, cfc_writer_t **out);
/* Записать один закодированный video frame. is_keyframe ставит флаг
* AV_PKT_FLAG_KEY. Использует av_write_frame (без интерливинг-очереди). */
int cfc_writer_write(
cfc_writer_t *w,
const uint8_t *bitstream,
size_t size,
int64_t pts_ns,
int is_keyframe
);
/* Записать audio packet (AAC ADTS либо raw — определяется extradata).
* Возвращает 0 при успехе. Thread-safe относительно write video?
* Нет — caller обязан вызывать оба write_* из одного thread'а либо
* локироваться. В нашей архитектуре video-thread drain'ит audio-ring
* и пишет сюда сам. */
int cfc_writer_write_audio(
cfc_writer_t *w,
const uint8_t *aac_data,
size_t size,
int64_t pts_ns
);
/* Закрыть writer. Для mpegts вызывает av_write_trailer. */
int cfc_writer_close(cfc_writer_t *w);
#ifdef __cplusplus
}
#endif
#endif /* CUFRAMES_COMPOSER_WRITER_H */
+3
View File
@@ -18,6 +18,8 @@ set(COMPOSER_SOURCES_C
overlay.c
control.c
health.c
writer.c
audio.c
)
set(COMPOSER_SOURCES_CU
cugrid/cugrid.cu
@@ -58,6 +60,7 @@ foreach(target cuframes_composer cuframes_composer_static)
${LIBZMQ_LIBRARY} # для control plane (Phase 4a)
${LIBJSONC_LIBRARY} # для control plane JSON (Phase 4a)
${LIBMOSQUITTO_LIBRARY} # для MQTT health (Phase 4b)
PkgConfig::LIBAV # для mpegts writer (Phase 7)
rt
)
target_include_directories(${target} PRIVATE
+333
View File
@@ -0,0 +1,333 @@
/* Реализация cfc_audio_t — RTSP AAC consumer + lock-free SPSC ring.
*
* Architecture:
* audio_thread:
* 1. avformat_open_input(rtsp_url) с low-latency options:
* rtsp_transport=tcp, stimeout=2000000us, nobuffer+flush_packets,
* probesize=32768, analyzeduration=0
* 2. avformat_find_stream_info → найти audio stream
* 3. Сохранить codec params (sample_rate, channels, extradata)
* 4. av_read_frame loop:
* - Если audio packet → копия в ring (SPSC, fixed cap N=64)
* - Если ring полный → drop'ить старейший (video может отставать)
* 5. На EOF/error → exponential backoff reconnect
*
* main thread (video):
* cfc_audio_drain → pop из ring и cfc_writer_write_audio
*
* Lock-free SPSC ring: single producer (audio_thread), single consumer
* (video main thread). Atomic head/tail indices, copy-on-write packet'ы.
*
* Лицензия: LGPL-2.1+
*/
#include "../include/cuframes_composer/audio.h"
#include <libavcodec/avcodec.h>
#include <libavformat/avformat.h>
#include <libavutil/opt.h>
#include <pthread.h>
#include <stdatomic.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#define CFC_AUDIO_RING_CAP 64
typedef struct audio_pkt {
uint8_t *data;
int size;
int64_t pts_ns;
} audio_pkt_t;
struct cfc_audio {
cfc_audio_config_t cfg;
char url_copy[256];
pthread_t thread;
int thread_started;
_Atomic int stop_flag;
/* Codec params — заполняются audio_thread'ом после первого open'а */
_Atomic int codec_ready;
pthread_mutex_t params_mu;
int sample_rate;
int channels;
uint8_t *extradata;
int extradata_size;
/* SPSC ring */
audio_pkt_t ring[CFC_AUDIO_RING_CAP];
_Atomic uint32_t head; /* producer (audio_thread) */
_Atomic uint32_t tail; /* consumer (main) */
/* Monotonic PTS на основе накопленных AAC samples.
* Один AAC frame = 1024 samples (для AAC-LC). PTS = total_samples *
* 1_000_000_000 / sample_rate. Это исключает non-monotonic DTS warnings
* при burst arrival packets'ов. */
int64_t start_us;
int64_t total_samples; /* счётчик samples'ов across packets */
};
/* ── helpers ──────────────────────────────────────────────────────────── */
static int64_t now_us(void)
{
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
return (int64_t)ts.tv_sec * 1000000 + ts.tv_nsec / 1000;
}
static void log_av_err(const char *what, int err)
{
char buf[256];
av_strerror(err, buf, sizeof(buf));
fprintf(stderr, "[cfc/audio] %s: %s\n", what, buf);
}
static int ring_push(cfc_audio_t *a, audio_pkt_t pkt)
{
uint32_t h = atomic_load_explicit(&a->head, memory_order_relaxed);
uint32_t t = atomic_load_explicit(&a->tail, memory_order_acquire);
uint32_t next = (h + 1) % CFC_AUDIO_RING_CAP;
if (next == t) {
/* Ring full — drop the NEW packet (вместо oldest). Это безопасно
* для SPSC: producer не трогает tail-side данные которые consumer
* может concurrently читать. Раньше я free'ил oldest и двигал tail
* — это вызывало heap corruption из-за гонки с consumer'ом. */
free(pkt.data);
return -1;
}
a->ring[h] = pkt;
atomic_store_explicit(&a->head, next, memory_order_release);
return 0;
}
static int ring_pop(cfc_audio_t *a, audio_pkt_t *out)
{
uint32_t t = atomic_load_explicit(&a->tail, memory_order_relaxed);
uint32_t h = atomic_load_explicit(&a->head, memory_order_acquire);
if (t == h) return -1; /* empty */
*out = a->ring[t];
atomic_store_explicit(&a->tail, (t + 1) % CFC_AUDIO_RING_CAP,
memory_order_release);
return 0;
}
static void ring_clear(cfc_audio_t *a)
{
audio_pkt_t pkt;
while (ring_pop(a, &pkt) == 0) free(pkt.data);
}
/* ── audio_thread ─────────────────────────────────────────────────────── */
static int audio_open_source(cfc_audio_t *a, AVFormatContext **fmt_ctx,
int *audio_stream_idx)
{
int err;
AVFormatContext *ctx = NULL;
AVDictionary *opts = NULL;
/* Low-latency RTSP options. */
av_dict_set(&opts, "rtsp_transport", "tcp", 0);
av_dict_set(&opts, "stimeout", "2000000", 0);
av_dict_set(&opts, "fflags", "nobuffer+flush_packets", 0);
av_dict_set(&opts, "probesize", "32768", 0);
av_dict_set(&opts, "analyzeduration", "0", 0);
err = avformat_open_input(&ctx, a->url_copy, NULL, &opts);
av_dict_free(&opts);
if (err < 0) {
log_av_err("avformat_open_input", err);
return -1;
}
err = avformat_find_stream_info(ctx, NULL);
if (err < 0) {
log_av_err("avformat_find_stream_info", err);
avformat_close_input(&ctx);
return -1;
}
/* Найти audio stream. */
int aidx = -1;
for (unsigned i = 0; i < ctx->nb_streams; i++) {
if (ctx->streams[i]->codecpar->codec_type == AVMEDIA_TYPE_AUDIO) {
aidx = (int)i;
break;
}
}
if (aidx < 0) {
fprintf(stderr, "[cfc/audio] no audio stream in %s\n", a->url_copy);
avformat_close_input(&ctx);
return -1;
}
/* Сохранить codec params для writer'а. */
AVCodecParameters *cp = ctx->streams[aidx]->codecpar;
pthread_mutex_lock(&a->params_mu);
a->sample_rate = cp->sample_rate;
#if LIBAVCODEC_VERSION_INT >= AV_VERSION_INT(59, 24, 100)
a->channels = cp->ch_layout.nb_channels;
#else
a->channels = cp->channels;
#endif
if (cp->extradata && cp->extradata_size > 0) {
if (a->extradata) free(a->extradata);
a->extradata = malloc(cp->extradata_size);
if (a->extradata) {
memcpy(a->extradata, cp->extradata, cp->extradata_size);
a->extradata_size = cp->extradata_size;
}
}
pthread_mutex_unlock(&a->params_mu);
atomic_store(&a->codec_ready, 1);
fprintf(stderr, "[cfc/audio] connected %s — AAC %dHz %dch extradata=%db\n",
a->url_copy, cp->sample_rate, a->channels, cp->extradata_size);
*fmt_ctx = ctx;
*audio_stream_idx = aidx;
return 0;
}
static void *audio_thread(void *arg)
{
cfc_audio_t *a = (cfc_audio_t *)arg;
int backoff_ms = a->cfg.reconnect_min_ms;
while (!atomic_load(&a->stop_flag)) {
AVFormatContext *ctx = NULL;
int aidx = -1;
if (audio_open_source(a, &ctx, &aidx) != 0) {
/* Reconnect backoff */
for (int waited = 0; waited < backoff_ms && !atomic_load(&a->stop_flag); waited += 100) {
usleep(100 * 1000);
}
backoff_ms *= 2;
if (backoff_ms > a->cfg.reconnect_max_ms) backoff_ms = a->cfg.reconnect_max_ms;
continue;
}
backoff_ms = a->cfg.reconnect_min_ms;
/* Read loop */
AVPacket *pkt = av_packet_alloc();
while (!atomic_load(&a->stop_flag) && pkt) {
int err = av_read_frame(ctx, pkt);
if (err < 0) {
log_av_err("av_read_frame", err);
break;
}
if (pkt->stream_index != aidx) {
av_packet_unref(pkt);
continue;
}
/* Push to ring — копия data + monotonic PTS на основе total_samples.
* Один AAC frame = 1024 samples; накапливаем и пересчитываем в ns.
* (С RTSP timestamps было бы строже, но они уже могут быть
* jittered'ными или не монотонны при reconnect'ах). */
audio_pkt_t apkt = {
.data = malloc(pkt->size),
.size = pkt->size,
.pts_ns = a->total_samples * 1000000000LL / a->sample_rate,
};
a->total_samples += 1024;
if (apkt.data) {
memcpy(apkt.data, pkt->data, pkt->size);
ring_push(a, apkt);
}
av_packet_unref(pkt);
}
av_packet_free(&pkt);
avformat_close_input(&ctx);
if (atomic_load(&a->stop_flag)) break;
fprintf(stderr, "[cfc/audio] disconnect, retry in %dms\n", backoff_ms);
for (int waited = 0; waited < backoff_ms && !atomic_load(&a->stop_flag); waited += 100) {
usleep(100 * 1000);
}
backoff_ms *= 2;
if (backoff_ms > a->cfg.reconnect_max_ms) backoff_ms = a->cfg.reconnect_max_ms;
}
return NULL;
}
/* ── Public API ───────────────────────────────────────────────────────── */
int cfc_audio_create(const cfc_audio_config_t *cfg, cfc_audio_t **out)
{
if (!cfg || !cfg->rtsp_url || !out) return -1;
cfc_audio_t *a = calloc(1, sizeof(*a));
if (!a) return -1;
a->cfg = *cfg;
strncpy(a->url_copy, cfg->rtsp_url, sizeof(a->url_copy) - 1);
a->cfg.rtsp_url = a->url_copy;
if (a->cfg.reconnect_min_ms <= 0) a->cfg.reconnect_min_ms = 1000;
if (a->cfg.reconnect_max_ms <= 0) a->cfg.reconnect_max_ms = 10000;
pthread_mutex_init(&a->params_mu, NULL);
atomic_init(&a->stop_flag, 0);
atomic_init(&a->codec_ready, 0);
atomic_init(&a->head, 0);
atomic_init(&a->tail, 0);
a->start_us = now_us();
avformat_network_init();
if (pthread_create(&a->thread, NULL, audio_thread, a) != 0) {
pthread_mutex_destroy(&a->params_mu);
free(a);
return -1;
}
a->thread_started = 1;
*out = a;
return 0;
}
int cfc_audio_get_codec_params(cfc_audio_t *a, int *sample_rate,
int *channels, const uint8_t **extradata,
size_t *extradata_size)
{
if (!a) return -1;
if (!atomic_load(&a->codec_ready)) return -1;
pthread_mutex_lock(&a->params_mu);
if (sample_rate) *sample_rate = a->sample_rate;
if (channels) *channels = a->channels;
if (extradata) *extradata = a->extradata;
if (extradata_size) *extradata_size = a->extradata_size;
pthread_mutex_unlock(&a->params_mu);
return 0;
}
int cfc_audio_drain(cfc_audio_t *a, cfc_writer_t *writer, int max_packets)
{
if (!a || !writer) return 0;
int wrote = 0;
for (int i = 0; i < max_packets; i++) {
audio_pkt_t pkt;
if (ring_pop(a, &pkt) != 0) break;
cfc_writer_write_audio(writer, pkt.data, pkt.size, pkt.pts_ns);
free(pkt.data);
wrote++;
}
return wrote;
}
int cfc_audio_destroy(cfc_audio_t *a)
{
if (!a) return 0;
atomic_store(&a->stop_flag, 1);
if (a->thread_started) pthread_join(a->thread, NULL);
ring_clear(a);
if (a->extradata) free(a->extradata);
pthread_mutex_destroy(&a->params_mu);
free(a);
return 0;
}
+3 -1
View File
@@ -186,7 +186,9 @@ int cfc_composer_create(const cfc_composer_config_t *cfg, cfc_composer_t **out)
if (!comp->cells[i].source_key) continue;
char name[32];
snprintf(name, sizeof(name), "composer-%d", i);
const char *prefix = comp->cfg.consumer_prefix;
if (!prefix || !*prefix) prefix = "composer";
snprintf(name, sizeof(name), "%s-%d", prefix, i);
cfc_source_config_t scfg = {
.key = comp->cells[i].source_key,
+4
View File
@@ -414,6 +414,10 @@ int cfc_encoder_encode_frame(cfc_encoder_t *enc, CUdeviceptr ptr, int pitch,
pp.inputWidth = enc->width;
pp.inputHeight = enc->height;
pp.inputPitch = enc->staging_pitch;
/* SPS/PPS вставляются автоматически через repeatSPSPPS=1 в init-конфиге
* (перед каждым началом intra refresh cycle). Per-frame OUTPUT_SPSPPS
* создавал двойную инжекцию которая ломала h264_mp4toannexb BSF
* в downstream mpegts muxer'е (см. docs/RESEARCH-audio-mixing-low-latency.md). */
pp.pictureStruct = NV_ENC_PIC_STRUCT_FRAME;
pp.inputTimeStamp = (uint64_t)pts_ns;
+356
View File
@@ -0,0 +1,356 @@
/* Реализация cfc_writer_t — Phase 7 monolithic mpegts → Unix socket
*
* Архитектура (по research'у docs/RESEARCH-audio-mixing-low-latency.md):
* - mpegts backend пишет ОБА stream'а (video + audio) в один AVFormatContext
* - Использует av_write_frame (НЕ av_interleaved_write_frame) — у нас
* pts/dts монотонные, интерливинг-очередь не нужна и добавляет latency
* - Поддерживает URL "unix:/path/to.sock" для прямой публикации в mediamtx
* (mediamtx.yml: source: unix+mpegts:///path/to.sock)
* - NVENC даёт Annex-B inline SPS/PPS через repeatSPSPPS=1; муксер
* автоинсёртит h264_mp4toannexb который использует extradata correctly
*
* - "h264" backend (raw fwrite) сохранён для Phase 1-5 совместимости
*
* Лицензия: LGPL-2.1+
*/
#include "../include/cuframes_composer/writer.h"
#include <libavcodec/avcodec.h>
#include <libavformat/avformat.h>
#include <libavutil/avutil.h>
#include <libavutil/channel_layout.h>
#include <libavutil/mathematics.h>
#include <libavutil/opt.h>
#include <errno.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
typedef enum {
CFC_WRITER_RAW = 0,
CFC_WRITER_MPEGTS = 1,
} cfc_writer_backend_t;
struct cfc_writer {
cfc_writer_backend_t backend;
int is_stdout;
FILE *fp; /* raw backend */
AVFormatContext *fmt_ctx;
AVStream *video_stream;
AVStream *audio_stream;
int header_written;
int64_t first_video_pts_ns;
int64_t first_audio_pts_ns;
/* Monotonic video frame counter — PTS = frame_idx * frame_duration_pts.
* Гарантирует exact frame cadence regardless of wall-clock jitter. */
int64_t video_frame_idx;
};
static int is_stdout_path(const char *p)
{
return p && (!strcmp(p, "-") || !strcmp(p, "pipe:1") ||
!strcmp(p, "/dev/stdout"));
}
static void log_av_err(const char *what, int err)
{
char buf[256];
av_strerror(err, buf, sizeof(buf));
fprintf(stderr, "[cfc/writer] %s failed: %s\n", what, buf);
}
/* ── Raw H.264 backend ────────────────────────────────────────────────── */
static int raw_create(cfc_writer_t *w, const cfc_writer_config_t *cfg)
{
w->is_stdout = is_stdout_path(cfg->path);
if (w->is_stdout) {
w->fp = stdout;
setvbuf(stdout, NULL, _IOFBF, 1024 * 1024);
} else {
w->fp = fopen(cfg->path, "wb");
if (!w->fp) {
fprintf(stderr, "[cfc/writer] fopen(%s): %s\n",
cfg->path, strerror(errno));
return -1;
}
}
return 0;
}
static int raw_write(cfc_writer_t *w, const uint8_t *bs, size_t size,
int64_t pts_ns, int is_keyframe)
{
(void)pts_ns; (void)is_keyframe;
if (fwrite(bs, 1, size, w->fp) != size) {
fprintf(stderr, "[cfc/writer/raw] fwrite: %s\n", strerror(errno));
return -1;
}
return 0;
}
static int raw_close(cfc_writer_t *w)
{
if (w->fp) {
fflush(w->fp);
if (!w->is_stdout) fclose(w->fp);
w->fp = NULL;
}
return 0;
}
/* ── MPEG-TS backend (libavformat) ────────────────────────────────────── */
static int mpegts_create(cfc_writer_t *w, const cfc_writer_config_t *cfg)
{
int err;
w->is_stdout = is_stdout_path(cfg->path);
w->first_video_pts_ns = AV_NOPTS_VALUE;
w->first_audio_pts_ns = AV_NOPTS_VALUE;
const char *url = w->is_stdout ? "pipe:1" : cfg->path;
err = avformat_alloc_output_context2(&w->fmt_ctx, NULL, "mpegts", url);
if (err < 0 || !w->fmt_ctx) {
log_av_err("avformat_alloc_output_context2", err);
return -1;
}
/* mpegts options: pat_pmt_at_frames — PAT/PMT перед каждым keyframe
* (быстрая tune-in). resend_headers — повторять. omit_video_pes_length —
* не ставить length в PES header (low-latency настройка). */
av_opt_set(w->fmt_ctx->priv_data, "mpegts_flags",
"+resend_headers+pat_pmt_at_frames", 0);
/* Video stream */
w->video_stream = avformat_new_stream(w->fmt_ctx, NULL);
if (!w->video_stream) goto fail;
AVCodecParameters *vp = w->video_stream->codecpar;
vp->codec_type = AVMEDIA_TYPE_VIDEO;
vp->codec_id = AV_CODEC_ID_H264;
vp->codec_tag = 0;
vp->width = cfg->width;
vp->height = cfg->height;
vp->format = AV_PIX_FMT_YUV420P;
vp->bit_rate = (int64_t)cfg->bitrate_kbps * 1000;
vp->profile = FF_PROFILE_H264_HIGH;
if (cfg->extradata && cfg->extradata_size > 0) {
vp->extradata = av_mallocz(cfg->extradata_size + AV_INPUT_BUFFER_PADDING_SIZE);
if (!vp->extradata) goto fail;
memcpy(vp->extradata, cfg->extradata, cfg->extradata_size);
vp->extradata_size = (int)cfg->extradata_size;
}
w->video_stream->time_base = (AVRational){1, 90000};
w->video_stream->avg_frame_rate = (AVRational){cfg->fps_num, cfg->fps_den};
w->video_stream->r_frame_rate = w->video_stream->avg_frame_rate;
/* Audio stream (опционально) */
if (cfg->has_audio) {
w->audio_stream = avformat_new_stream(w->fmt_ctx, NULL);
if (!w->audio_stream) goto fail;
AVCodecParameters *ap = w->audio_stream->codecpar;
ap->codec_type = AVMEDIA_TYPE_AUDIO;
ap->codec_id = AV_CODEC_ID_AAC;
ap->codec_tag = 0;
ap->sample_rate = cfg->audio_sample_rate > 0 ? cfg->audio_sample_rate : 44100;
int nch = cfg->audio_channels > 0 ? cfg->audio_channels : 2;
#if LIBAVCODEC_VERSION_INT >= AV_VERSION_INT(59, 24, 100)
/* ffmpeg ≥ 5.1: AVChannelLayout API */
ap->ch_layout.order = AV_CHANNEL_ORDER_NATIVE;
ap->ch_layout.nb_channels = nch;
ap->ch_layout.u.mask = (nch == 1) ? AV_CH_LAYOUT_MONO : AV_CH_LAYOUT_STEREO;
#else
/* ffmpeg 4.x: legacy channels + channel_layout */
ap->channels = nch;
ap->channel_layout = (nch == 1) ? AV_CH_LAYOUT_MONO : AV_CH_LAYOUT_STEREO;
#endif
ap->format = AV_SAMPLE_FMT_FLTP;
ap->profile = FF_PROFILE_AAC_LOW;
if (cfg->audio_extradata && cfg->audio_extradata_size > 0) {
ap->extradata = av_mallocz(cfg->audio_extradata_size + AV_INPUT_BUFFER_PADDING_SIZE);
if (!ap->extradata) goto fail;
memcpy(ap->extradata, cfg->audio_extradata, cfg->audio_extradata_size);
ap->extradata_size = (int)cfg->audio_extradata_size;
}
w->audio_stream->time_base = (AVRational){1, ap->sample_rate};
}
/* Open output. */
if (!(w->fmt_ctx->oformat->flags & AVFMT_NOFILE)) {
err = avio_open(&w->fmt_ctx->pb, url, AVIO_FLAG_WRITE);
if (err < 0) {
log_av_err("avio_open", err);
goto fail;
}
}
/* Low-latency: flushes packet immediately. */
w->fmt_ctx->flags |= AVFMT_FLAG_FLUSH_PACKETS;
/* Малая интерливинг-очередь — 200мс. av_interleaved_write_frame
* накапливает packets per stream до этого предела, чтобы корректно
* сериализовать по DTS. 200мс = ~5 video frames @ 25 fps, ничтожно
* для CCTV. Без этого audio/video идут в порядке call'ов, downstream
* decoder'ы видят jitter и тормозит video. */
w->fmt_ctx->max_interleave_delta = 200000; /* микросекунды */
err = avformat_write_header(w->fmt_ctx, NULL);
if (err < 0) {
log_av_err("avformat_write_header", err);
if (w->fmt_ctx->pb) avio_closep(&w->fmt_ctx->pb);
goto fail;
}
w->header_written = 1;
fprintf(stderr,
"[cfc/writer/mpegts] открыт %s — video %dx%d, %s\n",
url, cfg->width, cfg->height,
cfg->has_audio ? "video+audio" : "video-only");
return 0;
fail:
avformat_free_context(w->fmt_ctx);
w->fmt_ctx = NULL;
return -1;
}
static int mpegts_write_video(cfc_writer_t *w, const uint8_t *bs, size_t size,
int64_t pts_ns, int is_keyframe)
{
(void)pts_ns; /* video PTS — на frame_idx, не wall-clock (см. struct) */
AVPacket *pkt = av_packet_alloc();
if (!pkt) return -1;
/* av_new_packet — создаёт packet с правильной AVBufferRef, чтобы
* av_interleaved_write_frame мог корректно ref-count'ить ownership.
* Без этого manual av_malloc'нутая data вызывает heap corruption
* при последующем packet free внутри libav. */
if (av_new_packet(pkt, (int)size) < 0) { av_packet_free(&pkt); return -1; }
memcpy(pkt->data, bs, size);
int64_t duration = av_rescale_q(1,
(AVRational){w->video_stream->avg_frame_rate.den,
w->video_stream->avg_frame_rate.num},
w->video_stream->time_base);
pkt->stream_index = w->video_stream->index;
pkt->pts = w->video_frame_idx * duration;
pkt->dts = pkt->pts;
pkt->duration = duration;
pkt->flags = is_keyframe ? AV_PKT_FLAG_KEY : 0;
w->video_frame_idx++;
int err = av_interleaved_write_frame(w->fmt_ctx, pkt);
av_packet_free(&pkt);
if (err < 0) {
log_av_err("av_interleaved_write_frame[video]", err);
return -1;
}
return 0;
}
static int mpegts_write_audio(cfc_writer_t *w, const uint8_t *bs, size_t size,
int64_t pts_ns)
{
if (!w->audio_stream) return -1;
if (w->first_audio_pts_ns == AV_NOPTS_VALUE) w->first_audio_pts_ns = pts_ns;
int64_t rel_pts = pts_ns - w->first_audio_pts_ns;
AVPacket *pkt = av_packet_alloc();
if (!pkt) return -1;
if (av_new_packet(pkt, (int)size) < 0) { av_packet_free(&pkt); return -1; }
memcpy(pkt->data, bs, size);
pkt->stream_index = w->audio_stream->index;
pkt->pts = av_rescale_q(rel_pts, (AVRational){1, 1000000000},
w->audio_stream->time_base);
pkt->dts = pkt->pts;
pkt->flags = AV_PKT_FLAG_KEY; /* AAC frames всегда keyframe */
int err = av_interleaved_write_frame(w->fmt_ctx, pkt);
av_packet_free(&pkt);
if (err < 0) {
log_av_err("av_interleaved_write_frame[audio]", err);
return -1;
}
return 0;
}
static int mpegts_close(cfc_writer_t *w)
{
if (w->fmt_ctx) {
if (w->header_written) av_write_trailer(w->fmt_ctx);
if (w->fmt_ctx->pb && !(w->fmt_ctx->oformat->flags & AVFMT_NOFILE)) {
avio_closep(&w->fmt_ctx->pb);
}
avformat_free_context(w->fmt_ctx);
w->fmt_ctx = NULL;
}
return 0;
}
/* ── Public API ───────────────────────────────────────────────────────── */
int cfc_writer_create(const cfc_writer_config_t *cfg, cfc_writer_t **out)
{
if (!cfg || !cfg->path || !cfg->format || !out) return -1;
cfc_writer_t *w = calloc(1, sizeof(*w));
if (!w) return -1;
int rc;
if (!strcmp(cfg->format, "h264")) {
w->backend = CFC_WRITER_RAW;
rc = raw_create(w, cfg);
} else if (!strcmp(cfg->format, "mpegts")) {
w->backend = CFC_WRITER_MPEGTS;
rc = mpegts_create(w, cfg);
} else {
fprintf(stderr, "[cfc/writer] unknown format '%s'\n", cfg->format);
free(w);
return -1;
}
if (rc != 0) { free(w); return -1; }
*out = w;
return 0;
}
int cfc_writer_write(cfc_writer_t *w, const uint8_t *bs, size_t size,
int64_t pts_ns, int is_keyframe)
{
if (!w || !bs || size == 0) return -1;
if (w->backend == CFC_WRITER_RAW) {
return raw_write(w, bs, size, pts_ns, is_keyframe);
}
return mpegts_write_video(w, bs, size, pts_ns, is_keyframe);
}
int cfc_writer_write_audio(cfc_writer_t *w, const uint8_t *bs, size_t size,
int64_t pts_ns)
{
if (!w || !bs || size == 0) return -1;
if (w->backend != CFC_WRITER_MPEGTS) return -1;
return mpegts_write_audio(w, bs, size, pts_ns);
}
int cfc_writer_close(cfc_writer_t *w)
{
if (!w) return 0;
int rc;
if (w->backend == CFC_WRITER_RAW) {
rc = raw_close(w);
} else {
rc = mpegts_close(w);
}
free(w);
return rc;
}