From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from ffbox0-bg.mplayerhq.hu (ffbox0-bg.ffmpeg.org [79.124.17.100]) by master.gitmailbox.com (Postfix) with ESMTP id C488247683 for ; Tue, 19 Sep 2023 19:23:44 +0000 (UTC) Received: from [127.0.1.1] (localhost [127.0.0.1]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTP id 2F5CD68C98A; Tue, 19 Sep 2023 22:21:36 +0300 (EEST) Received: from mail1.khirnov.net (quelana.khirnov.net [94.230.150.81]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTPS id CE53968C8ED for ; Tue, 19 Sep 2023 22:21:29 +0300 (EEST) Received: from localhost (mail1.khirnov.net [IPv6:::1]) by mail1.khirnov.net (Postfix) with ESMTP id 66C5F10 for ; Tue, 19 Sep 2023 21:21:29 +0200 (CEST) Received: from mail1.khirnov.net ([IPv6:::1]) by localhost (mail1.khirnov.net [IPv6:::1]) (amavis, port 10024) with ESMTP id fKt4zWWMr14x for ; Tue, 19 Sep 2023 21:21:24 +0200 (CEST) Received: from libav.khirnov.net (libav.khirnov.net [IPv6:2a00:c500:561:201::7]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256 client-signature RSA-PSS (2048 bits) client-digest SHA256) (Client CN "libav.khirnov.net", Issuer "smtp.khirnov.net SMTP CA" (verified OK)) by mail1.khirnov.net (Postfix) with ESMTPS id CA8F9516C for ; Tue, 19 Sep 2023 21:20:42 +0200 (CEST) Received: from libav.khirnov.net (libav.khirnov.net [IPv6:::1]) by libav.khirnov.net (Postfix) with ESMTP id BE18F3A150F for ; Tue, 19 Sep 2023 21:20:42 +0200 (CEST) From: Anton Khirnov To: ffmpeg-devel@ffmpeg.org Date: Tue, 19 Sep 2023 21:10:43 +0200 Message-Id: <20230919191044.18873-17-anton@khirnov.net> X-Mailer: git-send-email 2.40.1 In-Reply-To: <20230919191044.18873-1-anton@khirnov.net> References: <20230919191044.18873-1-anton@khirnov.net> MIME-Version: 1.0 Subject: [FFmpeg-devel] [PATCH 16/27] fftools/ffmpeg_demux: switch from AVThreadMessageQueue to ThreadQueue X-BeenThere: ffmpeg-devel@ffmpeg.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: FFmpeg development discussions and patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: FFmpeg development discussions and patches Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: ffmpeg-devel-bounces@ffmpeg.org Sender: "ffmpeg-devel" Archived-At: List-Archive: List-Post: * the code is made shorter and simpler * avoids constantly allocating and freeing AVPackets, thanks to ThreadQueue integration with ObjPool * is consistent with decoding/filtering/muxing * reduces the diff in the future switch to thread-aware scheduling This makes ifile_get_packet() always block. Any potential issues caused by this will be resolved by the switch to thread-aware scheduling in future commits. --- fftools/ffmpeg.c | 32 +++++------ fftools/ffmpeg.h | 14 ++++- fftools/ffmpeg_demux.c | 116 +++++++++++++++------------------------- fftools/ffmpeg_filter.c | 2 +- 4 files changed, 71 insertions(+), 93 deletions(-) diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c index 7c33b56cd3..7d6972f689 100644 --- a/fftools/ffmpeg.c +++ b/fftools/ffmpeg.c @@ -1029,9 +1029,6 @@ static int check_keyboard_interaction(int64_t cur_time) static void reset_eagain(void) { - int i; - for (i = 0; i < nb_input_files; i++) - input_files[i]->eagain = 0; for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) ost->unavailable = 0; } @@ -1055,19 +1052,14 @@ static void decode_flush(InputFile *ifile) * this function should be called again * - AVERROR_EOF -- this function should not be called again */ -static int process_input(int file_index) +static int process_input(int file_index, AVPacket *pkt) { InputFile *ifile = input_files[file_index]; InputStream *ist; - AVPacket *pkt; int ret, i; - ret = ifile_get_packet(ifile, &pkt); + ret = ifile_get_packet(ifile, pkt); - if (ret == AVERROR(EAGAIN)) { - ifile->eagain = 1; - return ret; - } if (ret == 1) { /* the input file is looped: flush the decoders */ decode_flush(ifile); @@ -1114,7 +1106,7 @@ static int process_input(int file_index) ret = process_input_packet(ist, pkt, 0); - av_packet_free(&pkt); + av_packet_unref(pkt); return ret < 0 ? ret : 0; } @@ -1124,7 +1116,7 @@ static int process_input(int file_index) * * @return 0 for success, <0 for error */ -static int transcode_step(OutputStream *ost) +static int transcode_step(OutputStream *ost, AVPacket *demux_pkt) { InputStream *ist = NULL; int ret; @@ -1139,10 +1131,8 @@ static int transcode_step(OutputStream *ost) av_assert0(ist); } - ret = process_input(ist->file_index); + ret = process_input(ist->file_index, demux_pkt); if (ret == AVERROR(EAGAIN)) { - if (input_files[ist->file_index]->eagain) - ost->unavailable = 1; return 0; } @@ -1168,12 +1158,19 @@ static int transcode(int *err_rate_exceeded) int ret = 0, i; InputStream *ist; int64_t timer_start; + AVPacket *demux_pkt = NULL; print_stream_maps(); *err_rate_exceeded = 0; atomic_store(&transcode_init_done, 1); + demux_pkt = av_packet_alloc(); + if (!demux_pkt) { + ret = AVERROR(ENOMEM); + goto fail; + } + if (stdin_interaction) { av_log(NULL, AV_LOG_INFO, "Press [q] to stop, [?] for help\n"); } @@ -1201,7 +1198,7 @@ static int transcode(int *err_rate_exceeded) break; } - ret = transcode_step(ost); + ret = transcode_step(ost, demux_pkt); if (ret < 0 && ret != AVERROR_EOF) { av_log(NULL, AV_LOG_ERROR, "Error while filtering: %s\n", av_err2str(ret)); break; @@ -1242,6 +1239,9 @@ static int transcode(int *err_rate_exceeded) /* dump report by using the first video and audio streams */ print_report(1, timer_start, av_gettime_relative()); +fail: + av_packet_free(&demux_pkt); + return ret; } diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h index 5a8f52ce00..88b8ed12c0 100644 --- a/fftools/ffmpeg.h +++ b/fftools/ffmpeg.h @@ -101,6 +101,17 @@ typedef struct { } AudioChannelMap; #endif +/** + * AVPacket.opaque values we use. + */ +enum PacketOpaque { + /** + * Sent by demuxers after seeking, so that decoders should be flushed. + * The packet wit this value is otherwise always empty. + */ + PKT_OPAQUE_SEEK = 1, +}; + typedef struct DemuxPktData { // estimated dts in AV_TIME_BASE_Q, // to be used when real dts is missing @@ -406,7 +417,6 @@ typedef struct InputFile { AVFormatContext *ctx; int eof_reached; /* true if eof reached */ - int eagain; /* true if last read attempt returned EAGAIN */ int64_t input_ts_offset; int input_sync_ref; /** @@ -856,7 +866,7 @@ void ifile_close(InputFile **f); * caller should flush decoders and read from this demuxer again * - a negative error code on failure */ -int ifile_get_packet(InputFile *f, AVPacket **pkt); +int ifile_get_packet(InputFile *f, AVPacket *pkt); int ist_output_add(InputStream *ist, OutputStream *ost); int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple); diff --git a/fftools/ffmpeg_demux.c b/fftools/ffmpeg_demux.c index c01852d4cf..0955956117 100644 --- a/fftools/ffmpeg_demux.c +++ b/fftools/ffmpeg_demux.c @@ -20,6 +20,8 @@ #include #include "ffmpeg.h" +#include "objpool.h" +#include "thread_queue.h" #include "libavutil/avassert.h" #include "libavutil/avstring.h" @@ -32,7 +34,6 @@ #include "libavutil/time.h" #include "libavutil/timestamp.h" #include "libavutil/thread.h" -#include "libavutil/threadmessage.h" #include "libavcodec/packet.h" @@ -109,19 +110,13 @@ typedef struct Demuxer { double readrate_initial_burst; - AVThreadMessageQueue *in_thread_queue; + ThreadQueue *thread_queue; int thread_queue_size; pthread_t thread; - int non_blocking; int read_started; } Demuxer; -typedef struct DemuxMsg { - AVPacket *pkt; - int looping; -} DemuxMsg; - static DemuxStream *ds_from_ist(InputStream *ist) { return (DemuxStream*)ist; @@ -456,26 +451,16 @@ static int ts_fixup(Demuxer *d, AVPacket *pkt) return 0; } -// process an input packet into a message to send to the consumer thread -// src is always cleared by this function -static int input_packet_process(Demuxer *d, DemuxMsg *msg, AVPacket *src) +static int input_packet_process(Demuxer *d, AVPacket *pkt) { InputFile *f = &d->f; - InputStream *ist = f->streams[src->stream_index]; + InputStream *ist = f->streams[pkt->stream_index]; DemuxStream *ds = ds_from_ist(ist); - AVPacket *pkt; int ret = 0; - pkt = av_packet_alloc(); - if (!pkt) { - av_packet_unref(src); - return AVERROR(ENOMEM); - } - av_packet_move_ref(pkt, src); - ret = ts_fixup(d, pkt); if (ret < 0) - goto fail; + return ret; ds->data_size += pkt->size; ds->nb_packets++; @@ -493,10 +478,8 @@ static int input_packet_process(Demuxer *d, DemuxMsg *msg, AVPacket *src) continue; dst_data = av_packet_new_side_data(pkt, src_sd->type, src_sd->size); - if (!dst_data) { - ret = AVERROR(ENOMEM); - goto fail; - } + if (!dst_data) + return AVERROR(ENOMEM); memcpy(dst_data, src_sd->data, src_sd->size); } @@ -513,13 +496,7 @@ static int input_packet_process(Demuxer *d, DemuxMsg *msg, AVPacket *src) av_ts2timestr(input_files[ist->file_index]->ts_offset, &AV_TIME_BASE_Q)); } - msg->pkt = pkt; - pkt = NULL; - -fail: - av_packet_free(&pkt); - - return ret; + return 0; } static void readrate_sleep(Demuxer *d) @@ -569,7 +546,6 @@ static void *input_thread(void *arg) Demuxer *d = arg; InputFile *f = &d->f; AVPacket *pkt; - unsigned flags = d->non_blocking ? AV_THREAD_MESSAGE_NONBLOCK : 0; int ret = 0; pkt = av_packet_alloc(); @@ -585,8 +561,6 @@ static void *input_thread(void *arg) d->wallclock_start = av_gettime_relative(); while (1) { - DemuxMsg msg = { NULL }; - ret = av_read_frame(f->ctx, pkt); if (ret == AVERROR(EAGAIN)) { @@ -596,8 +570,8 @@ static void *input_thread(void *arg) if (ret < 0) { if (d->loop) { /* signal looping to the consumer thread */ - msg.looping = 1; - ret = av_thread_message_queue_send(d->in_thread_queue, &msg, 0); + pkt->opaque = (void*)(intptr_t)PKT_OPAQUE_SEEK; + ret = tq_send(d->thread_queue, 0, pkt); if (ret >= 0) ret = seek_to_start(d); if (ret >= 0) @@ -640,35 +614,26 @@ static void *input_thread(void *arg) } } - ret = input_packet_process(d, &msg, pkt); + ret = input_packet_process(d, pkt); if (ret < 0) break; if (f->readrate) readrate_sleep(d); - ret = av_thread_message_queue_send(d->in_thread_queue, &msg, flags); - if (flags && ret == AVERROR(EAGAIN)) { - flags = 0; - ret = av_thread_message_queue_send(d->in_thread_queue, &msg, flags); - av_log(f, AV_LOG_WARNING, - "Thread message queue blocking; consider raising the " - "thread_queue_size option (current value: %d)\n", - d->thread_queue_size); - } + ret = tq_send(d->thread_queue, 0, pkt); if (ret < 0) { if (ret != AVERROR_EOF) av_log(f, AV_LOG_ERROR, "Unable to send packet to main thread: %s\n", av_err2str(ret)); - av_packet_free(&msg.pkt); break; } } finish: av_assert0(ret < 0); - av_thread_message_queue_set_err_recv(d->in_thread_queue, ret); + tq_send_finish(d->thread_queue, 0); av_packet_free(&pkt); @@ -680,16 +645,16 @@ finish: static void thread_stop(Demuxer *d) { InputFile *f = &d->f; - DemuxMsg msg; - if (!d->in_thread_queue) + if (!d->thread_queue) return; - av_thread_message_queue_set_err_send(d->in_thread_queue, AVERROR_EOF); - while (av_thread_message_queue_recv(d->in_thread_queue, &msg, 0) >= 0) - av_packet_free(&msg.pkt); + + tq_receive_finish(d->thread_queue, 0); pthread_join(d->thread, NULL); - av_thread_message_queue_free(&d->in_thread_queue); + + tq_free(&d->thread_queue); + av_thread_message_queue_free(&f->audio_duration_queue); } @@ -697,18 +662,20 @@ static int thread_start(Demuxer *d) { int ret; InputFile *f = &d->f; + ObjPool *op; if (d->thread_queue_size <= 0) d->thread_queue_size = (nb_input_files > 1 ? 8 : 1); - if (nb_input_files > 1 && - (f->ctx->pb ? !f->ctx->pb->seekable : - strcmp(f->ctx->iformat->name, "lavfi"))) - d->non_blocking = 1; - ret = av_thread_message_queue_alloc(&d->in_thread_queue, - d->thread_queue_size, sizeof(DemuxMsg)); - if (ret < 0) - return ret; + op = objpool_alloc_packets(); + if (!op) + return AVERROR(ENOMEM); + + d->thread_queue = tq_alloc(1, d->thread_queue_size, op, pkt_move); + if (!d->thread_queue) { + objpool_free(&op); + return AVERROR(ENOMEM); + } if (d->loop) { int nb_audio_dec = 0; @@ -738,31 +705,32 @@ static int thread_start(Demuxer *d) return 0; fail: - av_thread_message_queue_free(&d->in_thread_queue); + tq_free(&d->thread_queue); return ret; } -int ifile_get_packet(InputFile *f, AVPacket **pkt) +int ifile_get_packet(InputFile *f, AVPacket *pkt) { Demuxer *d = demuxer_from_ifile(f); - DemuxMsg msg; - int ret; + int ret, dummy; - if (!d->in_thread_queue) { + if (!d->thread_queue) { ret = thread_start(d); if (ret < 0) return ret; } - ret = av_thread_message_queue_recv(d->in_thread_queue, &msg, - d->non_blocking ? - AV_THREAD_MESSAGE_NONBLOCK : 0); + ret = tq_receive(d->thread_queue, &dummy, pkt); if (ret < 0) return ret; - if (msg.looping) - return 1; - *pkt = msg.pkt; + if (pkt->opaque) { + av_assert0((intptr_t)pkt->opaque == PKT_OPAQUE_SEEK && + !pkt->data && !pkt->side_data_elems); + pkt->opaque = NULL; + return 1; + } + return 0; } diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c index cfd13dd81a..04c4b4ea7b 100644 --- a/fftools/ffmpeg_filter.c +++ b/fftools/ffmpeg_filter.c @@ -2656,7 +2656,7 @@ static int choose_input(const FilterGraph *fg, const FilterGraphThread *fgt) InputFilterPriv *ifp = ifp_from_ifilter(ifilter); InputStream *ist = ifp->ist; - if (input_files[ist->file_index]->eagain || fgt->eof_in[i]) + if (fgt->eof_in[i]) continue; nb_requests = av_buffersrc_get_nb_failed_requests(ifp->filter); -- 2.40.1 _______________________________________________ ffmpeg-devel mailing list ffmpeg-devel@ffmpeg.org https://ffmpeg.org/mailman/listinfo/ffmpeg-devel To unsubscribe, visit link above, or email ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".