From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from ffbox0-bg.mplayerhq.hu (ffbox0-bg.ffmpeg.org [79.124.17.100]) by master.gitmailbox.com (Postfix) with ESMTP id 648B847699 for ; Tue, 19 Sep 2023 19:24:33 +0000 (UTC) Received: from [127.0.1.1] (localhost [127.0.0.1]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTP id 230BF68C9B2; Tue, 19 Sep 2023 22:21:47 +0300 (EEST) Received: from mail1.khirnov.net (quelana.khirnov.net [94.230.150.81]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTPS id 7030F68C901 for ; Tue, 19 Sep 2023 22:21:45 +0300 (EEST) Received: from localhost (mail1.khirnov.net [IPv6:::1]) by mail1.khirnov.net (Postfix) with ESMTP id 2519A10 for ; Tue, 19 Sep 2023 21:21:45 +0200 (CEST) Received: from mail1.khirnov.net ([IPv6:::1]) by localhost (mail1.khirnov.net [IPv6:::1]) (amavis, port 10024) with ESMTP id BdTd6PMSxMPD for ; Tue, 19 Sep 2023 21:21:39 +0200 (CEST) Received: from libav.khirnov.net (libav.khirnov.net [IPv6:2a00:c500:561:201::7]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256 client-signature RSA-PSS (2048 bits) client-digest SHA256) (Client CN "libav.khirnov.net", Issuer "smtp.khirnov.net SMTP CA" (verified OK)) by mail1.khirnov.net (Postfix) with ESMTPS id 22982520E for ; Tue, 19 Sep 2023 21:20:43 +0200 (CEST) Received: from libav.khirnov.net (libav.khirnov.net [IPv6:::1]) by libav.khirnov.net (Postfix) with ESMTP id 14E333A0212 for ; Tue, 19 Sep 2023 21:20:43 +0200 (CEST) From: Anton Khirnov To: ffmpeg-devel@ffmpeg.org Date: Tue, 19 Sep 2023 21:10:49 +0200 Message-Id: <20230919191044.18873-23-anton@khirnov.net> X-Mailer: git-send-email 2.40.1 In-Reply-To: <20230919191044.18873-1-anton@khirnov.net> References: <20230919191044.18873-1-anton@khirnov.net> MIME-Version: 1.0 Subject: [FFmpeg-devel] [PATCH 22/27] WIP fftools/ffmpeg_demux: convert to the scheduler X-BeenThere: ffmpeg-devel@ffmpeg.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: FFmpeg development discussions and patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: FFmpeg development discussions and patches Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: ffmpeg-devel-bounces@ffmpeg.org Sender: "ffmpeg-devel" Archived-At: List-Archive: List-Post: --- fftools/ffmpeg.c | 2 +- fftools/ffmpeg.h | 12 ---- fftools/ffmpeg_demux.c | 131 +++++++++++++++++++---------------------- 3 files changed, 60 insertions(+), 85 deletions(-) diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c index 995424ca93..00e57c4382 100644 --- a/fftools/ffmpeg.c +++ b/fftools/ffmpeg.c @@ -1061,7 +1061,7 @@ static int process_input(int file_index, AVPacket *pkt) InputStream *ist; int ret, i; - ret = ifile_get_packet(ifile, pkt); + ret = 0; if (ret == 1) { /* the input file is looped: flush the decoders */ diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h index 278216e5ff..4646c05bea 100644 --- a/fftools/ffmpeg.h +++ b/fftools/ffmpeg.h @@ -860,18 +860,6 @@ int64_t of_filesize(OutputFile *of); int ifile_open(const OptionsContext *o, const char *filename, Scheduler *sch); void ifile_close(InputFile **f); -/** - * Get next input packet from the demuxer. - * - * @param pkt the packet is written here when this function returns 0 - * @return - * - 0 when a packet has been read successfully - * - 1 when stream end was reached, but the stream is looped; - * caller should flush decoders and read from this demuxer again - * - a negative error code on failure - */ -int ifile_get_packet(InputFile *f, AVPacket *pkt); - int ist_output_add(InputStream *ist, OutputStream *ost); int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple); diff --git a/fftools/ffmpeg_demux.c b/fftools/ffmpeg_demux.c index 074546d517..9c0e54979e 100644 --- a/fftools/ffmpeg_demux.c +++ b/fftools/ffmpeg_demux.c @@ -21,8 +21,6 @@ #include "ffmpeg.h" #include "ffmpeg_sched.h" -#include "objpool.h" -#include "thread_queue.h" #include "libavutil/avassert.h" #include "libavutil/avstring.h" @@ -34,7 +32,6 @@ #include "libavutil/pixdesc.h" #include "libavutil/time.h" #include "libavutil/timestamp.h" -#include "libavutil/thread.h" #include "libavcodec/packet.h" @@ -65,6 +62,9 @@ typedef struct DemuxStream { double ts_scale; + // scheduler returned EOF for this stream + int finished; + int streamcopy_needed; int wrap_correction_done; @@ -115,11 +115,10 @@ typedef struct Demuxer { double readrate_initial_burst; Scheduler *sch; - ThreadQueue *thread_queue; - int thread_queue_size; - pthread_t thread; int read_started; + int nb_streams_used; + int nb_streams_finished; } Demuxer; static DemuxStream *ds_from_ist(InputStream *ist) @@ -503,6 +502,8 @@ static int input_packet_process(Demuxer *d, AVPacket *pkt) av_ts2timestr(input_files[ist->file_index]->ts_offset, &AV_TIME_BASE_Q)); } + pkt->stream_index = ds->sch_idx_stream; + return 0; } @@ -565,9 +566,14 @@ static void *input_thread(void *arg) discard_unused_programs(f); + // XXX + d->read_started = 1; + d->wallclock_start = av_gettime_relative(); while (1) { + DemuxStream *ds; + ret = av_read_frame(f->ctx, pkt); if (ret == AVERROR(EAGAIN)) { @@ -575,25 +581,32 @@ static void *input_thread(void *arg) continue; } if (ret < 0) { -#if 0 if (d->loop) { - /* signal looping to the consumer thread */ - pkt->opaque = (void*)(intptr_t)PKT_OPAQUE_SEEK; - ret = tq_send(d->thread_queue, 0, pkt); - if (ret >= 0) - ret = seek_to_start(d); + /* signal looping to our consumers */ + for (int i = 0; i < f->nb_streams; i++) { + pkt->opaque = (void*)(intptr_t)PKT_OPAQUE_SEEK; + pkt->stream_index = i; + + ret = sch_demux_send(d->sch, f->index, pkt); + // XXX + if (ret >= 0) + //ret = seek_to_start(d); + if (ret < 0) + break; + } if (ret >= 0) continue; /* fallthrough to the error path */ } -#endif if (ret == AVERROR_EOF) av_log(d, AV_LOG_VERBOSE, "EOF while reading input\n"); - else + else { av_log(d, AV_LOG_ERROR, "Error during demuxing: %s\n", av_err2str(ret)); + ret = exit_on_error ? ret : 0; + } break; } @@ -605,8 +618,9 @@ static void *input_thread(void *arg) /* the following test is needed in case new streams appear dynamically in stream : we ignore them */ - if (pkt->stream_index >= f->nb_streams || - f->streams[pkt->stream_index]->discard) { + ds = pkt->stream_index < f->nb_streams ? + ds_from_ist(f->streams[pkt->stream_index]) : NULL; + if (!ds || ds->ist.discard || ds->finished) { report_new_stream(d, pkt); av_packet_unref(pkt); continue; @@ -630,40 +644,47 @@ static void *input_thread(void *arg) if (f->readrate) readrate_sleep(d); - ret = tq_send(d->thread_queue, 0, pkt); - if (ret < 0) { - if (ret != AVERROR_EOF) + ret = sch_demux_send(d->sch, f->index, pkt); + if (ret == AVERROR_EOF) { + av_packet_unref(pkt); + + av_log(ds, AV_LOG_VERBOSE, "All consumers done\n"); + ds->finished = 1; + + if (++d->nb_streams_finished == d->nb_streams_used) { + av_log(f, AV_LOG_VERBOSE, "All streams' consumers done\n"); + break; + } + continue; + } else if (ret < 0) { + if (ret != AVERROR_EXIT) av_log(f, AV_LOG_ERROR, - "Unable to send packet to main thread: %s\n", + "Unable to send demuxed packet to consumers: %s\n", av_err2str(ret)); break; } } + // EOF/EXIT is normal termination + if (ret == AVERROR_EOF || ret == AVERROR_EXIT) + ret = 0; + finish: - av_assert0(ret < 0); - tq_send_finish(d->thread_queue, 0); + sch_demux_send(d->sch, f->index, NULL); av_packet_free(&pkt); av_log(d, AV_LOG_VERBOSE, "Terminating demuxer thread\n"); - return NULL; + return (void*)(intptr_t)ret; } +// XXX +#if 0 static void thread_stop(Demuxer *d) { InputFile *f = &d->f; - if (!d->thread_queue) - return; - - tq_receive_finish(d->thread_queue, 0); - - pthread_join(d->thread, NULL); - - tq_free(&d->thread_queue); - //av_thread_message_queue_free(&f->audio_duration_queue); } @@ -671,22 +692,7 @@ static int thread_start(Demuxer *d) { int ret; InputFile *f = &d->f; - ObjPool *op; - if (d->thread_queue_size <= 0) - d->thread_queue_size = (nb_input_files > 1 ? 8 : 1); - - op = objpool_alloc_packets(); - if (!op) - return AVERROR(ENOMEM); - - d->thread_queue = tq_alloc(1, d->thread_queue_size, op, pkt_move); - if (!d->thread_queue) { - objpool_free(&op); - return AVERROR(ENOMEM); - } - -#if 0 if (d->loop) { int nb_audio_dec = 0; @@ -704,20 +710,6 @@ static int thread_start(Demuxer *d) f->audio_duration_queue_size = nb_audio_dec; } } -#endif - - if ((ret = pthread_create(&d->thread, NULL, input_thread, d))) { - av_log(d, AV_LOG_ERROR, "pthread_create failed: %s. Try to increase `ulimit -v` or decrease `ulimit -s`.\n", strerror(ret)); - ret = AVERROR(ret); - goto fail; - } - - d->read_started = 1; - - return 0; -fail: - tq_free(&d->thread_queue); - return ret; } int ifile_get_packet(InputFile *f, AVPacket *pkt) @@ -725,12 +717,6 @@ int ifile_get_packet(InputFile *f, AVPacket *pkt) Demuxer *d = demuxer_from_ifile(f); int ret, dummy; - if (!d->thread_queue) { - ret = thread_start(d); - if (ret < 0) - return ret; - } - ret = tq_receive(d->thread_queue, &dummy, pkt); if (ret < 0) return ret; @@ -744,6 +730,7 @@ int ifile_get_packet(InputFile *f, AVPacket *pkt) return 0; } +#endif static void demux_final_stats(Demuxer *d) { @@ -813,8 +800,6 @@ void ifile_close(InputFile **pf) if (!f) return; - thread_stop(d); - if (d->read_started) demux_final_stats(d); @@ -846,7 +831,11 @@ static int ist_use(InputStream *ist, int decoding_needed) ds->sch_idx_stream = ret; } - ist->discard = 0; + if (ist->discard) { + ist->discard = 0; + d->nb_streams_used++; + } + ist->st->discard = ist->user_set_discard; ist->decoding_needed |= decoding_needed; ds->streamcopy_needed |= !decoding_needed; @@ -1647,8 +1636,6 @@ int ifile_open(const OptionsContext *o, const char *filename, Scheduler *sch) "since neither -readrate nor -re were given\n"); } - d->thread_queue_size = o->thread_queue_size; - /* Add all the streams from the given input file to the demuxer */ for (int i = 0; i < ic->nb_streams; i++) { ret = ist_add(o, d, ic->streams[i]); -- 2.40.1 _______________________________________________ ffmpeg-devel mailing list ffmpeg-devel@ffmpeg.org https://ffmpeg.org/mailman/listinfo/ffmpeg-devel To unsubscribe, visit link above, or email ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".