From: Anton Khirnov <anton@khirnov.net> To: ffmpeg-devel@ffmpeg.org Subject: [FFmpeg-devel] [PATCH 22/27] WIP fftools/ffmpeg_demux: convert to the scheduler Date: Tue, 19 Sep 2023 21:10:49 +0200 Message-ID: <20230919191044.18873-23-anton@khirnov.net> (raw) In-Reply-To: <20230919191044.18873-1-anton@khirnov.net> --- fftools/ffmpeg.c | 2 +- fftools/ffmpeg.h | 12 ---- fftools/ffmpeg_demux.c | 131 +++++++++++++++++++---------------------- 3 files changed, 60 insertions(+), 85 deletions(-) diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c index 995424ca93..00e57c4382 100644 --- a/fftools/ffmpeg.c +++ b/fftools/ffmpeg.c @@ -1061,7 +1061,7 @@ static int process_input(int file_index, AVPacket *pkt) InputStream *ist; int ret, i; - ret = ifile_get_packet(ifile, pkt); + ret = 0; if (ret == 1) { /* the input file is looped: flush the decoders */ diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h index 278216e5ff..4646c05bea 100644 --- a/fftools/ffmpeg.h +++ b/fftools/ffmpeg.h @@ -860,18 +860,6 @@ int64_t of_filesize(OutputFile *of); int ifile_open(const OptionsContext *o, const char *filename, Scheduler *sch); void ifile_close(InputFile **f); -/** - * Get next input packet from the demuxer. - * - * @param pkt the packet is written here when this function returns 0 - * @return - * - 0 when a packet has been read successfully - * - 1 when stream end was reached, but the stream is looped; - * caller should flush decoders and read from this demuxer again - * - a negative error code on failure - */ -int ifile_get_packet(InputFile *f, AVPacket *pkt); - int ist_output_add(InputStream *ist, OutputStream *ost); int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple); diff --git a/fftools/ffmpeg_demux.c b/fftools/ffmpeg_demux.c index 074546d517..9c0e54979e 100644 --- a/fftools/ffmpeg_demux.c +++ b/fftools/ffmpeg_demux.c @@ -21,8 +21,6 @@ #include "ffmpeg.h" #include "ffmpeg_sched.h" -#include "objpool.h" -#include "thread_queue.h" #include "libavutil/avassert.h" #include "libavutil/avstring.h" @@ -34,7 +32,6 @@ #include "libavutil/pixdesc.h" #include "libavutil/time.h" #include "libavutil/timestamp.h" -#include "libavutil/thread.h" #include "libavcodec/packet.h" @@ -65,6 +62,9 @@ typedef struct DemuxStream { double ts_scale; + // scheduler returned EOF for this stream + int finished; + int streamcopy_needed; int wrap_correction_done; @@ -115,11 +115,10 @@ typedef struct Demuxer { double readrate_initial_burst; Scheduler *sch; - ThreadQueue *thread_queue; - int thread_queue_size; - pthread_t thread; int read_started; + int nb_streams_used; + int nb_streams_finished; } Demuxer; static DemuxStream *ds_from_ist(InputStream *ist) @@ -503,6 +502,8 @@ static int input_packet_process(Demuxer *d, AVPacket *pkt) av_ts2timestr(input_files[ist->file_index]->ts_offset, &AV_TIME_BASE_Q)); } + pkt->stream_index = ds->sch_idx_stream; + return 0; } @@ -565,9 +566,14 @@ static void *input_thread(void *arg) discard_unused_programs(f); + // XXX + d->read_started = 1; + d->wallclock_start = av_gettime_relative(); while (1) { + DemuxStream *ds; + ret = av_read_frame(f->ctx, pkt); if (ret == AVERROR(EAGAIN)) { @@ -575,25 +581,32 @@ static void *input_thread(void *arg) continue; } if (ret < 0) { -#if 0 if (d->loop) { - /* signal looping to the consumer thread */ - pkt->opaque = (void*)(intptr_t)PKT_OPAQUE_SEEK; - ret = tq_send(d->thread_queue, 0, pkt); - if (ret >= 0) - ret = seek_to_start(d); + /* signal looping to our consumers */ + for (int i = 0; i < f->nb_streams; i++) { + pkt->opaque = (void*)(intptr_t)PKT_OPAQUE_SEEK; + pkt->stream_index = i; + + ret = sch_demux_send(d->sch, f->index, pkt); + // XXX + if (ret >= 0) + //ret = seek_to_start(d); + if (ret < 0) + break; + } if (ret >= 0) continue; /* fallthrough to the error path */ } -#endif if (ret == AVERROR_EOF) av_log(d, AV_LOG_VERBOSE, "EOF while reading input\n"); - else + else { av_log(d, AV_LOG_ERROR, "Error during demuxing: %s\n", av_err2str(ret)); + ret = exit_on_error ? ret : 0; + } break; } @@ -605,8 +618,9 @@ static void *input_thread(void *arg) /* the following test is needed in case new streams appear dynamically in stream : we ignore them */ - if (pkt->stream_index >= f->nb_streams || - f->streams[pkt->stream_index]->discard) { + ds = pkt->stream_index < f->nb_streams ? + ds_from_ist(f->streams[pkt->stream_index]) : NULL; + if (!ds || ds->ist.discard || ds->finished) { report_new_stream(d, pkt); av_packet_unref(pkt); continue; @@ -630,40 +644,47 @@ static void *input_thread(void *arg) if (f->readrate) readrate_sleep(d); - ret = tq_send(d->thread_queue, 0, pkt); - if (ret < 0) { - if (ret != AVERROR_EOF) + ret = sch_demux_send(d->sch, f->index, pkt); + if (ret == AVERROR_EOF) { + av_packet_unref(pkt); + + av_log(ds, AV_LOG_VERBOSE, "All consumers done\n"); + ds->finished = 1; + + if (++d->nb_streams_finished == d->nb_streams_used) { + av_log(f, AV_LOG_VERBOSE, "All streams' consumers done\n"); + break; + } + continue; + } else if (ret < 0) { + if (ret != AVERROR_EXIT) av_log(f, AV_LOG_ERROR, - "Unable to send packet to main thread: %s\n", + "Unable to send demuxed packet to consumers: %s\n", av_err2str(ret)); break; } } + // EOF/EXIT is normal termination + if (ret == AVERROR_EOF || ret == AVERROR_EXIT) + ret = 0; + finish: - av_assert0(ret < 0); - tq_send_finish(d->thread_queue, 0); + sch_demux_send(d->sch, f->index, NULL); av_packet_free(&pkt); av_log(d, AV_LOG_VERBOSE, "Terminating demuxer thread\n"); - return NULL; + return (void*)(intptr_t)ret; } +// XXX +#if 0 static void thread_stop(Demuxer *d) { InputFile *f = &d->f; - if (!d->thread_queue) - return; - - tq_receive_finish(d->thread_queue, 0); - - pthread_join(d->thread, NULL); - - tq_free(&d->thread_queue); - //av_thread_message_queue_free(&f->audio_duration_queue); } @@ -671,22 +692,7 @@ static int thread_start(Demuxer *d) { int ret; InputFile *f = &d->f; - ObjPool *op; - if (d->thread_queue_size <= 0) - d->thread_queue_size = (nb_input_files > 1 ? 8 : 1); - - op = objpool_alloc_packets(); - if (!op) - return AVERROR(ENOMEM); - - d->thread_queue = tq_alloc(1, d->thread_queue_size, op, pkt_move); - if (!d->thread_queue) { - objpool_free(&op); - return AVERROR(ENOMEM); - } - -#if 0 if (d->loop) { int nb_audio_dec = 0; @@ -704,20 +710,6 @@ static int thread_start(Demuxer *d) f->audio_duration_queue_size = nb_audio_dec; } } -#endif - - if ((ret = pthread_create(&d->thread, NULL, input_thread, d))) { - av_log(d, AV_LOG_ERROR, "pthread_create failed: %s. Try to increase `ulimit -v` or decrease `ulimit -s`.\n", strerror(ret)); - ret = AVERROR(ret); - goto fail; - } - - d->read_started = 1; - - return 0; -fail: - tq_free(&d->thread_queue); - return ret; } int ifile_get_packet(InputFile *f, AVPacket *pkt) @@ -725,12 +717,6 @@ int ifile_get_packet(InputFile *f, AVPacket *pkt) Demuxer *d = demuxer_from_ifile(f); int ret, dummy; - if (!d->thread_queue) { - ret = thread_start(d); - if (ret < 0) - return ret; - } - ret = tq_receive(d->thread_queue, &dummy, pkt); if (ret < 0) return ret; @@ -744,6 +730,7 @@ int ifile_get_packet(InputFile *f, AVPacket *pkt) return 0; } +#endif static void demux_final_stats(Demuxer *d) { @@ -813,8 +800,6 @@ void ifile_close(InputFile **pf) if (!f) return; - thread_stop(d); - if (d->read_started) demux_final_stats(d); @@ -846,7 +831,11 @@ static int ist_use(InputStream *ist, int decoding_needed) ds->sch_idx_stream = ret; } - ist->discard = 0; + if (ist->discard) { + ist->discard = 0; + d->nb_streams_used++; + } + ist->st->discard = ist->user_set_discard; ist->decoding_needed |= decoding_needed; ds->streamcopy_needed |= !decoding_needed; @@ -1647,8 +1636,6 @@ int ifile_open(const OptionsContext *o, const char *filename, Scheduler *sch) "since neither -readrate nor -re were given\n"); } - d->thread_queue_size = o->thread_queue_size; - /* Add all the streams from the given input file to the demuxer */ for (int i = 0; i < ic->nb_streams; i++) { ret = ist_add(o, d, ic->streams[i]); -- 2.40.1 _______________________________________________ ffmpeg-devel mailing list ffmpeg-devel@ffmpeg.org https://ffmpeg.org/mailman/listinfo/ffmpeg-devel To unsubscribe, visit link above, or email ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
next prev parent reply other threads:[~2023-09-19 19:24 UTC|newest] Thread overview: 30+ messages / expand[flat|nested] mbox.gz Atom feed top 2023-09-19 19:10 [FFmpeg-devel] [RFC/PATCH] ffmpeg CLI multithreading Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 01/27] fftools/ffmpeg: move derivation of frame duration from filter framerate Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 02/27] fftools/ffmpeg_enc: move handling video frame duration to video_sync_process() Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 03/27] fftools/ffmpeg_enc: move remaining vsync-related code " Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 04/27] fftools/ffmpeg_enc: simplify adjust_frame_pts_to_encoder_tb() signature Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 05/27] ffools/ffmpeg_filter: stop trying to handle an unreachable state Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 06/27] tests/fate/ffmpeg: add tests for -force_key_frames source Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 07/27] fftools/ffmpeg_enc: unbreak -force_key_frames source_no_drop Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 08/27] fftools/ffmpeg_enc: merge -force_key_frames source/source_no_drop Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 09/27] fftools/ffmpeg: stop accessing OutputStream.last_dropped in print_report() Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 10/27] fftools/ffmpeg_enc: move framerate conversion state into a separate struct Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 11/27] fftools/ffmpeg_enc: move fps conversion code to ffmpeg_filter Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 12/27] fftools/ffmpeg_filter: fail on filtering errors Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 13/27] fftools/ffmpeg_enc: constify the frame passed to enc_open() Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 14/27] fftools/ffmpeg_filter: move filtering to a separate thread Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 15/27] fftools/ffmpeg_mux: add muxing thread private data Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 16/27] fftools/ffmpeg_demux: switch from AVThreadMessageQueue to ThreadQueue Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 17/27] XXX: disable fix_sub_duration_heartbeat Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 18/27] XXX fftools/ffmpeg_enc: temporarily disable side data copying Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 19/27] XXX ffmpeg temporarily disable -stream_loop Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 20/27] WIP: fftools/ffmpeg_enc: move encoding to a separate thread Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 21/27] WIP fftools/ffmpeg: add thread-aware transcode scheduling infrastructure Anton Khirnov 2023-09-19 19:10 ` Anton Khirnov [this message] 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 23/27] WIP fftools/ffmpeg_dec: convert to the scheduler Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 24/27] WIP fftools/ffmpeg_filter: " Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 25/27] WIP fftools/ffmpeg_enc: " Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 26/27] WIP fftools/ffmpeg_mux: " Anton Khirnov 2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 27/27] WIP: ffmpeg: switch to scheduler Anton Khirnov 2023-09-20 14:49 ` [FFmpeg-devel] [RFC/PATCH] ffmpeg CLI multithreading Michael Niedermayer 2023-09-25 10:17 ` Anton Khirnov
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=20230919191044.18873-23-anton@khirnov.net \ --to=anton@khirnov.net \ --cc=ffmpeg-devel@ffmpeg.org \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
Git Inbox Mirror of the ffmpeg-devel mailing list - see https://ffmpeg.org/mailman/listinfo/ffmpeg-devel This inbox may be cloned and mirrored by anyone: git clone --mirror https://master.gitmailbox.com/ffmpegdev/0 ffmpegdev/git/0.git # If you have public-inbox 1.1+ installed, you may # initialize and index your mirror using the following commands: public-inbox-init -V2 ffmpegdev ffmpegdev/ https://master.gitmailbox.com/ffmpegdev \ ffmpegdev@gitmailbox.com public-inbox-index ffmpegdev Example config snippet for mirrors. AGPL code for this site: git clone https://public-inbox.org/public-inbox.git