* [FFmpeg-devel] [PATCH] fftools/ffmpeg: optimize inter-thread queue sizes
@ 2024-01-24 19:29 Anton Khirnov
2024-01-24 19:35 ` Andreas Rheinhardt
0 siblings, 1 reply; 3+ messages in thread
From: Anton Khirnov @ 2024-01-24 19:29 UTC (permalink / raw)
To: ffmpeg-devel
Use 8 packets/frames by default rather than 1, which seems to provide
better throughput.
Allow -thread_queue_size to set the muxer queue size manually again.
---
fftools/ffmpeg_mux.h | 2 --
fftools/ffmpeg_mux_init.c | 3 +--
fftools/ffmpeg_opt.c | 2 +-
fftools/ffmpeg_sched.c | 15 ++++++++++-----
fftools/ffmpeg_sched.h | 4 +++-
tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat | 5 -----
6 files changed, 15 insertions(+), 16 deletions(-)
diff --git a/fftools/ffmpeg_mux.h b/fftools/ffmpeg_mux.h
index d0be8a51ea..e1b44142cf 100644
--- a/fftools/ffmpeg_mux.h
+++ b/fftools/ffmpeg_mux.h
@@ -94,8 +94,6 @@ typedef struct Muxer {
AVDictionary *opts;
- int thread_queue_size;
-
/* filesize limit expressed in bytes */
int64_t limit_filesize;
atomic_int_least64_t last_filesize;
diff --git a/fftools/ffmpeg_mux_init.c b/fftools/ffmpeg_mux_init.c
index 6b5e4f8b3c..8ada837555 100644
--- a/fftools/ffmpeg_mux_init.c
+++ b/fftools/ffmpeg_mux_init.c
@@ -3047,7 +3047,6 @@ int of_open(const OptionsContext *o, const char *filename, Scheduler *sch)
of->start_time = o->start_time;
of->shortest = o->shortest;
- mux->thread_queue_size = o->thread_queue_size > 0 ? o->thread_queue_size : 8;
mux->limit_filesize = o->limit_filesize;
av_dict_copy(&mux->opts, o->g->format_opts, 0);
@@ -3081,7 +3080,7 @@ int of_open(const OptionsContext *o, const char *filename, Scheduler *sch)
}
err = sch_add_mux(sch, muxer_thread, mux_check_init, mux,
- !strcmp(oc->oformat->name, "rtp"));
+ !strcmp(oc->oformat->name, "rtp"), o->thread_queue_size);
if (err < 0)
return err;
mux->sch = sch;
diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c
index 304c493dcf..7505b0cf90 100644
--- a/fftools/ffmpeg_opt.c
+++ b/fftools/ffmpeg_opt.c
@@ -144,7 +144,7 @@ static void init_options(OptionsContext *o)
o->limit_filesize = INT64_MAX;
o->chapters_input_file = INT_MAX;
o->accurate_seek = 1;
- o->thread_queue_size = -1;
+ o->thread_queue_size = 0;
o->input_sync_ref = -1;
o->find_stream_info = 1;
o->shortest_buf_duration = 10.f;
diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c
index 4fc5a33941..62a40c6057 100644
--- a/fftools/ffmpeg_sched.c
+++ b/fftools/ffmpeg_sched.c
@@ -218,6 +218,7 @@ typedef struct SchMux {
*/
atomic_int mux_started;
ThreadQueue *queue;
+ unsigned queue_size;
AVPacket *sub_heartbeat_pkt;
} SchMux;
@@ -358,6 +359,8 @@ static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_si
ThreadQueue *tq;
ObjPool *op;
+ queue_size = queue_size > 0 ? queue_size : 8;
+
op = (type == QUEUE_PACKETS) ? objpool_alloc_packets() :
objpool_alloc_frames();
if (!op)
@@ -653,7 +656,7 @@ static const AVClass sch_mux_class = {
};
int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
- void *arg, int sdp_auto)
+ void *arg, int sdp_auto, unsigned thread_queue_size)
{
const unsigned idx = sch->nb_mux;
@@ -667,6 +670,7 @@ int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
mux = &sch->mux[idx];
mux->class = &sch_mux_class;
mux->init = init;
+ mux->queue_size = thread_queue_size;
task_init(sch, &mux->task, SCH_NODE_TYPE_MUX, idx, func, arg);
@@ -773,7 +777,7 @@ int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx,
if (!dec->send_frame)
return AVERROR(ENOMEM);
- ret = queue_alloc(&dec->queue, 1, 1, QUEUE_PACKETS);
+ ret = queue_alloc(&dec->queue, 1, 0, QUEUE_PACKETS);
if (ret < 0)
return ret;
@@ -813,7 +817,7 @@ int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx,
task_init(sch, &enc->task, SCH_NODE_TYPE_ENC, idx, func, ctx);
- ret = queue_alloc(&enc->queue, 1, 1, QUEUE_FRAMES);
+ ret = queue_alloc(&enc->queue, 1, 0, QUEUE_FRAMES);
if (ret < 0)
return ret;
@@ -861,7 +865,7 @@ int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
if (ret < 0)
return ret;
- ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 1, QUEUE_FRAMES);
+ ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 0, QUEUE_FRAMES);
if (ret < 0)
return ret;
@@ -1313,7 +1317,8 @@ int sch_start(Scheduler *sch)
}
}
- ret = queue_alloc(&mux->queue, mux->nb_streams, 1, QUEUE_PACKETS);
+ ret = queue_alloc(&mux->queue, mux->nb_streams, mux->queue_size,
+ QUEUE_PACKETS);
if (ret < 0)
return ret;
diff --git a/fftools/ffmpeg_sched.h b/fftools/ffmpeg_sched.h
index b167d8d158..d12affa69d 100644
--- a/fftools/ffmpeg_sched.h
+++ b/fftools/ffmpeg_sched.h
@@ -225,12 +225,14 @@ int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
* streams in the muxer.
* @param ctx Muxer state; will be passed to func/init and used for logging.
* @param sdp_auto Determines automatic SDP writing - see sch_sdp_filename().
+ * @param thread_queue_size number of packets that can be buffered before
+ * sending to the muxer blocks
*
* @retval ">=0" Index of the newly-created muxer.
* @retval "<0" Error code.
*/
int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
- void *ctx, int sdp_auto);
+ void *ctx, int sdp_auto, unsigned thread_queue_size);
/**
* Add a muxed stream for a previously added muxer.
*
diff --git a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
index bc9b833799..3a3ec96637 100644
--- a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
+++ b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
@@ -33,8 +33,3 @@
<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
>> Safety remains our numb</font>
-9
-00:00:03,704 --> 00:00:04,004
-<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
->> Safety remains our number one</font>
-
--
2.42.0
_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
To unsubscribe, visit link above, or email
ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
^ permalink raw reply [flat|nested] 3+ messages in thread
* Re: [FFmpeg-devel] [PATCH] fftools/ffmpeg: optimize inter-thread queue sizes
2024-01-24 19:29 [FFmpeg-devel] [PATCH] fftools/ffmpeg: optimize inter-thread queue sizes Anton Khirnov
@ 2024-01-24 19:35 ` Andreas Rheinhardt
2024-01-24 19:45 ` Anton Khirnov
0 siblings, 1 reply; 3+ messages in thread
From: Andreas Rheinhardt @ 2024-01-24 19:35 UTC (permalink / raw)
To: ffmpeg-devel
Anton Khirnov:
> Use 8 packets/frames by default rather than 1, which seems to provide
> better throughput.
>
> Allow -thread_queue_size to set the muxer queue size manually again.
> ---
> fftools/ffmpeg_mux.h | 2 --
> fftools/ffmpeg_mux_init.c | 3 +--
> fftools/ffmpeg_opt.c | 2 +-
> fftools/ffmpeg_sched.c | 15 ++++++++++-----
> fftools/ffmpeg_sched.h | 4 +++-
> tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat | 5 -----
> 6 files changed, 15 insertions(+), 16 deletions(-)
>
> diff --git a/fftools/ffmpeg_mux.h b/fftools/ffmpeg_mux.h
> index d0be8a51ea..e1b44142cf 100644
> --- a/fftools/ffmpeg_mux.h
> +++ b/fftools/ffmpeg_mux.h
> @@ -94,8 +94,6 @@ typedef struct Muxer {
>
> AVDictionary *opts;
>
> - int thread_queue_size;
> -
> /* filesize limit expressed in bytes */
> int64_t limit_filesize;
> atomic_int_least64_t last_filesize;
> diff --git a/fftools/ffmpeg_mux_init.c b/fftools/ffmpeg_mux_init.c
> index 6b5e4f8b3c..8ada837555 100644
> --- a/fftools/ffmpeg_mux_init.c
> +++ b/fftools/ffmpeg_mux_init.c
> @@ -3047,7 +3047,6 @@ int of_open(const OptionsContext *o, const char *filename, Scheduler *sch)
> of->start_time = o->start_time;
> of->shortest = o->shortest;
>
> - mux->thread_queue_size = o->thread_queue_size > 0 ? o->thread_queue_size : 8;
> mux->limit_filesize = o->limit_filesize;
> av_dict_copy(&mux->opts, o->g->format_opts, 0);
>
> @@ -3081,7 +3080,7 @@ int of_open(const OptionsContext *o, const char *filename, Scheduler *sch)
> }
>
> err = sch_add_mux(sch, muxer_thread, mux_check_init, mux,
> - !strcmp(oc->oformat->name, "rtp"));
> + !strcmp(oc->oformat->name, "rtp"), o->thread_queue_size);
> if (err < 0)
> return err;
> mux->sch = sch;
> diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c
> index 304c493dcf..7505b0cf90 100644
> --- a/fftools/ffmpeg_opt.c
> +++ b/fftools/ffmpeg_opt.c
> @@ -144,7 +144,7 @@ static void init_options(OptionsContext *o)
> o->limit_filesize = INT64_MAX;
> o->chapters_input_file = INT_MAX;
> o->accurate_seek = 1;
> - o->thread_queue_size = -1;
> + o->thread_queue_size = 0;
> o->input_sync_ref = -1;
> o->find_stream_info = 1;
> o->shortest_buf_duration = 10.f;
> diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c
> index 4fc5a33941..62a40c6057 100644
> --- a/fftools/ffmpeg_sched.c
> +++ b/fftools/ffmpeg_sched.c
> @@ -218,6 +218,7 @@ typedef struct SchMux {
> */
> atomic_int mux_started;
> ThreadQueue *queue;
> + unsigned queue_size;
>
> AVPacket *sub_heartbeat_pkt;
> } SchMux;
> @@ -358,6 +359,8 @@ static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_si
> ThreadQueue *tq;
> ObjPool *op;
>
> + queue_size = queue_size > 0 ? queue_size : 8;
> +
> op = (type == QUEUE_PACKETS) ? objpool_alloc_packets() :
> objpool_alloc_frames();
> if (!op)
> @@ -653,7 +656,7 @@ static const AVClass sch_mux_class = {
> };
>
> int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
> - void *arg, int sdp_auto)
> + void *arg, int sdp_auto, unsigned thread_queue_size)
> {
> const unsigned idx = sch->nb_mux;
>
> @@ -667,6 +670,7 @@ int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
> mux = &sch->mux[idx];
> mux->class = &sch_mux_class;
> mux->init = init;
> + mux->queue_size = thread_queue_size;
>
> task_init(sch, &mux->task, SCH_NODE_TYPE_MUX, idx, func, arg);
>
> @@ -773,7 +777,7 @@ int sch_add_dec(Scheduler *sch, SchThreadFunc func, void *ctx,
> if (!dec->send_frame)
> return AVERROR(ENOMEM);
>
> - ret = queue_alloc(&dec->queue, 1, 1, QUEUE_PACKETS);
> + ret = queue_alloc(&dec->queue, 1, 0, QUEUE_PACKETS);
> if (ret < 0)
> return ret;
>
> @@ -813,7 +817,7 @@ int sch_add_enc(Scheduler *sch, SchThreadFunc func, void *ctx,
>
> task_init(sch, &enc->task, SCH_NODE_TYPE_ENC, idx, func, ctx);
>
> - ret = queue_alloc(&enc->queue, 1, 1, QUEUE_FRAMES);
> + ret = queue_alloc(&enc->queue, 1, 0, QUEUE_FRAMES);
> if (ret < 0)
> return ret;
>
> @@ -861,7 +865,7 @@ int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
> if (ret < 0)
> return ret;
>
> - ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 1, QUEUE_FRAMES);
> + ret = queue_alloc(&fg->queue, fg->nb_inputs + 1, 0, QUEUE_FRAMES);
> if (ret < 0)
> return ret;
>
> @@ -1313,7 +1317,8 @@ int sch_start(Scheduler *sch)
> }
> }
>
> - ret = queue_alloc(&mux->queue, mux->nb_streams, 1, QUEUE_PACKETS);
> + ret = queue_alloc(&mux->queue, mux->nb_streams, mux->queue_size,
> + QUEUE_PACKETS);
> if (ret < 0)
> return ret;
>
> diff --git a/fftools/ffmpeg_sched.h b/fftools/ffmpeg_sched.h
> index b167d8d158..d12affa69d 100644
> --- a/fftools/ffmpeg_sched.h
> +++ b/fftools/ffmpeg_sched.h
> @@ -225,12 +225,14 @@ int sch_add_filtergraph(Scheduler *sch, unsigned nb_inputs, unsigned nb_outputs,
> * streams in the muxer.
> * @param ctx Muxer state; will be passed to func/init and used for logging.
> * @param sdp_auto Determines automatic SDP writing - see sch_sdp_filename().
> + * @param thread_queue_size number of packets that can be buffered before
> + * sending to the muxer blocks
> *
> * @retval ">=0" Index of the newly-created muxer.
> * @retval "<0" Error code.
> */
> int sch_add_mux(Scheduler *sch, SchThreadFunc func, int (*init)(void *),
> - void *ctx, int sdp_auto);
> + void *ctx, int sdp_auto, unsigned thread_queue_size);
> /**
> * Add a muxed stream for a previously added muxer.
> *
> diff --git a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
> index bc9b833799..3a3ec96637 100644
> --- a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
> +++ b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
> @@ -33,8 +33,3 @@
> <font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
> >> Safety remains our numb</font>
>
> -9
> -00:00:03,704 --> 00:00:04,004
> -<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
> ->> Safety remains our number one</font>
> -
Why does the output of this test change?
- Andreas
_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
To unsubscribe, visit link above, or email
ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
^ permalink raw reply [flat|nested] 3+ messages in thread
* Re: [FFmpeg-devel] [PATCH] fftools/ffmpeg: optimize inter-thread queue sizes
2024-01-24 19:35 ` Andreas Rheinhardt
@ 2024-01-24 19:45 ` Anton Khirnov
0 siblings, 0 replies; 3+ messages in thread
From: Anton Khirnov @ 2024-01-24 19:45 UTC (permalink / raw)
To: FFmpeg development discussions and patches
Quoting Andreas Rheinhardt (2024-01-24 20:35:48)
> > diff --git a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
> > index bc9b833799..3a3ec96637 100644
> > --- a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
> > +++ b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat
> > @@ -33,8 +33,3 @@
> > <font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
> > >> Safety remains our numb</font>
> >
> > -9
> > -00:00:03,704 --> 00:00:04,004
> > -<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )
> > ->> Safety remains our number one</font>
> > -
>
> Why does the output of this test change?
Because the feature being tested is semi-broken - exact output depends on the
amount of buffering between the decoder and the muxer. We also get
rare random failures in this test in current master. So far nobody has
suggested a way of fixing it properly.
--
Anton Khirnov
_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
To unsubscribe, visit link above, or email
ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
^ permalink raw reply [flat|nested] 3+ messages in thread
end of thread, other threads:[~2024-01-24 19:45 UTC | newest]
Thread overview: 3+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2024-01-24 19:29 [FFmpeg-devel] [PATCH] fftools/ffmpeg: optimize inter-thread queue sizes Anton Khirnov
2024-01-24 19:35 ` Andreas Rheinhardt
2024-01-24 19:45 ` Anton Khirnov
Git Inbox Mirror of the ffmpeg-devel mailing list - see https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
This inbox may be cloned and mirrored by anyone:
git clone --mirror https://master.gitmailbox.com/ffmpegdev/0 ffmpegdev/git/0.git
# If you have public-inbox 1.1+ installed, you may
# initialize and index your mirror using the following commands:
public-inbox-init -V2 ffmpegdev ffmpegdev/ https://master.gitmailbox.com/ffmpegdev \
ffmpegdev@gitmailbox.com
public-inbox-index ffmpegdev
Example config snippet for mirrors.
AGPL code for this site: git clone https://public-inbox.org/public-inbox.git