From: Anton Khirnov <anton@khirnov.net> To: ffmpeg-devel@ffmpeg.org Subject: [FFmpeg-devel] [PATCH 20/27] WIP: fftools/ffmpeg_enc: move encoding to a separate thread Date: Tue, 19 Sep 2023 21:10:47 +0200 Message-ID: <20230919191044.18873-21-anton@khirnov.net> (raw) In-Reply-To: <20230919191044.18873-1-anton@khirnov.net> As for the analogous decoding change, this is only a preparatory step to a fully threaded architecture and does not yet make encoding truly parallel. The main thread will currently submit a frame and wait until it has been fully processed by the encoder before moving on. That will change in future commits after filters are moved to threads and a thread-aware scheduler is added. WIP: resolve all // XXX left in the code Also, if an encoder with a sync queue receives EOF it will terminate after processing everything it currently has, even though the sync queue might still be triggered by other threads. --- fftools/ffmpeg_enc.c | 384 ++++++++++++++++++++++++++++++++++++++----- 1 file changed, 340 insertions(+), 44 deletions(-) diff --git a/fftools/ffmpeg_enc.c b/fftools/ffmpeg_enc.c index d8d7c3416d..ea542173c5 100644 --- a/fftools/ffmpeg_enc.c +++ b/fftools/ffmpeg_enc.c @@ -20,6 +20,7 @@ #include <stdint.h> #include "ffmpeg.h" +#include "thread_queue.h" #include "libavutil/avassert.h" #include "libavutil/avstring.h" @@ -43,6 +44,7 @@ struct Encoder { // packet for receiving encoded output AVPacket *pkt; + AVFrame *sub_frame; // combined size of all the packets received from the encoder uint64_t data_size; @@ -51,8 +53,48 @@ struct Encoder { uint64_t packets_encoded; int opened; + int finished; + + pthread_t thread; + /** + * Queue for sending frames from the main thread to + * the encoder thread. + */ + ThreadQueue *queue_in; + /** + * Queue for sending encoded packets from the encoder thread + * to the main thread. + * + * An empty packet is sent to signal that a previously sent + * frame has been fully processed. + */ + ThreadQueue *queue_out; }; +// data that is local to the decoder thread and not visible outside of it +typedef struct EncoderThread { + AVFrame *frame; + AVPacket *pkt; +} EncoderThread; + +static int enc_thread_stop(Encoder *e) +{ + void *ret; + + if (!e->queue_in) + return 0; + + tq_send_finish(e->queue_in, 0); + tq_receive_finish(e->queue_out, 0); + + pthread_join(e->thread, &ret); + + tq_free(&e->queue_in); + tq_free(&e->queue_out); + + return (int)(intptr_t)ret; +} + void enc_free(Encoder **penc) { Encoder *enc = *penc; @@ -60,7 +102,10 @@ void enc_free(Encoder **penc) if (!enc) return; + enc_thread_stop(enc); + av_frame_free(&enc->sq_frame); + av_frame_free(&enc->sub_frame); av_packet_free(&enc->pkt); @@ -77,6 +122,12 @@ int enc_alloc(Encoder **penc, const AVCodec *codec) if (!enc) return AVERROR(ENOMEM); + if (codec->type == AVMEDIA_TYPE_SUBTITLE) { + enc->sub_frame = av_frame_alloc(); + if (!enc->sub_frame) + goto fail; + } + enc->pkt = av_packet_alloc(); if (!enc->pkt) goto fail; @@ -165,6 +216,52 @@ static int set_encoder_id(OutputFile *of, OutputStream *ost) return 0; } +static void *encoder_thread(void *arg); + +static int enc_thread_start(OutputStream *ost) +{ + Encoder *e = ost->enc; + ObjPool *op; + int ret = 0; + + op = objpool_alloc_frames(); + if (!op) + return AVERROR(ENOMEM); + + e->queue_in = tq_alloc(1, 1, op, frame_move); + if (!e->queue_in) { + objpool_free(&op); + return AVERROR(ENOMEM); + } + + op = objpool_alloc_packets(); + if (!op) + goto fail; + + e->queue_out = tq_alloc(1, 4, op, pkt_move); + if (!e->queue_out) { + objpool_free(&op); + goto fail; + } + + ret = pthread_create(&e->thread, NULL, encoder_thread, ost); + if (ret) { + ret = AVERROR(ret); + av_log(ost, AV_LOG_ERROR, "pthread_create() failed: %s\n", + av_err2str(ret)); + goto fail; + } + + return 0; +fail: + if (ret >= 0) + ret = AVERROR(ENOMEM); + + tq_free(&e->queue_in); + tq_free(&e->queue_out); + return ret; +} + int enc_open(OutputStream *ost, const AVFrame *frame) { InputStream *ist = ost->ist; @@ -387,6 +484,13 @@ int enc_open(OutputStream *ost, const AVFrame *frame) if (ost->st->time_base.num <= 0 || ost->st->time_base.den <= 0) ost->st->time_base = av_add_q(ost->enc_ctx->time_base, (AVRational){0, 1}); + ret = enc_thread_start(ost); + if (ret < 0) { + av_log(ost, AV_LOG_ERROR, "Error starting encoder thread: %s\n", + av_err2str(ret)); + return ret; + } + ret = of_stream_init(of, ost); if (ret < 0) return ret; @@ -400,19 +504,18 @@ static int check_recording_time(OutputStream *ost, int64_t ts, AVRational tb) if (of->recording_time != INT64_MAX && av_compare_ts(ts, tb, of->recording_time, AV_TIME_BASE_Q) >= 0) { - close_output_stream(ost); return 0; } return 1; } -int enc_subtitle(OutputFile *of, OutputStream *ost, const AVSubtitle *sub) +static int do_subtitle_out(OutputFile *of, OutputStream *ost, const AVSubtitle *sub, + AVPacket *pkt) { Encoder *e = ost->enc; int subtitle_out_max_size = 1024 * 1024; int subtitle_out_size, nb, i, ret; AVCodecContext *enc; - AVPacket *pkt = e->pkt; int64_t pts; if (sub->pts == AV_NOPTS_VALUE) { @@ -442,8 +545,9 @@ int enc_subtitle(OutputFile *of, OutputStream *ost, const AVSubtitle *sub) for (i = 0; i < nb; i++) { AVSubtitle local_sub = *sub; + // XXX if (!check_recording_time(ost, pts, AV_TIME_BASE_Q)) - return 0; + return AVERROR_EOF; ret = av_new_packet(pkt, subtitle_out_max_size); if (ret < 0) @@ -484,9 +588,11 @@ int enc_subtitle(OutputFile *of, OutputStream *ost, const AVSubtitle *sub) } pkt->dts = pkt->pts; - ret = of_output_packet(of, ost, pkt); - if (ret < 0) + ret = tq_send(e->queue_out, 0, pkt); + if (ret < 0) { + av_packet_unref(pkt); return ret; + } } return 0; @@ -624,11 +730,11 @@ static int update_video_stats(OutputStream *ost, const AVPacket *pkt, int write_ return 0; } -static int encode_frame(OutputFile *of, OutputStream *ost, AVFrame *frame) +static int encode_frame(OutputFile *of, OutputStream *ost, AVFrame *frame, + AVPacket *pkt) { Encoder *e = ost->enc; AVCodecContext *enc = ost->enc_ctx; - AVPacket *pkt = e->pkt; const char *type_desc = av_get_media_type_string(enc->codec_type); const char *action = frame ? "encode" : "flush"; int ret; @@ -678,11 +784,9 @@ static int encode_frame(OutputFile *of, OutputStream *ost, AVFrame *frame) if (ret == AVERROR(EAGAIN)) { av_assert0(frame); // should never happen during flushing return 0; - } else if (ret == AVERROR_EOF) { - ret = of_output_packet(of, ost, NULL); - return ret < 0 ? ret : AVERROR_EOF; } else if (ret < 0) { - av_log(ost, AV_LOG_ERROR, "%s encoding failed\n", type_desc); + if (ret != AVERROR_EOF) + av_log(ost, AV_LOG_ERROR, "%s encoding failed\n", type_desc); return ret; } @@ -706,33 +810,36 @@ static int encode_frame(OutputFile *of, OutputStream *ost, AVFrame *frame) av_ts2str(pkt->duration), av_ts2timestr(pkt->duration, &enc->time_base)); } - if ((ret = trigger_fix_sub_duration_heartbeat(ost, pkt)) < 0) { - av_log(NULL, AV_LOG_ERROR, - "Subtitle heartbeat logic failed in %s! (%s)\n", - __func__, av_err2str(ret)); - return ret; - } + // XXX + //if ((ret = trigger_fix_sub_duration_heartbeat(ost, pkt)) < 0) { + // av_log(NULL, AV_LOG_ERROR, + // "Subtitle heartbeat logic failed in %s! (%s)\n", + // __func__, av_err2str(ret)); + // return ret; + //} e->data_size += pkt->size; e->packets_encoded++; - ret = of_output_packet(of, ost, pkt); - if (ret < 0) + ret = tq_send(e->queue_out, 0, pkt); + if (ret < 0) { + av_packet_unref(pkt); return ret; + } } av_assert0(0); } static int submit_encode_frame(OutputFile *of, OutputStream *ost, - AVFrame *frame) + AVFrame *frame, AVPacket *pkt) { Encoder *e = ost->enc; int ret; if (ost->sq_idx_encode < 0) - return encode_frame(of, ost, frame); + return encode_frame(of, ost, frame, pkt); if (frame) { ret = av_frame_ref(e->sq_frame, frame); @@ -761,22 +868,22 @@ static int submit_encode_frame(OutputFile *of, OutputStream *ost, return (ret == AVERROR(EAGAIN)) ? 0 : ret; } - ret = encode_frame(of, ost, enc_frame); + ret = encode_frame(of, ost, enc_frame, pkt); if (enc_frame) av_frame_unref(enc_frame); if (ret < 0) { - if (ret == AVERROR_EOF) - close_output_stream(ost); + // XXX + //if (ret == AVERROR_EOF) + // close_output_stream(ost); return ret; } } } static int do_audio_out(OutputFile *of, OutputStream *ost, - AVFrame *frame) + AVFrame *frame, AVPacket *pkt) { AVCodecContext *enc = ost->enc_ctx; - int ret; if (!(enc->codec->capabilities & AV_CODEC_CAP_PARAM_CHANGE) && enc->ch_layout.nb_channels != frame->ch_layout.nb_channels) { @@ -785,11 +892,12 @@ static int do_audio_out(OutputFile *of, OutputStream *ost, return 0; } + // XXX if (!check_recording_time(ost, frame->pts, frame->time_base)) - return 0; + return AVERROR_EOF; - ret = submit_encode_frame(of, ost, frame); - return (ret < 0 && ret != AVERROR_EOF) ? ret : 0; + // XXX check EOF handling + return submit_encode_frame(of, ost, frame, pkt); } static enum AVPictureType forced_kf_apply(void *logctx, KeyframeForceCtx *kf, @@ -839,13 +947,14 @@ force_keyframe: } /* May modify/reset frame */ -static int do_video_out(OutputFile *of, OutputStream *ost, AVFrame *in_picture) +static int do_video_out(OutputFile *of, OutputStream *ost, + AVFrame *in_picture, AVPacket *pkt) { - int ret; AVCodecContext *enc = ost->enc_ctx; + // XXX if (!check_recording_time(ost, in_picture->pts, ost->enc_ctx->time_base)) - return 0; + return AVERROR_EOF; in_picture->quality = enc->global_quality; in_picture->pict_type = forced_kf_apply(ost, &ost->kf, enc->time_base, in_picture); @@ -857,26 +966,210 @@ static int do_video_out(OutputFile *of, OutputStream *ost, AVFrame *in_picture) } #endif - ret = submit_encode_frame(of, ost, in_picture); - return (ret == AVERROR_EOF) ? 0 : ret; + // XXX check EOF handling + return submit_encode_frame(of, ost, in_picture, pkt); +} + +static int frame_encode(OutputStream *ost, AVFrame *frame, AVPacket *pkt) +{ + OutputFile *of = output_files[ost->file_index]; + enum AVMediaType type = ost->type; + int ret; + + if (type == AVMEDIA_TYPE_SUBTITLE) { + // no flushing for subtitles + return frame ? + do_subtitle_out(of, ost, (AVSubtitle*)frame->buf[0]->data, pkt) : 0; + } + + // XXX + if (frame) { + ret = (type == AVMEDIA_TYPE_VIDEO) ? do_video_out(of, ost, frame, pkt) : + do_audio_out(of, ost, frame, pkt); + if (ret < 0) + return ret; + } + + return frame ? 0 : submit_encode_frame(of, ost, NULL, pkt); +} + +static void enc_thread_set_name(const OutputStream *ost) +{ + char name[16]; + snprintf(name, sizeof(name), "enc%d:%d:%s", ost->file_index, ost->index, + ost->enc_ctx->codec->name); + ff_thread_setname(name); +} + +static void enc_thread_uninit(EncoderThread *et) +{ + av_packet_free(&et->pkt); + av_frame_free(&et->frame); + + memset(et, 0, sizeof(*et)); +} + +static int enc_thread_init(EncoderThread *et) +{ + memset(et, 0, sizeof(*et)); + + et->frame = av_frame_alloc(); + if (!et->frame) + goto fail; + + et->pkt = av_packet_alloc(); + if (!et->pkt) + goto fail; + + return 0; + +fail: + enc_thread_uninit(et); + return AVERROR(ENOMEM); +} + +static void *encoder_thread(void *arg) +{ + OutputStream *ost = arg; + OutputFile *of = output_files[ost->file_index]; + Encoder *e = ost->enc; + EncoderThread et; + int ret = 0, input_status = 0; + + ret = enc_thread_init(&et); + if (ret < 0) + goto finish; + + enc_thread_set_name(ost); + + while (!input_status) { + int dummy; + + input_status = tq_receive(e->queue_in, &dummy, et.frame); + if (input_status < 0) + av_log(ost, AV_LOG_VERBOSE, "Encoder thread received EOF\n"); + + ret = frame_encode(ost, input_status >= 0 ? et.frame : NULL, et.pkt); + + av_packet_unref(et.pkt); + av_frame_unref(et.frame); + + if (ret < 0) { + if (ret == AVERROR_EOF) + av_log(ost, AV_LOG_VERBOSE, "Encoder returned EOF, finishing\n"); + else + av_log(ost, AV_LOG_ERROR, "Error encoding a frame: %s\n", + av_err2str(ret)); + break; + } + + // signal to the consumer thread that the frame was encoded + ret = tq_send(e->queue_out, 0, et.pkt); + if (ret < 0) { + if (ret != AVERROR_EOF) + av_log(ost, AV_LOG_ERROR, + "Error communicating with the main thread\n"); + break; + } + } + + // EOF is normal thread termination + if (ret == AVERROR_EOF) + ret = 0; + +finish: + if (ost->sq_idx_encode >= 0) + sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL)); + + tq_receive_finish(e->queue_in, 0); + tq_send_finish (e->queue_out, 0); + + enc_thread_uninit(&et); + + av_log(ost, AV_LOG_VERBOSE, "Terminating encoder thread\n"); + + return (void*)(intptr_t)ret; } int enc_frame(OutputStream *ost, AVFrame *frame) { OutputFile *of = output_files[ost->file_index]; - int ret; + Encoder *e = ost->enc; + int ret, thread_ret; ret = enc_open(ost, frame); if (ret < 0) return ret; - return ost->enc_ctx->codec_type == AVMEDIA_TYPE_VIDEO ? - do_video_out(of, ost, frame) : do_audio_out(of, ost, frame); + // thread already joined + // XXX check EOF handling + if (!e->queue_in) + return AVERROR_EOF; + + // send the frame/EOF to the encoder thread + if (frame) { + ret = tq_send(e->queue_in, 0, frame); + if (ret < 0) + goto finish; + } else + tq_send_finish(e->queue_in, 0); + + // retrieve all encoded data for the frame + while (1) { + int dummy; + + ret = tq_receive(e->queue_out, &dummy, e->pkt); + if (ret < 0) + break; + + // frame fully encoded + if (!e->pkt->data && !e->pkt->side_data_elems) + return 0; + + // process the encoded packet + ret = of_output_packet(of, ost, e->pkt); + if (ret < 0) + goto finish; + } + +finish: + thread_ret = enc_thread_stop(e); + if (thread_ret < 0) { + av_log(ost, AV_LOG_ERROR, "Encoder thread returned error: %s\n", + av_err2str(thread_ret)); + ret = err_merge(ret, thread_ret); + } + + if (ret < 0 && ret != AVERROR_EOF) + return ret; + + // signal EOF to the muxer + return of_output_packet(of, ost, NULL); +} + +int enc_subtitle(OutputFile *of, OutputStream *ost, const AVSubtitle *sub) +{ + Encoder *e = ost->enc; + AVFrame *f = e->sub_frame; + int ret; + + // XXX the queue for transferring data to the encoder thread runs + // on AVFrames, so we wrap AVSubtitle in an AVBufferRef and put + // that inside the frame + // eventually, subtitles should be switched to use AVFrames natively + ret = subtitle_wrap_frame(f, sub, 1); + if (ret < 0) + return ret; + + ret = enc_frame(ost, f); + av_frame_unref(f); + + return ret; } int enc_flush(void) { - int ret; + int ret = 0; for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) { OutputFile *of = output_files[ost->file_index]; @@ -887,16 +1180,19 @@ int enc_flush(void) for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) { Encoder *e = ost->enc; AVCodecContext *enc = ost->enc_ctx; - OutputFile *of = output_files[ost->file_index]; + int err; if (!enc || !e->opened || (enc->codec_type != AVMEDIA_TYPE_VIDEO && enc->codec_type != AVMEDIA_TYPE_AUDIO)) continue; - ret = submit_encode_frame(of, ost, NULL); - if (ret != AVERROR_EOF) - return ret; + err = enc_frame(ost, NULL); + // XXX check EOF handling + if (err != AVERROR_EOF && ret < 0) + ret = err_merge(ret, err); + + av_assert0(!e->queue_in); } - return 0; + return ret; } -- 2.40.1 _______________________________________________ ffmpeg-devel mailing list ffmpeg-devel@ffmpeg.org https://ffmpeg.org/mailman/listinfo/ffmpeg-devel To unsubscribe, visit link above, or email ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
next prev parent reply other threads:[~2023-09-19 19:24 UTC|newest] Thread overview: 30+ messages / expand[flat|nested] mbox.gz Atom feed top 2023-09-19 19:10 [FFmpeg-devel] [RFC/PATCH] ffmpeg CLI multithreading Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 01/27] fftools/ffmpeg: move derivation of frame duration from filter framerate Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 02/27] fftools/ffmpeg_enc: move handling video frame duration to video_sync_process() Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 03/27] fftools/ffmpeg_enc: move remaining vsync-related code " Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 04/27] fftools/ffmpeg_enc: simplify adjust_frame_pts_to_encoder_tb() signature Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 05/27] ffools/ffmpeg_filter: stop trying to handle an unreachable state Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 06/27] tests/fate/ffmpeg: add tests for -force_key_frames source Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 07/27] fftools/ffmpeg_enc: unbreak -force_key_frames source_no_drop Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 08/27] fftools/ffmpeg_enc: merge -force_key_frames source/source_no_drop Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 09/27] fftools/ffmpeg: stop accessing OutputStream.last_dropped in print_report() Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 10/27] fftools/ffmpeg_enc: move framerate conversion state into a separate struct Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 11/27] fftools/ffmpeg_enc: move fps conversion code to ffmpeg_filter Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 12/27] fftools/ffmpeg_filter: fail on filtering errors Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 13/27] fftools/ffmpeg_enc: constify the frame passed to enc_open() Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 14/27] fftools/ffmpeg_filter: move filtering to a separate thread Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 15/27] fftools/ffmpeg_mux: add muxing thread private data Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 16/27] fftools/ffmpeg_demux: switch from AVThreadMessageQueue to ThreadQueue Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 17/27] XXX: disable fix_sub_duration_heartbeat Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 18/27] XXX fftools/ffmpeg_enc: temporarily disable side data copying Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 19/27] XXX ffmpeg temporarily disable -stream_loop Anton Khirnov 2023-09-19 19:10 ` Anton Khirnov [this message] 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 21/27] WIP fftools/ffmpeg: add thread-aware transcode scheduling infrastructure Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 22/27] WIP fftools/ffmpeg_demux: convert to the scheduler Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 23/27] WIP fftools/ffmpeg_dec: " Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 24/27] WIP fftools/ffmpeg_filter: " Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 25/27] WIP fftools/ffmpeg_enc: " Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 26/27] WIP fftools/ffmpeg_mux: " Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 27/27] WIP: ffmpeg: switch to scheduler Anton Khirnov 2023-09-20 14:49 ` [FFmpeg-devel] [RFC/PATCH] ffmpeg CLI multithreading Michael Niedermayer 2023-09-25 10:17 ` Anton Khirnov
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=20230919191044.18873-21-anton@khirnov.net \ --to=anton@khirnov.net \ --cc=ffmpeg-devel@ffmpeg.org \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
Git Inbox Mirror of the ffmpeg-devel mailing list - see https://ffmpeg.org/mailman/listinfo/ffmpeg-devel This inbox may be cloned and mirrored by anyone: git clone --mirror https://master.gitmailbox.com/ffmpegdev/0 ffmpegdev/git/0.git # If you have public-inbox 1.1+ installed, you may # initialize and index your mirror using the following commands: public-inbox-init -V2 ffmpegdev ffmpegdev/ https://master.gitmailbox.com/ffmpegdev \ ffmpegdev@gitmailbox.com public-inbox-index ffmpegdev Example config snippet for mirrors. AGPL code for this site: git clone https://public-inbox.org/public-inbox.git