Git Inbox Mirror of the ffmpeg-devel mailing list - see https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
 help / color / mirror / Atom feed
From: Anton Khirnov <anton@khirnov.net>
To: ffmpeg-devel@ffmpeg.org
Subject: [FFmpeg-devel] [PATCH 16/27] fftools/ffmpeg_demux: switch from AVThreadMessageQueue to ThreadQueue
Date: Tue, 19 Sep 2023 21:10:43 +0200
Message-ID: <20230919191044.18873-17-anton@khirnov.net> (raw)
In-Reply-To: <20230919191044.18873-1-anton@khirnov.net>

* the code is made shorter and simpler
* avoids constantly allocating and freeing AVPackets, thanks to
  ThreadQueue integration with ObjPool
* is consistent with decoding/filtering/muxing
* reduces the diff in the future switch to thread-aware scheduling

This makes ifile_get_packet() always block. Any potential issues caused
by this will be resolved by the switch to thread-aware scheduling in
future commits.
---
 fftools/ffmpeg.c        |  32 +++++------
 fftools/ffmpeg.h        |  14 ++++-
 fftools/ffmpeg_demux.c  | 116 +++++++++++++++-------------------------
 fftools/ffmpeg_filter.c |   2 +-
 4 files changed, 71 insertions(+), 93 deletions(-)

diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c
index 7c33b56cd3..7d6972f689 100644
--- a/fftools/ffmpeg.c
+++ b/fftools/ffmpeg.c
@@ -1029,9 +1029,6 @@ static int check_keyboard_interaction(int64_t cur_time)
 
 static void reset_eagain(void)
 {
-    int i;
-    for (i = 0; i < nb_input_files; i++)
-        input_files[i]->eagain = 0;
     for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost))
         ost->unavailable = 0;
 }
@@ -1055,19 +1052,14 @@ static void decode_flush(InputFile *ifile)
  *   this function should be called again
  * - AVERROR_EOF -- this function should not be called again
  */
-static int process_input(int file_index)
+static int process_input(int file_index, AVPacket *pkt)
 {
     InputFile *ifile = input_files[file_index];
     InputStream *ist;
-    AVPacket *pkt;
     int ret, i;
 
-    ret = ifile_get_packet(ifile, &pkt);
+    ret = ifile_get_packet(ifile, pkt);
 
-    if (ret == AVERROR(EAGAIN)) {
-        ifile->eagain = 1;
-        return ret;
-    }
     if (ret == 1) {
         /* the input file is looped: flush the decoders */
         decode_flush(ifile);
@@ -1114,7 +1106,7 @@ static int process_input(int file_index)
 
     ret = process_input_packet(ist, pkt, 0);
 
-    av_packet_free(&pkt);
+    av_packet_unref(pkt);
 
     return ret < 0 ? ret : 0;
 }
@@ -1124,7 +1116,7 @@ static int process_input(int file_index)
  *
  * @return  0 for success, <0 for error
  */
-static int transcode_step(OutputStream *ost)
+static int transcode_step(OutputStream *ost, AVPacket *demux_pkt)
 {
     InputStream  *ist = NULL;
     int ret;
@@ -1139,10 +1131,8 @@ static int transcode_step(OutputStream *ost)
         av_assert0(ist);
     }
 
-    ret = process_input(ist->file_index);
+    ret = process_input(ist->file_index, demux_pkt);
     if (ret == AVERROR(EAGAIN)) {
-        if (input_files[ist->file_index]->eagain)
-            ost->unavailable = 1;
         return 0;
     }
 
@@ -1168,12 +1158,19 @@ static int transcode(int *err_rate_exceeded)
     int ret = 0, i;
     InputStream *ist;
     int64_t timer_start;
+    AVPacket *demux_pkt = NULL;
 
     print_stream_maps();
 
     *err_rate_exceeded = 0;
     atomic_store(&transcode_init_done, 1);
 
+    demux_pkt = av_packet_alloc();
+    if (!demux_pkt) {
+        ret = AVERROR(ENOMEM);
+        goto fail;
+    }
+
     if (stdin_interaction) {
         av_log(NULL, AV_LOG_INFO, "Press [q] to stop, [?] for help\n");
     }
@@ -1201,7 +1198,7 @@ static int transcode(int *err_rate_exceeded)
             break;
         }
 
-        ret = transcode_step(ost);
+        ret = transcode_step(ost, demux_pkt);
         if (ret < 0 && ret != AVERROR_EOF) {
             av_log(NULL, AV_LOG_ERROR, "Error while filtering: %s\n", av_err2str(ret));
             break;
@@ -1242,6 +1239,9 @@ static int transcode(int *err_rate_exceeded)
     /* dump report by using the first video and audio streams */
     print_report(1, timer_start, av_gettime_relative());
 
+fail:
+    av_packet_free(&demux_pkt);
+
     return ret;
 }
 
diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h
index 5a8f52ce00..88b8ed12c0 100644
--- a/fftools/ffmpeg.h
+++ b/fftools/ffmpeg.h
@@ -101,6 +101,17 @@ typedef struct {
 } AudioChannelMap;
 #endif
 
+/**
+ * AVPacket.opaque values we use.
+ */
+enum PacketOpaque {
+    /**
+     * Sent by demuxers after seeking, so that decoders should be flushed.
+     * The packet wit this value is otherwise always empty.
+     */
+    PKT_OPAQUE_SEEK = 1,
+};
+
 typedef struct DemuxPktData {
     // estimated dts in AV_TIME_BASE_Q,
     // to be used when real dts is missing
@@ -406,7 +417,6 @@ typedef struct InputFile {
 
     AVFormatContext *ctx;
     int eof_reached;      /* true if eof reached */
-    int eagain;           /* true if last read attempt returned EAGAIN */
     int64_t input_ts_offset;
     int input_sync_ref;
     /**
@@ -856,7 +866,7 @@ void ifile_close(InputFile **f);
  *     caller should flush decoders and read from this demuxer again
  * - a negative error code on failure
  */
-int ifile_get_packet(InputFile *f, AVPacket **pkt);
+int ifile_get_packet(InputFile *f, AVPacket *pkt);
 
 int ist_output_add(InputStream *ist, OutputStream *ost);
 int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple);
diff --git a/fftools/ffmpeg_demux.c b/fftools/ffmpeg_demux.c
index c01852d4cf..0955956117 100644
--- a/fftools/ffmpeg_demux.c
+++ b/fftools/ffmpeg_demux.c
@@ -20,6 +20,8 @@
 #include <stdint.h>
 
 #include "ffmpeg.h"
+#include "objpool.h"
+#include "thread_queue.h"
 
 #include "libavutil/avassert.h"
 #include "libavutil/avstring.h"
@@ -32,7 +34,6 @@
 #include "libavutil/time.h"
 #include "libavutil/timestamp.h"
 #include "libavutil/thread.h"
-#include "libavutil/threadmessage.h"
 
 #include "libavcodec/packet.h"
 
@@ -109,19 +110,13 @@ typedef struct Demuxer {
 
     double readrate_initial_burst;
 
-    AVThreadMessageQueue *in_thread_queue;
+    ThreadQueue          *thread_queue;
     int                   thread_queue_size;
     pthread_t             thread;
-    int                   non_blocking;
 
     int                   read_started;
 } Demuxer;
 
-typedef struct DemuxMsg {
-    AVPacket *pkt;
-    int looping;
-} DemuxMsg;
-
 static DemuxStream *ds_from_ist(InputStream *ist)
 {
     return (DemuxStream*)ist;
@@ -456,26 +451,16 @@ static int ts_fixup(Demuxer *d, AVPacket *pkt)
     return 0;
 }
 
-// process an input packet into a message to send to the consumer thread
-// src is always cleared by this function
-static int input_packet_process(Demuxer *d, DemuxMsg *msg, AVPacket *src)
+static int input_packet_process(Demuxer *d, AVPacket *pkt)
 {
     InputFile     *f = &d->f;
-    InputStream *ist = f->streams[src->stream_index];
+    InputStream *ist = f->streams[pkt->stream_index];
     DemuxStream  *ds = ds_from_ist(ist);
-    AVPacket *pkt;
     int ret = 0;
 
-    pkt = av_packet_alloc();
-    if (!pkt) {
-        av_packet_unref(src);
-        return AVERROR(ENOMEM);
-    }
-    av_packet_move_ref(pkt, src);
-
     ret = ts_fixup(d, pkt);
     if (ret < 0)
-        goto fail;
+        return ret;
 
     ds->data_size += pkt->size;
     ds->nb_packets++;
@@ -493,10 +478,8 @@ static int input_packet_process(Demuxer *d, DemuxMsg *msg, AVPacket *src)
                 continue;
 
             dst_data = av_packet_new_side_data(pkt, src_sd->type, src_sd->size);
-            if (!dst_data) {
-                ret = AVERROR(ENOMEM);
-                goto fail;
-            }
+            if (!dst_data)
+                return AVERROR(ENOMEM);
 
             memcpy(dst_data, src_sd->data, src_sd->size);
         }
@@ -513,13 +496,7 @@ static int input_packet_process(Demuxer *d, DemuxMsg *msg, AVPacket *src)
                av_ts2timestr(input_files[ist->file_index]->ts_offset, &AV_TIME_BASE_Q));
     }
 
-    msg->pkt = pkt;
-    pkt      = NULL;
-
-fail:
-    av_packet_free(&pkt);
-
-    return ret;
+    return 0;
 }
 
 static void readrate_sleep(Demuxer *d)
@@ -569,7 +546,6 @@ static void *input_thread(void *arg)
     Demuxer   *d = arg;
     InputFile *f = &d->f;
     AVPacket *pkt;
-    unsigned flags = d->non_blocking ? AV_THREAD_MESSAGE_NONBLOCK : 0;
     int ret = 0;
 
     pkt = av_packet_alloc();
@@ -585,8 +561,6 @@ static void *input_thread(void *arg)
     d->wallclock_start = av_gettime_relative();
 
     while (1) {
-        DemuxMsg msg = { NULL };
-
         ret = av_read_frame(f->ctx, pkt);
 
         if (ret == AVERROR(EAGAIN)) {
@@ -596,8 +570,8 @@ static void *input_thread(void *arg)
         if (ret < 0) {
             if (d->loop) {
                 /* signal looping to the consumer thread */
-                msg.looping = 1;
-                ret = av_thread_message_queue_send(d->in_thread_queue, &msg, 0);
+                pkt->opaque = (void*)(intptr_t)PKT_OPAQUE_SEEK;
+                ret = tq_send(d->thread_queue, 0, pkt);
                 if (ret >= 0)
                     ret = seek_to_start(d);
                 if (ret >= 0)
@@ -640,35 +614,26 @@ static void *input_thread(void *arg)
             }
         }
 
-        ret = input_packet_process(d, &msg, pkt);
+        ret = input_packet_process(d, pkt);
         if (ret < 0)
             break;
 
         if (f->readrate)
             readrate_sleep(d);
 
-        ret = av_thread_message_queue_send(d->in_thread_queue, &msg, flags);
-        if (flags && ret == AVERROR(EAGAIN)) {
-            flags = 0;
-            ret = av_thread_message_queue_send(d->in_thread_queue, &msg, flags);
-            av_log(f, AV_LOG_WARNING,
-                   "Thread message queue blocking; consider raising the "
-                   "thread_queue_size option (current value: %d)\n",
-                   d->thread_queue_size);
-        }
+        ret = tq_send(d->thread_queue, 0, pkt);
         if (ret < 0) {
             if (ret != AVERROR_EOF)
                 av_log(f, AV_LOG_ERROR,
                        "Unable to send packet to main thread: %s\n",
                        av_err2str(ret));
-            av_packet_free(&msg.pkt);
             break;
         }
     }
 
 finish:
     av_assert0(ret < 0);
-    av_thread_message_queue_set_err_recv(d->in_thread_queue, ret);
+    tq_send_finish(d->thread_queue, 0);
 
     av_packet_free(&pkt);
 
@@ -680,16 +645,16 @@ finish:
 static void thread_stop(Demuxer *d)
 {
     InputFile *f = &d->f;
-    DemuxMsg msg;
 
-    if (!d->in_thread_queue)
+    if (!d->thread_queue)
         return;
-    av_thread_message_queue_set_err_send(d->in_thread_queue, AVERROR_EOF);
-    while (av_thread_message_queue_recv(d->in_thread_queue, &msg, 0) >= 0)
-        av_packet_free(&msg.pkt);
+
+    tq_receive_finish(d->thread_queue, 0);
 
     pthread_join(d->thread, NULL);
-    av_thread_message_queue_free(&d->in_thread_queue);
+
+    tq_free(&d->thread_queue);
+
     av_thread_message_queue_free(&f->audio_duration_queue);
 }
 
@@ -697,18 +662,20 @@ static int thread_start(Demuxer *d)
 {
     int ret;
     InputFile *f = &d->f;
+    ObjPool *op;
 
     if (d->thread_queue_size <= 0)
         d->thread_queue_size = (nb_input_files > 1 ? 8 : 1);
 
-    if (nb_input_files > 1 &&
-        (f->ctx->pb ? !f->ctx->pb->seekable :
-         strcmp(f->ctx->iformat->name, "lavfi")))
-        d->non_blocking = 1;
-    ret = av_thread_message_queue_alloc(&d->in_thread_queue,
-                                        d->thread_queue_size, sizeof(DemuxMsg));
-    if (ret < 0)
-        return ret;
+    op = objpool_alloc_packets();
+    if (!op)
+        return AVERROR(ENOMEM);
+
+    d->thread_queue = tq_alloc(1, d->thread_queue_size, op, pkt_move);
+    if (!d->thread_queue) {
+        objpool_free(&op);
+        return AVERROR(ENOMEM);
+    }
 
     if (d->loop) {
         int nb_audio_dec = 0;
@@ -738,31 +705,32 @@ static int thread_start(Demuxer *d)
 
     return 0;
 fail:
-    av_thread_message_queue_free(&d->in_thread_queue);
+    tq_free(&d->thread_queue);
     return ret;
 }
 
-int ifile_get_packet(InputFile *f, AVPacket **pkt)
+int ifile_get_packet(InputFile *f, AVPacket *pkt)
 {
     Demuxer *d = demuxer_from_ifile(f);
-    DemuxMsg msg;
-    int ret;
+    int ret, dummy;
 
-    if (!d->in_thread_queue) {
+    if (!d->thread_queue) {
         ret = thread_start(d);
         if (ret < 0)
             return ret;
     }
 
-    ret = av_thread_message_queue_recv(d->in_thread_queue, &msg,
-                                       d->non_blocking ?
-                                       AV_THREAD_MESSAGE_NONBLOCK : 0);
+    ret = tq_receive(d->thread_queue, &dummy, pkt);
     if (ret < 0)
         return ret;
-    if (msg.looping)
-        return 1;
 
-    *pkt = msg.pkt;
+    if (pkt->opaque) {
+        av_assert0((intptr_t)pkt->opaque == PKT_OPAQUE_SEEK &&
+                   !pkt->data && !pkt->side_data_elems);
+        pkt->opaque = NULL;
+        return 1;
+    }
+
     return 0;
 }
 
diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c
index cfd13dd81a..04c4b4ea7b 100644
--- a/fftools/ffmpeg_filter.c
+++ b/fftools/ffmpeg_filter.c
@@ -2656,7 +2656,7 @@ static int choose_input(const FilterGraph *fg, const FilterGraphThread *fgt)
         InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
         InputStream     *ist = ifp->ist;
 
-        if (input_files[ist->file_index]->eagain || fgt->eof_in[i])
+        if (fgt->eof_in[i])
             continue;
 
         nb_requests = av_buffersrc_get_nb_failed_requests(ifp->filter);
-- 
2.40.1

_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel

To unsubscribe, visit link above, or email
ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".

  parent reply	other threads:[~2023-09-19 19:23 UTC|newest]

Thread overview: 30+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2023-09-19 19:10 [FFmpeg-devel] [RFC/PATCH] ffmpeg CLI multithreading Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 01/27] fftools/ffmpeg: move derivation of frame duration from filter framerate Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 02/27] fftools/ffmpeg_enc: move handling video frame duration to video_sync_process() Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 03/27] fftools/ffmpeg_enc: move remaining vsync-related code " Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 04/27] fftools/ffmpeg_enc: simplify adjust_frame_pts_to_encoder_tb() signature Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 05/27] ffools/ffmpeg_filter: stop trying to handle an unreachable state Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 06/27] tests/fate/ffmpeg: add tests for -force_key_frames source Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 07/27] fftools/ffmpeg_enc: unbreak -force_key_frames source_no_drop Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 08/27] fftools/ffmpeg_enc: merge -force_key_frames source/source_no_drop Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 09/27] fftools/ffmpeg: stop accessing OutputStream.last_dropped in print_report() Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 10/27] fftools/ffmpeg_enc: move framerate conversion state into a separate struct Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 11/27] fftools/ffmpeg_enc: move fps conversion code to ffmpeg_filter Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 12/27] fftools/ffmpeg_filter: fail on filtering errors Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 13/27] fftools/ffmpeg_enc: constify the frame passed to enc_open() Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 14/27] fftools/ffmpeg_filter: move filtering to a separate thread Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 15/27] fftools/ffmpeg_mux: add muxing thread private data Anton Khirnov
2023-09-19 19:10 ` Anton Khirnov [this message]
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 17/27] XXX: disable fix_sub_duration_heartbeat Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 18/27] XXX fftools/ffmpeg_enc: temporarily disable side data copying Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 19/27] XXX ffmpeg temporarily disable -stream_loop Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 20/27] WIP: fftools/ffmpeg_enc: move encoding to a separate thread Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 21/27] WIP fftools/ffmpeg: add thread-aware transcode scheduling infrastructure Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 22/27] WIP fftools/ffmpeg_demux: convert to the scheduler Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 23/27] WIP fftools/ffmpeg_dec: " Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 24/27] WIP fftools/ffmpeg_filter: " Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 25/27] WIP fftools/ffmpeg_enc: " Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 26/27] WIP fftools/ffmpeg_mux: " Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 27/27] WIP: ffmpeg: switch to scheduler Anton Khirnov
2023-09-20 14:49 ` [FFmpeg-devel] [RFC/PATCH] ffmpeg CLI multithreading Michael Niedermayer
2023-09-25 10:17   ` Anton Khirnov

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20230919191044.18873-17-anton@khirnov.net \
    --to=anton@khirnov.net \
    --cc=ffmpeg-devel@ffmpeg.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

Git Inbox Mirror of the ffmpeg-devel mailing list - see https://ffmpeg.org/mailman/listinfo/ffmpeg-devel

This inbox may be cloned and mirrored by anyone:

	git clone --mirror https://master.gitmailbox.com/ffmpegdev/0 ffmpegdev/git/0.git

	# If you have public-inbox 1.1+ installed, you may
	# initialize and index your mirror using the following commands:
	public-inbox-init -V2 ffmpegdev ffmpegdev/ https://master.gitmailbox.com/ffmpegdev \
		ffmpegdev@gitmailbox.com
	public-inbox-index ffmpegdev

Example config snippet for mirrors.


AGPL code for this site: git clone https://public-inbox.org/public-inbox.git