From: Anton Khirnov <anton@khirnov.net> To: ffmpeg-devel@ffmpeg.org Subject: [FFmpeg-devel] [PATCH 49/49] fftools/ffmpeg: move each muxer to a separate thread Date: Mon, 4 Apr 2022 13:30:37 +0200 Message-ID: <20220404113037.13070-50-anton@khirnov.net> (raw) In-Reply-To: <20220404113037.13070-1-anton@khirnov.net> --- fftools/ffmpeg.c | 38 +++------ fftools/ffmpeg.h | 7 +- fftools/ffmpeg_mux.c | 197 +++++++++++++++++++++++++++++++++++-------- 3 files changed, 178 insertions(+), 64 deletions(-) diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c index 9dfbc4216a..8ea27d3422 100644 --- a/fftools/ffmpeg.c +++ b/fftools/ffmpeg.c @@ -1286,10 +1286,7 @@ static void finish_output_stream(OutputStream *ost) OutputFile *of = output_files[ost->file_index]; ost->finished = ENCODER_FINISHED; - if (ost->sq_idx_mux >= 0) - sq_send(of->sq_mux, ost->sq_idx_mux, SQPKT(NULL)); - else - ost->finished |= MUXER_FINISHED; + output_packet(of, ost->pkt, ost, 1); } /** @@ -3421,9 +3418,8 @@ static int need_output(void) for (i = 0; i < nb_output_streams; i++) { OutputStream *ost = output_streams[i]; - OutputFile *of = output_files[ost->file_index]; - if (ost->finished || of_finished(of)) + if (ost->finished) continue; return 1; @@ -4269,26 +4265,6 @@ static int transcode_step(void) return reap_filters(0); } -static void flush_sync_queues_mux(void) -{ - /* mark all queue inputs as done */ - for (int i = 0; i < nb_output_streams; i++) { - OutputStream *ost = output_streams[i]; - OutputFile *of = output_files[ost->file_index]; - if (ost->sq_idx_mux >= 0) - sq_send(of->sq_mux, ost->sq_idx_mux, SQPKT(NULL)); - } - - /* encode all packets remaining in the sync queues */ - for (int i = 0; i < nb_output_streams; i++) { - OutputStream *ost = output_streams[i]; - OutputFile *of = output_files[ost->file_index]; - - if (!(ost->finished & MUXER_FINISHED)) - output_packet(of, ost->pkt, ost, 1); - } -} - /* * The following code is the main loop of the file converter */ @@ -4310,6 +4286,12 @@ static int transcode(void) timer_start = av_gettime_relative(); + for (i = 0; i < nb_output_files; i++) { + ret = of_thread_start(output_files[i]); + if (ret < 0) + goto fail; + } + if ((ret = init_input_threads()) < 0) goto fail; @@ -4346,7 +4328,9 @@ static int transcode(void) } } flush_encoders(); - flush_sync_queues_mux(); + + for (i = 0; i < nb_output_files; i++) + of_thread_stop(output_files[i]); term_exit(); diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h index 407342462f..c4a5c2a0a2 100644 --- a/fftools/ffmpeg.h +++ b/fftools/ffmpeg.h @@ -583,6 +583,8 @@ typedef struct OutputFile { const AVOutputFormat *format; const char *url; + AVThreadMessageQueue *mux_queue; + SyncQueue *sq_encode; SyncQueue *sq_mux; @@ -697,11 +699,14 @@ int hwaccel_decode_init(AVCodecContext *avctx); int of_muxer_init(OutputFile *of, AVFormatContext *fc, AVDictionary *opts, int64_t limit_filesize); + +int of_thread_start(OutputFile *of); +void of_thread_stop(OutputFile *of); + int of_write_trailer(OutputFile *of); void of_close(OutputFile **pof); int of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost, int eof); -int of_finished(OutputFile *of); int64_t of_filesize(OutputFile *of); AVChapter * const * of_get_chapters(OutputFile *of, unsigned int *nb_chapters); diff --git a/fftools/ffmpeg_mux.c b/fftools/ffmpeg_mux.c index 6ca9a51dd6..f99dd5ec3e 100644 --- a/fftools/ffmpeg_mux.c +++ b/fftools/ffmpeg_mux.c @@ -16,17 +16,20 @@ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA */ +#include <stdatomic.h> #include <stdio.h> #include <string.h> #include "ffmpeg.h" #include "sync_queue.h" +#include "thread_queue.h" #include "libavutil/fifo.h" #include "libavutil/intreadwrite.h" #include "libavutil/log.h" #include "libavutil/mem.h" #include "libavutil/timestamp.h" +#include "libavutil/thread.h" #include "libavcodec/packet.h" @@ -46,18 +49,24 @@ typedef struct MuxStream { /* dts of the last packet sent to the muxer, in the stream timebase * used for making up missing dts values */ int64_t last_mux_dts; + + /* data (a real or a flush packet) was received for this stream */ + int got_data; } MuxStream; struct Muxer { AVFormatContext *fc; + pthread_t thread; + ThreadQueue *tq; + MuxStream *streams; AVDictionary *opts; /* filesize limit expressed in bytes */ int64_t limit_filesize; - int64_t final_filesize; + atomic_int_least64_t last_filesize; int header_written; }; @@ -221,13 +230,32 @@ static int queue_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) return 0; } +static int64_t filesize(AVIOContext *pb) +{ + int64_t ret = -1; + + if (pb) { + ret = avio_size(pb); + if (ret <= 0) // FIXME improve avio_size() so it works with non seekable output too + ret = avio_tell(pb); + } + + return ret; +} + static int write_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) { MuxStream *ms = &of->mux->streams[ost->index]; AVFormatContext *s = of->mux->fc; AVStream *st = ost->st; + int64_t fs; int ret; + fs = filesize(s->pb); + atomic_store(&of->mux->last_filesize, fs); + if (fs >= of->mux->limit_filesize) + return AVERROR_EOF; + if ((st->codecpar->codec_type == AVMEDIA_TYPE_VIDEO && video_sync_method == VSYNC_DROP) || (st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO && audio_sync_method < 0)) pkt->pts = pkt->dts = AV_NOPTS_VALUE; @@ -333,8 +361,8 @@ static int check_write_header(OutputFile *of) int ret, i; for (i = 0; i < fc->nb_streams; i++) { - OutputStream *ost = output_streams[of->ost_index + i]; - if (!ost->initialized) + MuxStream *ms = &of->mux->streams[i]; + if (!ms->got_data) return 0; } @@ -378,12 +406,15 @@ static int check_write_header(OutputFile *of) return 0; } -int of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost, int eof) +static int sync_queue_process(OutputFile *of, OutputStream *ost, AVPacket *pkt) { + Muxer *mux = of->mux; + MuxStream *ms = &mux->streams[ost->index]; int ret; - if (!of->mux->header_written) { - ret = check_write_header(of); + ms->got_data = 1; + if (!mux->header_written) { + ret = check_write_header(of); if (ret < 0) { av_packet_unref(pkt); return ret; @@ -391,34 +422,102 @@ int of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost, int eof) } if (ost->sq_idx_mux >= 0) { - ret = sq_send(of->sq_mux, ost->sq_idx_mux, - SQPKT(eof ? NULL: pkt)); + int ret = sq_send(of->sq_mux, ost->sq_idx_mux, SQPKT(pkt)); if (ret < 0) { - av_packet_unref(pkt); - if (ret == AVERROR_EOF) { - ost->finished |= MUXER_FINISHED; - return 0; - } else - return ret; + if (pkt) + av_packet_unref(pkt); + return ret; } while (1) { + pkt = av_packet_alloc(); + if (!pkt) + // XXX + abort(); + ret = sq_receive(of->sq_mux, -1, SQPKT(pkt)); - if (ret == AVERROR_EOF || ret == AVERROR(EAGAIN)) - return 0; - else if (ret < 0) - return ret; + if (ret < 0) { + av_packet_free(&pkt); + return (ret == AVERROR_EOF || ret == AVERROR(EAGAIN)) ? 0 : ret; + } ret = submit_packet(of, pkt, output_streams[of->ost_index + ret]); + av_packet_free(&pkt); if (ret < 0) return ret; } - } else if (!eof) + } else if (pkt) return submit_packet(of, pkt, ost); return 0; } +static void *muxer_thread(void *arg) +{ + OutputFile *of = arg; + Muxer *mux = of->mux; + + while (1) { + OutputStream *ost; + AVPacket *pkt = NULL; + int stream_idx, ret; + + ret = tq_receive(mux->tq, &stream_idx, &pkt); + if (stream_idx < 0) { + av_log(NULL, AV_LOG_DEBUG, + "All streams finished for output file #%d\n", of->index); + break; + } + + ost = output_streams[of->ost_index + stream_idx]; + ret = sync_queue_process(of, ost, ret < 0 ? NULL : pkt); + av_packet_free(&pkt); + if (ret == AVERROR_EOF) + tq_receive_finish(mux->tq, stream_idx); + else if (ret < 0) { + av_log(NULL, AV_LOG_ERROR, + "Error muxing a packet for output file #%d\n", of->index); + break; + } + } + + for (unsigned int i = 0; i < mux->fc->nb_streams; i++) { + sync_queue_process(of, output_streams[of->ost_index], NULL); + tq_receive_finish(mux->tq, i); + } + + av_log(NULL, AV_LOG_DEBUG, "Terminating muxer thread %d\n", of->index); + + return NULL; +} + +int of_submit_packet(OutputFile *of, AVPacket *pkt, OutputStream *ost, int eof) +{ + AVPacket *pkt1; + int ret = 0; + + if (eof) { + tq_send_finish(of->mux->tq, ost->index); + return 0; + } + + pkt1 = av_packet_alloc(); + if (!pkt1) { + av_packet_unref(pkt); + return AVERROR(ENOMEM); + } + + av_packet_move_ref(pkt1, pkt); + + ret = tq_send(of->mux->tq, ost->index, &pkt1); + if (ret < 0) { + av_packet_free(&pkt1); + ost->finished |= MUXER_FINISHED; + } + + return ret == AVERROR_EOF ? 0 : ret; +} + int of_write_trailer(OutputFile *of) { AVFormatContext *fc = of->mux->fc; @@ -438,7 +537,7 @@ int of_write_trailer(OutputFile *of) return ret; } - of->mux->final_filesize = of_filesize(of); + of->mux->last_filesize = filesize(fc->pb); if (!(of->format->flags & AVFMT_NOFILE)) { ret = avio_closep(&fc->pb); @@ -487,6 +586,9 @@ static void mux_free(Muxer **pmux) av_freep(&mux->streams); av_dict_free(&mux->opts); + if (mux->tq) { + } + fc_close(&mux->fc); av_freep(pmux); @@ -558,30 +660,53 @@ fail: return ret; } -int of_finished(OutputFile *of) +int64_t of_filesize(OutputFile *of) { - return of_filesize(of) >= of->mux->limit_filesize; + return atomic_load(&of->mux->last_filesize); } -int64_t of_filesize(OutputFile *of) +AVChapter * const * +of_get_chapters(OutputFile *of, unsigned int *nb_chapters) { - AVIOContext *pb = of->mux->fc->pb; - int64_t ret = -1; + *nb_chapters = of->mux->fc->nb_chapters; + return of->mux->fc->chapters; +} - if (of->mux->final_filesize) - ret = of->mux->final_filesize; - else if (pb) { - ret = avio_size(pb); - if (ret <= 0) // FIXME improve avio_size() so it works with non seekable output too - ret = avio_tell(pb); +static void pkt_free(void *pkt) +{ + av_packet_free((AVPacket**)&pkt); +} + +int of_thread_start(OutputFile *of) +{ + Muxer *mux = of->mux; + int ret; + + mux->tq = tq_alloc(mux->fc->nb_streams, 8, sizeof(AVPacket*), + pkt_free); + if (!mux->tq) + return AVERROR(ENOMEM); + + ret = pthread_create(&mux->thread, NULL, muxer_thread, (void*)of); + if (ret) { + tq_free(&mux->tq); + return AVERROR(ret); } - return ret; + return 0; } -AVChapter * const * -of_get_chapters(OutputFile *of, unsigned int *nb_chapters) +void of_thread_stop(OutputFile *of) { - *nb_chapters = of->mux->fc->nb_chapters; - return of->mux->fc->chapters; + Muxer *mux = of->mux; + + if (!mux || !mux->tq) + return; + + for (unsigned int i = 0; i < mux->fc->nb_streams; i++) + tq_send_finish(mux->tq, i); + + pthread_join(mux->thread, NULL); + + tq_free(&mux->tq); } -- 2.34.1 _______________________________________________ ffmpeg-devel mailing list ffmpeg-devel@ffmpeg.org https://ffmpeg.org/mailman/listinfo/ffmpeg-devel To unsubscribe, visit link above, or email ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
next prev parent reply other threads:[~2022-04-04 11:41 UTC|newest] Thread overview: 81+ messages / expand[flat|nested] mbox.gz Atom feed top 2022-04-04 11:29 [FFmpeg-devel] [RFC] Switching ffmpeg.c to a threaded architecture Anton Khirnov 2022-04-04 11:29 ` [FFmpeg-devel] [PATCH 01/49] fftools/ffmpeg: drop an obsolete hack Anton Khirnov 2022-04-04 11:29 ` [FFmpeg-devel] [PATCH 02/49] fftools/ffmpeg: move a comment to a more appropriate place Anton Khirnov 2022-04-06 11:20 ` James Almer 2022-04-04 11:29 ` [FFmpeg-devel] [PATCH 03/49] fftools/ffmpeg: stop using OutputStream.frame_number for streamcopy Anton Khirnov 2022-04-06 11:20 ` James Almer 2022-04-04 11:29 ` [FFmpeg-devel] [PATCH 04/49] fftools/ffmpeg: pass the muxer context explicitly to some functions Anton Khirnov 2022-04-04 11:29 ` [FFmpeg-devel] [PATCH 05/49] fftools/ffmpeg: store the output file index in OutputFile Anton Khirnov 2022-04-04 11:29 ` [FFmpeg-devel] [PATCH 06/49] fftools/ffmpeg: move some muxing-related code into a separate file Anton Khirnov 2022-04-04 11:29 ` [FFmpeg-devel] [PATCH 07/49] fftools/ffmpeg: move writing the trailer to ffmpeg_mux.c Anton Khirnov 2022-04-04 11:29 ` [FFmpeg-devel] [PATCH 08/49] fftools/ffmpeg: move freeing the output file " Anton Khirnov 2022-04-04 11:29 ` [FFmpeg-devel] [PATCH 09/49] fftools/ffmpeg: store output format separately from the muxer context Anton Khirnov 2022-04-06 12:00 ` James Almer 2022-04-13 10:18 ` Anton Khirnov 2022-04-04 11:29 ` [FFmpeg-devel] [PATCH 10/49] fftools/ffmpeg_mux: add private " Anton Khirnov 2022-04-04 11:29 ` [FFmpeg-devel] [PATCH 11/49] fftools/ffmpeg: add a helper function to access output file size Anton Khirnov 2022-04-04 11:30 ` [FFmpeg-devel] [PATCH 12/49] fftools/ffmpeg: fix the type of limit_filesize Anton Khirnov 2022-04-04 11:30 ` [FFmpeg-devel] [PATCH 13/49] fftools/ffmpeg: refactor limiting output file size with -fs Anton Khirnov 2022-04-04 11:30 ` [FFmpeg-devel] [PATCH 14/49] fftools/ffmpeg: set want_sdp when initializing the muxer Anton Khirnov 2022-04-04 11:30 ` [FFmpeg-devel] [PATCH 15/49] fftools/ffmpeg: write the header for stream-less outputs " Anton Khirnov 2022-04-04 11:30 ` [FFmpeg-devel] [PATCH 16/49] fftools/ffmpeg: move closing the file into of_write_trailer() Anton Khirnov 2022-04-04 11:30 ` [FFmpeg-devel] [PATCH 17/49] fftools/ffmpeg: refactor the code checking for bitexact output Anton Khirnov 2022-04-04 11:30 ` [FFmpeg-devel] [PATCH 18/49] fftools/ffmpeg: access output file chapters through a wrapper Anton Khirnov 2022-04-04 11:30 ` [FFmpeg-devel] [PATCH 19/49] fftools/ffmpeg: do not log to the muxer context Anton Khirnov 2022-04-04 11:30 ` [FFmpeg-devel] [PATCH 20/49] fftools/ffmpeg: move the mux queue into muxer private data Anton Khirnov 2022-04-04 11:30 ` [FFmpeg-devel] [PATCH 21/49] fftools/ffmpeg_mux: split queuing packets into a separate function Anton Khirnov 2022-04-04 11:30 ` [FFmpeg-devel] [PATCH 22/49] fftools/ffmpeg_mux: split of_write_packet() Anton Khirnov 2022-04-04 11:30 ` [FFmpeg-devel] [PATCH 23/49] fftools/ffmpeg: move output file opts into private context Anton Khirnov 2022-04-04 11:30 ` [FFmpeg-devel] [PATCH 24/49] fftools/ffmpeg: move processing video stats to ffmpeg_mux Anton Khirnov 2022-04-04 11:30 ` [FFmpeg-devel] [PATCH 25/49] fftools/ffmpeg_mux: drop a useless check and reduce indentation Anton Khirnov 2022-04-04 11:30 ` [FFmpeg-devel] [PATCH 26/49] fftools/ffmpeg_mux: stop using AVStream.nb_frames in do_video_stats() Anton Khirnov 2022-04-04 11:30 ` [FFmpeg-devel] [PATCH 27/49] fftools/ffmpeg_mux: stop using av_stream_get_end_pts() " Anton Khirnov 2022-04-04 11:30 ` [FFmpeg-devel] [PATCH 28/49] fftools/ffmpeg_mux: merge variable declaration and initialization Anton Khirnov 2022-04-04 11:30 ` [FFmpeg-devel] [PATCH 29/49] fftools/ffmpeg_mux: move processing AV_PKT_DATA_QUALITY_STATS to do_video_stats() Anton Khirnov 2022-04-04 11:30 ` [FFmpeg-devel] [PATCH 30/49] fftools/ffmpeg: share the code encoding a single frame between video and audio Anton Khirnov 2022-04-04 11:30 ` [FFmpeg-devel] [PATCH 31/49] fftools/ffmpeg: reuse the encoding code for flushing encoders Anton Khirnov 2022-04-04 11:30 ` [FFmpeg-devel] [PATCH 32/49] fftools/ffmpeg: reindent after previous commit Anton Khirnov 2022-04-04 11:30 ` [FFmpeg-devel] [PATCH 33/49] fftools/ffmpeg: use refcounted packets for encoded subtitles Anton Khirnov 2022-04-04 11:30 ` [FFmpeg-devel] [PATCH 34/49] fftools/ffmpeg: rework -shortest implementation Anton Khirnov 2022-04-04 11:30 ` [FFmpeg-devel] [PATCH 35/49] fftools/ffmpeg: use the sync queues to handle -frames Anton Khirnov 2022-04-04 11:30 ` [FFmpeg-devel] [PATCH 36/49] fftools/ffmpeg: stop using OutputStream.frame_number in print_report() Anton Khirnov 2022-04-04 11:30 ` [FFmpeg-devel] [PATCH 37/49] fftools/ffmpeg: only set OutputStream.frame_number for video encoding Anton Khirnov 2022-04-04 11:30 ` [FFmpeg-devel] [PATCH 38/49] fftools/ffmpeg: make the muxer AVFormatContext private to ffmpeg_mux.c Anton Khirnov 2022-04-04 11:30 ` [FFmpeg-devel] [PATCH 39/49] fftools/ffmpeg_mux: return errors from of_submit_packet() Anton Khirnov 2022-04-04 11:30 ` [FFmpeg-devel] [PATCH 40/49] fftools/ffmpeg_mux: return errors from submit_packet() Anton Khirnov 2022-04-04 11:30 ` [FFmpeg-devel] [PATCH 41/49] fftools/ffmpeg_mux: return errors from write_packet() Anton Khirnov 2022-04-04 11:30 ` [FFmpeg-devel] [PATCH 42/49] fftools/ffmpeg_mux: simplify submit_packet() Anton Khirnov 2022-04-04 11:30 ` [FFmpeg-devel] [PATCH 43/49] fftools/ffmpeg_mux: return errors from update_video_stats() Anton Khirnov 2022-04-04 11:30 ` [FFmpeg-devel] [PATCH 44/49] fftools/ffmpeg_mux: do not call exit_program() in print_sdp() Anton Khirnov 2022-04-04 11:30 ` [FFmpeg-devel] [PATCH 45/49] fftools/ffmpeg: stop using av_stream_get_end_pts() Anton Khirnov 2022-04-04 11:30 ` [FFmpeg-devel] [PATCH 46/49] fftools/ffmpeg: do not write the output file header from init_output_stream() Anton Khirnov 2022-04-04 11:30 ` [FFmpeg-devel] [PATCH 47/49] fftools/ffmpeg: depend on threads Anton Khirnov 2022-04-04 11:30 ` [FFmpeg-devel] [PATCH 48/49] fftools: add a multistream thread-safe queue Anton Khirnov 2022-04-04 11:30 ` Anton Khirnov [this message] 2022-04-05 9:00 ` [FFmpeg-devel] [RFC] Switching ffmpeg.c to a threaded architecture Anton Khirnov 2022-04-05 19:15 ` Michael Niedermayer 2022-04-05 19:46 ` Anton Khirnov 2022-04-05 21:05 ` Soft Works 2022-04-05 21:18 ` Paul B Mahol 2022-04-05 21:19 ` Soft Works 2022-04-06 11:17 ` Paul B Mahol 2022-04-06 15:46 ` Soft Works 2022-04-06 15:54 ` Matt Zagrabelny 2022-04-06 8:41 ` Anton Khirnov 2022-04-06 16:29 ` Soft Works 2022-04-06 17:38 ` Paul B Mahol 2022-04-06 17:56 ` Kieran Kunhya 2022-04-06 18:53 ` Soft Works 2022-04-07 8:32 ` Anton Khirnov 2022-04-08 15:27 ` Soft Works 2022-04-11 8:28 ` Anton Khirnov 2022-04-11 20:09 ` Soft Works 2022-04-11 20:52 ` Paul B Mahol 2022-04-11 20:58 ` Soft Works 2022-04-12 9:29 ` Paul B Mahol 2022-04-12 22:43 ` Soft Works 2022-04-13 9:42 ` Nicolas George 2022-04-13 22:14 ` Soft Works 2022-04-14 10:02 ` Paul B Mahol 2022-04-14 21:37 ` Soft Works 2022-04-06 10:02 ` Anton Khirnov
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=20220404113037.13070-50-anton@khirnov.net \ --to=anton@khirnov.net \ --cc=ffmpeg-devel@ffmpeg.org \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
Git Inbox Mirror of the ffmpeg-devel mailing list - see https://ffmpeg.org/mailman/listinfo/ffmpeg-devel This inbox may be cloned and mirrored by anyone: git clone --mirror https://master.gitmailbox.com/ffmpegdev/0 ffmpegdev/git/0.git # If you have public-inbox 1.1+ installed, you may # initialize and index your mirror using the following commands: public-inbox-init -V2 ffmpegdev ffmpegdev/ https://master.gitmailbox.com/ffmpegdev \ ffmpegdev@gitmailbox.com public-inbox-index ffmpegdev Example config snippet for mirrors. AGPL code for this site: git clone https://public-inbox.org/public-inbox.git