From: Anton Khirnov <anton@khirnov.net> To: ffmpeg-devel@ffmpeg.org Subject: [FFmpeg-devel] [PATCH 23/27] WIP fftools/ffmpeg_dec: convert to the scheduler Date: Tue, 19 Sep 2023 21:10:50 +0200 Message-ID: <20230919191044.18873-24-anton@khirnov.net> (raw) In-Reply-To: <20230919191044.18873-1-anton@khirnov.net> --- fftools/ffmpeg.c | 9 +-- fftools/ffmpeg.h | 11 --- fftools/ffmpeg_dec.c | 189 +++++++++---------------------------------- 3 files changed, 42 insertions(+), 167 deletions(-) diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c index 00e57c4382..a09a9e1200 100644 --- a/fftools/ffmpeg.c +++ b/fftools/ffmpeg.c @@ -810,11 +810,6 @@ static int process_input_packet(InputStream *ist, const AVPacket *pkt, int no_eo int ret = 0; int eof_reached = 0; - if (ist->decoding_needed) { - ret = dec_packet(ist, pkt, no_eof); - if (ret < 0 && ret != AVERROR_EOF) - return ret; - } if (ret == AVERROR_EOF || (!pkt && !ist->decoding_needed)) eof_reached = 1; @@ -1036,6 +1031,7 @@ static void reset_eagain(void) ost->unavailable = 0; } +#if 0 static void decode_flush(InputFile *ifile) { for (int i = 0; i < ifile->nb_streams; i++) { @@ -1047,6 +1043,7 @@ static void decode_flush(InputFile *ifile) dec_packet(ist, NULL, 1); } } +#endif /* * Return @@ -1063,11 +1060,13 @@ static int process_input(int file_index, AVPacket *pkt) ret = 0; +#if 0 if (ret == 1) { /* the input file is looped: flush the decoders */ decode_flush(ifile); return AVERROR(EAGAIN); } +#endif if (ret < 0) { if (ret != AVERROR_EOF) { av_log(ifile, AV_LOG_ERROR, diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h index 4646c05bea..841f8d0d68 100644 --- a/fftools/ffmpeg.h +++ b/fftools/ffmpeg.h @@ -815,17 +815,6 @@ int hwaccel_retrieve_data(AVCodecContext *avctx, AVFrame *input); int dec_open(InputStream *ist, Scheduler *sch, unsigned sch_idx); void dec_free(Decoder **pdec); -/** - * Submit a packet for decoding - * - * When pkt==NULL and no_eof=0, there will be no more input. Flush decoders and - * mark all downstreams as finished. - * - * When pkt==NULL and no_eof=1, the stream was reset (e.g. after a seek). Flush - * decoders and await further input. - */ -int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof); - int enc_alloc(Encoder **penc, const AVCodec *codec, Scheduler *sch, unsigned sch_idx); void enc_free(Encoder **penc); diff --git a/fftools/ffmpeg_dec.c b/fftools/ffmpeg_dec.c index dc8d0374a3..400fa666b9 100644 --- a/fftools/ffmpeg_dec.c +++ b/fftools/ffmpeg_dec.c @@ -53,24 +53,6 @@ struct Decoder { Scheduler *sch; unsigned sch_idx; - - pthread_t thread; - /** - * Queue for sending coded packets from the main thread to - * the decoder thread. - * - * An empty packet is sent to flush the decoder without terminating - * decoding. - */ - ThreadQueue *queue_in; - /** - * Queue for sending decoded frames from the decoder thread - * to the main thread. - * - * An empty frame is sent to signal that a single packet has been fully - * processed. - */ - ThreadQueue *queue_out; }; // data that is local to the decoder thread and not visible outside of it @@ -79,24 +61,6 @@ typedef struct DecThreadContext { AVPacket *pkt; } DecThreadContext; -static int dec_thread_stop(Decoder *d) -{ - void *ret; - - if (!d->queue_in) - return 0; - - tq_send_finish(d->queue_in, 0); - tq_receive_finish(d->queue_out, 0); - - pthread_join(d->thread, &ret); - - tq_free(&d->queue_in); - tq_free(&d->queue_out); - - return (intptr_t)ret; -} - void dec_free(Decoder **pdec) { Decoder *dec = *pdec; @@ -104,8 +68,6 @@ void dec_free(Decoder **pdec) if (!dec) return; - dec_thread_stop(dec); - av_frame_free(&dec->frame); av_packet_free(&dec->pkt); @@ -147,25 +109,6 @@ fail: return AVERROR(ENOMEM); } -static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame) -{ - int i, ret = 0; - - for (i = 0; i < ist->nb_filters; i++) { - ret = ifilter_send_frame(ist->filters[i], decoded_frame, - i < ist->nb_filters - 1 || - ist->dec->type == AVMEDIA_TYPE_SUBTITLE); - if (ret == AVERROR_EOF) - ret = 0; /* ignore */ - if (ret < 0) { - av_log(NULL, AV_LOG_ERROR, - "Failed to inject frame into filter network: %s\n", av_err2str(ret)); - break; - } - } - return ret; -} - static AVRational audio_samplerate_update(void *logctx, Decoder *d, const AVFrame *frame) { @@ -420,7 +363,8 @@ static int process_subtitle(InputStream *ist, AVFrame *frame) if (!subtitle) return 0; - ret = send_frame_to_filters(ist, frame); + // XXX + //ret = send_frame_to_filters(ist, frame); if (ret < 0) return ret; @@ -495,7 +439,7 @@ static int transcode_subtitles(InputStream *ist, const AVPacket *pkt, ist->frames_decoded++; - // XXX the queue for transferring data back to the main thread runs + // XXX the queue for transferring data to consumers runs // on AVFrames, so we wrap AVSubtitle in an AVBufferRef and put that // inside the frame // eventually, subtitles should be switched to use AVFrames natively @@ -508,26 +452,11 @@ static int transcode_subtitles(InputStream *ist, const AVPacket *pkt, frame->width = ist->dec_ctx->width; frame->height = ist->dec_ctx->height; - ret = tq_send(d->queue_out, 0, frame); + ret = sch_dec_send(d->sch, d->sch_idx, frame); if (ret < 0) av_frame_unref(frame); - return ret; -} - -static int send_filter_eof(InputStream *ist) -{ - Decoder *d = ist->decoder; - int i, ret; - - for (i = 0; i < ist->nb_filters; i++) { - int64_t end_pts = d->last_frame_pts == AV_NOPTS_VALUE ? AV_NOPTS_VALUE : - d->last_frame_pts + d->last_frame_duration_est; - ret = ifilter_send_eof(ist->filters[i], end_pts, d->last_frame_tb); - if (ret < 0) - return ret; - } - return 0; + return ret == AVERROR_EOF ? AVERROR_EXIT : ret; } static int packet_decode(InputStream *ist, const AVPacket *pkt, AVFrame *frame) @@ -629,9 +558,9 @@ static int packet_decode(InputStream *ist, const AVPacket *pkt, AVFrame *frame) ist->frames_decoded++; - ret = tq_send(d->queue_out, 0, frame); + ret = sch_dec_send(d->sch, d->sch_idx, frame); if (ret < 0) - return ret; + return ret == AVERROR_EOF ? AVERROR_EXIT : ret; } } @@ -685,9 +614,9 @@ void *decoder_thread(void *arg) dec_thread_set_name(ist); while (!input_status) { - int dummy, flush_buffers; + int flush_buffers; - input_status = tq_receive(d->queue_in, &dummy, dt.pkt); + input_status = sch_dec_receive(d->sch, d->sch_idx, dt.pkt); flush_buffers = input_status >= 0 && !dt.pkt->buf; if (!dt.pkt->buf) av_log(ist, AV_LOG_VERBOSE, "Decoder thread received %s packet\n", @@ -698,6 +627,14 @@ void *decoder_thread(void *arg) av_packet_unref(dt.pkt); av_frame_unref(dt.frame); + // AVERROR_EOF - EOF from the decoder + // AVERROR_EXIT - EOF from the scheduler + // we treat them differently when flushing + if (ret == AVERROR_EXIT) { + ret = AVERROR_EOF; + flush_buffers = 0; + } + if (ret == AVERROR_EOF) { av_log(ist, AV_LOG_VERBOSE, "Decoder returned EOF, %s\n", flush_buffers ? "resetting" : "finishing"); @@ -725,23 +662,32 @@ void *decoder_thread(void *arg) av_err2str(ret)); break; } - - // signal to the consumer thread that the entire packet was processed - ret = tq_send(d->queue_out, 0, dt.frame); - if (ret < 0) { - if (ret != AVERROR_EOF) - av_log(ist, AV_LOG_ERROR, "Error communicating with the main thread\n"); - break; - } } // EOF is normal thread termination if (ret == AVERROR_EOF) ret = 0; + // on success send EOF timestamp to our downstreams + if (ret >= 0) { + av_frame_unref(dt.frame); + + dt.frame->pts = d->last_frame_pts == AV_NOPTS_VALUE ? AV_NOPTS_VALUE : + d->last_frame_pts + d->last_frame_duration_est; + dt.frame->time_base = d->last_frame_tb; + + // XXX check EOF/EXIT handling + ret = sch_dec_send(d->sch, d->sch_idx, dt.frame); + if (ret < 0 && ret != AVERROR_EOF) { + av_log(NULL, AV_LOG_FATAL, + "Error signalling EOF timestamp: %s\n", av_err2str(ret)); + goto finish; + } + ret = 0; + } + finish: - tq_receive_finish(d->queue_in, 0); - tq_send_finish (d->queue_out, 0); + sch_dec_send(d->sch, d->sch_idx, NULL); #if 0 // make sure the demuxer does not get stuck waiting for audio durations @@ -757,15 +703,12 @@ finish: return (void*)(intptr_t)ret; } +#if 0 int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof) { Decoder *d = ist->decoder; int ret = 0, thread_ret; - // thread already joined - if (!d->queue_in) - return AVERROR_EOF; - // send the packet/flush request/EOF to the decoder thread if (pkt || no_eof) { av_packet_unref(d->pkt); @@ -816,59 +759,10 @@ finish: if (ret < 0 && ret != AVERROR_EOF) return ret; - // signal EOF to our downstreams - ret = send_filter_eof(ist); - if (ret < 0) { - av_log(NULL, AV_LOG_FATAL, "Error marking filters as finished\n"); - return ret; - } return AVERROR_EOF; } - -static int dec_thread_start(InputStream *ist) -{ - Decoder *d = ist->decoder; - ObjPool *op; - int ret = 0; - - op = objpool_alloc_packets(); - if (!op) - return AVERROR(ENOMEM); - - d->queue_in = tq_alloc(1, 1, op, pkt_move); - if (!d->queue_in) { - objpool_free(&op); - return AVERROR(ENOMEM); - } - - op = objpool_alloc_frames(); - if (!op) - goto fail; - - d->queue_out = tq_alloc(1, 4, op, frame_move); - if (!d->queue_out) { - objpool_free(&op); - goto fail; - } - - ret = pthread_create(&d->thread, NULL, decoder_thread, ist); - if (ret) { - ret = AVERROR(ret); - av_log(ist, AV_LOG_ERROR, "pthread_create() failed: %s\n", - av_err2str(ret)); - goto fail; - } - - return 0; -fail: - if (ret >= 0) - ret = AVERROR(ENOMEM); - - tq_free(&d->queue_in); - tq_free(&d->queue_out); - return ret; -} +#endif static enum AVPixelFormat get_format(AVCodecContext *s, const enum AVPixelFormat *pix_fmts) { @@ -1121,12 +1015,5 @@ int dec_open(InputStream *ist, Scheduler *sch, unsigned sch_idx) if (ret < 0) return ret; - ret = dec_thread_start(ist); - if (ret < 0) { - av_log(ist, AV_LOG_ERROR, "Error starting decoder thread: %s\n", - av_err2str(ret)); - return ret; - } - return 0; } -- 2.40.1 _______________________________________________ ffmpeg-devel mailing list ffmpeg-devel@ffmpeg.org https://ffmpeg.org/mailman/listinfo/ffmpeg-devel To unsubscribe, visit link above, or email ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
next prev parent reply other threads:[~2023-09-19 19:24 UTC|newest] Thread overview: 30+ messages / expand[flat|nested] mbox.gz Atom feed top 2023-09-19 19:10 [FFmpeg-devel] [RFC/PATCH] ffmpeg CLI multithreading Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 01/27] fftools/ffmpeg: move derivation of frame duration from filter framerate Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 02/27] fftools/ffmpeg_enc: move handling video frame duration to video_sync_process() Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 03/27] fftools/ffmpeg_enc: move remaining vsync-related code " Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 04/27] fftools/ffmpeg_enc: simplify adjust_frame_pts_to_encoder_tb() signature Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 05/27] ffools/ffmpeg_filter: stop trying to handle an unreachable state Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 06/27] tests/fate/ffmpeg: add tests for -force_key_frames source Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 07/27] fftools/ffmpeg_enc: unbreak -force_key_frames source_no_drop Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 08/27] fftools/ffmpeg_enc: merge -force_key_frames source/source_no_drop Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 09/27] fftools/ffmpeg: stop accessing OutputStream.last_dropped in print_report() Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 10/27] fftools/ffmpeg_enc: move framerate conversion state into a separate struct Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 11/27] fftools/ffmpeg_enc: move fps conversion code to ffmpeg_filter Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 12/27] fftools/ffmpeg_filter: fail on filtering errors Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 13/27] fftools/ffmpeg_enc: constify the frame passed to enc_open() Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 14/27] fftools/ffmpeg_filter: move filtering to a separate thread Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 15/27] fftools/ffmpeg_mux: add muxing thread private data Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 16/27] fftools/ffmpeg_demux: switch from AVThreadMessageQueue to ThreadQueue Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 17/27] XXX: disable fix_sub_duration_heartbeat Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 18/27] XXX fftools/ffmpeg_enc: temporarily disable side data copying Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 19/27] XXX ffmpeg temporarily disable -stream_loop Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 20/27] WIP: fftools/ffmpeg_enc: move encoding to a separate thread Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 21/27] WIP fftools/ffmpeg: add thread-aware transcode scheduling infrastructure Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 22/27] WIP fftools/ffmpeg_demux: convert to the scheduler Anton Khirnov 2023-09-19 19:10 ` Anton Khirnov [this message] 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 24/27] WIP fftools/ffmpeg_filter: " Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 25/27] WIP fftools/ffmpeg_enc: " Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 26/27] WIP fftools/ffmpeg_mux: " Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 27/27] WIP: ffmpeg: switch to scheduler Anton Khirnov 2023-09-20 14:49 ` [FFmpeg-devel] [RFC/PATCH] ffmpeg CLI multithreading Michael Niedermayer 2023-09-25 10:17 ` Anton Khirnov
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=20230919191044.18873-24-anton@khirnov.net \ --to=anton@khirnov.net \ --cc=ffmpeg-devel@ffmpeg.org \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
Git Inbox Mirror of the ffmpeg-devel mailing list - see https://ffmpeg.org/mailman/listinfo/ffmpeg-devel This inbox may be cloned and mirrored by anyone: git clone --mirror https://master.gitmailbox.com/ffmpegdev/0 ffmpegdev/git/0.git # If you have public-inbox 1.1+ installed, you may # initialize and index your mirror using the following commands: public-inbox-init -V2 ffmpegdev ffmpegdev/ https://master.gitmailbox.com/ffmpegdev \ ffmpegdev@gitmailbox.com public-inbox-index ffmpegdev Example config snippet for mirrors. AGPL code for this site: git clone https://public-inbox.org/public-inbox.git