Git Inbox Mirror of the ffmpeg-devel mailing list - see https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
 help / color / mirror / Atom feed
* [FFmpeg-devel] [PATCH] fftools/ffmpeg: optimize inter-thread queue sizes
@ 2024-01-24 19:29 Anton Khirnov
  2024-01-24 19:35 ` Andreas Rheinhardt
  0 siblings, 1 reply; 3+ messages in thread
From: Anton Khirnov @ 2024-01-24 19:29 UTC (permalink / raw)
  To: ffmpeg-devel

Use 8 packets/frames by default rather than 1, which seems to provide
better throughput.

Allow -thread_queue_size to set the muxer queue size manually again.
---
 fftools/ffmpeg_mux.h                             |  2 --
 fftools/ffmpeg_mux_init.c                        |  3 +--
 fftools/ffmpeg_opt.c                             |  2 +-
 fftools/ffmpeg_sched.c                           | 15 ++++++++++-----
 fftools/ffmpeg_sched.h                           |  4 +++-
 tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat |  5 -----
 6 files changed, 15 insertions(+), 16 deletions(-)

diff --git a/fftools/ffmpeg_mux.h b/fftools/ffmpeg_mux.h
index d0be8a51ea..e1b44142cf 100644
--- a/fftools/ffmpeg_mux.h
+++ b/fftools/ffmpeg_mux.h
@@ -94,8 +94,6 @@ typedef struct Muxer {
 
     AVDictionary *opts;
 
-    int thread_queue_size;
-
     /* filesize limit expressed in bytes */
     int64_t limit_filesize;
     atomic_int_least64_t last_filesize;
diff --git a/fftools/ffmpeg_mux_init.c b/fftools/ffmpeg_mux_init.c
index 6b5e4f8b3c..8ada837555 100644
--- a/fftools/ffmpeg_mux_init.c
+++ b/fftools/ffmpeg_mux_init.c
@@ -3047,7 +3047,6 @@ int of_open(const OptionsContext *o, const char *filename, Scheduler *sch)
     of->start_time     = o->start_time;
     of->shortest       = o->shortest;
 
-    mux->thread_queue_size = o->thread_queue_size > 0 ? o->thread_queue_size : 8;
     mux->limit_filesize    = o->limit_filesize;
     av_dict_copy(&mux->opts, o->g->format_opts, 0);
 
@@ -3081,7 +3080,7 @@ int of_open(const OptionsContext *o, const char *filename, Scheduler *sch)
     }
 
     err = sch_add_mux(sch, muxer_thread, mux_check_init, mux,
-                      !strcmp(oc->oformat->name, "rtp"));
+                      !strcmp(oc->oformat->name, "rtp"), o->thread_queue_size);
     if (err < 0)
         return err;
     mux->sch     = sch;
diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c
index 304c493dcf..7505b0cf90 100644
--- a/fftools/ffmpeg_opt.c
+++ b/fftools/ffmpeg_opt.c
@@ -144,7 +144,7 @@ static void init_options(OptionsContext *o)
     o->limit_filesize = INT64_MAX;
     o->chapters_input_file = INT_MAX;
     o->accurate_seek  = 1;
-    o->thread_queue_size = -1;
+    o->thread_queue_size = 0;
     o->input_sync_ref = -1;
     o->find_stream_info = 1;
     o->shortest_buf_duration = 10.f;
diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c
index 4fc5a33941..62a40c6057 100644
--- a/fftools/ffmpeg_sched.c
+++ b/fftools/ffmpeg_sched.c
@@ -218,6 +218,7 @@ typedef struct SchMux {
      */
     atomic_int          mux_started;
     ThreadQueue        *queue;
+    unsigned            queue_size;
 
     AVPacket           *sub_heartbeat_pkt;
 } SchMux;
@@ -358,6 +359,8 @@ static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_si
     ThreadQueue *tq;
     ObjPool *op;
 
+    queue_size = queue_size > 0 ? queue_size : 8;
+
     op = (type == QUEUE_PACKETS) ? objpool_alloc_packets() :
                                    objpool_alloc_frames();
     if (!op)
@@ -653,7 +656,7 @@ static const AVClass sch_mux_class = {
 };
 
 int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
-                void *arg, int sdp_auto)
+                void *arg, int sdp_auto, unsigned thread_queue_size)
 {
     const unsigned idx = sch->nb_mux;
 
@@ -667,6 +670,7 @@ int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
     mux             = &sch->mux[idx];
     mux->class      = &sch_mux_class;
     mux->init       = init;
+    mux->queue_size = thread_queue_size;
 
     task_init(sch, &mux->task, SCH_NODE_TYPE_MUX, idx, func, arg);
 
@@ -773,7 +777,7 @@ int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx,
     if (!dec->send_frame)
         return AVERROR(ENOMEM);
 
-    ret = queue_alloc(&dec->queue, 1, 1, QUEUE_PACKETS);
+    ret = queue_alloc(&dec->queue, 1, 0, QUEUE_PACKETS);
     if (ret < 0)
         return ret;
 
@@ -813,7 +817,7 @@ int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx,
 
     task_init(sch, &enc->task, SCH_NODE_TYPE_ENC, idx, func, ctx);
 
-    ret = queue_alloc(&enc->queue, 1, 1, QUEUE_FRAMES);
+    ret = queue_alloc(&enc->queue, 1, 0, QUEUE_FRAMES);
     if (ret < 0)
         return ret;
 
@@ -861,7 +865,7 @@ int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
     if (ret < 0)
         return ret;
 
-    ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 1, QUEUE_FRAMES);
+    ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 0, QUEUE_FRAMES);
     if (ret < 0)
         return ret;
 
@@ -1313,7 +1317,8 @@ int sch_start(Scheduler *sch)
             }
         }
 
-        ret = queue_alloc(&mux->queue, mux->nb_streams, 1, QUEUE_PACKETS);
+        ret = queue_alloc(&mux->queue, mux->nb_streams, mux->queue_size,
+                          QUEUE_PACKETS);
         if (ret < 0)
             return ret;
 
diff --git a/fftools/ffmpeg_sched.h b/fftools/ffmpeg_sched.h
index b167d8d158..d12affa69d 100644
--- a/fftools/ffmpeg_sched.h
+++ b/fftools/ffmpeg_sched.h
@@ -225,12 +225,14 @@ int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
  *             streams in the muxer.
  * @param ctx Muxer state; will be passed to func/init and used for logging.
  * @param sdp_auto Determines automatic SDP writing - see sch_sdp_filename().
+ * @param thread_queue_size number of packets that can be buffered before
+ *                          sending to the muxer blocks
  *
  * @retval ">=0" Index of the newly-created muxer.
  * @retval "<0"  Error code.
  */
 int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
-                void *ctx, int sdp_auto);
+                void *ctx, int sdp_auto, unsigned thread_queue_size);
 /**
  * Add a muxed stream for a previously added muxer.
  *
diff --git a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
index bc9b833799..3a3ec96637 100644
--- a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
+++ b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
@@ -33,8 +33,3 @@
 <font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
 >> Safety remains our numb</font>
 
-9
-00:00:03,704 --> 00:00:04,004
-<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
->> Safety remains our number one</font>
-
-- 
2.42.0

_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel

To unsubscribe, visit link above, or email
ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".

^ permalink raw reply	[flat|nested] 3+ messages in thread

* Re: [FFmpeg-devel] [PATCH] fftools/ffmpeg: optimize inter-thread queue sizes
  2024-01-24 19:29 [FFmpeg-devel] [PATCH] fftools/ffmpeg: optimize inter-thread queue sizes Anton Khirnov
@ 2024-01-24 19:35 ` Andreas Rheinhardt
  2024-01-24 19:45   ` Anton Khirnov
  0 siblings, 1 reply; 3+ messages in thread
From: Andreas Rheinhardt @ 2024-01-24 19:35 UTC (permalink / raw)
  To: ffmpeg-devel

Anton Khirnov:
> Use 8 packets/frames by default rather than 1, which seems to provide
> better throughput.
> 
> Allow -thread_queue_size to set the muxer queue size manually again.
> ---
>  fftools/ffmpeg_mux.h                             |  2 --
>  fftools/ffmpeg_mux_init.c                        |  3 +--
>  fftools/ffmpeg_opt.c                             |  2 +-
>  fftools/ffmpeg_sched.c                           | 15 ++++++++++-----
>  fftools/ffmpeg_sched.h                           |  4 +++-
>  tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat |  5 -----
>  6 files changed, 15 insertions(+), 16 deletions(-)
> 
> diff --git a/fftools/ffmpeg_mux.h b/fftools/ffmpeg_mux.h
> index d0be8a51ea..e1b44142cf 100644
> --- a/fftools/ffmpeg_mux.h
> +++ b/fftools/ffmpeg_mux.h
> @@ -94,8 +94,6 @@ typedef struct Muxer {
>  
>      AVDictionary *opts;
>  
> -    int thread_queue_size;
> -
>      /* filesize limit expressed in bytes */
>      int64_t limit_filesize;
>      atomic_int_least64_t last_filesize;
> diff --git a/fftools/ffmpeg_mux_init.c b/fftools/ffmpeg_mux_init.c
> index 6b5e4f8b3c..8ada837555 100644
> --- a/fftools/ffmpeg_mux_init.c
> +++ b/fftools/ffmpeg_mux_init.c
> @@ -3047,7 +3047,6 @@ int of_open(const OptionsContext *o, const char *filename, Scheduler *sch)
>      of->start_time     = o->start_time;
>      of->shortest       = o->shortest;
>  
> -    mux->thread_queue_size = o->thread_queue_size > 0 ? o->thread_queue_size : 8;
>      mux->limit_filesize    = o->limit_filesize;
>      av_dict_copy(&mux->opts, o->g->format_opts, 0);
>  
> @@ -3081,7 +3080,7 @@ int of_open(const OptionsContext *o, const char *filename, Scheduler *sch)
>      }
>  
>      err = sch_add_mux(sch, muxer_thread, mux_check_init, mux,
> -                      !strcmp(oc->oformat->name, "rtp"));
> +                      !strcmp(oc->oformat->name, "rtp"), o->thread_queue_size);
>      if (err < 0)
>          return err;
>      mux->sch     = sch;
> diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c
> index 304c493dcf..7505b0cf90 100644
> --- a/fftools/ffmpeg_opt.c
> +++ b/fftools/ffmpeg_opt.c
> @@ -144,7 +144,7 @@ static void init_options(OptionsContext *o)
>      o->limit_filesize = INT64_MAX;
>      o->chapters_input_file = INT_MAX;
>      o->accurate_seek  = 1;
> -    o->thread_queue_size = -1;
> +    o->thread_queue_size = 0;
>      o->input_sync_ref = -1;
>      o->find_stream_info = 1;
>      o->shortest_buf_duration = 10.f;
> diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c
> index 4fc5a33941..62a40c6057 100644
> --- a/fftools/ffmpeg_sched.c
> +++ b/fftools/ffmpeg_sched.c
> @@ -218,6 +218,7 @@ typedef struct SchMux {
>       */
>      atomic_int          mux_started;
>      ThreadQueue        *queue;
> +    unsigned            queue_size;
>  
>      AVPacket           *sub_heartbeat_pkt;
>  } SchMux;
> @@ -358,6 +359,8 @@ static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_si
>      ThreadQueue *tq;
>      ObjPool *op;
>  
> +    queue_size = queue_size > 0 ? queue_size : 8;
> +
>      op = (type == QUEUE_PACKETS) ? objpool_alloc_packets() :
>                                     objpool_alloc_frames();
>      if (!op)
> @@ -653,7 +656,7 @@ static const AVClass sch_mux_class = {
>  };
>  
>  int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
> -                void *arg, int sdp_auto)
> +                void *arg, int sdp_auto, unsigned thread_queue_size)
>  {
>      const unsigned idx = sch->nb_mux;
>  
> @@ -667,6 +670,7 @@ int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
>      mux             = &sch->mux[idx];
>      mux->class      = &sch_mux_class;
>      mux->init       = init;
> +    mux->queue_size = thread_queue_size;
>  
>      task_init(sch, &mux->task, SCH_NODE_TYPE_MUX, idx, func, arg);
>  
> @@ -773,7 +777,7 @@ int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx,
>      if (!dec->send_frame)
>          return AVERROR(ENOMEM);
>  
> -    ret = queue_alloc(&dec->queue, 1, 1, QUEUE_PACKETS);
> +    ret = queue_alloc(&dec->queue, 1, 0, QUEUE_PACKETS);
>      if (ret < 0)
>          return ret;
>  
> @@ -813,7 +817,7 @@ int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx,
>  
>      task_init(sch, &enc->task, SCH_NODE_TYPE_ENC, idx, func, ctx);
>  
> -    ret = queue_alloc(&enc->queue, 1, 1, QUEUE_FRAMES);
> +    ret = queue_alloc(&enc->queue, 1, 0, QUEUE_FRAMES);
>      if (ret < 0)
>          return ret;
>  
> @@ -861,7 +865,7 @@ int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
>      if (ret < 0)
>          return ret;
>  
> -    ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 1, QUEUE_FRAMES);
> +    ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 0, QUEUE_FRAMES);
>      if (ret < 0)
>          return ret;
>  
> @@ -1313,7 +1317,8 @@ int sch_start(Scheduler *sch)
>              }
>          }
>  
> -        ret = queue_alloc(&mux->queue, mux->nb_streams, 1, QUEUE_PACKETS);
> +        ret = queue_alloc(&mux->queue, mux->nb_streams, mux->queue_size,
> +                          QUEUE_PACKETS);
>          if (ret < 0)
>              return ret;
>  
> diff --git a/fftools/ffmpeg_sched.h b/fftools/ffmpeg_sched.h
> index b167d8d158..d12affa69d 100644
> --- a/fftools/ffmpeg_sched.h
> +++ b/fftools/ffmpeg_sched.h
> @@ -225,12 +225,14 @@ int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
>   *             streams in the muxer.
>   * @param ctx Muxer state; will be passed to func/init and used for logging.
>   * @param sdp_auto Determines automatic SDP writing - see sch_sdp_filename().
> + * @param thread_queue_size number of packets that can be buffered before
> + *                          sending to the muxer blocks
>   *
>   * @retval ">=0" Index of the newly-created muxer.
>   * @retval "<0"  Error code.
>   */
>  int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
> -                void *ctx, int sdp_auto);
> +                void *ctx, int sdp_auto, unsigned thread_queue_size);
>  /**
>   * Add a muxed stream for a previously added muxer.
>   *
> diff --git a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
> index bc9b833799..3a3ec96637 100644
> --- a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
> +++ b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
> @@ -33,8 +33,3 @@
>  <font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
>  >> Safety remains our numb</font>
>  
> -9
> -00:00:03,704 --> 00:00:04,004
> -<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
> ->> Safety remains our number one</font>
> -

Why does the output of this test change?

- Andreas


_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel

To unsubscribe, visit link above, or email
ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".

^ permalink raw reply	[flat|nested] 3+ messages in thread

* Re: [FFmpeg-devel] [PATCH] fftools/ffmpeg: optimize inter-thread queue sizes
  2024-01-24 19:35 ` Andreas Rheinhardt
@ 2024-01-24 19:45   ` Anton Khirnov
  0 siblings, 0 replies; 3+ messages in thread
From: Anton Khirnov @ 2024-01-24 19:45 UTC (permalink / raw)
  To: FFmpeg development discussions and patches

Quoting Andreas Rheinhardt (2024-01-24 20:35:48)
> > diff --git a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
> > index bc9b833799..3a3ec96637 100644
> > --- a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
> > +++ b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
> > @@ -33,8 +33,3 @@
> >  <font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
> >  >> Safety remains our numb</font>
> >  
> > -9
> > -00:00:03,704 --> 00:00:04,004
> > -<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
> > ->> Safety remains our number one</font>
> > -
> 
> Why does the output of this test change?

Because the feature being tested is semi-broken - exact output depends on the
amount of buffering between the decoder and the muxer. We also get
rare random failures in this test in current master. So far nobody has
suggested a way of fixing it properly.

-- 
Anton Khirnov
_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel

To unsubscribe, visit link above, or email
ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".

^ permalink raw reply	[flat|nested] 3+ messages in thread

end of thread, other threads:[~2024-01-24 19:45 UTC | newest]

Thread overview: 3+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2024-01-24 19:29 [FFmpeg-devel] [PATCH] fftools/ffmpeg: optimize inter-thread queue sizes Anton Khirnov
2024-01-24 19:35 ` Andreas Rheinhardt
2024-01-24 19:45   ` Anton Khirnov

Git Inbox Mirror of the ffmpeg-devel mailing list - see https://ffmpeg.org/mailman/listinfo/ffmpeg-devel

This inbox may be cloned and mirrored by anyone:

	git clone --mirror https://master.gitmailbox.com/ffmpegdev/0 ffmpegdev/git/0.git

	# If you have public-inbox 1.1+ installed, you may
	# initialize and index your mirror using the following commands:
	public-inbox-init -V2 ffmpegdev ffmpegdev/ https://master.gitmailbox.com/ffmpegdev \
		ffmpegdev@gitmailbox.com
	public-inbox-index ffmpegdev

Example config snippet for mirrors.


AGPL code for this site: git clone https://public-inbox.org/public-inbox.git