Git Inbox Mirror of the ffmpeg-devel mailing list - see https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
 help / color / mirror / Atom feed
From: Niklas Haas via ffmpeg-devel <ffmpeg-devel@ffmpeg.org>
To: ffmpeg-devel@ffmpeg.org
Cc: Niklas Haas <code@ffmpeg.org>
Subject: [FFmpeg-devel] [PATCH] WIP: Several improvements around early decoding latency and memory usage of unused inputs (PR #20457)
Date: Sun, 07 Sep 2025 22:30:17 -0000
Message-ID: <175728421754.25.209555194598833183@463a07221176> (raw)

PR #20457 opened by Niklas Haas (haasn)
URL: https://code.ffmpeg.org/FFmpeg/FFmpeg/pulls/20457
Patch URL: https://code.ffmpeg.org/FFmpeg/FFmpeg/pulls/20457.patch

A series of improvements aimed at reducing the amount of wasted memory inside ffmpeg, in particular for unused inputs. Generally speaking, this PR has two parts:

1. Allow threaded decoders to decode a single frame ahead of time (before saturating frame threads); as is often the case during initialization routines.
2. Various improvements inside the `ffmpeg` tool aimed at making sure it does not process/decode more frames than needed, in particular during early filtergraph init.

I marked this as WIP because I first want to substantiate my changes with some sort of benchmarks.


>From 78cc209b51b2cfda45cdb3747554d7a0ae8a1e20 Mon Sep 17 00:00:00 2001
From: Niklas Haas <git@haasn.dev>
Date: Wed, 3 Sep 2025 14:10:55 +0200
Subject: [PATCH 1/7] fftools/ffmpeg_sched: relax queue size assertion

The code in the decoder just cares about allocating enough extra hw frames
to cover the size of the queue; but there's no reason we actually *have* to
use this many. We can safely relax the assertion to a <= check.
---
 fftools/ffmpeg_sched.c | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c
index 3180367576..4f0d446007 100644
--- a/fftools/ffmpeg_sched.c
+++ b/fftools/ffmpeg_sched.c
@@ -389,7 +389,7 @@ static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_si
         // for frames held in queues inside the ffmpeg utility.  If this
         // can ever dynamically change then the corresponding decode
         // code needs to be updated as well.
-        av_assert0(queue_size == DEFAULT_FRAME_THREAD_QUEUE_SIZE);
+        av_assert0(queue_size <= DEFAULT_FRAME_THREAD_QUEUE_SIZE);
     }
 
     tq = tq_alloc(nb_streams, queue_size,
-- 
2.49.1


>From 5165822b5c26277476db061a93e9dd401d8e01ad Mon Sep 17 00:00:00 2001
From: Niklas Haas <git@haasn.dev>
Date: Wed, 3 Sep 2025 14:38:38 +0200
Subject: [PATCH 2/7] fftools/ffmpeg_sched: lower default frame queue size

I tested this extensively under different conditions and could not come up
with any scenario where using a larger queue size was actually beneficial.
Moreover, having such a large default queue is very wasteful especially
for larger frame sizes; and can in the worst case lead to an extra ~50% memory
footprint per input (with the default 16 threads), regardless of whether that
input is currently in use or not.

My methodology was to add logging in the event of a queue underrun/overrun,
and then observe and then observe the frequency of such events in practice,
as well as the impact on performance. I came up with an example filter graph
involving decoding, filtering and encoding with several input files and
various changes to move the bottleneck around.

I found that, in all configurations I tested, with all thread counts and
bottlenecks, using a queue size of 2 frames yielded practically identical
performance to a queue size of 8 frames. I was only able to consistently
measure a slowdown when restricting the queue to a single frame, where the
underruns ended up making up almost 1.1% of frame events in the worst case.

A summary of my test log follows:

= Bottleneck in decoder =

ffmpeg -i A -i B -i C -filter_complex "concat=n=3" -f null -

== 16 threads ==

=== Queue statistics (dec -> filtergraph) ===
- 8 frames = 91355 underruns, 1 overrun
- 4 frames = 91381 underruns, 2 overruns
- 2 frames = 91326 underruns, 21 overruns
- 1 frame  = 91284 underruns, 102 overruns

=== Time elapsed ===
- 8 frames = 14.37s
- 4 frames = 14.28s
- 2 frames = 14.27s
- 1 frame  = 14.35s

== 1 thread ==

=== Queue statistics (dec -> filtergraph) ===
- 8 frames = 91801 underruns, 0 overruns
- 4 frames = 91929 underruns, 1 overrun
- 2 frames = 91854 underruns, 7 overruns
- 1 frame  = 91745 underrons, 83 overruns

=== Time elapsed ===
- 8 frames = 39.51s
- 4 frames = 39.94s
- 2 frames = 39.91s
- 1 frame  = 41.69s

= Bottleneck in filter graph: =

ffmpeg -i A -i B -i C -filter_complex "concat=n=3,scale=3840x2160" -f null -

== 16 threads ==

=== Queue statistics (dec -> filtergraph) ===
- 8 frames =  277 underruns, 84673 overruns
- 4 frames =  640 underruns, 86523 overruns
- 2 frames =  850 underruns, 88751 overruns
- 1 frame  = 1028 underruns, 89957 overruns

=== Time elapsed ===
- 8 frames = 26.35s
- 4 frames = 26.31s
- 2 frames = 26.38s
- 1 frame  = 26.55s

== 1 thread ==

=== Queue statistics (dec -> filtergraph) ===
- 8 frames = 29746 underruns, 57033 overruns
- 4 frames = 29940 underruns, 58948 overruns
- 2 frames = 30160 underruns, 60185 overruns
- 1 frame  = 30259 underruns, 61126 overruns

=== Time elapsed ===
- 8 frames = 52.08s
- 4 frames = 52.49s
- 2 frames = 52.25s
- 1 frame  = 52.69s

= Bottleneck in encoder: =

ffmpeg -i A -i B -i C -filter_complex "concat=n=3" -c:v libx264 -preset veryfast -f null -

== 1 thread ==

== Queue statistics (filtergraph -> enc) ==
- 8 frames = 26763 underruns, 63535 overruns
- 4 frames = 26863 underruns, 63810 overruns
- 2 frames = 27243 underruns, 63839 overruns
- 1 frame  = 27670 underruns, 63953 overruns

== Time elapsed ==
- 8 frames = 89.45s
- 4 frames = 89.04s
- 2 frames = 89.24s
- 1 frame  = 90.26s
---
 fftools/ffmpeg_sched.h | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/fftools/ffmpeg_sched.h b/fftools/ffmpeg_sched.h
index fb7a77ddfc..24ad37b778 100644
--- a/fftools/ffmpeg_sched.h
+++ b/fftools/ffmpeg_sched.h
@@ -257,7 +257,7 @@ int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
 /**
  * Default size of a frame thread queue.
  */
-#define DEFAULT_FRAME_THREAD_QUEUE_SIZE 8
+#define DEFAULT_FRAME_THREAD_QUEUE_SIZE 2
 
 /**
  * Add a muxed stream for a previously added muxer.
-- 
2.49.1


>From 578e7a6a298bb1501b872bfb820d6a9208bc4f0b Mon Sep 17 00:00:00 2001
From: Niklas Haas <git@haasn.dev>
Date: Thu, 4 Sep 2025 16:53:28 +0200
Subject: [PATCH 3/7] fftools/ffmpeg_sched: get rid of src_sched

This field is just saving (typically) a single pointer indirection; and IMO
makes the logic and graph relations unnecessarily complicated. I am also
considering adding choking logic to decoders and encoders as well, which this
field would get in the way of.

Apart from the unchoking logic in unchoke_for_input(), the only other place
that uses this field is the (cold) function check_acyclic(), which can be
served just as well with a simple function to do the graph traversal there.
---
 fftools/ffmpeg_sched.c | 100 ++++++++++++++++++++---------------------
 1 file changed, 49 insertions(+), 51 deletions(-)

diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c
index 4f0d446007..3c5cffa594 100644
--- a/fftools/ffmpeg_sched.c
+++ b/fftools/ffmpeg_sched.c
@@ -189,7 +189,6 @@ typedef struct PreMuxQueue {
 
 typedef struct SchMuxStream {
     SchedulerNode       src;
-    SchedulerNode       src_sched;
 
     unsigned           *sub_heartbeat_dst;
     unsigned         nb_sub_heartbeat_dst;
@@ -235,7 +234,6 @@ typedef struct SchMux {
 
 typedef struct SchFilterIn {
     SchedulerNode       src;
-    SchedulerNode       src_sched;
     int                 send_finished;
     int                 receive_finished;
 } SchFilterIn;
@@ -1268,24 +1266,31 @@ static void unchoke_for_stream(Scheduler *sch, SchedulerNode src)
 {
     while (1) {
         SchFilterGraph *fg;
-
-        // fed directly by a demuxer (i.e. not through a filtergraph)
-        if (src.type == SCH_NODE_TYPE_DEMUX) {
+        switch (src.type) {
+        case SCH_NODE_TYPE_DEMUX:
+            // fed directly by a demuxer (i.e. not through a filtergraph)
             sch->demux[src.idx].waiter.choked_next = 0;
             return;
-        }
-
-        av_assert0(src.type == SCH_NODE_TYPE_FILTER_OUT);
-        fg = &sch->filters[src.idx];
-
-        // the filtergraph contains internal sources and
-        // requested to be scheduled directly
-        if (fg->best_input == fg->nb_inputs) {
-            fg->waiter.choked_next = 0;
+        case SCH_NODE_TYPE_DEC:
+            src = sch->dec[src.idx].src;
+            continue;
+        case SCH_NODE_TYPE_ENC:
+            src = sch->enc[src.idx].src;
+            continue;
+        case SCH_NODE_TYPE_FILTER_OUT:
+            fg = &sch->filters[src.idx];
+            // the filtergraph contains internal sources and
+            // requested to be scheduled directly
+            if (fg->best_input == fg->nb_inputs) {
+                fg->waiter.choked_next = 0;
+                return;
+            }
+            src = fg->inputs[fg->best_input].src;
+            continue;
+        default:
+            av_unreachable("Invalid source node type?");
             return;
         }
-
-        src = fg->inputs[fg->best_input].src_sched;
     }
 }
 
@@ -1328,7 +1333,7 @@ static void schedule_update_locked(Scheduler *sch)
                 continue;
 
             // resolve the source to unchoke
-            unchoke_for_stream(sch, ms->src_sched);
+            unchoke_for_stream(sch, ms->src);
             have_unchoked = 1;
         }
     }
@@ -1361,6 +1366,27 @@ enum {
     CYCLE_NODE_DONE,
 };
 
+// Finds the filtergraph or muxer upstream of a scheduler node
+static SchedulerNode src_filtergraph(const Scheduler *sch, SchedulerNode src)
+{
+    while (1) {
+        switch (src.type) {
+        case SCH_NODE_TYPE_DEMUX:
+        case SCH_NODE_TYPE_FILTER_OUT:
+            return src;
+        case SCH_NODE_TYPE_DEC:
+            src = sch->dec[src.idx].src;
+            continue;
+        case SCH_NODE_TYPE_ENC:
+            src = sch->enc[src.idx].src;
+            continue;
+        default:
+            av_unreachable("Invalid source node type?");
+            return (SchedulerNode) {0};
+        }
+    }
+}
+
 static int
 check_acyclic_for_output(const Scheduler *sch, SchedulerNode src,
                          uint8_t *filters_visited, SchedulerNode *filters_stack)
@@ -1377,22 +1403,23 @@ check_acyclic_for_output(const Scheduler *sch, SchedulerNode src,
         // descend into every input, depth first
         if (src.idx_stream < fg->nb_inputs) {
             const SchFilterIn *fi = &fg->inputs[src.idx_stream++];
+            SchedulerNode node = src_filtergraph(sch, fi->src);
 
             // connected to demuxer, no cycles possible
-            if (fi->src_sched.type == SCH_NODE_TYPE_DEMUX)
+            if (node.type == SCH_NODE_TYPE_DEMUX)
                 continue;
 
             // otherwise connected to another filtergraph
-            av_assert0(fi->src_sched.type == SCH_NODE_TYPE_FILTER_OUT);
+            av_assert0(node.type == SCH_NODE_TYPE_FILTER_OUT);
 
             // found a cycle
-            if (filters_visited[fi->src_sched.idx] == CYCLE_NODE_STARTED)
+            if (filters_visited[node.idx] == CYCLE_NODE_STARTED)
                 return AVERROR(EINVAL);
 
             // place current position on stack and descend
             av_assert0(nb_filters_stack < sch->nb_filters);
             filters_stack[nb_filters_stack++] = src;
-            src = (SchedulerNode){ .idx = fi->src_sched.idx, .idx_stream = 0 };
+            src = (SchedulerNode){ .idx = node.idx, .idx_stream = 0 };
             continue;
         }
 
@@ -1514,22 +1541,7 @@ static int start_prepare(Scheduler *sch)
         for (unsigned j = 0; j < mux->nb_streams; j++) {
             SchMuxStream *ms = &mux->streams[j];
 
-            switch (ms->src.type) {
-            case SCH_NODE_TYPE_ENC: {
-                SchEnc *enc = &sch->enc[ms->src.idx];
-                if (enc->src.type == SCH_NODE_TYPE_DEC) {
-                    ms->src_sched = sch->dec[enc->src.idx].src;
-                    av_assert0(ms->src_sched.type == SCH_NODE_TYPE_DEMUX);
-                } else {
-                    ms->src_sched = enc->src;
-                    av_assert0(ms->src_sched.type == SCH_NODE_TYPE_FILTER_OUT);
-                }
-                break;
-                }
-            case SCH_NODE_TYPE_DEMUX:
-                ms->src_sched = ms->src;
-                break;
-            default:
+            if (!ms->src.type) {
                 av_log(mux, AV_LOG_ERROR,
                        "Muxer stream #%u not connected to a source\n", j);
                 return AVERROR(EINVAL);
@@ -1547,26 +1559,12 @@ static int start_prepare(Scheduler *sch)
 
         for (unsigned j = 0; j < fg->nb_inputs; j++) {
             SchFilterIn *fi = &fg->inputs[j];
-            SchDec     *dec;
 
             if (!fi->src.type) {
                 av_log(fg, AV_LOG_ERROR,
                        "Filtergraph input %u not connected to a source\n", j);
                 return AVERROR(EINVAL);
             }
-
-            if (fi->src.type == SCH_NODE_TYPE_FILTER_OUT)
-                fi->src_sched = fi->src;
-            else {
-                av_assert0(fi->src.type == SCH_NODE_TYPE_DEC);
-                dec = &sch->dec[fi->src.idx];
-
-                switch (dec->src.type) {
-                case SCH_NODE_TYPE_DEMUX: fi->src_sched = dec->src;                   break;
-                case SCH_NODE_TYPE_ENC:   fi->src_sched = sch->enc[dec->src.idx].src; break;
-                default: av_assert0(0);
-                }
-            }
         }
 
         for (unsigned j = 0; j < fg->nb_outputs; j++) {
-- 
2.49.1


>From 1593181148281d3424fc284e2b4fae334716c6ea Mon Sep 17 00:00:00 2001
From: Niklas Haas <git@haasn.dev>
Date: Thu, 4 Sep 2025 15:11:46 +0200
Subject: [PATCH 4/7] fftools/ffmpeg_sched: choke inputs during filtergraph
 configuration

Currently, while the filter graph is being initially created, the scheduler
continues demuxing frames on the last input that happened to be active before
the filter graph was complete.

This can lead to an excess number of decoded frames "piling" up on this input,
regardless of whether or not it will actually be requested by the configured
filter graph. Suspending the filter graph during this initialization phase
reduces the amount of wasted memory.
---
 fftools/ffmpeg_filter.c |  6 ++++++
 fftools/ffmpeg_sched.c  | 12 ++++++++++++
 fftools/ffmpeg_sched.h  |  7 +++++++
 3 files changed, 25 insertions(+)

diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c
index 2dae6400c8..c1c8eeb2d8 100644
--- a/fftools/ffmpeg_filter.c
+++ b/fftools/ffmpeg_filter.c
@@ -2878,6 +2878,7 @@ static const char *unknown_if_null(const char *str)
 static int send_frame(FilterGraph *fg, FilterGraphThread *fgt,
                       InputFilter *ifilter, AVFrame *frame)
 {
+    FilterGraphPriv *fgp = fgp_from_fg(fg);
     InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
     FrameData       *fd;
     AVFrameSideData *sd;
@@ -2986,6 +2987,11 @@ static int send_frame(FilterGraph *fg, FilterGraphThread *fgt,
             if (reason.len > 1)
                 reason.str[reason.len - 2] = '\0'; // remove last comma
             av_log(fg, AV_LOG_INFO, "Reconfiguring filter graph%s%s\n", reason.len ? " because " : "", reason.str);
+        } else {
+            /* Choke all input to avoid buffering excessive frames while the
+             * initial filter graph is being configured, and before we have a
+             * preferred input */
+            sch_filter_choke_inputs(fgp->sch, fgp->sch_idx);
         }
 
         ret = configure_filtergraph(fg, fgt);
diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c
index 3c5cffa594..039cd1c9aa 100644
--- a/fftools/ffmpeg_sched.c
+++ b/fftools/ffmpeg_sched.c
@@ -2510,6 +2510,18 @@ int sch_filter_command(Scheduler *sch, unsigned fg_idx, AVFrame *frame)
     return send_to_filter(sch, fg, fg->nb_inputs, frame);
 }
 
+void sch_filter_choke_inputs(Scheduler *sch, unsigned fg_idx)
+{
+    SchFilterGraph *fg;
+    av_assert0(fg_idx < sch->nb_filters);
+    fg = &sch->filters[fg_idx];
+
+    pthread_mutex_lock(&sch->schedule_lock);
+    fg->best_input = fg->nb_inputs;
+    schedule_update_locked(sch);
+    pthread_mutex_unlock(&sch->schedule_lock);
+}
+
 static int task_cleanup(Scheduler *sch, SchedulerNode node)
 {
     switch (node.type) {
diff --git a/fftools/ffmpeg_sched.h b/fftools/ffmpeg_sched.h
index 24ad37b778..0c01f558e4 100644
--- a/fftools/ffmpeg_sched.h
+++ b/fftools/ffmpeg_sched.h
@@ -443,6 +443,13 @@ int sch_filter_send(Scheduler *sch, unsigned fg_idx, unsigned out_idx,
 
 int sch_filter_command(Scheduler *sch, unsigned fg_idx, struct AVFrame *frame);
 
+/**
+ * Called by filtergraph tasks to choke all filter inputs, preventing them from
+ * receiving more frames until woken up again by the scheduler. Used during
+ * initial graph configuration to avoid unnecessary buffering.
+ */
+void sch_filter_choke_inputs(Scheduler *sch, unsigned fg_idx);
+
 /**
  * Called by encoder tasks to obtain frames for encoding. Will wait for a frame
  * to become available and return it in frame.
-- 
2.49.1


>From 92a4315b395dc477a3c776be7fb39eabad3cfbc7 Mon Sep 17 00:00:00 2001
From: Niklas Haas <git@haasn.dev>
Date: Thu, 4 Sep 2025 18:09:31 +0200
Subject: [PATCH 5/7] fftools/thread_queue: allow choking thread queues
 directly

Currently, when a demuxer thread is choked, it will avoid queuing more
packets, but any packets already present on the thread queue will still be
processed.

This can be quite wasteful if the choke is due to e.g. decoder not being
needed yet, such as in a filter graph involving concatenation-style filters.
Adding the ability to propagate the choke status to the thread queue directly
allows downstream decoders and filter graphs to avoid unnecessary work and
buffering.

Reduces the effective latency between scheduler updates and changes in the
thread workfload.
---
 fftools/thread_queue.c | 17 +++++++++++++++++
 fftools/thread_queue.h |  9 +++++++++
 2 files changed, 26 insertions(+)

diff --git a/fftools/thread_queue.c b/fftools/thread_queue.c
index b035ffe11d..eb33431c98 100644
--- a/fftools/thread_queue.c
+++ b/fftools/thread_queue.c
@@ -38,6 +38,7 @@ enum {
 };
 
 struct ThreadQueue {
+    int             choked;
     int              *finished;
     unsigned int    nb_streams;
 
@@ -157,6 +158,9 @@ static int receive_locked(ThreadQueue *tq, int *stream_idx,
 {
     unsigned int nb_finished = 0;
 
+    if (tq->choked)
+        return AVERROR(EAGAIN);
+
     while (av_container_fifo_read(tq->fifo, data, 0) >= 0) {
         unsigned idx;
         int ret;
@@ -230,6 +234,7 @@ void tq_send_finish(ThreadQueue *tq, unsigned int stream_idx)
      * next time the consumer thread tries to read this stream it will get
      * an EOF and recv-finished flag will be set */
     tq->finished[stream_idx] |= FINISHED_SEND;
+    tq->choked = 0;
     pthread_cond_broadcast(&tq->cond);
 
     pthread_mutex_unlock(&tq->lock);
@@ -249,3 +254,15 @@ void tq_receive_finish(ThreadQueue *tq, unsigned int stream_idx)
 
     pthread_mutex_unlock(&tq->lock);
 }
+
+void tq_choke(ThreadQueue *tq, int choked)
+{
+    pthread_mutex_lock(&tq->lock);
+
+    int prev_choked = tq->choked;
+    tq->choked = choked;
+    if (choked != prev_choked)
+        pthread_cond_broadcast(&tq->cond);
+
+    pthread_mutex_unlock(&tq->lock);
+}
diff --git a/fftools/thread_queue.h b/fftools/thread_queue.h
index cc01c8a2c9..ad7669f131 100644
--- a/fftools/thread_queue.h
+++ b/fftools/thread_queue.h
@@ -58,6 +58,15 @@ int tq_send(ThreadQueue *tq, unsigned int stream_idx, void *data);
  */
 void tq_send_finish(ThreadQueue *tq, unsigned int stream_idx);
 
+/**
+ * Prevent further reads from the thread queue until it is unchoked. Threads
+ * attempting to read from the queue will block, similar to when the queue is
+ * empty.
+ *
+ * @param choked 1 to choke, 0 to unchoke
+ */
+void tq_choke(ThreadQueue *tq, int choked);
+
 /**
  * Read the next item from the queue.
  *
-- 
2.49.1


>From 706d33ff945856339fd4af7c208bebb4ff2d9905 Mon Sep 17 00:00:00 2001
From: Niklas Haas <git@haasn.dev>
Date: Thu, 4 Sep 2025 18:22:27 +0200
Subject: [PATCH 6/7] fftools/ffmpeg_sched: forward demuxer choke status to dst
 queues

Cut off a choked demuxer's output codec/filter queues, effectively preventing
them from processing packets while the demuxer is choked. Avoids downstream
nodes from piling up extra input that a demuxer shouldn't currently be
sending.

The main benefit of this is to avoid queuing up excess packets that don't want
to be decoded yet, reducing memory consumption for idle inputs by preventing
them from being read earlier than needed.
---
 fftools/ffmpeg_sched.c | 41 ++++++++++++++++++++++++++++++++++++++---
 1 file changed, 38 insertions(+), 3 deletions(-)

diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c
index 039cd1c9aa..589f5360f2 100644
--- a/fftools/ffmpeg_sched.c
+++ b/fftools/ffmpeg_sched.c
@@ -1294,6 +1294,36 @@ static void unchoke_for_stream(Scheduler *sch, SchedulerNode src)
     }
 }
 
+static void choke_demux(const Scheduler *sch, int demux_id, int choked)
+{
+    av_assert1(demux_id < sch->nb_demux);
+    SchDemux *demux = &sch->demux[demux_id];
+
+    for (int i = 0; i < demux->nb_streams; i++) {
+        SchedulerNode *dst = demux->streams[i].dst;
+        SchFilterGraph *fg;
+
+        switch (dst->type) {
+        case SCH_NODE_TYPE_DEC:
+            tq_choke(sch->dec[dst->idx].queue, choked);
+            break;
+        case SCH_NODE_TYPE_ENC:
+            tq_choke(sch->enc[dst->idx].queue, choked);
+            break;
+        case SCH_NODE_TYPE_MUX:
+            break;
+        case SCH_NODE_TYPE_FILTER_IN:
+            fg = &sch->filters[dst->idx];
+            if (fg->nb_inputs == 1)
+                tq_choke(fg->queue, choked);
+            break;
+        default:
+            av_unreachable("Invalid destination node type?");
+            break;
+        }
+    }
+}
+
 static void schedule_update_locked(Scheduler *sch)
 {
     int64_t dts;
@@ -1350,13 +1380,16 @@ static void schedule_update_locked(Scheduler *sch)
             }
         }
 
-
-    for (unsigned type = 0; type < 2; type++)
+    for (unsigned type = 0; type < 2; type++) {
         for (unsigned i = 0; i < (type ? sch->nb_filters : sch->nb_demux); i++) {
             SchWaiter *w = type ? &sch->filters[i].waiter : &sch->demux[i].waiter;
-            if (w->choked_prev != w->choked_next)
+            if (w->choked_prev != w->choked_next) {
                 waiter_set(w, w->choked_next);
+                if (!type)
+                    choke_demux(sch, i, w->choked_next);
+            }
         }
+    }
 
 }
 
@@ -2595,6 +2628,8 @@ int sch_stop(Scheduler *sch, int64_t *finish_ts)
         for (unsigned i = 0; i < (type ? sch->nb_demux : sch->nb_filters); i++) {
             SchWaiter *w = type ? &sch->demux[i].waiter : &sch->filters[i].waiter;
             waiter_set(w, 1);
+            if (type)
+                choke_demux(sch, i, 0); // unfreeze to allow draining
         }
 
     for (unsigned i = 0; i < sch->nb_demux; i++) {
-- 
2.49.1


>From 554cbafb9ca4035f522f5346b54213314df70705 Mon Sep 17 00:00:00 2001
From: Niklas Haas <git@haasn.dev>
Date: Thu, 4 Sep 2025 15:56:16 +0200
Subject: [PATCH 7/7] avcodec/pthread_frame: allow early-decoding a single
 frame

Normally, this function tries to make sure all threads are saturated with
work to do before returning any frames; and will continue requesting packets
until that is the case.

However, this significantly slows down initial decoding latency when only
requesting a single frame (to e.g. configure the filter graph), and also
wastes a lot of unnecessary memory.

With a very slight tweak, we can make the *first* frame decode immediately,
without waiting for the threads to fully buffer. In theory, this could be
a user-configurable parameter down the line, but in my own performance
testing I could not measure any performance downside to leaving it on 1,
so I just hard-coded this default for now.
---
 libavcodec/pthread_frame.c | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/libavcodec/pthread_frame.c b/libavcodec/pthread_frame.c
index 4ea5dd3698..1f01914a12 100644
--- a/libavcodec/pthread_frame.c
+++ b/libavcodec/pthread_frame.c
@@ -149,6 +149,7 @@ typedef struct FrameThreadContext {
 
     int next_decoding;             ///< The next context to submit a packet to.
     int next_finished;             ///< The next context to return output from.
+    int nb_early_frames;           ///< Frames to immediately decode before saturating threads
 
     /* hwaccel state for thread-unsafe hwaccels is temporarily stored here in
      * order to transfer its ownership to the next decoding thread without the
@@ -588,7 +589,7 @@ int ff_thread_receive_frame(AVCodecContext *avctx, AVFrame *frame)
 
         /* do not return any frames until all threads have something to do */
         if (fctx->next_decoding != fctx->next_finished &&
-            !avctx->internal->draining)
+            !avctx->internal->draining && !fctx->nb_early_frames)
             continue;
 
         p                   = &fctx->threads[fctx->next_finished];
@@ -612,6 +613,8 @@ int ff_thread_receive_frame(AVCodecContext *avctx, AVFrame *frame)
      * we first return all the frames, then the error */
     if (fctx->df.nb_f) {
         decoded_frames_pop(&fctx->df, frame);
+        if (fctx->nb_early_frames)
+            fctx->nb_early_frames--;
         ret = 0;
     } else {
         ret = fctx->result;
@@ -973,6 +976,7 @@ av_cold int ff_frame_thread_init(AVCodecContext *avctx)
             goto error;
     }
 
+    fctx->nb_early_frames = 1;
     return 0;
 
 error:
-- 
2.49.1

_______________________________________________
ffmpeg-devel mailing list -- ffmpeg-devel@ffmpeg.org
To unsubscribe send an email to ffmpeg-devel-leave@ffmpeg.org

                 reply	other threads:[~2025-09-07 22:30 UTC|newest]

Thread overview: [no followups] expand[flat|nested]  mbox.gz  Atom feed

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=175728421754.25.209555194598833183@463a07221176 \
    --to=ffmpeg-devel@ffmpeg.org \
    --cc=code@ffmpeg.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

Git Inbox Mirror of the ffmpeg-devel mailing list - see https://ffmpeg.org/mailman/listinfo/ffmpeg-devel

This inbox may be cloned and mirrored by anyone:

	git clone --mirror https://master.gitmailbox.com/ffmpegdev/0 ffmpegdev/git/0.git

	# If you have public-inbox 1.1+ installed, you may
	# initialize and index your mirror using the following commands:
	public-inbox-init -V2 ffmpegdev ffmpegdev/ https://master.gitmailbox.com/ffmpegdev \
		ffmpegdev@gitmailbox.com
	public-inbox-index ffmpegdev

Example config snippet for mirrors.


AGPL code for this site: git clone https://public-inbox.org/public-inbox.git