Git Inbox Mirror of the ffmpeg-devel mailing list - see https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
 help / color / mirror / Atom feed
From: Anton Khirnov <anton@khirnov.net>
To: ffmpeg-devel@ffmpeg.org
Subject: [FFmpeg-devel] [PATCH 22/27] WIP fftools/ffmpeg_demux: convert to the scheduler
Date: Tue, 19 Sep 2023 21:10:49 +0200
Message-ID: <20230919191044.18873-23-anton@khirnov.net> (raw)
In-Reply-To: <20230919191044.18873-1-anton@khirnov.net>

---
 fftools/ffmpeg.c       |   2 +-
 fftools/ffmpeg.h       |  12 ----
 fftools/ffmpeg_demux.c | 131 +++++++++++++++++++----------------------
 3 files changed, 60 insertions(+), 85 deletions(-)

diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c
index 995424ca93..00e57c4382 100644
--- a/fftools/ffmpeg.c
+++ b/fftools/ffmpeg.c
@@ -1061,7 +1061,7 @@ static int process_input(int file_index, AVPacket *pkt)
     InputStream *ist;
     int ret, i;
 
-    ret = ifile_get_packet(ifile, pkt);
+    ret = 0;
 
     if (ret == 1) {
         /* the input file is looped: flush the decoders */
diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h
index 278216e5ff..4646c05bea 100644
--- a/fftools/ffmpeg.h
+++ b/fftools/ffmpeg.h
@@ -860,18 +860,6 @@ int64_t of_filesize(OutputFile *of);
 int ifile_open(const OptionsContext *o, const char *filename, Scheduler *sch);
 void ifile_close(InputFile **f);
 
-/**
- * Get next input packet from the demuxer.
- *
- * @param pkt the packet is written here when this function returns 0
- * @return
- * - 0 when a packet has been read successfully
- * - 1 when stream end was reached, but the stream is looped;
- *     caller should flush decoders and read from this demuxer again
- * - a negative error code on failure
- */
-int ifile_get_packet(InputFile *f, AVPacket *pkt);
-
 int ist_output_add(InputStream *ist, OutputStream *ost);
 int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple);
 
diff --git a/fftools/ffmpeg_demux.c b/fftools/ffmpeg_demux.c
index 074546d517..9c0e54979e 100644
--- a/fftools/ffmpeg_demux.c
+++ b/fftools/ffmpeg_demux.c
@@ -21,8 +21,6 @@
 
 #include "ffmpeg.h"
 #include "ffmpeg_sched.h"
-#include "objpool.h"
-#include "thread_queue.h"
 
 #include "libavutil/avassert.h"
 #include "libavutil/avstring.h"
@@ -34,7 +32,6 @@
 #include "libavutil/pixdesc.h"
 #include "libavutil/time.h"
 #include "libavutil/timestamp.h"
-#include "libavutil/thread.h"
 
 #include "libavcodec/packet.h"
 
@@ -65,6 +62,9 @@ typedef struct DemuxStream {
 
     double ts_scale;
 
+    // scheduler returned EOF for this stream
+    int finished;
+
     int streamcopy_needed;
 
     int wrap_correction_done;
@@ -115,11 +115,10 @@ typedef struct Demuxer {
     double readrate_initial_burst;
 
     Scheduler            *sch;
-    ThreadQueue          *thread_queue;
-    int                   thread_queue_size;
-    pthread_t             thread;
 
     int                   read_started;
+    int                   nb_streams_used;
+    int                   nb_streams_finished;
 } Demuxer;
 
 static DemuxStream *ds_from_ist(InputStream *ist)
@@ -503,6 +502,8 @@ static int input_packet_process(Demuxer *d, AVPacket *pkt)
                av_ts2timestr(input_files[ist->file_index]->ts_offset, &AV_TIME_BASE_Q));
     }
 
+    pkt->stream_index = ds->sch_idx_stream;
+
     return 0;
 }
 
@@ -565,9 +566,14 @@ static void *input_thread(void *arg)
 
     discard_unused_programs(f);
 
+    // XXX
+    d->read_started = 1;
+
     d->wallclock_start = av_gettime_relative();
 
     while (1) {
+        DemuxStream *ds;
+
         ret = av_read_frame(f->ctx, pkt);
 
         if (ret == AVERROR(EAGAIN)) {
@@ -575,25 +581,32 @@ static void *input_thread(void *arg)
             continue;
         }
         if (ret < 0) {
-#if 0
             if (d->loop) {
-                /* signal looping to the consumer thread */
-                pkt->opaque = (void*)(intptr_t)PKT_OPAQUE_SEEK;
-                ret = tq_send(d->thread_queue, 0, pkt);
-                if (ret >= 0)
-                    ret = seek_to_start(d);
+                /* signal looping to our consumers */
+                for (int i = 0; i < f->nb_streams; i++) {
+                    pkt->opaque       = (void*)(intptr_t)PKT_OPAQUE_SEEK;
+                    pkt->stream_index = i;
+
+                    ret = sch_demux_send(d->sch, f->index, pkt);
+                    // XXX
+                    if (ret >= 0)
+                        //ret = seek_to_start(d);
+                    if (ret < 0)
+                        break;
+                }
                 if (ret >= 0)
                     continue;
 
                 /* fallthrough to the error path */
             }
-#endif
 
             if (ret == AVERROR_EOF)
                 av_log(d, AV_LOG_VERBOSE, "EOF while reading input\n");
-            else
+            else {
                 av_log(d, AV_LOG_ERROR, "Error during demuxing: %s\n",
                        av_err2str(ret));
+                ret = exit_on_error ? ret : 0;
+            }
 
             break;
         }
@@ -605,8 +618,9 @@ static void *input_thread(void *arg)
 
         /* the following test is needed in case new streams appear
            dynamically in stream : we ignore them */
-        if (pkt->stream_index >= f->nb_streams ||
-            f->streams[pkt->stream_index]->discard) {
+        ds = pkt->stream_index < f->nb_streams ?
+             ds_from_ist(f->streams[pkt->stream_index]) : NULL;
+        if (!ds || ds->ist.discard || ds->finished) {
             report_new_stream(d, pkt);
             av_packet_unref(pkt);
             continue;
@@ -630,40 +644,47 @@ static void *input_thread(void *arg)
         if (f->readrate)
             readrate_sleep(d);
 
-        ret = tq_send(d->thread_queue, 0, pkt);
-        if (ret < 0) {
-            if (ret != AVERROR_EOF)
+        ret = sch_demux_send(d->sch, f->index, pkt);
+        if (ret == AVERROR_EOF) {
+            av_packet_unref(pkt);
+
+            av_log(ds, AV_LOG_VERBOSE, "All consumers done\n");
+            ds->finished = 1;
+
+            if (++d->nb_streams_finished == d->nb_streams_used) {
+                av_log(f, AV_LOG_VERBOSE, "All streams' consumers done\n");
+                break;
+            }
+            continue;
+        } else if (ret < 0) {
+            if (ret != AVERROR_EXIT)
                 av_log(f, AV_LOG_ERROR,
-                       "Unable to send packet to main thread: %s\n",
+                       "Unable to send demuxed packet to consumers: %s\n",
                        av_err2str(ret));
             break;
         }
     }
 
+    // EOF/EXIT is normal termination
+    if (ret == AVERROR_EOF || ret == AVERROR_EXIT)
+        ret = 0;
+
 finish:
-    av_assert0(ret < 0);
-    tq_send_finish(d->thread_queue, 0);
+    sch_demux_send(d->sch, f->index, NULL);
 
     av_packet_free(&pkt);
 
     av_log(d, AV_LOG_VERBOSE, "Terminating demuxer thread\n");
 
-    return NULL;
+    return (void*)(intptr_t)ret;
 }
 
+// XXX
+#if 0
 static void thread_stop(Demuxer *d)
 {
     InputFile *f = &d->f;
 
-    if (!d->thread_queue)
-        return;
-
-    tq_receive_finish(d->thread_queue, 0);
-
-    pthread_join(d->thread, NULL);
-
-    tq_free(&d->thread_queue);
-
     //av_thread_message_queue_free(&f->audio_duration_queue);
 }
 
@@ -671,22 +692,7 @@ static int thread_start(Demuxer *d)
 {
     int ret;
     InputFile *f = &d->f;
-    ObjPool *op;
 
-    if (d->thread_queue_size <= 0)
-        d->thread_queue_size = (nb_input_files > 1 ? 8 : 1);
-
-    op = objpool_alloc_packets();
-    if (!op)
-        return AVERROR(ENOMEM);
-
-    d->thread_queue = tq_alloc(1, d->thread_queue_size, op, pkt_move);
-    if (!d->thread_queue) {
-        objpool_free(&op);
-        return AVERROR(ENOMEM);
-    }
-
-#if 0
     if (d->loop) {
         int nb_audio_dec = 0;
 
@@ -704,20 +710,6 @@ static int thread_start(Demuxer *d)
             f->audio_duration_queue_size = nb_audio_dec;
         }
     }
-#endif
-
-    if ((ret = pthread_create(&d->thread, NULL, input_thread, d))) {
-        av_log(d, AV_LOG_ERROR, "pthread_create failed: %s. Try to increase `ulimit -v` or decrease `ulimit -s`.\n", strerror(ret));
-        ret = AVERROR(ret);
-        goto fail;
-    }
-
-    d->read_started = 1;
-
-    return 0;
-fail:
-    tq_free(&d->thread_queue);
-    return ret;
 }
 
 int ifile_get_packet(InputFile *f, AVPacket *pkt)
@@ -725,12 +717,6 @@ int ifile_get_packet(InputFile *f, AVPacket *pkt)
     Demuxer *d = demuxer_from_ifile(f);
     int ret, dummy;
 
-    if (!d->thread_queue) {
-        ret = thread_start(d);
-        if (ret < 0)
-            return ret;
-    }
-
     ret = tq_receive(d->thread_queue, &dummy, pkt);
     if (ret < 0)
         return ret;
@@ -744,6 +730,7 @@ int ifile_get_packet(InputFile *f, AVPacket *pkt)
 
     return 0;
 }
+#endif
 
 static void demux_final_stats(Demuxer *d)
 {
@@ -813,8 +800,6 @@ void ifile_close(InputFile **pf)
     if (!f)
         return;
 
-    thread_stop(d);
-
     if (d->read_started)
         demux_final_stats(d);
 
@@ -846,7 +831,11 @@ static int ist_use(InputStream *ist, int decoding_needed)
         ds->sch_idx_stream = ret;
     }
 
-    ist->discard          = 0;
+    if (ist->discard) {
+        ist->discard = 0;
+        d->nb_streams_used++;
+    }
+
     ist->st->discard      = ist->user_set_discard;
     ist->decoding_needed |= decoding_needed;
     ds->streamcopy_needed |= !decoding_needed;
@@ -1647,8 +1636,6 @@ int ifile_open(const OptionsContext *o, const char *filename, Scheduler *sch)
                "since neither -readrate nor -re were given\n");
     }
 
-    d->thread_queue_size = o->thread_queue_size;
-
     /* Add all the streams from the given input file to the demuxer */
     for (int i = 0; i < ic->nb_streams; i++) {
         ret = ist_add(o, d, ic->streams[i]);
-- 
2.40.1

_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel

To unsubscribe, visit link above, or email
ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".

  parent reply	other threads:[~2023-09-19 19:24 UTC|newest]

Thread overview: 30+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2023-09-19 19:10 [FFmpeg-devel] [RFC/PATCH] ffmpeg CLI multithreading Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 01/27] fftools/ffmpeg: move derivation of frame duration from filter framerate Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 02/27] fftools/ffmpeg_enc: move handling video frame duration to video_sync_process() Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 03/27] fftools/ffmpeg_enc: move remaining vsync-related code " Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 04/27] fftools/ffmpeg_enc: simplify adjust_frame_pts_to_encoder_tb() signature Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 05/27] ffools/ffmpeg_filter: stop trying to handle an unreachable state Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 06/27] tests/fate/ffmpeg: add tests for -force_key_frames source Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 07/27] fftools/ffmpeg_enc: unbreak -force_key_frames source_no_drop Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 08/27] fftools/ffmpeg_enc: merge -force_key_frames source/source_no_drop Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 09/27] fftools/ffmpeg: stop accessing OutputStream.last_dropped in print_report() Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 10/27] fftools/ffmpeg_enc: move framerate conversion state into a separate struct Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 11/27] fftools/ffmpeg_enc: move fps conversion code to ffmpeg_filter Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 12/27] fftools/ffmpeg_filter: fail on filtering errors Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 13/27] fftools/ffmpeg_enc: constify the frame passed to enc_open() Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 14/27] fftools/ffmpeg_filter: move filtering to a separate thread Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 15/27] fftools/ffmpeg_mux: add muxing thread private data Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 16/27] fftools/ffmpeg_demux: switch from AVThreadMessageQueue to ThreadQueue Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 17/27] XXX: disable fix_sub_duration_heartbeat Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 18/27] XXX fftools/ffmpeg_enc: temporarily disable side data copying Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 19/27] XXX ffmpeg temporarily disable -stream_loop Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 20/27] WIP: fftools/ffmpeg_enc: move encoding to a separate thread Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 21/27] WIP fftools/ffmpeg: add thread-aware transcode scheduling infrastructure Anton Khirnov
2023-09-19 19:10 ` Anton Khirnov [this message]
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 23/27] WIP fftools/ffmpeg_dec: convert to the scheduler Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 24/27] WIP fftools/ffmpeg_filter: " Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 25/27] WIP fftools/ffmpeg_enc: " Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 26/27] WIP fftools/ffmpeg_mux: " Anton Khirnov
2023-09-19 19:10 ` [FFmpeg-devel] [PATCH 27/27] WIP: ffmpeg: switch to scheduler Anton Khirnov
2023-09-20 14:49 ` [FFmpeg-devel] [RFC/PATCH] ffmpeg CLI multithreading Michael Niedermayer
2023-09-25 10:17   ` Anton Khirnov

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20230919191044.18873-23-anton@khirnov.net \
    --to=anton@khirnov.net \
    --cc=ffmpeg-devel@ffmpeg.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

Git Inbox Mirror of the ffmpeg-devel mailing list - see https://ffmpeg.org/mailman/listinfo/ffmpeg-devel

This inbox may be cloned and mirrored by anyone:

	git clone --mirror https://master.gitmailbox.com/ffmpegdev/0 ffmpegdev/git/0.git

	# If you have public-inbox 1.1+ installed, you may
	# initialize and index your mirror using the following commands:
	public-inbox-init -V2 ffmpegdev ffmpegdev/ https://master.gitmailbox.com/ffmpegdev \
		ffmpegdev@gitmailbox.com
	public-inbox-index ffmpegdev

Example config snippet for mirrors.


AGPL code for this site: git clone https://public-inbox.org/public-inbox.git