Git Inbox Mirror of the ffmpeg-devel mailing list - see https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
 help / color / mirror / Atom feed
From: Anton Khirnov <anton@khirnov.net>
To: ffmpeg-devel@ffmpeg.org
Subject: [FFmpeg-devel] [PATCH 22/25] fftools/ffmpeg: move -stream_loop handling to the demuxer thread
Date: Wed,  3 Aug 2022 15:58:41 +0200
Message-ID: <20220803135844.16662-22-anton@khirnov.net> (raw)
In-Reply-To: <20220803135844.16662-1-anton@khirnov.net>

-stream_loop is currently handled by destroying the demuxer thread,
seeking, then recreating it anew. This is very messy and conflicts with
the future goal of moving each major ffmpeg component into its own
thread.

Handle -stream_loop directly in the demuxer thread. Looping requires the
demuxer to know the duration of the file, which takes into account the
duration of the last decoded audio frame (if any). Use a thread message
queue to communicate this information from the main thread to the
demuxer thread.
---
 fftools/ffmpeg.c       |  61 ++++++++-------
 fftools/ffmpeg.h       |  23 +++++-
 fftools/ffmpeg_demux.c | 166 +++++++++++++++++++++++++----------------
 3 files changed, 158 insertions(+), 92 deletions(-)

diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c
index 90e25973d3..0218f330b9 100644
--- a/fftools/ffmpeg.c
+++ b/fftools/ffmpeg.c
@@ -3628,6 +3628,37 @@ static void reset_eagain(void)
         output_streams[i]->unavailable = 0;
 }
 
+static void decode_flush(InputFile *ifile)
+{
+    for (int i = 0; i < ifile->nb_streams; i++) {
+        InputStream *ist = input_streams[ifile->ist_index + i];
+        int ret;
+
+        if (!ist->processing_needed)
+            continue;
+
+        do {
+            ret = process_input_packet(ist, NULL, 1);
+        } while (ret > 0);
+
+        if (ist->decoding_needed) {
+            /* report last frame duration to the demuxer thread */
+            if (ist->st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO) {
+                LastFrameDuration dur;
+
+                dur.stream_idx = i;
+                dur.duration   = av_rescale_q(ist->nb_samples,
+                                              (AVRational){ 1, ist->dec_ctx->sample_rate},
+                                              ist->st->time_base);
+
+                av_thread_message_queue_send(ifile->audio_duration_queue, &dur, 0);
+            }
+
+            avcodec_flush_buffers(ist->dec_ctx);
+        }
+    }
+}
+
 /*
  * Return
  * - 0 -- one packet was read and processed
@@ -3641,7 +3672,7 @@ static int process_input(int file_index)
     AVFormatContext *is;
     InputStream *ist;
     AVPacket *pkt;
-    int ret, thread_ret, i, j;
+    int ret, i, j;
     int64_t duration;
     int64_t pkt_dts;
     int disable_discontinuity_correction = copy_ts;
@@ -3653,30 +3684,10 @@ static int process_input(int file_index)
         ifile->eagain = 1;
         return ret;
     }
-    if (ret < 0 && ifile->loop) {
-        for (i = 0; i < ifile->nb_streams; i++) {
-            ist = input_streams[ifile->ist_index + i];
-            if (ist->processing_needed) {
-                ret = process_input_packet(ist, NULL, 1);
-                if (ret>0)
-                    return 0;
-                if (ist->decoding_needed)
-                    avcodec_flush_buffers(ist->dec_ctx);
-            }
-        }
-        free_input_thread(file_index);
-        ret = seek_to_start(ifile, is);
-        thread_ret = init_input_thread(file_index);
-        if (thread_ret < 0)
-            return thread_ret;
-        if (ret < 0)
-            av_log(NULL, AV_LOG_WARNING, "Seek to start failed.\n");
-        else
-            ret = ifile_get_packet(ifile, &pkt);
-        if (ret == AVERROR(EAGAIN)) {
-            ifile->eagain = 1;
-            return ret;
-        }
+    if (ret == 1) {
+        /* the input file is looped: flush the decoders */
+        decode_flush(ifile);
+        return AVERROR(EAGAIN);
     }
     if (ret < 0) {
         if (ret != AVERROR_EOF) {
diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h
index 2a9c34eb93..aa97f35310 100644
--- a/fftools/ffmpeg.h
+++ b/fftools/ffmpeg.h
@@ -407,6 +407,11 @@ typedef struct InputStream {
     int got_output;
 } InputStream;
 
+typedef struct LastFrameDuration {
+    int     stream_idx;
+    int64_t duration;
+} LastFrameDuration;
+
 typedef struct InputFile {
     int index;
 
@@ -438,6 +443,11 @@ typedef struct InputFile {
     pthread_t thread;           /* thread reading from this file */
     int non_blocking;           /* reading packets from the thread should not block */
     int thread_queue_size;      /* maximum number of queued packets */
+
+    /* when looping the input file, this queue is used by decoders to report
+     * the last frame duration back to the demuxer thread */
+    AVThreadMessageQueue *audio_duration_queue;
+    int                   audio_duration_queue_size;
 } InputFile;
 
 enum forced_keyframes_const {
@@ -710,11 +720,18 @@ int64_t of_filesize(OutputFile *of);
 AVChapter * const *
 of_get_chapters(OutputFile *of, unsigned int *nb_chapters);
 
+/**
+ * Get next input packet from the demuxer.
+ *
+ * @param pkt the packet is written here when this function returns 0
+ * @return
+ * - 0 when a packet has been read successfully
+ * - 1 when stream end was reached, but the stream is looped;
+ *     caller should flush decoders and read from this demuxer again
+ * - a negative error code on failure
+ */
 int ifile_get_packet(InputFile *f, AVPacket **pkt);
 int init_input_threads(void);
-int init_input_thread(int i);
 void free_input_threads(void);
-void free_input_thread(int i);
-int seek_to_start(InputFile *ifile, AVFormatContext *is);
 
 #endif /* FFTOOLS_FFMPEG_H */
diff --git a/fftools/ffmpeg_demux.c b/fftools/ffmpeg_demux.c
index ff9313b040..e02d2d9656 100644
--- a/fftools/ffmpeg_demux.c
+++ b/fftools/ffmpeg_demux.c
@@ -28,6 +28,11 @@
 
 #include "libavformat/avformat.h"
 
+typedef struct DemuxMsg {
+    AVPacket *pkt;
+    int looping;
+} DemuxMsg;
+
 static void report_new_stream(InputFile *file, AVPacket *pkt)
 {
     AVStream *st = file->ctx->streams[pkt->stream_index];
@@ -42,61 +47,54 @@ static void report_new_stream(InputFile *file, AVPacket *pkt)
     file->nb_streams_warn = pkt->stream_index + 1;
 }
 
-// set duration to max(tmp, duration) in a proper time base and return duration's time_base
-static AVRational duration_max(int64_t tmp, int64_t *duration, AVRational tmp_time_base,
-                               AVRational time_base)
+static void ifile_duration_update(InputFile *f, InputStream *ist,
+                                  int64_t last_duration)
 {
-    int ret;
-
-    if (!*duration) {
-        *duration = tmp;
-        return tmp_time_base;
+    /* the total duration of the stream, max_pts - min_pts is
+     * the duration of the stream without the last frame */
+    if (ist->max_pts > ist->min_pts &&
+        ist->max_pts - (uint64_t)ist->min_pts < INT64_MAX - last_duration)
+        last_duration += ist->max_pts - ist->min_pts;
+
+    if (!f->duration ||
+        av_compare_ts(f->duration, f->time_base,
+                      last_duration, ist->st->time_base) < 0) {
+        f->duration = last_duration;
+        f->time_base = ist->st->time_base;
     }
-
-    ret = av_compare_ts(*duration, time_base, tmp, tmp_time_base);
-    if (ret < 0) {
-        *duration = tmp;
-        return tmp_time_base;
-    }
-
-    return time_base;
 }
 
-int seek_to_start(InputFile *ifile, AVFormatContext *is)
+static int seek_to_start(InputFile *ifile)
 {
+    AVFormatContext *is = ifile->ctx;
     InputStream *ist;
-    AVCodecContext *avctx;
-    int i, ret, has_audio = 0;
-    int64_t duration = 0;
+    int ret;
 
     ret = avformat_seek_file(is, -1, INT64_MIN, is->start_time, is->start_time, 0);
     if (ret < 0)
         return ret;
 
-    for (i = 0; i < ifile->nb_streams; i++) {
-        ist   = input_streams[ifile->ist_index + i];
-        avctx = ist->dec_ctx;
-
+    if (ifile->audio_duration_queue_size) {
         /* duration is the length of the last frame in a stream
          * when audio stream is present we don't care about
          * last video frame length because it's not defined exactly */
-        if (avctx->codec_type == AVMEDIA_TYPE_AUDIO && ist->nb_samples)
-            has_audio = 1;
-    }
+        int got_durations = 0;
 
-    for (i = 0; i < ifile->nb_streams; i++) {
-        ist   = input_streams[ifile->ist_index + i];
-        avctx = ist->dec_ctx;
+        while (got_durations < ifile->audio_duration_queue_size) {
+            LastFrameDuration dur;
+            ret = av_thread_message_queue_recv(ifile->audio_duration_queue, &dur, 0);
+            if (ret < 0)
+                return ret;
+            got_durations++;
 
-        if (has_audio) {
-            if (avctx->codec_type == AVMEDIA_TYPE_AUDIO && ist->nb_samples) {
-                AVRational sample_rate = {1, avctx->sample_rate};
+            ist = input_streams[ifile->ist_index + dur.stream_idx];
+            ifile_duration_update(ifile, ist, dur.duration);
+        }
+    } else {
+        for (int i = 0; i < ifile->nb_streams; i++) {
+            int64_t duration = 0;
+            ist   = input_streams[ifile->ist_index + i];
 
-                duration = av_rescale_q(ist->nb_samples, sample_rate, ist->st->time_base);
-            } else {
-                continue;
-            }
-        } else {
             if (ist->framerate.num) {
                 duration = av_rescale_q(1, av_inv_q(ist->framerate), ist->st->time_base);
             } else if (ist->st->avg_frame_rate.num) {
@@ -104,15 +102,9 @@ int seek_to_start(InputFile *ifile, AVFormatContext *is)
             } else {
                 duration = 1;
             }
+
+            ifile_duration_update(ifile, ist, duration);
         }
-        if (!ifile->duration)
-            ifile->time_base = ist->st->time_base;
-        /* the total duration of the stream, max_pts - min_pts is
-         * the duration of the stream without the last frame */
-        if (ist->max_pts > ist->min_pts && ist->max_pts - (uint64_t)ist->min_pts < INT64_MAX - duration)
-            duration += ist->max_pts - ist->min_pts;
-        ifile->time_base = duration_max(duration, &ifile->duration, ist->st->time_base,
-                                        ifile->time_base);
     }
 
     if (ifile->loop > 0)
@@ -124,11 +116,13 @@ int seek_to_start(InputFile *ifile, AVFormatContext *is)
 static void *input_thread(void *arg)
 {
     InputFile *f = arg;
-    AVPacket *pkt = f->pkt, *queue_pkt;
+    AVPacket *pkt = f->pkt;
     unsigned flags = f->non_blocking ? AV_THREAD_MESSAGE_NONBLOCK : 0;
     int ret = 0;
 
     while (1) {
+        DemuxMsg msg = { NULL };
+
         ret = av_read_frame(f->ctx, pkt);
 
         if (ret == AVERROR(EAGAIN)) {
@@ -136,6 +130,18 @@ static void *input_thread(void *arg)
             continue;
         }
         if (ret < 0) {
+            if (f->loop) {
+                /* signal looping to the consumer thread */
+                msg.looping = 1;
+                ret = av_thread_message_queue_send(f->in_thread_queue, &msg, 0);
+                if (ret >= 0)
+                    ret = seek_to_start(f);
+                if (ret >= 0)
+                    continue;
+
+                /* fallthrough to the error path */
+            }
+
             av_thread_message_queue_set_err_recv(f->in_thread_queue, ret);
             break;
         }
@@ -153,17 +159,17 @@ static void *input_thread(void *arg)
             continue;
         }
 
-        queue_pkt = av_packet_alloc();
-        if (!queue_pkt) {
+        msg.pkt = av_packet_alloc();
+        if (!msg.pkt) {
             av_packet_unref(pkt);
             av_thread_message_queue_set_err_recv(f->in_thread_queue, AVERROR(ENOMEM));
             break;
         }
-        av_packet_move_ref(queue_pkt, pkt);
-        ret = av_thread_message_queue_send(f->in_thread_queue, &queue_pkt, flags);
+        av_packet_move_ref(msg.pkt, pkt);
+        ret = av_thread_message_queue_send(f->in_thread_queue, &msg, flags);
         if (flags && ret == AVERROR(EAGAIN)) {
             flags = 0;
-            ret = av_thread_message_queue_send(f->in_thread_queue, &queue_pkt, flags);
+            ret = av_thread_message_queue_send(f->in_thread_queue, &msg, flags);
             av_log(f->ctx, AV_LOG_WARNING,
                    "Thread message queue blocking; consider raising the "
                    "thread_queue_size option (current value: %d)\n",
@@ -174,7 +180,7 @@ static void *input_thread(void *arg)
                 av_log(f->ctx, AV_LOG_ERROR,
                        "Unable to send packet to main thread: %s\n",
                        av_err2str(ret));
-            av_packet_free(&queue_pkt);
+            av_packet_free(&msg.pkt);
             av_thread_message_queue_set_err_recv(f->in_thread_queue, ret);
             break;
         }
@@ -183,19 +189,20 @@ static void *input_thread(void *arg)
     return NULL;
 }
 
-void free_input_thread(int i)
+static void free_input_thread(int i)
 {
     InputFile *f = input_files[i];
-    AVPacket *pkt;
+    DemuxMsg msg;
 
     if (!f || !f->in_thread_queue)
         return;
     av_thread_message_queue_set_err_send(f->in_thread_queue, AVERROR_EOF);
-    while (av_thread_message_queue_recv(f->in_thread_queue, &pkt, 0) >= 0)
-        av_packet_free(&pkt);
+    while (av_thread_message_queue_recv(f->in_thread_queue, &msg, 0) >= 0)
+        av_packet_free(&msg.pkt);
 
     pthread_join(f->thread, NULL);
     av_thread_message_queue_free(&f->in_thread_queue);
+    av_thread_message_queue_free(&f->audio_duration_queue);
 }
 
 void free_input_threads(void)
@@ -206,7 +213,7 @@ void free_input_threads(void)
         free_input_thread(i);
 }
 
-int init_input_thread(int i)
+static int init_input_thread(int i)
 {
     int ret;
     InputFile *f = input_files[i];
@@ -218,17 +225,38 @@ int init_input_thread(int i)
         strcmp(f->ctx->iformat->name, "lavfi"))
         f->non_blocking = 1;
     ret = av_thread_message_queue_alloc(&f->in_thread_queue,
-                                        f->thread_queue_size, sizeof(f->pkt));
+                                        f->thread_queue_size, sizeof(DemuxMsg));
     if (ret < 0)
         return ret;
 
+    if (f->loop) {
+        int nb_audio_dec = 0;
+
+        for (int i = 0; i < f->nb_streams; i++) {
+            InputStream *ist = input_streams[f->ist_index + i];
+            nb_audio_dec += !!(ist->decoding_needed &&
+                               ist->st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO);
+        }
+
+        if (nb_audio_dec) {
+            ret = av_thread_message_queue_alloc(&f->audio_duration_queue,
+                                                nb_audio_dec, sizeof(LastFrameDuration));
+            if (ret < 0)
+                goto fail;
+            f->audio_duration_queue_size = nb_audio_dec;
+        }
+    }
+
     if ((ret = pthread_create(&f->thread, NULL, input_thread, f))) {
         av_log(NULL, AV_LOG_ERROR, "pthread_create failed: %s. Try to increase `ulimit -v` or decrease `ulimit -s`.\n", strerror(ret));
-        av_thread_message_queue_free(&f->in_thread_queue);
-        return AVERROR(ret);
+        ret = AVERROR(ret);
+        goto fail;
     }
 
     return 0;
+fail:
+    av_thread_message_queue_free(&f->in_thread_queue);
+    return ret;
 }
 
 int init_input_threads(void)
@@ -245,6 +273,9 @@ int init_input_threads(void)
 
 int ifile_get_packet(InputFile *f, AVPacket **pkt)
 {
+    DemuxMsg msg;
+    int ret;
+
     if (f->readrate || f->rate_emu) {
         int i;
         int64_t file_start = copy_ts * (
@@ -264,7 +295,14 @@ int ifile_get_packet(InputFile *f, AVPacket **pkt)
         }
     }
 
-    return av_thread_message_queue_recv(f->in_thread_queue, pkt,
-                                        f->non_blocking ?
-                                        AV_THREAD_MESSAGE_NONBLOCK : 0);
+    ret = av_thread_message_queue_recv(f->in_thread_queue, &msg,
+                                       f->non_blocking ?
+                                       AV_THREAD_MESSAGE_NONBLOCK : 0);
+    if (ret < 0)
+        return ret;
+    if (msg.looping)
+        return 1;
+
+    *pkt = msg.pkt;
+    return 0;
 }
-- 
2.34.1

_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel

To unsubscribe, visit link above, or email
ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".

  parent reply	other threads:[~2022-08-03 14:03 UTC|newest]

Thread overview: 35+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2022-08-03 13:58 [FFmpeg-devel] [PATCH 01/25] fftools/ffmpeg_opt: move adding attachments out of open_output_file() Anton Khirnov
2022-08-03 13:58 ` [FFmpeg-devel] [PATCH 02/25] fftools/ffmpeg_opt: move adding programs " Anton Khirnov
2022-08-03 13:58 ` [FFmpeg-devel] [PATCH 03/25] fftools/ffmpeg_opt: move adding metadata " Anton Khirnov
2022-08-03 13:58 ` [FFmpeg-devel] [PATCH 04/25] fftools/ffmpeg_hw: stop logging to the decoder context Anton Khirnov
2022-08-03 13:58 ` [FFmpeg-devel] [PATCH 05/25] fftools/ffmpeg: stop accessing the decoder context unnecessarily Anton Khirnov
2022-08-03 13:58 ` [FFmpeg-devel] [PATCH 06/25] fftools/ffmpeg_opt: drop redundant decoder selection Anton Khirnov
2022-08-03 13:58 ` [FFmpeg-devel] [PATCH 07/25] fftools/ffmpeg: remove OutputStream.stream_copy Anton Khirnov
2022-08-04 12:40   ` Michael Niedermayer
2022-08-04 12:54     ` Andreas Rheinhardt
2022-08-04 14:37     ` Anton Khirnov
2022-08-04 14:51       ` Andreas Rheinhardt
2022-08-06  4:26     ` [FFmpeg-devel] [PATCH] " Anton Khirnov
2022-08-03 13:58 ` [FFmpeg-devel] [PATCH 08/25] fftools/ffmpeg: remove OutputStream.encoding_needed Anton Khirnov
2022-08-03 13:58 ` [FFmpeg-devel] [PATCH 09/25] fftools/ffmpeg: remove OutputStream.sync_ist Anton Khirnov
2022-08-03 13:58 ` [FFmpeg-devel] [PATCH 10/25] fftools/ffmpeg: deprecate specifying a sync stream with -map Anton Khirnov
2022-08-03 13:58 ` [FFmpeg-devel] [PATCH 11/25] doc/ffmpeg: update -map documentation Anton Khirnov
2022-08-03 13:58 ` [FFmpeg-devel] [PATCH 12/25] fftools/ffmpeg: drop a superfluous stack variable Anton Khirnov
2022-08-03 13:58 ` [FFmpeg-devel] [PATCH 13/25] fftools/ffmpeg: store the input file index in InputFile Anton Khirnov
2022-08-03 13:58 ` [FFmpeg-devel] [PATCH 14/25] fftools/ffmpeg: always read input in a thread Anton Khirnov
2022-08-03 13:58 ` [FFmpeg-devel] [PATCH 15/25] fftools/ffmpeg: drop a write-only variable Anton Khirnov
2022-08-03 13:58 ` [FFmpeg-devel] [PATCH 16/25] fftools/ffmpeg: move the input thread into its own file Anton Khirnov
2022-08-03 13:58 ` [FFmpeg-devel] [PATCH 17/25] fftools/ffmpeg: drop the 'h' key handling Anton Khirnov
2022-08-03 13:58 ` [FFmpeg-devel] [PATCH 18/25] fftools/ffmpeg: handle dumping input packets in input_thread() Anton Khirnov
2022-08-03 13:58 ` [FFmpeg-devel] [PATCH 19/25] fftools/ffmpeg: report new streams from the input thread Anton Khirnov
2022-08-03 18:47   ` Andreas Rheinhardt
2022-08-04  8:20   ` Anton Khirnov
2022-08-04  8:23     ` Nicolas George
2022-08-04  8:25       ` Andreas Rheinhardt
2022-08-04  8:29         ` Nicolas George
2022-08-03 13:58 ` [FFmpeg-devel] [PATCH 20/25] fftools/ffmpeg: move get_input_packet() to ffmpeg_demux.c Anton Khirnov
2022-08-03 13:58 ` [FFmpeg-devel] [PATCH 21/25] fftools/ffmpeg: move seek_to_start() " Anton Khirnov
2022-08-03 13:58 ` Anton Khirnov [this message]
2022-08-03 13:58 ` [FFmpeg-devel] [PATCH 23/25] fftools/ffmpeg_demux: factorize signalling end of demuxing Anton Khirnov
2022-08-03 13:58 ` [FFmpeg-devel] [PATCH 24/25] fftools/ffmpeg_demux: do not store demux packet in the context Anton Khirnov
2022-08-03 13:58 ` [FFmpeg-devel] [PATCH 25/25] fftools/ffmpeg: move handling corrupt packets to the input thread Anton Khirnov

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20220803135844.16662-22-anton@khirnov.net \
    --to=anton@khirnov.net \
    --cc=ffmpeg-devel@ffmpeg.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

Git Inbox Mirror of the ffmpeg-devel mailing list - see https://ffmpeg.org/mailman/listinfo/ffmpeg-devel

This inbox may be cloned and mirrored by anyone:

	git clone --mirror https://master.gitmailbox.com/ffmpegdev/0 ffmpegdev/git/0.git

	# If you have public-inbox 1.1+ installed, you may
	# initialize and index your mirror using the following commands:
	public-inbox-init -V2 ffmpegdev ffmpegdev/ https://master.gitmailbox.com/ffmpegdev \
		ffmpegdev@gitmailbox.com
	public-inbox-index ffmpegdev

Example config snippet for mirrors.


AGPL code for this site: git clone https://public-inbox.org/public-inbox.git