* Re: [FFmpeg-devel] [PATCH 1/1] lavf/dashdec: Multithreaded DASH initialization
2022-08-20 21:35 [FFmpeg-devel] [PATCH 1/1] lavf/dashdec: Multithreaded DASH initialization Lukas Fellechner
@ 2022-08-20 21:53 ` Lukas Fellechner
2022-08-21 4:10 ` Steven Liu
2022-08-21 19:26 ` [FFmpeg-devel] [PATCH v2] " Lukas Fellechner
` (2 subsequent siblings)
3 siblings, 1 reply; 26+ messages in thread
From: Lukas Fellechner @ 2022-08-20 21:53 UTC (permalink / raw)
To: ffmpeg-devel
Trying with inline PATCH since attached file was not showing up...
---
From: Lukas Fellechner <lukas.fellechner@gmx.net>
Subject: [PATCH 1/1] lavf/dashdec: Multithreaded DASH initialization
Initializing DASH streams is currently slow, because each individual stream is opened and probed sequentially. With DASH streams often having somewhere between 10-20 streams, this can easily take up to half a minute. This patch adds an "init-threads" option, specifying the max number of threads to use. Multiple worker threads are spun up to massively bring down init times.
---
libavformat/dashdec.c | 421 +++++++++++++++++++++++++++++++++++++-----
1 file changed, 375 insertions(+), 46 deletions(-)
diff --git a/libavformat/dashdec.c b/libavformat/dashdec.c
index 63bf7e96a5..69a6c2ba79 100644
--- a/libavformat/dashdec.c
+++ b/libavformat/dashdec.c
@@ -24,6 +24,7 @@
#include "libavutil/opt.h"
#include "libavutil/time.h"
#include "libavutil/parseutils.h"
+#include "libavutil/thread.h"
#include "internal.h"
#include "avio_internal.h"
#include "dash.h"
@@ -152,6 +153,8 @@ typedef struct DASHContext {
int max_url_size;
char *cenc_decryption_key;
+ int init_threads;
+
/* Flags for init section*/
int is_init_section_common_video;
int is_init_section_common_audio;
@@ -1918,22 +1921,40 @@ fail:
return ret;
}
-static int open_demux_for_component(AVFormatContext *s, struct representation *pls)
+static int open_demux_for_component(AVFormatContext* s, struct representation* pls)
+{
+ int ret = 0;
+
+ ret = begin_open_demux_for_component(s, pls);
+ if (ret < 0)
+ return ret;
+
+ ret = end_open_demux_for_component(s, pls);
+
+ return ret;
+}
+
+static int begin_open_demux_for_component(AVFormatContext* s, struct representation* pls)
{
int ret = 0;
- int i;
pls->parent = s;
- pls->cur_seq_no = calc_cur_seg_no(s, pls);
+ pls->cur_seq_no = calc_cur_seg_no(s, pls);
if (!pls->last_seq_no) {
pls->last_seq_no = calc_max_seg_no(pls, s->priv_data);
}
ret = reopen_demux_for_component(s, pls);
- if (ret < 0) {
- goto fail;
- }
+
+ return ret;
+}
+
+static int end_open_demux_for_component(AVFormatContext* s, struct representation* pls)
+{
+ int ret = 0;
+ int i;
+
for (i = 0; i < pls->ctx->nb_streams; i++) {
AVStream *st = avformat_new_stream(s, NULL);
AVStream *ist = pls->ctx->streams[i];
@@ -2015,6 +2036,131 @@ static void move_metadata(AVStream *st, const char *key, char **value)
}
}
+struct work_pool_data
+{
+ AVFormatContext* ctx;
+ struct representation* pls;
+ struct representation* common_pls;
+ pthread_mutex_t* common_mutex;
+ pthread_cond_t* common_condition;
+ int is_common;
+ int is_started;
+ int result;
+};
+
+struct thread_data
+{
+ pthread_t thread;
+ pthread_mutex_t* mutex;
+ struct work_pool_data* work_pool;
+ int work_pool_size;
+ int is_started;
+};
+
+static void *worker_thread(void *ptr)
+{
+ int ret = 0;
+ int i;
+ struct thread_data* thread_data = (struct thread_data*)ptr;
+ struct work_pool_data* work_pool = NULL;
+ struct work_pool_data* data = NULL;
+ for (;;) {
+
+ // get next work item
+ pthread_mutex_lock(thread_data->mutex);
+ data = NULL;
+ work_pool = thread_data->work_pool;
+ for (i = 0; i < thread_data->work_pool_size; i++) {
+ if (!work_pool->is_started) {
+ data = work_pool;
+ data->is_started = 1;
+ break;
+ }
+ work_pool++;
+ }
+ pthread_mutex_unlock(thread_data->mutex);
+
+ if (!data) {
+ // no more work to do
+ return NULL;
+ }
+
+ // if we are common section provider, init and signal
+ if (data->is_common) {
+ data->pls->parent = data->ctx;
+ ret = update_init_section(data->pls);
+ if (ret < 0) {
+ pthread_cond_signal(data->common_condition);
+ goto end;
+ }
+ else
+ ret = AVERROR(pthread_cond_signal(data->common_condition));
+ }
+
+ // if we depend on common section provider, wait for signal and copy
+ if (data->common_pls) {
+ ret = AVERROR(pthread_cond_wait(data->common_condition, data->common_mutex));
+ if (ret < 0)
+ goto end;
+
+ if (!data->common_pls->init_sec_buf) {
+ goto end;
+ ret = AVERROR(EFAULT);
+ }
+
+ ret = copy_init_section(data->pls, data->common_pls);
+ if (ret < 0)
+ goto end;
+ }
+
+ ret = begin_open_demux_for_component(data->ctx, data->pls);
+ if (ret < 0)
+ goto end;
+
+ end:
+ data->result = ret;
+ }
+
+
+ return NULL;
+}
+
+static void create_work_pool_data(AVFormatContext* ctx, int stream_index,
+ struct representation* pls, struct representation* common_pls,
+ struct work_pool_data* init_data, pthread_mutex_t* common_mutex,
+ pthread_cond_t* common_condition)
+{
+ init_data->ctx = ctx;
+ init_data->pls = pls;
+ init_data->pls->stream_index = stream_index;
+ init_data->common_condition = common_condition;
+ init_data->common_mutex = common_mutex;
+ init_data->result = -1;
+
+ if (pls == common_pls) {
+ init_data->is_common = 1;
+ }
+ else if (common_pls) {
+ init_data->common_pls = common_pls;
+ }
+}
+
+static int start_thread(struct thread_data *thread_data,
+ struct work_pool_data *work_pool, int work_pool_size, pthread_mutex_t *mutex)
+{
+ int ret;
+
+ thread_data->mutex = mutex;
+ thread_data->work_pool = work_pool;
+ thread_data->work_pool_size = work_pool_size;
+
+ ret = AVERROR(pthread_create(&thread_data->thread, NULL, worker_thread, (void*)thread_data));
+ if (ret == 0)
+ thread_data->is_started = 1;
+
+ return ret;
+}
+
static int dash_read_header(AVFormatContext *s)
{
DASHContext *c = s->priv_data;
@@ -2040,63 +2186,245 @@ static int dash_read_header(AVFormatContext *s)
av_dict_set(&c->avio_opts, "seekable", "0", 0);
}
- if(c->n_videos)
+ if (c->n_videos)
c->is_init_section_common_video = is_common_init_section_exist(c->videos, c->n_videos);
- /* Open the demuxer for video and audio components if available */
- for (i = 0; i < c->n_videos; i++) {
- rep = c->videos[i];
- if (i > 0 && c->is_init_section_common_video) {
- ret = copy_init_section(rep, c->videos[0]);
+ if (c->n_audios)
+ c->is_init_section_common_audio = is_common_init_section_exist(c->audios, c->n_audios);
+
+ if (c->n_subtitles)
+ c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles);
+
+ int nstreams = c->n_videos + c->n_audios + c->n_subtitles;
+ int threads = FFMIN(nstreams, c->init_threads);
+
+ if (threads > 1)
+ {
+ // alloc data
+ struct work_pool_data* init_data = (struct work_pool_data*)av_mallocz(sizeof(struct work_pool_data) * nstreams);
+ if (!init_data)
+ return AVERROR(ENOMEM);
+
+ struct thread_data* thread_data = (struct thread_data*)av_mallocz(sizeof(struct thread_data) * threads);
+ if (!thread_data)
+ return AVERROR(ENOMEM);
+
+ // alloc mutex and conditions
+ pthread_mutex_t work_pool_mutex;
+
+ pthread_mutex_t common_video_mutex;
+ pthread_cond_t common_video_cond;
+
+ pthread_mutex_t common_audio_mutex;
+ pthread_cond_t common_audio_cond;
+
+ pthread_mutex_t common_subtitle_mutex;
+ pthread_cond_t common_subtitle_cond;
+
+ // init mutex and conditions
+ ret = AVERROR(pthread_mutex_init(&work_pool_mutex, NULL));
+ if (ret < 0)
+ goto cleanup;
+
+ if (c->is_init_section_common_video) {
+ ret = AVERROR(pthread_mutex_init(&common_video_mutex, NULL));
if (ret < 0)
- return ret;
+ goto cleanup;
+
+ ret = AVERROR(pthread_cond_init(&common_video_cond, NULL));
+ if (ret < 0)
+ goto cleanup;
}
- ret = open_demux_for_component(s, rep);
- if (ret)
- return ret;
- rep->stream_index = stream_index;
- ++stream_index;
- }
+ if (c->is_init_section_common_audio) {
+ ret = AVERROR(pthread_mutex_init(&common_audio_mutex, NULL));
+ if (ret < 0)
+ goto cleanup;
- if(c->n_audios)
- c->is_init_section_common_audio = is_common_init_section_exist(c->audios, c->n_audios);
+ ret = AVERROR(pthread_cond_init(&common_audio_cond, NULL));
+ if (ret < 0)
+ goto cleanup;
+ }
- for (i = 0; i < c->n_audios; i++) {
- rep = c->audios[i];
- if (i > 0 && c->is_init_section_common_audio) {
- ret = copy_init_section(rep, c->audios[0]);
+ if (c->is_init_section_common_subtitle) {
+ ret = AVERROR(pthread_mutex_init(&common_subtitle_mutex, NULL));
if (ret < 0)
- return ret;
+ goto cleanup;
+
+ ret = AVERROR(pthread_cond_init(&common_subtitle_cond, NULL));
+ if (ret < 0)
+ goto cleanup;
}
- ret = open_demux_for_component(s, rep);
- if (ret)
- return ret;
- rep->stream_index = stream_index;
- ++stream_index;
- }
+ // init work pool data
+ struct work_pool_data* current_data = init_data;
- if (c->n_subtitles)
- c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles);
+ for (i = 0; i < c->n_videos; i++) {
+ create_work_pool_data(s, stream_index, c->videos[i],
+ c->is_init_section_common_video ? c->videos[0] : NULL,
+ current_data, &common_video_mutex, &common_video_cond);
- for (i = 0; i < c->n_subtitles; i++) {
- rep = c->subtitles[i];
- if (i > 0 && c->is_init_section_common_subtitle) {
- ret = copy_init_section(rep, c->subtitles[0]);
+ stream_index++;
+ current_data++;
+ }
+
+ for (i = 0; i < c->n_audios; i++) {
+ create_work_pool_data(s, stream_index, c->audios[i],
+ c->is_init_section_common_audio ? c->audios[0] : NULL,
+ current_data, &common_audio_mutex, &common_audio_cond);
+
+ stream_index++;
+ current_data++;
+ }
+
+ for (i = 0; i < c->n_subtitles; i++) {
+ create_work_pool_data(s, stream_index, c->subtitles[i],
+ c->is_init_section_common_subtitle ? c->subtitles[0] : NULL,
+ current_data, &common_subtitle_mutex, &common_subtitle_cond);
+
+ stream_index++;
+ current_data++;
+ }
+
+ // start threads
+ struct thread_data* current_thread = thread_data;
+ for (i = 0; i < threads; i++) {
+ ret = start_thread(current_thread, init_data, nstreams, &work_pool_mutex);
if (ret < 0)
- return ret;
+ goto cleanup;
+
+ current_thread++;
}
- ret = open_demux_for_component(s, rep);
- if (ret)
- return ret;
- rep->stream_index = stream_index;
- ++stream_index;
+ cleanup:
+ // we need to cleanup even in case of errors, so we need to store results of init, run and cleanup
+ int initResult = ret;
+ int runResult = 0;
+ int cleanupResult = 0;
+
+ // join threads
+ current_thread = thread_data;
+ for (i = 0; i < threads; i++) {
+ if (current_thread->is_started) {
+ ret = AVERROR(pthread_join(current_thread->thread, NULL));
+ if (ret < 0)
+ cleanupResult = ret;
+ }
+ current_thread++;
+ }
+
+ // finalize streams and collect results
+ current_data = init_data;
+ for (i = 0; i < nstreams; i++) {
+ if (current_data->result < 0) {
+ // thread ran into error: collect result
+ runResult = current_data->result;
+ }
+ else {
+ // thread success: create streams on AVFormatContext
+ ret = end_open_demux_for_component(s, current_data->pls);
+ if (ret < 0)
+ runResult = ret;
+ }
+ current_data++;
+ }
+
+ // cleanup mutex and conditions
+ ret = AVERROR(pthread_mutex_destroy(&work_pool_mutex));
+ if (ret < 0)
+ cleanupResult = ret;
+
+ if (c->is_init_section_common_video) {
+ ret = AVERROR(pthread_mutex_destroy(&common_video_mutex));
+ if (ret < 0)
+ cleanupResult = ret;
+
+ ret = AVERROR(pthread_cond_destroy(&common_video_cond));
+ if (ret < 0)
+ cleanupResult = ret;
+ }
+
+ if (c->is_init_section_common_audio) {
+ ret = AVERROR(pthread_mutex_destroy(&common_audio_mutex));
+ if (ret < 0)
+ cleanupResult = ret;
+
+ ret = AVERROR(pthread_cond_destroy(&common_audio_cond));
+ if (ret < 0)
+ cleanupResult = ret;
+ }
+
+ if (c->is_init_section_common_subtitle) {
+ ret = AVERROR(pthread_mutex_destroy(&common_subtitle_mutex));
+ if (ret < 0)
+ cleanupResult = ret;
+
+ ret = AVERROR(pthread_cond_destroy(&common_subtitle_cond));
+ if (ret < 0)
+ cleanupResult = ret;
+ }
+
+ // return results if errors have occured in one of the phases
+ if (initResult < 0)
+ return initResult;
+
+ if (runResult < 0)
+ return runResult;
+
+ if (cleanupResult < 0)
+ return cleanupResult;
}
+ else
+ {
+ /* Open the demuxer for video and audio components if available */
+ for (i = 0; i < c->n_videos; i++) {
+ rep = c->videos[i];
+ if (i > 0 && c->is_init_section_common_video) {
+ ret = copy_init_section(rep, c->videos[0]);
+ if (ret < 0)
+ return ret;
+ }
+ ret = open_demux_for_component(s, rep);
- if (!stream_index)
- return AVERROR_INVALIDDATA;
+ if (ret)
+ return ret;
+ rep->stream_index = stream_index;
+ ++stream_index;
+ }
+
+ for (i = 0; i < c->n_audios; i++) {
+ rep = c->audios[i];
+ if (i > 0 && c->is_init_section_common_audio) {
+ ret = copy_init_section(rep, c->audios[0]);
+ if (ret < 0)
+ return ret;
+ }
+ ret = open_demux_for_component(s, rep);
+
+ if (ret)
+ return ret;
+ rep->stream_index = stream_index;
+ ++stream_index;
+ }
+
+ for (i = 0; i < c->n_subtitles; i++) {
+ rep = c->subtitles[i];
+ if (i > 0 && c->is_init_section_common_subtitle) {
+ ret = copy_init_section(rep, c->subtitles[0]);
+ if (ret < 0)
+ return ret;
+ }
+ ret = open_demux_for_component(s, rep);
+
+ if (ret)
+ return ret;
+ rep->stream_index = stream_index;
+ ++stream_index;
+ }
+
+ if (!stream_index)
+ return AVERROR_INVALIDDATA;
+ }
/* Create a program */
program = av_new_program(s, 0);
@@ -2349,6 +2677,7 @@ static const AVOption dash_options[] = {
{.str = "aac,m4a,m4s,m4v,mov,mp4,webm,ts"},
INT_MIN, INT_MAX, FLAGS},
{ "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key), AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS },
+ { "init_threads", "Number of threads to use for initializing the DASH stream", OFFSET(init_threads), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 64, FLAGS },
{NULL}
};
--
2.31.1.windows.1
_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
To unsubscribe, visit link above, or email
ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
^ permalink raw reply [flat|nested] 26+ messages in thread
* Re: [FFmpeg-devel] [PATCH 1/1] lavf/dashdec: Multithreaded DASH initialization
2022-08-20 21:53 ` Lukas Fellechner
@ 2022-08-21 4:10 ` Steven Liu
2022-08-21 12:47 ` Lukas Fellechner
0 siblings, 1 reply; 26+ messages in thread
From: Steven Liu @ 2022-08-21 4:10 UTC (permalink / raw)
To: FFmpeg development discussions and patches
Lukas Fellechner <Lukas.Fellechner@gmx.net> 于2022年8月21日周日 05:54写道:
>
> Trying with inline PATCH since attached file was not showing up...
>
> ---
>
> From: Lukas Fellechner <lukas.fellechner@gmx.net>
> Subject: [PATCH 1/1] lavf/dashdec: Multithreaded DASH initialization
>
> Initializing DASH streams is currently slow, because each individual stream is opened and probed sequentially. With DASH streams often having somewhere between 10-20 streams, this can easily take up to half a minute. This patch adds an "init-threads" option, specifying the max number of threads to use. Multiple worker threads are spun up to massively bring down init times.
> ---
> libavformat/dashdec.c | 421 +++++++++++++++++++++++++++++++++++++-----
> 1 file changed, 375 insertions(+), 46 deletions(-)
>
> diff --git a/libavformat/dashdec.c b/libavformat/dashdec.c
> index 63bf7e96a5..69a6c2ba79 100644
> --- a/libavformat/dashdec.c
> +++ b/libavformat/dashdec.c
> @@ -24,6 +24,7 @@
> #include "libavutil/opt.h"
> #include "libavutil/time.h"
> #include "libavutil/parseutils.h"
> +#include "libavutil/thread.h"
> #include "internal.h"
> #include "avio_internal.h"
> #include "dash.h"
> @@ -152,6 +153,8 @@ typedef struct DASHContext {
> int max_url_size;
> char *cenc_decryption_key;
>
> + int init_threads;
> +
> /* Flags for init section*/
> int is_init_section_common_video;
> int is_init_section_common_audio;
> @@ -1918,22 +1921,40 @@ fail:
> return ret;
> }
>
> -static int open_demux_for_component(AVFormatContext *s, struct representation *pls)
> +static int open_demux_for_component(AVFormatContext* s, struct representation* pls)
> +{
> + int ret = 0;
> +
> + ret = begin_open_demux_for_component(s, pls);
> + if (ret < 0)
> + return ret;
> +
> + ret = end_open_demux_for_component(s, pls);
> +
> + return ret;
> +}
> +
> +static int begin_open_demux_for_component(AVFormatContext* s, struct representation* pls)
> {
> int ret = 0;
> - int i;
>
> pls->parent = s;
> - pls->cur_seq_no = calc_cur_seg_no(s, pls);
> + pls->cur_seq_no = calc_cur_seg_no(s, pls);
>
> if (!pls->last_seq_no) {
> pls->last_seq_no = calc_max_seg_no(pls, s->priv_data);
> }
>
> ret = reopen_demux_for_component(s, pls);
> - if (ret < 0) {
> - goto fail;
> - }
> +
> + return ret;
> +}
> +
> +static int end_open_demux_for_component(AVFormatContext* s, struct representation* pls)
> +{
> + int ret = 0;
> + int i;
> +
> for (i = 0; i < pls->ctx->nb_streams; i++) {
> AVStream *st = avformat_new_stream(s, NULL);
> AVStream *ist = pls->ctx->streams[i];
> @@ -2015,6 +2036,131 @@ static void move_metadata(AVStream *st, const char *key, char **value)
> }
> }
>
> +struct work_pool_data
> +{
> + AVFormatContext* ctx;
> + struct representation* pls;
> + struct representation* common_pls;
> + pthread_mutex_t* common_mutex;
> + pthread_cond_t* common_condition;
Should add #if HAVE_THREADS to check if the pthread supported.
> + int is_common;
> + int is_started;
> + int result;
> +};
> +
> +struct thread_data
> +{
> + pthread_t thread;
> + pthread_mutex_t* mutex;
> + struct work_pool_data* work_pool;
> + int work_pool_size;
> + int is_started;
> +};
> +
> +static void *worker_thread(void *ptr)
> +{
> + int ret = 0;
> + int i;
> + struct thread_data* thread_data = (struct thread_data*)ptr;
> + struct work_pool_data* work_pool = NULL;
> + struct work_pool_data* data = NULL;
> + for (;;) {
> +
> + // get next work item
> + pthread_mutex_lock(thread_data->mutex);
> + data = NULL;
> + work_pool = thread_data->work_pool;
> + for (i = 0; i < thread_data->work_pool_size; i++) {
> + if (!work_pool->is_started) {
> + data = work_pool;
> + data->is_started = 1;
> + break;
> + }
> + work_pool++;
> + }
> + pthread_mutex_unlock(thread_data->mutex);
> +
> + if (!data) {
> + // no more work to do
> + return NULL;
> + }
> +
> + // if we are common section provider, init and signal
> + if (data->is_common) {
> + data->pls->parent = data->ctx;
> + ret = update_init_section(data->pls);
> + if (ret < 0) {
> + pthread_cond_signal(data->common_condition);
> + goto end;
> + }
> + else
> + ret = AVERROR(pthread_cond_signal(data->common_condition));
> + }
> +
> + // if we depend on common section provider, wait for signal and copy
> + if (data->common_pls) {
> + ret = AVERROR(pthread_cond_wait(data->common_condition, data->common_mutex));
> + if (ret < 0)
> + goto end;
> +
> + if (!data->common_pls->init_sec_buf) {
> + goto end;
> + ret = AVERROR(EFAULT);
> + }
> +
> + ret = copy_init_section(data->pls, data->common_pls);
> + if (ret < 0)
> + goto end;
> + }
> +
> + ret = begin_open_demux_for_component(data->ctx, data->pls);
> + if (ret < 0)
> + goto end;
> +
> + end:
> + data->result = ret;
> + }
> +
> +
> + return NULL;
> +}
> +
> +static void create_work_pool_data(AVFormatContext* ctx, int stream_index,
> + struct representation* pls, struct representation* common_pls,
> + struct work_pool_data* init_data, pthread_mutex_t* common_mutex,
> + pthread_cond_t* common_condition)
> +{
> + init_data->ctx = ctx;
> + init_data->pls = pls;
> + init_data->pls->stream_index = stream_index;
> + init_data->common_condition = common_condition;
> + init_data->common_mutex = common_mutex;
> + init_data->result = -1;
> +
> + if (pls == common_pls) {
> + init_data->is_common = 1;
> + }
> + else if (common_pls) {
> + init_data->common_pls = common_pls;
> + }
> +}
> +
> +static int start_thread(struct thread_data *thread_data,
> + struct work_pool_data *work_pool, int work_pool_size, pthread_mutex_t *mutex)
> +{
> + int ret;
> +
> + thread_data->mutex = mutex;
> + thread_data->work_pool = work_pool;
> + thread_data->work_pool_size = work_pool_size;
> +
> + ret = AVERROR(pthread_create(&thread_data->thread, NULL, worker_thread, (void*)thread_data));
> + if (ret == 0)
> + thread_data->is_started = 1;
> +
> + return ret;
> +}
> +
> static int dash_read_header(AVFormatContext *s)
> {
> DASHContext *c = s->priv_data;
> @@ -2040,63 +2186,245 @@ static int dash_read_header(AVFormatContext *s)
> av_dict_set(&c->avio_opts, "seekable", "0", 0);
> }
>
> - if(c->n_videos)
> + if (c->n_videos)
> c->is_init_section_common_video = is_common_init_section_exist(c->videos, c->n_videos);
>
> - /* Open the demuxer for video and audio components if available */
> - for (i = 0; i < c->n_videos; i++) {
> - rep = c->videos[i];
> - if (i > 0 && c->is_init_section_common_video) {
> - ret = copy_init_section(rep, c->videos[0]);
> + if (c->n_audios)
> + c->is_init_section_common_audio = is_common_init_section_exist(c->audios, c->n_audios);
> +
> + if (c->n_subtitles)
> + c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles);
> +
> + int nstreams = c->n_videos + c->n_audios + c->n_subtitles;
> + int threads = FFMIN(nstreams, c->init_threads);
> +
> + if (threads > 1)
> + {
> + // alloc data
> + struct work_pool_data* init_data = (struct work_pool_data*)av_mallocz(sizeof(struct work_pool_data) * nstreams);
> + if (!init_data)
> + return AVERROR(ENOMEM);
> +
> + struct thread_data* thread_data = (struct thread_data*)av_mallocz(sizeof(struct thread_data) * threads);
> + if (!thread_data)
> + return AVERROR(ENOMEM);
> +
> + // alloc mutex and conditions
> + pthread_mutex_t work_pool_mutex;
> +
> + pthread_mutex_t common_video_mutex;
> + pthread_cond_t common_video_cond;
> +
> + pthread_mutex_t common_audio_mutex;
> + pthread_cond_t common_audio_cond;
> +
> + pthread_mutex_t common_subtitle_mutex;
> + pthread_cond_t common_subtitle_cond;
> +
> + // init mutex and conditions
> + ret = AVERROR(pthread_mutex_init(&work_pool_mutex, NULL));
> + if (ret < 0)
> + goto cleanup;
> +
> + if (c->is_init_section_common_video) {
> + ret = AVERROR(pthread_mutex_init(&common_video_mutex, NULL));
> if (ret < 0)
> - return ret;
> + goto cleanup;
> +
> + ret = AVERROR(pthread_cond_init(&common_video_cond, NULL));
> + if (ret < 0)
> + goto cleanup;
> }
> - ret = open_demux_for_component(s, rep);
>
> - if (ret)
> - return ret;
> - rep->stream_index = stream_index;
> - ++stream_index;
> - }
> + if (c->is_init_section_common_audio) {
> + ret = AVERROR(pthread_mutex_init(&common_audio_mutex, NULL));
> + if (ret < 0)
> + goto cleanup;
>
> - if(c->n_audios)
> - c->is_init_section_common_audio = is_common_init_section_exist(c->audios, c->n_audios);
> + ret = AVERROR(pthread_cond_init(&common_audio_cond, NULL));
> + if (ret < 0)
> + goto cleanup;
> + }
>
> - for (i = 0; i < c->n_audios; i++) {
> - rep = c->audios[i];
> - if (i > 0 && c->is_init_section_common_audio) {
> - ret = copy_init_section(rep, c->audios[0]);
> + if (c->is_init_section_common_subtitle) {
> + ret = AVERROR(pthread_mutex_init(&common_subtitle_mutex, NULL));
> if (ret < 0)
> - return ret;
> + goto cleanup;
> +
> + ret = AVERROR(pthread_cond_init(&common_subtitle_cond, NULL));
> + if (ret < 0)
> + goto cleanup;
> }
> - ret = open_demux_for_component(s, rep);
>
> - if (ret)
> - return ret;
> - rep->stream_index = stream_index;
> - ++stream_index;
> - }
> + // init work pool data
> + struct work_pool_data* current_data = init_data;
>
> - if (c->n_subtitles)
> - c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles);
> + for (i = 0; i < c->n_videos; i++) {
> + create_work_pool_data(s, stream_index, c->videos[i],
> + c->is_init_section_common_video ? c->videos[0] : NULL,
> + current_data, &common_video_mutex, &common_video_cond);
>
> - for (i = 0; i < c->n_subtitles; i++) {
> - rep = c->subtitles[i];
> - if (i > 0 && c->is_init_section_common_subtitle) {
> - ret = copy_init_section(rep, c->subtitles[0]);
> + stream_index++;
> + current_data++;
> + }
> +
> + for (i = 0; i < c->n_audios; i++) {
> + create_work_pool_data(s, stream_index, c->audios[i],
> + c->is_init_section_common_audio ? c->audios[0] : NULL,
> + current_data, &common_audio_mutex, &common_audio_cond);
> +
> + stream_index++;
> + current_data++;
> + }
> +
> + for (i = 0; i < c->n_subtitles; i++) {
> + create_work_pool_data(s, stream_index, c->subtitles[i],
> + c->is_init_section_common_subtitle ? c->subtitles[0] : NULL,
> + current_data, &common_subtitle_mutex, &common_subtitle_cond);
> +
> + stream_index++;
> + current_data++;
> + }
> +
> + // start threads
> + struct thread_data* current_thread = thread_data;
> + for (i = 0; i < threads; i++) {
> + ret = start_thread(current_thread, init_data, nstreams, &work_pool_mutex);
> if (ret < 0)
> - return ret;
> + goto cleanup;
> +
> + current_thread++;
> }
> - ret = open_demux_for_component(s, rep);
>
> - if (ret)
> - return ret;
> - rep->stream_index = stream_index;
> - ++stream_index;
> + cleanup:
> + // we need to cleanup even in case of errors, so we need to store results of init, run and cleanup
> + int initResult = ret;
> + int runResult = 0;
> + int cleanupResult = 0;
> +
> + // join threads
> + current_thread = thread_data;
> + for (i = 0; i < threads; i++) {
> + if (current_thread->is_started) {
> + ret = AVERROR(pthread_join(current_thread->thread, NULL));
> + if (ret < 0)
> + cleanupResult = ret;
> + }
> + current_thread++;
> + }
> +
> + // finalize streams and collect results
> + current_data = init_data;
> + for (i = 0; i < nstreams; i++) {
> + if (current_data->result < 0) {
> + // thread ran into error: collect result
> + runResult = current_data->result;
> + }
> + else {
> + // thread success: create streams on AVFormatContext
> + ret = end_open_demux_for_component(s, current_data->pls);
> + if (ret < 0)
> + runResult = ret;
> + }
> + current_data++;
> + }
> +
> + // cleanup mutex and conditions
> + ret = AVERROR(pthread_mutex_destroy(&work_pool_mutex));
> + if (ret < 0)
> + cleanupResult = ret;
> +
> + if (c->is_init_section_common_video) {
> + ret = AVERROR(pthread_mutex_destroy(&common_video_mutex));
> + if (ret < 0)
> + cleanupResult = ret;
> +
> + ret = AVERROR(pthread_cond_destroy(&common_video_cond));
> + if (ret < 0)
> + cleanupResult = ret;
> + }
> +
> + if (c->is_init_section_common_audio) {
> + ret = AVERROR(pthread_mutex_destroy(&common_audio_mutex));
> + if (ret < 0)
> + cleanupResult = ret;
> +
> + ret = AVERROR(pthread_cond_destroy(&common_audio_cond));
> + if (ret < 0)
> + cleanupResult = ret;
> + }
> +
> + if (c->is_init_section_common_subtitle) {
> + ret = AVERROR(pthread_mutex_destroy(&common_subtitle_mutex));
> + if (ret < 0)
> + cleanupResult = ret;
> +
> + ret = AVERROR(pthread_cond_destroy(&common_subtitle_cond));
> + if (ret < 0)
> + cleanupResult = ret;
> + }
> +
> + // return results if errors have occured in one of the phases
> + if (initResult < 0)
> + return initResult;
> +
> + if (runResult < 0)
> + return runResult;
> +
> + if (cleanupResult < 0)
> + return cleanupResult;
> }
> + else
> + {
> + /* Open the demuxer for video and audio components if available */
> + for (i = 0; i < c->n_videos; i++) {
> + rep = c->videos[i];
> + if (i > 0 && c->is_init_section_common_video) {
> + ret = copy_init_section(rep, c->videos[0]);
> + if (ret < 0)
> + return ret;
> + }
> + ret = open_demux_for_component(s, rep);
>
> - if (!stream_index)
> - return AVERROR_INVALIDDATA;
> + if (ret)
> + return ret;
> + rep->stream_index = stream_index;
> + ++stream_index;
> + }
> +
> + for (i = 0; i < c->n_audios; i++) {
> + rep = c->audios[i];
> + if (i > 0 && c->is_init_section_common_audio) {
> + ret = copy_init_section(rep, c->audios[0]);
> + if (ret < 0)
> + return ret;
> + }
> + ret = open_demux_for_component(s, rep);
> +
> + if (ret)
> + return ret;
> + rep->stream_index = stream_index;
> + ++stream_index;
> + }
> +
> + for (i = 0; i < c->n_subtitles; i++) {
> + rep = c->subtitles[i];
> + if (i > 0 && c->is_init_section_common_subtitle) {
> + ret = copy_init_section(rep, c->subtitles[0]);
> + if (ret < 0)
> + return ret;
> + }
> + ret = open_demux_for_component(s, rep);
> +
> + if (ret)
> + return ret;
> + rep->stream_index = stream_index;
> + ++stream_index;
> + }
> +
> + if (!stream_index)
> + return AVERROR_INVALIDDATA;
> + }
>
> /* Create a program */
> program = av_new_program(s, 0);
> @@ -2349,6 +2677,7 @@ static const AVOption dash_options[] = {
> {.str = "aac,m4a,m4s,m4v,mov,mp4,webm,ts"},
> INT_MIN, INT_MAX, FLAGS},
> { "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key), AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS },
> + { "init_threads", "Number of threads to use for initializing the DASH stream", OFFSET(init_threads), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 64, FLAGS },
> {NULL}
> };
>
> --
> 2.31.1.windows.1
>
>
> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel@ffmpeg.org
> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
>
> To unsubscribe, visit link above, or email
> ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
To unsubscribe, visit link above, or email
ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
^ permalink raw reply [flat|nested] 26+ messages in thread
* [FFmpeg-devel] [PATCH v2] lavf/dashdec: Multithreaded DASH initialization
2022-08-20 21:35 [FFmpeg-devel] [PATCH 1/1] lavf/dashdec: Multithreaded DASH initialization Lukas Fellechner
2022-08-20 21:53 ` Lukas Fellechner
@ 2022-08-21 19:26 ` Lukas Fellechner
2022-08-23 3:19 ` Steven Liu
2022-08-23 19:03 ` [FFmpeg-devel] [PATCH v3 0/3] " Lukas Fellechner
2022-09-05 21:16 ` [FFmpeg-devel] [PATCH v4 0/4] lavf/dashdec: Multithreaded DASH initialization Lukas Fellechner
3 siblings, 1 reply; 26+ messages in thread
From: Lukas Fellechner @ 2022-08-21 19:26 UTC (permalink / raw)
To: ffmpeg-devel; +Cc: Lukas Fellechner
Initializing DASH streams is currently slow, because each individual stream is opened and probed sequentially. With DASH streams often having somewhere between 10-20 streams, this can easily take up to half a minute. This patch adds an "init-threads" option, specifying the max number of threads to use. Multiple worker threads are spun up to massively bring down init times.
---
libavformat/dashdec.c | 432 +++++++++++++++++++++++++++++++++++++-----
1 file changed, 386 insertions(+), 46 deletions(-)
diff --git a/libavformat/dashdec.c b/libavformat/dashdec.c
index 63bf7e96a5..7eca3e3415 100644
--- a/libavformat/dashdec.c
+++ b/libavformat/dashdec.c
@@ -24,6 +24,7 @@
#include "libavutil/opt.h"
#include "libavutil/time.h"
#include "libavutil/parseutils.h"
+#include "libavutil/thread.h"
#include "internal.h"
#include "avio_internal.h"
#include "dash.h"
@@ -152,6 +153,8 @@ typedef struct DASHContext {
int max_url_size;
char *cenc_decryption_key;
+ int init_threads;
+
/* Flags for init section*/
int is_init_section_common_video;
int is_init_section_common_audio;
@@ -1918,22 +1921,40 @@ fail:
return ret;
}
-static int open_demux_for_component(AVFormatContext *s, struct representation *pls)
+static int open_demux_for_component(AVFormatContext* s, struct representation* pls)
+{
+ int ret = 0;
+
+ ret = begin_open_demux_for_component(s, pls);
+ if (ret < 0)
+ return ret;
+
+ ret = end_open_demux_for_component(s, pls);
+
+ return ret;
+}
+
+static int begin_open_demux_for_component(AVFormatContext* s, struct representation* pls)
{
int ret = 0;
- int i;
pls->parent = s;
- pls->cur_seq_no = calc_cur_seg_no(s, pls);
+ pls->cur_seq_no = calc_cur_seg_no(s, pls);
if (!pls->last_seq_no) {
pls->last_seq_no = calc_max_seg_no(pls, s->priv_data);
}
ret = reopen_demux_for_component(s, pls);
- if (ret < 0) {
- goto fail;
- }
+
+ return ret;
+}
+
+static int end_open_demux_for_component(AVFormatContext* s, struct representation* pls)
+{
+ int ret = 0;
+ int i;
+
for (i = 0; i < pls->ctx->nb_streams; i++) {
AVStream *st = avformat_new_stream(s, NULL);
AVStream *ist = pls->ctx->streams[i];
@@ -2015,6 +2036,135 @@ static void move_metadata(AVStream *st, const char *key, char **value)
}
}
+#if HAVE_THREADS
+
+struct work_pool_data
+{
+ AVFormatContext* ctx;
+ struct representation* pls;
+ struct representation* common_pls;
+ pthread_mutex_t* common_mutex;
+ pthread_cond_t* common_condition;
+ int is_common;
+ int is_started;
+ int result;
+};
+
+struct thread_data
+{
+ pthread_t thread;
+ pthread_mutex_t* mutex;
+ struct work_pool_data* work_pool;
+ int work_pool_size;
+ int is_started;
+};
+
+static void *worker_thread(void *ptr)
+{
+ int ret = 0;
+ int i;
+ struct thread_data* thread_data = (struct thread_data*)ptr;
+ struct work_pool_data* work_pool = NULL;
+ struct work_pool_data* data = NULL;
+ for (;;) {
+
+ // get next work item
+ pthread_mutex_lock(thread_data->mutex);
+ data = NULL;
+ work_pool = thread_data->work_pool;
+ for (i = 0; i < thread_data->work_pool_size; i++) {
+ if (!work_pool->is_started) {
+ data = work_pool;
+ data->is_started = 1;
+ break;
+ }
+ work_pool++;
+ }
+ pthread_mutex_unlock(thread_data->mutex);
+
+ if (!data) {
+ // no more work to do
+ return NULL;
+ }
+
+ // if we are common section provider, init and signal
+ if (data->is_common) {
+ data->pls->parent = data->ctx;
+ ret = update_init_section(data->pls);
+ if (ret < 0) {
+ pthread_cond_signal(data->common_condition);
+ goto end;
+ }
+ else
+ ret = AVERROR(pthread_cond_signal(data->common_condition));
+ }
+
+ // if we depend on common section provider, wait for signal and copy
+ if (data->common_pls) {
+ ret = AVERROR(pthread_cond_wait(data->common_condition, data->common_mutex));
+ if (ret < 0)
+ goto end;
+
+ if (!data->common_pls->init_sec_buf) {
+ goto end;
+ ret = AVERROR(EFAULT);
+ }
+
+ ret = copy_init_section(data->pls, data->common_pls);
+ if (ret < 0)
+ goto end;
+ }
+
+ ret = begin_open_demux_for_component(data->ctx, data->pls);
+ if (ret < 0)
+ goto end;
+
+ end:
+ data->result = ret;
+ }
+
+
+ return NULL;
+}
+
+static void create_work_pool_data(AVFormatContext* ctx, int stream_index,
+ struct representation* pls, struct representation* common_pls,
+ struct work_pool_data* init_data, pthread_mutex_t* common_mutex,
+ pthread_cond_t* common_condition)
+{
+ init_data->ctx = ctx;
+ init_data->pls = pls;
+ init_data->pls->stream_index = stream_index;
+ init_data->common_condition = common_condition;
+ init_data->common_mutex = common_mutex;
+ init_data->result = -1;
+
+ if (pls == common_pls) {
+ init_data->is_common = 1;
+ }
+ else if (common_pls) {
+ init_data->common_pls = common_pls;
+ }
+}
+
+static int start_thread(struct thread_data *thread_data,
+ struct work_pool_data *work_pool, int work_pool_size, pthread_mutex_t *mutex)
+{
+ int ret;
+
+ thread_data->mutex = mutex;
+ thread_data->work_pool = work_pool;
+ thread_data->work_pool_size = work_pool_size;
+
+ ret = AVERROR(pthread_create(&thread_data->thread, NULL, worker_thread, (void*)thread_data));
+ if (ret == 0)
+ thread_data->is_started = 1;
+
+ return ret;
+}
+
+#endif
+
static int dash_read_header(AVFormatContext *s)
{
DASHContext *c = s->priv_data;
@@ -2040,63 +2190,252 @@ static int dash_read_header(AVFormatContext *s)
av_dict_set(&c->avio_opts, "seekable", "0", 0);
}
- if(c->n_videos)
+ if (c->n_videos)
c->is_init_section_common_video = is_common_init_section_exist(c->videos, c->n_videos);
- /* Open the demuxer for video and audio components if available */
- for (i = 0; i < c->n_videos; i++) {
- rep = c->videos[i];
- if (i > 0 && c->is_init_section_common_video) {
- ret = copy_init_section(rep, c->videos[0]);
+ if (c->n_audios)
+ c->is_init_section_common_audio = is_common_init_section_exist(c->audios, c->n_audios);
+
+ if (c->n_subtitles)
+ c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles);
+
+ int threads = 0;
+ int nstreams = c->n_videos + c->n_audios + c->n_subtitles;
+
+#if HAVE_THREADS
+ threads = FFMIN(nstreams, c->init_threads);
+#endif
+
+ if (threads > 1)
+ {
+#if HAVE_THREADS
+ // alloc data
+ struct work_pool_data* init_data = (struct work_pool_data*)av_mallocz(sizeof(struct work_pool_data) * nstreams);
+ if (!init_data)
+ return AVERROR(ENOMEM);
+
+ struct thread_data* thread_data = (struct thread_data*)av_mallocz(sizeof(struct thread_data) * threads);
+ if (!thread_data)
+ return AVERROR(ENOMEM);
+
+ // alloc mutex and conditions
+ pthread_mutex_t work_pool_mutex;
+
+ pthread_mutex_t common_video_mutex;
+ pthread_cond_t common_video_cond;
+
+ pthread_mutex_t common_audio_mutex;
+ pthread_cond_t common_audio_cond;
+
+ pthread_mutex_t common_subtitle_mutex;
+ pthread_cond_t common_subtitle_cond;
+
+ // init mutex and conditions
+ ret = AVERROR(pthread_mutex_init(&work_pool_mutex, NULL));
+ if (ret < 0)
+ goto cleanup;
+
+ if (c->is_init_section_common_video) {
+ ret = AVERROR(pthread_mutex_init(&common_video_mutex, NULL));
if (ret < 0)
- return ret;
+ goto cleanup;
+
+ ret = AVERROR(pthread_cond_init(&common_video_cond, NULL));
+ if (ret < 0)
+ goto cleanup;
}
- ret = open_demux_for_component(s, rep);
- if (ret)
- return ret;
- rep->stream_index = stream_index;
- ++stream_index;
- }
+ if (c->is_init_section_common_audio) {
+ ret = AVERROR(pthread_mutex_init(&common_audio_mutex, NULL));
+ if (ret < 0)
+ goto cleanup;
- if(c->n_audios)
- c->is_init_section_common_audio = is_common_init_section_exist(c->audios, c->n_audios);
+ ret = AVERROR(pthread_cond_init(&common_audio_cond, NULL));
+ if (ret < 0)
+ goto cleanup;
+ }
- for (i = 0; i < c->n_audios; i++) {
- rep = c->audios[i];
- if (i > 0 && c->is_init_section_common_audio) {
- ret = copy_init_section(rep, c->audios[0]);
+ if (c->is_init_section_common_subtitle) {
+ ret = AVERROR(pthread_mutex_init(&common_subtitle_mutex, NULL));
if (ret < 0)
- return ret;
+ goto cleanup;
+
+ ret = AVERROR(pthread_cond_init(&common_subtitle_cond, NULL));
+ if (ret < 0)
+ goto cleanup;
}
- ret = open_demux_for_component(s, rep);
- if (ret)
- return ret;
- rep->stream_index = stream_index;
- ++stream_index;
- }
+ // init work pool data
+ struct work_pool_data* current_data = init_data;
- if (c->n_subtitles)
- c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles);
+ for (i = 0; i < c->n_videos; i++) {
+ create_work_pool_data(s, stream_index, c->videos[i],
+ c->is_init_section_common_video ? c->videos[0] : NULL,
+ current_data, &common_video_mutex, &common_video_cond);
- for (i = 0; i < c->n_subtitles; i++) {
- rep = c->subtitles[i];
- if (i > 0 && c->is_init_section_common_subtitle) {
- ret = copy_init_section(rep, c->subtitles[0]);
+ stream_index++;
+ current_data++;
+ }
+
+ for (i = 0; i < c->n_audios; i++) {
+ create_work_pool_data(s, stream_index, c->audios[i],
+ c->is_init_section_common_audio ? c->audios[0] : NULL,
+ current_data, &common_audio_mutex, &common_audio_cond);
+
+ stream_index++;
+ current_data++;
+ }
+
+ for (i = 0; i < c->n_subtitles; i++) {
+ create_work_pool_data(s, stream_index, c->subtitles[i],
+ c->is_init_section_common_subtitle ? c->subtitles[0] : NULL,
+ current_data, &common_subtitle_mutex, &common_subtitle_cond);
+
+ stream_index++;
+ current_data++;
+ }
+
+ // start threads
+ struct thread_data* current_thread = thread_data;
+ for (i = 0; i < threads; i++) {
+ ret = start_thread(current_thread, init_data, nstreams, &work_pool_mutex);
if (ret < 0)
- return ret;
+ goto cleanup;
+
+ current_thread++;
}
- ret = open_demux_for_component(s, rep);
- if (ret)
- return ret;
- rep->stream_index = stream_index;
- ++stream_index;
+ cleanup:
+ // we need to cleanup even in case of errors, so we need to store results of init, run and cleanup
+ int initResult = ret;
+ int runResult = 0;
+ int cleanupResult = 0;
+
+ // join threads
+ current_thread = thread_data;
+ for (i = 0; i < threads; i++) {
+ if (current_thread->is_started) {
+ ret = AVERROR(pthread_join(current_thread->thread, NULL));
+ if (ret < 0)
+ cleanupResult = ret;
+ }
+ current_thread++;
+ }
+
+ // finalize streams and collect results
+ current_data = init_data;
+ for (i = 0; i < nstreams; i++) {
+ if (current_data->result < 0) {
+ // thread ran into error: collect result
+ runResult = current_data->result;
+ }
+ else {
+ // thread success: create streams on AVFormatContext
+ ret = end_open_demux_for_component(s, current_data->pls);
+ if (ret < 0)
+ runResult = ret;
+ }
+ current_data++;
+ }
+
+ // cleanup mutex and conditions
+ ret = AVERROR(pthread_mutex_destroy(&work_pool_mutex));
+ if (ret < 0)
+ cleanupResult = ret;
+
+ if (c->is_init_section_common_video) {
+ ret = AVERROR(pthread_mutex_destroy(&common_video_mutex));
+ if (ret < 0)
+ cleanupResult = ret;
+
+ ret = AVERROR(pthread_cond_destroy(&common_video_cond));
+ if (ret < 0)
+ cleanupResult = ret;
+ }
+
+ if (c->is_init_section_common_audio) {
+ ret = AVERROR(pthread_mutex_destroy(&common_audio_mutex));
+ if (ret < 0)
+ cleanupResult = ret;
+
+ ret = AVERROR(pthread_cond_destroy(&common_audio_cond));
+ if (ret < 0)
+ cleanupResult = ret;
+ }
+
+ if (c->is_init_section_common_subtitle) {
+ ret = AVERROR(pthread_mutex_destroy(&common_subtitle_mutex));
+ if (ret < 0)
+ cleanupResult = ret;
+
+ ret = AVERROR(pthread_cond_destroy(&common_subtitle_cond));
+ if (ret < 0)
+ cleanupResult = ret;
+ }
+
+ // return results if errors have occured in one of the phases
+ if (initResult < 0)
+ return initResult;
+
+ if (runResult < 0)
+ return runResult;
+
+ if (cleanupResult < 0)
+ return cleanupResult;
+
+#endif
}
+ else
+ {
+ /* Open the demuxer for video and audio components if available */
+ for (i = 0; i < c->n_videos; i++) {
+ rep = c->videos[i];
+ if (i > 0 && c->is_init_section_common_video) {
+ ret = copy_init_section(rep, c->videos[0]);
+ if (ret < 0)
+ return ret;
+ }
+ ret = open_demux_for_component(s, rep);
- if (!stream_index)
- return AVERROR_INVALIDDATA;
+ if (ret)
+ return ret;
+ rep->stream_index = stream_index;
+ ++stream_index;
+ }
+
+ for (i = 0; i < c->n_audios; i++) {
+ rep = c->audios[i];
+ if (i > 0 && c->is_init_section_common_audio) {
+ ret = copy_init_section(rep, c->audios[0]);
+ if (ret < 0)
+ return ret;
+ }
+ ret = open_demux_for_component(s, rep);
+
+ if (ret)
+ return ret;
+ rep->stream_index = stream_index;
+ ++stream_index;
+ }
+
+ for (i = 0; i < c->n_subtitles; i++) {
+ rep = c->subtitles[i];
+ if (i > 0 && c->is_init_section_common_subtitle) {
+ ret = copy_init_section(rep, c->subtitles[0]);
+ if (ret < 0)
+ return ret;
+ }
+ ret = open_demux_for_component(s, rep);
+
+ if (ret)
+ return ret;
+ rep->stream_index = stream_index;
+ ++stream_index;
+ }
+
+ if (!stream_index)
+ return AVERROR_INVALIDDATA;
+ }
/* Create a program */
program = av_new_program(s, 0);
@@ -2349,6 +2688,7 @@ static const AVOption dash_options[] = {
{.str = "aac,m4a,m4s,m4v,mov,mp4,webm,ts"},
INT_MIN, INT_MAX, FLAGS},
{ "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key), AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS },
+ { "init_threads", "Number of threads to use for initializing the DASH stream", OFFSET(init_threads), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 64, FLAGS },
{NULL}
};
--
2.31.1.windows.1
_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
To unsubscribe, visit link above, or email
ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
^ permalink raw reply [flat|nested] 26+ messages in thread
* Re: [FFmpeg-devel] [PATCH v2] lavf/dashdec: Multithreaded DASH initialization
2022-08-21 19:26 ` [FFmpeg-devel] [PATCH v2] " Lukas Fellechner
@ 2022-08-23 3:19 ` Steven Liu
2022-08-23 19:09 ` Lukas Fellechner
0 siblings, 1 reply; 26+ messages in thread
From: Steven Liu @ 2022-08-23 3:19 UTC (permalink / raw)
To: FFmpeg development discussions and patches
Lukas Fellechner <lukas.fellechner@gmx.net> 于2022年8月22日周一 03:27写道:
>
> Initializing DASH streams is currently slow, because each individual stream is opened and probed sequentially. With DASH streams often having somewhere between 10-20 streams, this can easily take up to half a minute. This patch adds an "init-threads" option, specifying the max number of threads to use. Multiple worker threads are spun up to massively bring down init times.
> ---
> libavformat/dashdec.c | 432 +++++++++++++++++++++++++++++++++++++-----
> 1 file changed, 386 insertions(+), 46 deletions(-)
>
> diff --git a/libavformat/dashdec.c b/libavformat/dashdec.c
> index 63bf7e96a5..7eca3e3415 100644
> --- a/libavformat/dashdec.c
> +++ b/libavformat/dashdec.c
> @@ -24,6 +24,7 @@
> #include "libavutil/opt.h"
> #include "libavutil/time.h"
> #include "libavutil/parseutils.h"
> +#include "libavutil/thread.h"
> #include "internal.h"
> #include "avio_internal.h"
> #include "dash.h"
> @@ -152,6 +153,8 @@ typedef struct DASHContext {
> int max_url_size;
> char *cenc_decryption_key;
>
> + int init_threads;
> +
> /* Flags for init section*/
> int is_init_section_common_video;
> int is_init_section_common_audio;
> @@ -1918,22 +1921,40 @@ fail:
> return ret;
> }
>
> -static int open_demux_for_component(AVFormatContext *s, struct representation *pls)
> +static int open_demux_for_component(AVFormatContext* s, struct representation* pls)
> +{
> + int ret = 0;
> +
> + ret = begin_open_demux_for_component(s, pls);
> + if (ret < 0)
> + return ret;
> +
> + ret = end_open_demux_for_component(s, pls);
> +
> + return ret;
> +}
> +
> +static int begin_open_demux_for_component(AVFormatContext* s, struct representation* pls)
> {
> int ret = 0;
> - int i;
>
> pls->parent = s;
> - pls->cur_seq_no = calc_cur_seg_no(s, pls);
> + pls->cur_seq_no = calc_cur_seg_no(s, pls);
>
> if (!pls->last_seq_no) {
> pls->last_seq_no = calc_max_seg_no(pls, s->priv_data);
> }
>
> ret = reopen_demux_for_component(s, pls);
> - if (ret < 0) {
> - goto fail;
> - }
> +
> + return ret;
> +}
> +
> +static int end_open_demux_for_component(AVFormatContext* s, struct representation* pls)
> +{
> + int ret = 0;
> + int i;
> +
> for (i = 0; i < pls->ctx->nb_streams; i++) {
> AVStream *st = avformat_new_stream(s, NULL);
> AVStream *ist = pls->ctx->streams[i];
> @@ -2015,6 +2036,135 @@ static void move_metadata(AVStream *st, const char *key, char **value)
> }
> }
I look at the new functions likes begin_open_demux_for_component and
end_open_demux_for_component maybe can separate patch.
maybe you can submit two patch, one is make code clarify, the other
one support multithreads,
BTW, i saw there have two warning message about patch commit cooments
in patchwork: https://patchwork.ffmpeg.org/project/ffmpeg/patch/20220821192636.734-1-lukas.fellechner@gmx.net/
>
> +#if HAVE_THREADS
> +
> +struct work_pool_data
> +{
> + AVFormatContext* ctx;
> + struct representation* pls;
> + struct representation* common_pls;
> + pthread_mutex_t* common_mutex;
> + pthread_cond_t* common_condition;
> + int is_common;
> + int is_started;
> + int result;
> +};
> +
> +struct thread_data
> +{
> + pthread_t thread;
> + pthread_mutex_t* mutex;
> + struct work_pool_data* work_pool;
> + int work_pool_size;
> + int is_started;
> +};
> +
> +static void *worker_thread(void *ptr)
> +{
> + int ret = 0;
> + int i;
> + struct thread_data* thread_data = (struct thread_data*)ptr;
> + struct work_pool_data* work_pool = NULL;
> + struct work_pool_data* data = NULL;
> + for (;;) {
> +
> + // get next work item
> + pthread_mutex_lock(thread_data->mutex);
> + data = NULL;
> + work_pool = thread_data->work_pool;
> + for (i = 0; i < thread_data->work_pool_size; i++) {
> + if (!work_pool->is_started) {
> + data = work_pool;
> + data->is_started = 1;
> + break;
> + }
> + work_pool++;
> + }
> + pthread_mutex_unlock(thread_data->mutex);
> +
> + if (!data) {
> + // no more work to do
> + return NULL;
> + }
> +
> + // if we are common section provider, init and signal
> + if (data->is_common) {
> + data->pls->parent = data->ctx;
> + ret = update_init_section(data->pls);
> + if (ret < 0) {
> + pthread_cond_signal(data->common_condition);
> + goto end;
> + }
> + else
> + ret = AVERROR(pthread_cond_signal(data->common_condition));
> + }
> +
> + // if we depend on common section provider, wait for signal and copy
> + if (data->common_pls) {
> + ret = AVERROR(pthread_cond_wait(data->common_condition, data->common_mutex));
> + if (ret < 0)
> + goto end;
> +
> + if (!data->common_pls->init_sec_buf) {
> + goto end;
> + ret = AVERROR(EFAULT);
> + }
> +
> + ret = copy_init_section(data->pls, data->common_pls);
> + if (ret < 0)
> + goto end;
> + }
> +
> + ret = begin_open_demux_for_component(data->ctx, data->pls);
> + if (ret < 0)
> + goto end;
> +
> + end:
> + data->result = ret;
> + }
> +
> +
> + return NULL;
> +}
> +
> +static void create_work_pool_data(AVFormatContext* ctx, int stream_index,
> + struct representation* pls, struct representation* common_pls,
> + struct work_pool_data* init_data, pthread_mutex_t* common_mutex,
> + pthread_cond_t* common_condition)
> +{
> + init_data->ctx = ctx;
> + init_data->pls = pls;
> + init_data->pls->stream_index = stream_index;
> + init_data->common_condition = common_condition;
> + init_data->common_mutex = common_mutex;
> + init_data->result = -1;
> +
> + if (pls == common_pls) {
> + init_data->is_common = 1;
> + }
> + else if (common_pls) {
> + init_data->common_pls = common_pls;
> + }
> +}
> +
> +static int start_thread(struct thread_data *thread_data,
> + struct work_pool_data *work_pool, int work_pool_size, pthread_mutex_t *mutex)
> +{
> + int ret;
> +
> + thread_data->mutex = mutex;
> + thread_data->work_pool = work_pool;
> + thread_data->work_pool_size = work_pool_size;
> +
> + ret = AVERROR(pthread_create(&thread_data->thread, NULL, worker_thread, (void*)thread_data));
> + if (ret == 0)
> + thread_data->is_started = 1;
> +
> + return ret;
> +}
> +
> +#endif
> +
> static int dash_read_header(AVFormatContext *s)
> {
> DASHContext *c = s->priv_data;
> @@ -2040,63 +2190,252 @@ static int dash_read_header(AVFormatContext *s)
> av_dict_set(&c->avio_opts, "seekable", "0", 0);
> }
>
> - if(c->n_videos)
> + if (c->n_videos)
> c->is_init_section_common_video = is_common_init_section_exist(c->videos, c->n_videos);
>
> - /* Open the demuxer for video and audio components if available */
> - for (i = 0; i < c->n_videos; i++) {
> - rep = c->videos[i];
> - if (i > 0 && c->is_init_section_common_video) {
> - ret = copy_init_section(rep, c->videos[0]);
> + if (c->n_audios)
> + c->is_init_section_common_audio = is_common_init_section_exist(c->audios, c->n_audios);
> +
> + if (c->n_subtitles)
> + c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles);
> +
> + int threads = 0;
> + int nstreams = c->n_videos + c->n_audios + c->n_subtitles;
> +
> +#if HAVE_THREADS
> + threads = FFMIN(nstreams, c->init_threads);
> +#endif
> +
> + if (threads > 1)
> + {
> +#if HAVE_THREADS
> + // alloc data
> + struct work_pool_data* init_data = (struct work_pool_data*)av_mallocz(sizeof(struct work_pool_data) * nstreams);
> + if (!init_data)
> + return AVERROR(ENOMEM);
> +
> + struct thread_data* thread_data = (struct thread_data*)av_mallocz(sizeof(struct thread_data) * threads);
> + if (!thread_data)
> + return AVERROR(ENOMEM);
> +
> + // alloc mutex and conditions
> + pthread_mutex_t work_pool_mutex;
> +
> + pthread_mutex_t common_video_mutex;
> + pthread_cond_t common_video_cond;
> +
> + pthread_mutex_t common_audio_mutex;
> + pthread_cond_t common_audio_cond;
> +
> + pthread_mutex_t common_subtitle_mutex;
> + pthread_cond_t common_subtitle_cond;
> +
> + // init mutex and conditions
> + ret = AVERROR(pthread_mutex_init(&work_pool_mutex, NULL));
> + if (ret < 0)
> + goto cleanup;
> +
> + if (c->is_init_section_common_video) {
> + ret = AVERROR(pthread_mutex_init(&common_video_mutex, NULL));
> if (ret < 0)
> - return ret;
> + goto cleanup;
> +
> + ret = AVERROR(pthread_cond_init(&common_video_cond, NULL));
> + if (ret < 0)
> + goto cleanup;
> }
> - ret = open_demux_for_component(s, rep);
>
> - if (ret)
> - return ret;
> - rep->stream_index = stream_index;
> - ++stream_index;
> - }
> + if (c->is_init_section_common_audio) {
> + ret = AVERROR(pthread_mutex_init(&common_audio_mutex, NULL));
> + if (ret < 0)
> + goto cleanup;
>
> - if(c->n_audios)
> - c->is_init_section_common_audio = is_common_init_section_exist(c->audios, c->n_audios);
> + ret = AVERROR(pthread_cond_init(&common_audio_cond, NULL));
> + if (ret < 0)
> + goto cleanup;
> + }
>
> - for (i = 0; i < c->n_audios; i++) {
> - rep = c->audios[i];
> - if (i > 0 && c->is_init_section_common_audio) {
> - ret = copy_init_section(rep, c->audios[0]);
> + if (c->is_init_section_common_subtitle) {
> + ret = AVERROR(pthread_mutex_init(&common_subtitle_mutex, NULL));
> if (ret < 0)
> - return ret;
> + goto cleanup;
> +
> + ret = AVERROR(pthread_cond_init(&common_subtitle_cond, NULL));
> + if (ret < 0)
> + goto cleanup;
> }
> - ret = open_demux_for_component(s, rep);
>
> - if (ret)
> - return ret;
> - rep->stream_index = stream_index;
> - ++stream_index;
> - }
> + // init work pool data
> + struct work_pool_data* current_data = init_data;
>
> - if (c->n_subtitles)
> - c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles);
> + for (i = 0; i < c->n_videos; i++) {
> + create_work_pool_data(s, stream_index, c->videos[i],
> + c->is_init_section_common_video ? c->videos[0] : NULL,
> + current_data, &common_video_mutex, &common_video_cond);
>
> - for (i = 0; i < c->n_subtitles; i++) {
> - rep = c->subtitles[i];
> - if (i > 0 && c->is_init_section_common_subtitle) {
> - ret = copy_init_section(rep, c->subtitles[0]);
> + stream_index++;
> + current_data++;
> + }
> +
> + for (i = 0; i < c->n_audios; i++) {
> + create_work_pool_data(s, stream_index, c->audios[i],
> + c->is_init_section_common_audio ? c->audios[0] : NULL,
> + current_data, &common_audio_mutex, &common_audio_cond);
> +
> + stream_index++;
> + current_data++;
> + }
> +
> + for (i = 0; i < c->n_subtitles; i++) {
> + create_work_pool_data(s, stream_index, c->subtitles[i],
> + c->is_init_section_common_subtitle ? c->subtitles[0] : NULL,
> + current_data, &common_subtitle_mutex, &common_subtitle_cond);
> +
> + stream_index++;
> + current_data++;
> + }
> +
> + // start threads
> + struct thread_data* current_thread = thread_data;
> + for (i = 0; i < threads; i++) {
> + ret = start_thread(current_thread, init_data, nstreams, &work_pool_mutex);
> if (ret < 0)
> - return ret;
> + goto cleanup;
> +
> + current_thread++;
> }
> - ret = open_demux_for_component(s, rep);
>
> - if (ret)
> - return ret;
> - rep->stream_index = stream_index;
> - ++stream_index;
> + cleanup:
> + // we need to cleanup even in case of errors, so we need to store results of init, run and cleanup
> + int initResult = ret;
> + int runResult = 0;
> + int cleanupResult = 0;
> +
> + // join threads
> + current_thread = thread_data;
> + for (i = 0; i < threads; i++) {
> + if (current_thread->is_started) {
> + ret = AVERROR(pthread_join(current_thread->thread, NULL));
> + if (ret < 0)
> + cleanupResult = ret;
> + }
> + current_thread++;
> + }
> +
> + // finalize streams and collect results
> + current_data = init_data;
> + for (i = 0; i < nstreams; i++) {
> + if (current_data->result < 0) {
> + // thread ran into error: collect result
> + runResult = current_data->result;
> + }
> + else {
> + // thread success: create streams on AVFormatContext
> + ret = end_open_demux_for_component(s, current_data->pls);
> + if (ret < 0)
> + runResult = ret;
> + }
> + current_data++;
> + }
> +
> + // cleanup mutex and conditions
> + ret = AVERROR(pthread_mutex_destroy(&work_pool_mutex));
> + if (ret < 0)
> + cleanupResult = ret;
> +
> + if (c->is_init_section_common_video) {
> + ret = AVERROR(pthread_mutex_destroy(&common_video_mutex));
> + if (ret < 0)
> + cleanupResult = ret;
> +
> + ret = AVERROR(pthread_cond_destroy(&common_video_cond));
> + if (ret < 0)
> + cleanupResult = ret;
> + }
> +
> + if (c->is_init_section_common_audio) {
> + ret = AVERROR(pthread_mutex_destroy(&common_audio_mutex));
> + if (ret < 0)
> + cleanupResult = ret;
> +
> + ret = AVERROR(pthread_cond_destroy(&common_audio_cond));
> + if (ret < 0)
> + cleanupResult = ret;
> + }
> +
> + if (c->is_init_section_common_subtitle) {
> + ret = AVERROR(pthread_mutex_destroy(&common_subtitle_mutex));
> + if (ret < 0)
> + cleanupResult = ret;
> +
> + ret = AVERROR(pthread_cond_destroy(&common_subtitle_cond));
> + if (ret < 0)
> + cleanupResult = ret;
> + }
> +
> + // return results if errors have occured in one of the phases
> + if (initResult < 0)
> + return initResult;
> +
> + if (runResult < 0)
> + return runResult;
> +
> + if (cleanupResult < 0)
> + return cleanupResult;
> +
> +#endif
> }
> + else
> + {
> + /* Open the demuxer for video and audio components if available */
> + for (i = 0; i < c->n_videos; i++) {
> + rep = c->videos[i];
> + if (i > 0 && c->is_init_section_common_video) {
> + ret = copy_init_section(rep, c->videos[0]);
> + if (ret < 0)
> + return ret;
> + }
> + ret = open_demux_for_component(s, rep);
>
> - if (!stream_index)
> - return AVERROR_INVALIDDATA;
> + if (ret)
> + return ret;
> + rep->stream_index = stream_index;
> + ++stream_index;
> + }
> +
> + for (i = 0; i < c->n_audios; i++) {
> + rep = c->audios[i];
> + if (i > 0 && c->is_init_section_common_audio) {
> + ret = copy_init_section(rep, c->audios[0]);
> + if (ret < 0)
> + return ret;
> + }
> + ret = open_demux_for_component(s, rep);
> +
> + if (ret)
> + return ret;
> + rep->stream_index = stream_index;
> + ++stream_index;
> + }
> +
> + for (i = 0; i < c->n_subtitles; i++) {
> + rep = c->subtitles[i];
> + if (i > 0 && c->is_init_section_common_subtitle) {
> + ret = copy_init_section(rep, c->subtitles[0]);
> + if (ret < 0)
> + return ret;
> + }
> + ret = open_demux_for_component(s, rep);
> +
> + if (ret)
> + return ret;
> + rep->stream_index = stream_index;
> + ++stream_index;
> + }
> +
> + if (!stream_index)
> + return AVERROR_INVALIDDATA;
> + }
>
> /* Create a program */
> program = av_new_program(s, 0);
> @@ -2349,6 +2688,7 @@ static const AVOption dash_options[] = {
> {.str = "aac,m4a,m4s,m4v,mov,mp4,webm,ts"},
> INT_MIN, INT_MAX, FLAGS},
> { "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key), AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS },
> + { "init_threads", "Number of threads to use for initializing the DASH stream", OFFSET(init_threads), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 64, FLAGS },
> {NULL}
> };
>
> --
> 2.31.1.windows.1
>
> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel@ffmpeg.org
> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
>
> To unsubscribe, visit link above, or email
> ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
To unsubscribe, visit link above, or email
ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
^ permalink raw reply [flat|nested] 26+ messages in thread
* Re: [FFmpeg-devel] [PATCH v2] lavf/dashdec: Multithreaded DASH initialization
2022-08-23 3:19 ` Steven Liu
@ 2022-08-23 19:09 ` Lukas Fellechner
0 siblings, 0 replies; 26+ messages in thread
From: Lukas Fellechner @ 2022-08-23 19:09 UTC (permalink / raw)
To: ffmpeg-devel
Gesendet: Dienstag, 23. August 2022 um 05:19 Uhr
Von: "Steven Liu" <lingjiujianke@gmail.com>
An: "FFmpeg development discussions and patches" <ffmpeg-devel@ffmpeg.org>
Betreff: Re: [FFmpeg-devel] [PATCH v2] lavf/dashdec: Multithreaded DASH initialization
Lukas Fellechner <lukas.fellechner@gmx.net> 于2022年8月22日周一 03:27写道:
>
> I look at the new functions likes begin_open_demux_for_component and
> end_open_demux_for_component maybe can separate patch.
> maybe you can submit two patch, one is make code clarify, the other
> one support multithreads,
Good idea. I split up the patch into three parts actually. Git seems to
be pretty bad in handling indentation changes, which made the first patch
look awful, although there were only very small changes.
So I pulled out the indentation change into a separate patch.
Not sure if that is a good idea, but it makes the other two patches
much more readable.
_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
To unsubscribe, visit link above, or email
ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
^ permalink raw reply [flat|nested] 26+ messages in thread
* [FFmpeg-devel] [PATCH v3 0/3] lavf/dashdec: Multithreaded DASH initialization
2022-08-20 21:35 [FFmpeg-devel] [PATCH 1/1] lavf/dashdec: Multithreaded DASH initialization Lukas Fellechner
2022-08-20 21:53 ` Lukas Fellechner
2022-08-21 19:26 ` [FFmpeg-devel] [PATCH v2] " Lukas Fellechner
@ 2022-08-23 19:03 ` Lukas Fellechner
2022-08-23 19:03 ` [FFmpeg-devel] [PATCH v3 1/3] lavf/dashdec: Prepare DASH decoder for multithreading Lukas Fellechner
` (2 more replies)
2022-09-05 21:16 ` [FFmpeg-devel] [PATCH v4 0/4] lavf/dashdec: Multithreaded DASH initialization Lukas Fellechner
3 siblings, 3 replies; 26+ messages in thread
From: Lukas Fellechner @ 2022-08-23 19:03 UTC (permalink / raw)
To: ffmpeg-devel
Initializing DASH streams is currently slow, because each individual
stream is opened and probed sequentially. With DASH streams often
having somewhere between 10-20 streams, this can easily take up to
half a minute on slow connections.
This patch adds an "init-threads" option, specifying the max number
of threads to use. Multiple worker threads are spun up to massively
bring down init times.
In-Reply-To: <trinity-36a68f08-f239-4450-b893-af6bfa783181-1661031307501@3c-app-gmx-bs35>
_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
To unsubscribe, visit link above, or email
ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
^ permalink raw reply [flat|nested] 26+ messages in thread
* [FFmpeg-devel] [PATCH v3 1/3] lavf/dashdec: Prepare DASH decoder for multithreading
2022-08-23 19:03 ` [FFmpeg-devel] [PATCH v3 0/3] " Lukas Fellechner
@ 2022-08-23 19:03 ` Lukas Fellechner
2022-08-31 2:09 ` Steven Liu
2022-08-23 19:03 ` [FFmpeg-devel] [PATCH v3 2/3] lavf/dashdec: Multithreaded DASH initialization Lukas Fellechner
2022-08-23 19:03 ` [FFmpeg-devel] [PATCH v3 3/3] lavf/dashdec: Fix indentation after multithreading Lukas Fellechner
2 siblings, 1 reply; 26+ messages in thread
From: Lukas Fellechner @ 2022-08-23 19:03 UTC (permalink / raw)
To: ffmpeg-devel; +Cc: Lukas Fellechner
For adding multithreading to the DASH decoder initialization,
the open_demux_for_component() method must be split up into two parts:
begin_open_demux_for_component(): Opens the stream and does probing
and format detection. This can be run in parallel.
end_open_demux_for_component(): Creates the AVStreams and adds
them to the common parent AVFormatContext. This method must always be
run synchronously, after all threads are finished.
---
libavformat/dashdec.c | 42 ++++++++++++++++++++++++++++++------------
1 file changed, 30 insertions(+), 12 deletions(-)
diff --git a/libavformat/dashdec.c b/libavformat/dashdec.c
index 63bf7e96a5..e82da45e43 100644
--- a/libavformat/dashdec.c
+++ b/libavformat/dashdec.c
@@ -1918,10 +1918,9 @@ fail:
return ret;
}
-static int open_demux_for_component(AVFormatContext *s, struct representation *pls)
+static int begin_open_demux_for_component(AVFormatContext *s, struct representation *pls)
{
int ret = 0;
- int i;
pls->parent = s;
pls->cur_seq_no = calc_cur_seg_no(s, pls);
@@ -1931,9 +1930,15 @@ static int open_demux_for_component(AVFormatContext *s, struct representation *p
}
ret = reopen_demux_for_component(s, pls);
- if (ret < 0) {
- goto fail;
- }
+
+ return ret;
+}
+
+static int end_open_demux_for_component(AVFormatContext *s, struct representation *pls)
+{
+ int ret = 0;
+ int i;
+
for (i = 0; i < pls->ctx->nb_streams; i++) {
AVStream *st = avformat_new_stream(s, NULL);
AVStream *ist = pls->ctx->streams[i];
@@ -1965,6 +1970,19 @@ fail:
return ret;
}
+static int open_demux_for_component(AVFormatContext* s, struct representation* pls)
+{
+ int ret = 0;
+
+ ret = begin_open_demux_for_component(s, pls);
+ if (ret < 0)
+ return ret;
+
+ ret = end_open_demux_for_component(s, pls);
+
+ return ret;
+}
+
static int is_common_init_section_exist(struct representation **pls, int n_pls)
{
struct fragment *first_init_section = pls[0]->init_section;
@@ -2040,9 +2058,15 @@ static int dash_read_header(AVFormatContext *s)
av_dict_set(&c->avio_opts, "seekable", "0", 0);
}
- if(c->n_videos)
+ if (c->n_videos)
c->is_init_section_common_video = is_common_init_section_exist(c->videos, c->n_videos);
+ if (c->n_audios)
+ c->is_init_section_common_audio = is_common_init_section_exist(c->audios, c->n_audios);
+
+ if (c->n_subtitles)
+ c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles);
+
/* Open the demuxer for video and audio components if available */
for (i = 0; i < c->n_videos; i++) {
rep = c->videos[i];
@@ -2059,9 +2083,6 @@ static int dash_read_header(AVFormatContext *s)
++stream_index;
}
- if(c->n_audios)
- c->is_init_section_common_audio = is_common_init_section_exist(c->audios, c->n_audios);
-
for (i = 0; i < c->n_audios; i++) {
rep = c->audios[i];
if (i > 0 && c->is_init_section_common_audio) {
@@ -2077,9 +2098,6 @@ static int dash_read_header(AVFormatContext *s)
++stream_index;
}
- if (c->n_subtitles)
- c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles);
-
for (i = 0; i < c->n_subtitles; i++) {
rep = c->subtitles[i];
if (i > 0 && c->is_init_section_common_subtitle) {
--
2.31.1.windows.1
_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
To unsubscribe, visit link above, or email
ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
^ permalink raw reply [flat|nested] 26+ messages in thread
* Re: [FFmpeg-devel] [PATCH v3 1/3] lavf/dashdec: Prepare DASH decoder for multithreading
2022-08-23 19:03 ` [FFmpeg-devel] [PATCH v3 1/3] lavf/dashdec: Prepare DASH decoder for multithreading Lukas Fellechner
@ 2022-08-31 2:09 ` Steven Liu
0 siblings, 0 replies; 26+ messages in thread
From: Steven Liu @ 2022-08-31 2:09 UTC (permalink / raw)
To: FFmpeg development discussions and patches; +Cc: Lukas Fellechner
Lukas Fellechner <lukas.fellechner@gmx.net> 于2022年8月24日周三 03:04写道:
>
> For adding multithreading to the DASH decoder initialization,
> the open_demux_for_component() method must be split up into two parts:
>
> begin_open_demux_for_component(): Opens the stream and does probing
> and format detection. This can be run in parallel.
>
> end_open_demux_for_component(): Creates the AVStreams and adds
> them to the common parent AVFormatContext. This method must always be
> run synchronously, after all threads are finished.
> ---
> libavformat/dashdec.c | 42 ++++++++++++++++++++++++++++++------------
> 1 file changed, 30 insertions(+), 12 deletions(-)
>
> diff --git a/libavformat/dashdec.c b/libavformat/dashdec.c
> index 63bf7e96a5..e82da45e43 100644
> --- a/libavformat/dashdec.c
> +++ b/libavformat/dashdec.c
> @@ -1918,10 +1918,9 @@ fail:
> return ret;
> }
>
> -static int open_demux_for_component(AVFormatContext *s, struct representation *pls)
> +static int begin_open_demux_for_component(AVFormatContext *s, struct representation *pls)
> {
> int ret = 0;
> - int i;
>
> pls->parent = s;
> pls->cur_seq_no = calc_cur_seg_no(s, pls);
> @@ -1931,9 +1930,15 @@ static int open_demux_for_component(AVFormatContext *s, struct representation *p
> }
>
> ret = reopen_demux_for_component(s, pls);
> - if (ret < 0) {
> - goto fail;
> - }
> +
> + return ret;
> +}
> +
> +static int end_open_demux_for_component(AVFormatContext *s, struct representation *pls)
> +{
> + int ret = 0;
> + int i;
> +
> for (i = 0; i < pls->ctx->nb_streams; i++) {
> AVStream *st = avformat_new_stream(s, NULL);
> AVStream *ist = pls->ctx->streams[i];
> @@ -1965,6 +1970,19 @@ fail:
> return ret;
> }
>
> +static int open_demux_for_component(AVFormatContext* s, struct representation* pls)
> +{
> + int ret = 0;
> +
> + ret = begin_open_demux_for_component(s, pls);
> + if (ret < 0)
> + return ret;
> +
> + ret = end_open_demux_for_component(s, pls);
> +
> + return ret;
> +}
> +
> static int is_common_init_section_exist(struct representation **pls, int n_pls)
> {
> struct fragment *first_init_section = pls[0]->init_section;
> @@ -2040,9 +2058,15 @@ static int dash_read_header(AVFormatContext *s)
> av_dict_set(&c->avio_opts, "seekable", "0", 0);
> }
>
> - if(c->n_videos)
> + if (c->n_videos)
> c->is_init_section_common_video = is_common_init_section_exist(c->videos, c->n_videos);
>
> + if (c->n_audios)
> + c->is_init_section_common_audio = is_common_init_section_exist(c->audios, c->n_audios);
> +
> + if (c->n_subtitles)
> + c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles);
> +
> /* Open the demuxer for video and audio components if available */
> for (i = 0; i < c->n_videos; i++) {
> rep = c->videos[i];
> @@ -2059,9 +2083,6 @@ static int dash_read_header(AVFormatContext *s)
> ++stream_index;
> }
>
> - if(c->n_audios)
> - c->is_init_section_common_audio = is_common_init_section_exist(c->audios, c->n_audios);
> -
> for (i = 0; i < c->n_audios; i++) {
> rep = c->audios[i];
> if (i > 0 && c->is_init_section_common_audio) {
> @@ -2077,9 +2098,6 @@ static int dash_read_header(AVFormatContext *s)
> ++stream_index;
> }
>
> - if (c->n_subtitles)
> - c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles);
> -
> for (i = 0; i < c->n_subtitles; i++) {
> rep = c->subtitles[i];
> if (i > 0 && c->is_init_section_common_subtitle) {
> --
> 2.31.1.windows.1
>
> _______________________________________________
> ffmpeg-devel mailing list
> ffmpeg-devel@ffmpeg.org
> https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
>
> To unsubscribe, visit link above, or email
> ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
Patchset looks ok and test passed here. Any comments?
Thanks
Steven
_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
To unsubscribe, visit link above, or email
ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
^ permalink raw reply [flat|nested] 26+ messages in thread
* [FFmpeg-devel] [PATCH v3 2/3] lavf/dashdec: Multithreaded DASH initialization
2022-08-23 19:03 ` [FFmpeg-devel] [PATCH v3 0/3] " Lukas Fellechner
2022-08-23 19:03 ` [FFmpeg-devel] [PATCH v3 1/3] lavf/dashdec: Prepare DASH decoder for multithreading Lukas Fellechner
@ 2022-08-23 19:03 ` Lukas Fellechner
2022-08-31 2:54 ` Andreas Rheinhardt
2022-08-23 19:03 ` [FFmpeg-devel] [PATCH v3 3/3] lavf/dashdec: Fix indentation after multithreading Lukas Fellechner
2 siblings, 1 reply; 26+ messages in thread
From: Lukas Fellechner @ 2022-08-23 19:03 UTC (permalink / raw)
To: ffmpeg-devel; +Cc: Lukas Fellechner
This patch adds an "init-threads" option, specifying the max
number of threads to use. Multiple worker threads are spun up
to massively bring down init times.
---
libavformat/dashdec.c | 351 +++++++++++++++++++++++++++++++++++++++++-
1 file changed, 350 insertions(+), 1 deletion(-)
diff --git a/libavformat/dashdec.c b/libavformat/dashdec.c
index e82da45e43..20f2557ea3 100644
--- a/libavformat/dashdec.c
+++ b/libavformat/dashdec.c
@@ -24,6 +24,7 @@
#include "libavutil/opt.h"
#include "libavutil/time.h"
#include "libavutil/parseutils.h"
+#include "libavutil/thread.h"
#include "internal.h"
#include "avio_internal.h"
#include "dash.h"
@@ -152,6 +153,8 @@ typedef struct DASHContext {
int max_url_size;
char *cenc_decryption_key;
+ int init_threads;
+
/* Flags for init section*/
int is_init_section_common_video;
int is_init_section_common_audio;
@@ -2033,6 +2036,331 @@ static void move_metadata(AVStream *st, const char *key, char **value)
}
}
+#if HAVE_THREADS
+
+struct work_pool_data
+{
+ AVFormatContext *ctx;
+ struct representation *pls;
+ struct representation *common_pls;
+ pthread_mutex_t *common_mutex;
+ pthread_cond_t *common_condition;
+ int is_common;
+ int is_started;
+ int result;
+};
+
+struct thread_data
+{
+ pthread_t thread;
+ pthread_mutex_t *mutex;
+ struct work_pool_data *work_pool;
+ int work_pool_size;
+ int is_started;
+ int has_error;
+};
+
+static void *worker_thread(void *ptr)
+{
+ int ret = 0;
+ int i;
+ struct thread_data *thread_data = (struct thread_data*)ptr;
+ struct work_pool_data *work_pool = NULL;
+ struct work_pool_data *data = NULL;
+ for (;;) {
+
+ // get next work item unless there was an error
+ pthread_mutex_lock(thread_data->mutex);
+ data = NULL;
+ if (!thread_data->has_error) {
+ work_pool = thread_data->work_pool;
+ for (i = 0; i < thread_data->work_pool_size; i++) {
+ if (!work_pool->is_started) {
+ data = work_pool;
+ data->is_started = 1;
+ break;
+ }
+ work_pool++;
+ }
+ }
+ pthread_mutex_unlock(thread_data->mutex);
+
+ if (!data) {
+ // no more work to do
+ return NULL;
+ }
+
+ // if we are common section provider, init and signal
+ if (data->is_common) {
+ data->pls->parent = data->ctx;
+ ret = update_init_section(data->pls);
+ if (ret < 0) {
+ pthread_cond_signal(data->common_condition);
+ goto end;
+ }
+ else
+ ret = AVERROR(pthread_cond_signal(data->common_condition));
+ }
+
+ // if we depend on common section provider, wait for signal and copy
+ if (data->common_pls) {
+ ret = AVERROR(pthread_cond_wait(data->common_condition, data->common_mutex));
+ if (ret < 0)
+ goto end;
+
+ if (!data->common_pls->init_sec_buf) {
+ goto end;
+ ret = AVERROR(EFAULT);
+ }
+
+ ret = copy_init_section(data->pls, data->common_pls);
+ if (ret < 0)
+ goto end;
+ }
+
+ ret = begin_open_demux_for_component(data->ctx, data->pls);
+ if (ret < 0)
+ goto end;
+
+ end:
+ data->result = ret;
+
+ // notify error to other threads and exit
+ if (ret < 0) {
+ pthread_mutex_lock(thread_data->mutex);
+ thread_data->has_error = 1;
+ pthread_mutex_unlock(thread_data->mutex);
+ return NULL;
+ }
+ }
+
+
+ return NULL;
+}
+
+static void create_work_pool_data(AVFormatContext *ctx, int stream_index,
+ struct representation *pls, struct representation *common_pls,
+ struct work_pool_data *init_data, pthread_mutex_t *common_mutex,
+ pthread_cond_t *common_condition)
+{
+ init_data->ctx = ctx;
+ init_data->pls = pls;
+ init_data->pls->stream_index = stream_index;
+ init_data->common_condition = common_condition;
+ init_data->common_mutex = common_mutex;
+ init_data->result = -1;
+
+ if (pls == common_pls) {
+ init_data->is_common = 1;
+ }
+ else if (common_pls) {
+ init_data->common_pls = common_pls;
+ }
+}
+
+static int start_thread(struct thread_data *thread_data,
+ struct work_pool_data *work_pool, int work_pool_size, pthread_mutex_t *mutex)
+{
+ int ret;
+
+ thread_data->mutex = mutex;
+ thread_data->work_pool = work_pool;
+ thread_data->work_pool_size = work_pool_size;
+
+ ret = AVERROR(pthread_create(&thread_data->thread, NULL, worker_thread, (void*)thread_data));
+ if (ret == 0)
+ thread_data->is_started = 1;
+
+ return ret;
+}
+
+static int init_streams_multithreaded(AVFormatContext *s, int nstreams, int threads)
+{
+ DASHContext *c = s->priv_data;
+ int ret = 0;
+ int stream_index = 0;
+ int i;
+
+ // alloc data
+ struct work_pool_data *init_data = (struct work_pool_data*)av_mallocz(sizeof(struct work_pool_data) * nstreams);
+ if (!init_data)
+ return AVERROR(ENOMEM);
+
+ struct thread_data *thread_data = (struct thread_data*)av_mallocz(sizeof(struct thread_data) * threads);
+ if (!thread_data)
+ return AVERROR(ENOMEM);
+
+ // alloc mutex and conditions
+ pthread_mutex_t work_pool_mutex;
+
+ pthread_mutex_t common_video_mutex;
+ pthread_cond_t common_video_cond;
+
+ pthread_mutex_t common_audio_mutex;
+ pthread_cond_t common_audio_cond;
+
+ pthread_mutex_t common_subtitle_mutex;
+ pthread_cond_t common_subtitle_cond;
+
+ // init mutex and conditions
+ ret = AVERROR(pthread_mutex_init(&work_pool_mutex, NULL));
+ if (ret < 0)
+ goto cleanup;
+
+ if (c->is_init_section_common_video) {
+ ret = AVERROR(pthread_mutex_init(&common_video_mutex, NULL));
+ if (ret < 0)
+ goto cleanup;
+
+ ret = AVERROR(pthread_cond_init(&common_video_cond, NULL));
+ if (ret < 0)
+ goto cleanup;
+ }
+
+ if (c->is_init_section_common_audio) {
+ ret = AVERROR(pthread_mutex_init(&common_audio_mutex, NULL));
+ if (ret < 0)
+ goto cleanup;
+
+ ret = AVERROR(pthread_cond_init(&common_audio_cond, NULL));
+ if (ret < 0)
+ goto cleanup;
+ }
+
+ if (c->is_init_section_common_subtitle) {
+ ret = AVERROR(pthread_mutex_init(&common_subtitle_mutex, NULL));
+ if (ret < 0)
+ goto cleanup;
+
+ ret = AVERROR(pthread_cond_init(&common_subtitle_cond, NULL));
+ if (ret < 0)
+ goto cleanup;
+ }
+
+ // init work pool data
+ struct work_pool_data* current_data = init_data;
+
+ for (i = 0; i < c->n_videos; i++) {
+ create_work_pool_data(s, stream_index, c->videos[i],
+ c->is_init_section_common_video ? c->videos[0] : NULL,
+ current_data, &common_video_mutex, &common_video_cond);
+
+ stream_index++;
+ current_data++;
+ }
+
+ for (i = 0; i < c->n_audios; i++) {
+ create_work_pool_data(s, stream_index, c->audios[i],
+ c->is_init_section_common_audio ? c->audios[0] : NULL,
+ current_data, &common_audio_mutex, &common_audio_cond);
+
+ stream_index++;
+ current_data++;
+ }
+
+ for (i = 0; i < c->n_subtitles; i++) {
+ create_work_pool_data(s, stream_index, c->subtitles[i],
+ c->is_init_section_common_subtitle ? c->subtitles[0] : NULL,
+ current_data, &common_subtitle_mutex, &common_subtitle_cond);
+
+ stream_index++;
+ current_data++;
+ }
+
+ // start threads
+ struct thread_data *current_thread = thread_data;
+ for (i = 0; i < threads; i++) {
+ ret = start_thread(current_thread, init_data, nstreams, &work_pool_mutex);
+ if (ret < 0)
+ goto cleanup;
+
+ current_thread++;
+ }
+
+cleanup:
+ // we need to cleanup even in case of errors, so we need to store results of init, run and cleanup
+ int initResult = ret;
+ int runResult = 0;
+ int cleanupResult = 0;
+
+ // join threads
+ current_thread = thread_data;
+ for (i = 0; i < threads; i++) {
+ if (current_thread->is_started) {
+ ret = AVERROR(pthread_join(current_thread->thread, NULL));
+ if (ret < 0)
+ cleanupResult = ret;
+ }
+ current_thread++;
+ }
+
+ // finalize streams and collect results
+ current_data = init_data;
+ for (i = 0; i < nstreams; i++) {
+ if (current_data->result < 0) {
+ // thread ran into error: collect result and break
+ runResult = current_data->result;
+ break;
+ }
+ else {
+ // thread success: create streams on AVFormatContext
+ ret = end_open_demux_for_component(s, current_data->pls);
+ if (ret < 0)
+ runResult = ret;
+ }
+ current_data++;
+ }
+
+ // cleanup mutex and conditions
+ ret = AVERROR(pthread_mutex_destroy(&work_pool_mutex));
+ if (ret < 0)
+ cleanupResult = ret;
+
+ if (c->is_init_section_common_video) {
+ ret = AVERROR(pthread_mutex_destroy(&common_video_mutex));
+ if (ret < 0)
+ cleanupResult = ret;
+
+ ret = AVERROR(pthread_cond_destroy(&common_video_cond));
+ if (ret < 0)
+ cleanupResult = ret;
+ }
+
+ if (c->is_init_section_common_audio) {
+ ret = AVERROR(pthread_mutex_destroy(&common_audio_mutex));
+ if (ret < 0)
+ cleanupResult = ret;
+
+ ret = AVERROR(pthread_cond_destroy(&common_audio_cond));
+ if (ret < 0)
+ cleanupResult = ret;
+ }
+
+ if (c->is_init_section_common_subtitle) {
+ ret = AVERROR(pthread_mutex_destroy(&common_subtitle_mutex));
+ if (ret < 0)
+ cleanupResult = ret;
+
+ ret = AVERROR(pthread_cond_destroy(&common_subtitle_cond));
+ if (ret < 0)
+ cleanupResult = ret;
+ }
+
+ // return results if errors have occured in one of the phases
+ if (initResult < 0)
+ return initResult;
+
+ if (runResult < 0)
+ return runResult;
+
+ if (cleanupResult < 0)
+ return cleanupResult;
+
+ return 0;
+}
+
+#endif
+
static int dash_read_header(AVFormatContext *s)
{
DASHContext *c = s->priv_data;
@@ -2067,6 +2395,23 @@ static int dash_read_header(AVFormatContext *s)
if (c->n_subtitles)
c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles);
+ int threads = 0;
+ int nstreams = c->n_videos + c->n_audios + c->n_subtitles;
+
+#if HAVE_THREADS
+ threads = FFMIN(nstreams, c->init_threads);
+#endif
+
+ if (threads > 1)
+ {
+#if HAVE_THREADS
+ ret = init_streams_multithreaded(s, nstreams, threads);
+ if (ret < 0)
+ return ret;
+#endif
+ }
+ else
+ {
/* Open the demuxer for video and audio components if available */
for (i = 0; i < c->n_videos; i++) {
rep = c->videos[i];
@@ -2115,6 +2460,7 @@ static int dash_read_header(AVFormatContext *s)
if (!stream_index)
return AVERROR_INVALIDDATA;
+ }
/* Create a program */
program = av_new_program(s, 0);
@@ -2366,7 +2712,10 @@ static const AVOption dash_options[] = {
OFFSET(allowed_extensions), AV_OPT_TYPE_STRING,
{.str = "aac,m4a,m4s,m4v,mov,mp4,webm,ts"},
INT_MIN, INT_MAX, FLAGS},
- { "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key), AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS },
+ { "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key),
+ AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS },
+ { "init_threads", "Number of threads to use for initializing the DASH stream",
+ OFFSET(init_threads), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 64, FLAGS },
{NULL}
};
--
2.31.1.windows.1
_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
To unsubscribe, visit link above, or email
ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
^ permalink raw reply [flat|nested] 26+ messages in thread
* Re: [FFmpeg-devel] [PATCH v3 2/3] lavf/dashdec: Multithreaded DASH initialization
2022-08-23 19:03 ` [FFmpeg-devel] [PATCH v3 2/3] lavf/dashdec: Multithreaded DASH initialization Lukas Fellechner
@ 2022-08-31 2:54 ` Andreas Rheinhardt
2022-08-31 7:25 ` Steven Liu
2022-09-04 21:29 ` Lukas Fellechner
0 siblings, 2 replies; 26+ messages in thread
From: Andreas Rheinhardt @ 2022-08-31 2:54 UTC (permalink / raw)
To: ffmpeg-devel
Lukas Fellechner:
> This patch adds an "init-threads" option, specifying the max
> number of threads to use. Multiple worker threads are spun up
> to massively bring down init times.
> ---
> libavformat/dashdec.c | 351 +++++++++++++++++++++++++++++++++++++++++-
> 1 file changed, 350 insertions(+), 1 deletion(-)
>
> diff --git a/libavformat/dashdec.c b/libavformat/dashdec.c
> index e82da45e43..20f2557ea3 100644
> --- a/libavformat/dashdec.c
> +++ b/libavformat/dashdec.c
> @@ -24,6 +24,7 @@
> #include "libavutil/opt.h"
> #include "libavutil/time.h"
> #include "libavutil/parseutils.h"
> +#include "libavutil/thread.h"
> #include "internal.h"
> #include "avio_internal.h"
> #include "dash.h"
> @@ -152,6 +153,8 @@ typedef struct DASHContext {
> int max_url_size;
> char *cenc_decryption_key;
>
> + int init_threads;
> +
> /* Flags for init section*/
> int is_init_section_common_video;
> int is_init_section_common_audio;
> @@ -2033,6 +2036,331 @@ static void move_metadata(AVStream *st, const char *key, char **value)
> }
> }
>
> +#if HAVE_THREADS
> +
> +struct work_pool_data
> +{
> + AVFormatContext *ctx;
> + struct representation *pls;
> + struct representation *common_pls;
> + pthread_mutex_t *common_mutex;
> + pthread_cond_t *common_condition;
> + int is_common;
> + int is_started;
> + int result;
> +};
> +
> +struct thread_data
This is against our naming conventions: CamelCase for struct tags and
typedefs, lowercase names with underscore for variable names.
> +{
> + pthread_t thread;
> + pthread_mutex_t *mutex;
> + struct work_pool_data *work_pool;
> + int work_pool_size;
> + int is_started;
> + int has_error;
> +};
> +
> +static void *worker_thread(void *ptr)
> +{
> + int ret = 0;
> + int i;
> + struct thread_data *thread_data = (struct thread_data*)ptr;
> + struct work_pool_data *work_pool = NULL;
> + struct work_pool_data *data = NULL;
> + for (;;) {
> +
> + // get next work item unless there was an error
> + pthread_mutex_lock(thread_data->mutex);
> + data = NULL;
> + if (!thread_data->has_error) {
> + work_pool = thread_data->work_pool;
> + for (i = 0; i < thread_data->work_pool_size; i++) {
> + if (!work_pool->is_started) {
> + data = work_pool;
> + data->is_started = 1;
> + break;
> + }
> + work_pool++;
> + }
> + }
> + pthread_mutex_unlock(thread_data->mutex);
> +
> + if (!data) {
> + // no more work to do
> + return NULL;
> + }
> +
> + // if we are common section provider, init and signal
> + if (data->is_common) {
> + data->pls->parent = data->ctx;
> + ret = update_init_section(data->pls);
> + if (ret < 0) {
> + pthread_cond_signal(data->common_condition);
> + goto end;
> + }
> + else
> + ret = AVERROR(pthread_cond_signal(data->common_condition));
> + }
> +
> + // if we depend on common section provider, wait for signal and copy
> + if (data->common_pls) {
> + ret = AVERROR(pthread_cond_wait(data->common_condition, data->common_mutex));
> + if (ret < 0)
> + goto end;
> +
> + if (!data->common_pls->init_sec_buf) {
> + goto end;
> + ret = AVERROR(EFAULT);
> + }
> +
> + ret = copy_init_section(data->pls, data->common_pls);
> + if (ret < 0)
> + goto end;
> + }
> +
> + ret = begin_open_demux_for_component(data->ctx, data->pls);
> + if (ret < 0)
> + goto end;
> +
> + end:
> + data->result = ret;
> +
> + // notify error to other threads and exit
> + if (ret < 0) {
> + pthread_mutex_lock(thread_data->mutex);
> + thread_data->has_error = 1;
> + pthread_mutex_unlock(thread_data->mutex);
> + return NULL;
> + }
> + }
> +
> +
> + return NULL;
> +}
> +
> +static void create_work_pool_data(AVFormatContext *ctx, int stream_index,
> + struct representation *pls, struct representation *common_pls,
> + struct work_pool_data *init_data, pthread_mutex_t *common_mutex,
> + pthread_cond_t *common_condition)
> +{
> + init_data->ctx = ctx;
> + init_data->pls = pls;
> + init_data->pls->stream_index = stream_index;
> + init_data->common_condition = common_condition;
> + init_data->common_mutex = common_mutex;
> + init_data->result = -1;
> +
> + if (pls == common_pls) {
> + init_data->is_common = 1;
> + }
> + else if (common_pls) {
> + init_data->common_pls = common_pls;
> + }
> +}
> +
> +static int start_thread(struct thread_data *thread_data,
> + struct work_pool_data *work_pool, int work_pool_size, pthread_mutex_t *mutex)
> +{
> + int ret;
> +
> + thread_data->mutex = mutex;
> + thread_data->work_pool = work_pool;
> + thread_data->work_pool_size = work_pool_size;
> +
> + ret = AVERROR(pthread_create(&thread_data->thread, NULL, worker_thread, (void*)thread_data));
> + if (ret == 0)
> + thread_data->is_started = 1;
> +
> + return ret;
> +}
> +
> +static int init_streams_multithreaded(AVFormatContext *s, int nstreams, int threads)
> +{
> + DASHContext *c = s->priv_data;
> + int ret = 0;
> + int stream_index = 0;
> + int i;
We allow "for (int i = 0;"
> +
> + // alloc data
> + struct work_pool_data *init_data = (struct work_pool_data*)av_mallocz(sizeof(struct work_pool_data) * nstreams);
> + if (!init_data)
> + return AVERROR(ENOMEM);
> +
> + struct thread_data *thread_data = (struct thread_data*)av_mallocz(sizeof(struct thread_data) * threads);
> + if (!thread_data)
> + return AVERROR(ENOMEM);
1. init_data leaks here on error.
2. In fact, it seems to me that both init_data and thread_data are
nowhere freed.
> +
> + // alloc mutex and conditions
> + pthread_mutex_t work_pool_mutex;
> +
> + pthread_mutex_t common_video_mutex;
> + pthread_cond_t common_video_cond;
> +
> + pthread_mutex_t common_audio_mutex;
> + pthread_cond_t common_audio_cond;
> +
> + pthread_mutex_t common_subtitle_mutex;
> + pthread_cond_t common_subtitle_cond;
> +
> + // init mutex and conditions
> + ret = AVERROR(pthread_mutex_init(&work_pool_mutex, NULL));
> + if (ret < 0)
> + goto cleanup;
> +
> + if (c->is_init_section_common_video) {
> + ret = AVERROR(pthread_mutex_init(&common_video_mutex, NULL));
> + if (ret < 0)
> + goto cleanup;
> +
> + ret = AVERROR(pthread_cond_init(&common_video_cond, NULL));
> + if (ret < 0)
> + goto cleanup;
> + }
> +
> + if (c->is_init_section_common_audio) {
> + ret = AVERROR(pthread_mutex_init(&common_audio_mutex, NULL));
> + if (ret < 0)
> + goto cleanup;
> +
> + ret = AVERROR(pthread_cond_init(&common_audio_cond, NULL));
> + if (ret < 0)
> + goto cleanup;
> + }
> +
> + if (c->is_init_section_common_subtitle) {
> + ret = AVERROR(pthread_mutex_init(&common_subtitle_mutex, NULL));
> + if (ret < 0)
> + goto cleanup;
> +
> + ret = AVERROR(pthread_cond_init(&common_subtitle_cond, NULL));
> + if (ret < 0)
> + goto cleanup;
> + }
> +
> + // init work pool data
> + struct work_pool_data* current_data = init_data;
> +
> + for (i = 0; i < c->n_videos; i++) {
> + create_work_pool_data(s, stream_index, c->videos[i],
> + c->is_init_section_common_video ? c->videos[0] : NULL,
> + current_data, &common_video_mutex, &common_video_cond);
> +
> + stream_index++;
> + current_data++;
> + }
> +
> + for (i = 0; i < c->n_audios; i++) {
> + create_work_pool_data(s, stream_index, c->audios[i],
> + c->is_init_section_common_audio ? c->audios[0] : NULL,
> + current_data, &common_audio_mutex, &common_audio_cond);
> +
> + stream_index++;
> + current_data++;
> + }
> +
> + for (i = 0; i < c->n_subtitles; i++) {
> + create_work_pool_data(s, stream_index, c->subtitles[i],
> + c->is_init_section_common_subtitle ? c->subtitles[0] : NULL,
> + current_data, &common_subtitle_mutex, &common_subtitle_cond);
> +
> + stream_index++;
> + current_data++;
> + }
This is very repetitive.
> +
> + // start threads
> + struct thread_data *current_thread = thread_data;
> + for (i = 0; i < threads; i++) {
> + ret = start_thread(current_thread, init_data, nstreams, &work_pool_mutex);
> + if (ret < 0)
> + goto cleanup;
> +
> + current_thread++;
> + }
> +
> +cleanup:
> + // we need to cleanup even in case of errors, so we need to store results of init, run and cleanup
> + int initResult = ret;
> + int runResult = 0;
> + int cleanupResult = 0;
> +
> + // join threads
> + current_thread = thread_data;
> + for (i = 0; i < threads; i++) {
> + if (current_thread->is_started) {
> + ret = AVERROR(pthread_join(current_thread->thread, NULL));
> + if (ret < 0)
> + cleanupResult = ret;
> + }
> + current_thread++;
> + }
> +
> + // finalize streams and collect results
> + current_data = init_data;
> + for (i = 0; i < nstreams; i++) {
> + if (current_data->result < 0) {
> + // thread ran into error: collect result and break
> + runResult = current_data->result;
> + break;
> + }
> + else {
> + // thread success: create streams on AVFormatContext
> + ret = end_open_demux_for_component(s, current_data->pls);
> + if (ret < 0)
> + runResult = ret;
> + }
> + current_data++;
> + }
> +
> + // cleanup mutex and conditions
> + ret = AVERROR(pthread_mutex_destroy(&work_pool_mutex));
> + if (ret < 0)
> + cleanupResult = ret;
> +
> + if (c->is_init_section_common_video) {
> + ret = AVERROR(pthread_mutex_destroy(&common_video_mutex));
> + if (ret < 0)
> + cleanupResult = ret;
> +
> + ret = AVERROR(pthread_cond_destroy(&common_video_cond));
> + if (ret < 0)
> + cleanupResult = ret;
> + }
> +
> + if (c->is_init_section_common_audio) {
> + ret = AVERROR(pthread_mutex_destroy(&common_audio_mutex));
> + if (ret < 0)
> + cleanupResult = ret;
> +
> + ret = AVERROR(pthread_cond_destroy(&common_audio_cond));
> + if (ret < 0)
> + cleanupResult = ret;
> + }
> +
> + if (c->is_init_section_common_subtitle) {
> + ret = AVERROR(pthread_mutex_destroy(&common_subtitle_mutex));
> + if (ret < 0)
> + cleanupResult = ret;
> +
> + ret = AVERROR(pthread_cond_destroy(&common_subtitle_cond));
> + if (ret < 0)
> + cleanupResult = ret;
> + }
> +
> + // return results if errors have occured in one of the phases
> + if (initResult < 0)
> + return initResult;
> +
> + if (runResult < 0)
> + return runResult;
> +
> + if (cleanupResult < 0)
> + return cleanupResult;
> +
> + return 0;
> +}
> +
> +#endif
> +
> static int dash_read_header(AVFormatContext *s)
> {
> DASHContext *c = s->priv_data;
> @@ -2067,6 +2395,23 @@ static int dash_read_header(AVFormatContext *s)
> if (c->n_subtitles)
> c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles);
>
> + int threads = 0;
> + int nstreams = c->n_videos + c->n_audios + c->n_subtitles;
> +
> +#if HAVE_THREADS
> + threads = FFMIN(nstreams, c->init_threads);
> +#endif
> +
> + if (threads > 1)
> + {
> +#if HAVE_THREADS
> + ret = init_streams_multithreaded(s, nstreams, threads);
> + if (ret < 0)
> + return ret;
> +#endif
> + }
> + else
> + {
> /* Open the demuxer for video and audio components if available */
> for (i = 0; i < c->n_videos; i++) {
> rep = c->videos[i];
> @@ -2115,6 +2460,7 @@ static int dash_read_header(AVFormatContext *s)
>
> if (!stream_index)
> return AVERROR_INVALIDDATA;
> + }
>
> /* Create a program */
> program = av_new_program(s, 0);
> @@ -2366,7 +2712,10 @@ static const AVOption dash_options[] = {
> OFFSET(allowed_extensions), AV_OPT_TYPE_STRING,
> {.str = "aac,m4a,m4s,m4v,mov,mp4,webm,ts"},
> INT_MIN, INT_MAX, FLAGS},
> - { "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key), AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS },
> + { "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key),
> + AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS },
> + { "init_threads", "Number of threads to use for initializing the DASH stream",
> + OFFSET(init_threads), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 64, FLAGS },
> {NULL}
> };
>
> --
> 2.31.1.windows.1
>
1. We actually have an API to process multiple tasks by different
threads: Look at libavutil/slicethread.h. Why can't you reuse that?
2. In case initialization of one of the conditions/mutexes fails, you
are nevertheless destroying them; you are even destroying completely
uninitialized mutexes. This is undefined behaviour. Checking the result
of it does not fix this.
- Andreas
_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
To unsubscribe, visit link above, or email
ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
^ permalink raw reply [flat|nested] 26+ messages in thread
* Re: [FFmpeg-devel] [PATCH v3 2/3] lavf/dashdec: Multithreaded DASH initialization
2022-08-31 2:54 ` Andreas Rheinhardt
@ 2022-08-31 7:25 ` Steven Liu
2022-08-31 12:17 ` Andreas Rheinhardt
2022-09-04 21:29 ` Lukas Fellechner
1 sibling, 1 reply; 26+ messages in thread
From: Steven Liu @ 2022-08-31 7:25 UTC (permalink / raw)
To: FFmpeg development discussions and patches
Andreas Rheinhardt <andreas.rheinhardt@outlook.com> 于2022年8月31日周三 10:54写道:
>
> Lukas Fellechner:
> > This patch adds an "init-threads" option, specifying the max
> > number of threads to use. Multiple worker threads are spun up
> > to massively bring down init times.
> > ---
> > libavformat/dashdec.c | 351 +++++++++++++++++++++++++++++++++++++++++-
> > 1 file changed, 350 insertions(+), 1 deletion(-)
> >
> > diff --git a/libavformat/dashdec.c b/libavformat/dashdec.c
> > index e82da45e43..20f2557ea3 100644
> > --- a/libavformat/dashdec.c
> > +++ b/libavformat/dashdec.c
> > @@ -24,6 +24,7 @@
> > #include "libavutil/opt.h"
> > #include "libavutil/time.h"
> > #include "libavutil/parseutils.h"
> > +#include "libavutil/thread.h"
> > #include "internal.h"
> > #include "avio_internal.h"
> > #include "dash.h"
> > @@ -152,6 +153,8 @@ typedef struct DASHContext {
> > int max_url_size;
> > char *cenc_decryption_key;
> >
> > + int init_threads;
> > +
> > /* Flags for init section*/
> > int is_init_section_common_video;
> > int is_init_section_common_audio;
> > @@ -2033,6 +2036,331 @@ static void move_metadata(AVStream *st, const char *key, char **value)
> > }
> > }
> >
> > +#if HAVE_THREADS
> > +
> > +struct work_pool_data
> > +{
> > + AVFormatContext *ctx;
> > + struct representation *pls;
> > + struct representation *common_pls;
> > + pthread_mutex_t *common_mutex;
> > + pthread_cond_t *common_condition;
> > + int is_common;
> > + int is_started;
> > + int result;
> > +};
> > +
> > +struct thread_data
>
> This is against our naming conventions: CamelCase for struct tags and
> typedefs, lowercase names with underscore for variable names.
>
> > +{
> > + pthread_t thread;
> > + pthread_mutex_t *mutex;
> > + struct work_pool_data *work_pool;
> > + int work_pool_size;
> > + int is_started;
> > + int has_error;
> > +};
> > +
> > +static void *worker_thread(void *ptr)
> > +{
> > + int ret = 0;
> > + int i;
> > + struct thread_data *thread_data = (struct thread_data*)ptr;
> > + struct work_pool_data *work_pool = NULL;
> > + struct work_pool_data *data = NULL;
> > + for (;;) {
> > +
> > + // get next work item unless there was an error
> > + pthread_mutex_lock(thread_data->mutex);
> > + data = NULL;
> > + if (!thread_data->has_error) {
> > + work_pool = thread_data->work_pool;
> > + for (i = 0; i < thread_data->work_pool_size; i++) {
> > + if (!work_pool->is_started) {
> > + data = work_pool;
> > + data->is_started = 1;
> > + break;
> > + }
> > + work_pool++;
> > + }
> > + }
> > + pthread_mutex_unlock(thread_data->mutex);
> > +
> > + if (!data) {
> > + // no more work to do
> > + return NULL;
> > + }
> > +
> > + // if we are common section provider, init and signal
> > + if (data->is_common) {
> > + data->pls->parent = data->ctx;
> > + ret = update_init_section(data->pls);
> > + if (ret < 0) {
> > + pthread_cond_signal(data->common_condition);
> > + goto end;
> > + }
> > + else
> > + ret = AVERROR(pthread_cond_signal(data->common_condition));
> > + }
> > +
> > + // if we depend on common section provider, wait for signal and copy
> > + if (data->common_pls) {
> > + ret = AVERROR(pthread_cond_wait(data->common_condition, data->common_mutex));
> > + if (ret < 0)
> > + goto end;
> > +
> > + if (!data->common_pls->init_sec_buf) {
> > + goto end;
> > + ret = AVERROR(EFAULT);
> > + }
> > +
> > + ret = copy_init_section(data->pls, data->common_pls);
> > + if (ret < 0)
> > + goto end;
> > + }
> > +
> > + ret = begin_open_demux_for_component(data->ctx, data->pls);
> > + if (ret < 0)
> > + goto end;
> > +
> > + end:
> > + data->result = ret;
> > +
> > + // notify error to other threads and exit
> > + if (ret < 0) {
> > + pthread_mutex_lock(thread_data->mutex);
> > + thread_data->has_error = 1;
> > + pthread_mutex_unlock(thread_data->mutex);
> > + return NULL;
> > + }
> > + }
> > +
> > +
> > + return NULL;
> > +}
> > +
> > +static void create_work_pool_data(AVFormatContext *ctx, int stream_index,
> > + struct representation *pls, struct representation *common_pls,
> > + struct work_pool_data *init_data, pthread_mutex_t *common_mutex,
> > + pthread_cond_t *common_condition)
> > +{
> > + init_data->ctx = ctx;
> > + init_data->pls = pls;
> > + init_data->pls->stream_index = stream_index;
> > + init_data->common_condition = common_condition;
> > + init_data->common_mutex = common_mutex;
> > + init_data->result = -1;
> > +
> > + if (pls == common_pls) {
> > + init_data->is_common = 1;
> > + }
> > + else if (common_pls) {
> > + init_data->common_pls = common_pls;
> > + }
> > +}
> > +
> > +static int start_thread(struct thread_data *thread_data,
> > + struct work_pool_data *work_pool, int work_pool_size, pthread_mutex_t *mutex)
> > +{
> > + int ret;
> > +
> > + thread_data->mutex = mutex;
> > + thread_data->work_pool = work_pool;
> > + thread_data->work_pool_size = work_pool_size;
> > +
> > + ret = AVERROR(pthread_create(&thread_data->thread, NULL, worker_thread, (void*)thread_data));
> > + if (ret == 0)
> > + thread_data->is_started = 1;
> > +
> > + return ret;
> > +}
> > +
> > +static int init_streams_multithreaded(AVFormatContext *s, int nstreams, int threads)
> > +{
> > + DASHContext *c = s->priv_data;
> > + int ret = 0;
> > + int stream_index = 0;
> > + int i;
>
> We allow "for (int i = 0;"
>
> > +
> > + // alloc data
> > + struct work_pool_data *init_data = (struct work_pool_data*)av_mallocz(sizeof(struct work_pool_data) * nstreams);
> > + if (!init_data)
> > + return AVERROR(ENOMEM);
> > +
> > + struct thread_data *thread_data = (struct thread_data*)av_mallocz(sizeof(struct thread_data) * threads);
> > + if (!thread_data)
> > + return AVERROR(ENOMEM);
>
> 1. init_data leaks here on error.
> 2. In fact, it seems to me that both init_data and thread_data are
> nowhere freed.
>
> > +
> > + // alloc mutex and conditions
> > + pthread_mutex_t work_pool_mutex;
> > +
> > + pthread_mutex_t common_video_mutex;
> > + pthread_cond_t common_video_cond;
> > +
> > + pthread_mutex_t common_audio_mutex;
> > + pthread_cond_t common_audio_cond;
> > +
> > + pthread_mutex_t common_subtitle_mutex;
> > + pthread_cond_t common_subtitle_cond;
> > +
> > + // init mutex and conditions
> > + ret = AVERROR(pthread_mutex_init(&work_pool_mutex, NULL));
> > + if (ret < 0)
> > + goto cleanup;
> > +
> > + if (c->is_init_section_common_video) {
> > + ret = AVERROR(pthread_mutex_init(&common_video_mutex, NULL));
> > + if (ret < 0)
> > + goto cleanup;
> > +
> > + ret = AVERROR(pthread_cond_init(&common_video_cond, NULL));
> > + if (ret < 0)
> > + goto cleanup;
> > + }
> > +
> > + if (c->is_init_section_common_audio) {
> > + ret = AVERROR(pthread_mutex_init(&common_audio_mutex, NULL));
> > + if (ret < 0)
> > + goto cleanup;
> > +
> > + ret = AVERROR(pthread_cond_init(&common_audio_cond, NULL));
> > + if (ret < 0)
> > + goto cleanup;
> > + }
> > +
> > + if (c->is_init_section_common_subtitle) {
> > + ret = AVERROR(pthread_mutex_init(&common_subtitle_mutex, NULL));
> > + if (ret < 0)
> > + goto cleanup;
> > +
> > + ret = AVERROR(pthread_cond_init(&common_subtitle_cond, NULL));
> > + if (ret < 0)
> > + goto cleanup;
> > + }
> > +
> > + // init work pool data
> > + struct work_pool_data* current_data = init_data;
> > +
> > + for (i = 0; i < c->n_videos; i++) {
> > + create_work_pool_data(s, stream_index, c->videos[i],
> > + c->is_init_section_common_video ? c->videos[0] : NULL,
> > + current_data, &common_video_mutex, &common_video_cond);
> > +
> > + stream_index++;
> > + current_data++;
> > + }
> > +
> > + for (i = 0; i < c->n_audios; i++) {
> > + create_work_pool_data(s, stream_index, c->audios[i],
> > + c->is_init_section_common_audio ? c->audios[0] : NULL,
> > + current_data, &common_audio_mutex, &common_audio_cond);
> > +
> > + stream_index++;
> > + current_data++;
> > + }
> > +
> > + for (i = 0; i < c->n_subtitles; i++) {
> > + create_work_pool_data(s, stream_index, c->subtitles[i],
> > + c->is_init_section_common_subtitle ? c->subtitles[0] : NULL,
> > + current_data, &common_subtitle_mutex, &common_subtitle_cond);
> > +
> > + stream_index++;
> > + current_data++;
> > + }
>
> This is very repetitive.
>
> > +
> > + // start threads
> > + struct thread_data *current_thread = thread_data;
> > + for (i = 0; i < threads; i++) {
> > + ret = start_thread(current_thread, init_data, nstreams, &work_pool_mutex);
> > + if (ret < 0)
> > + goto cleanup;
> > +
> > + current_thread++;
> > + }
> > +
> > +cleanup:
> > + // we need to cleanup even in case of errors, so we need to store results of init, run and cleanup
> > + int initResult = ret;
> > + int runResult = 0;
> > + int cleanupResult = 0;
> > +
> > + // join threads
> > + current_thread = thread_data;
> > + for (i = 0; i < threads; i++) {
> > + if (current_thread->is_started) {
> > + ret = AVERROR(pthread_join(current_thread->thread, NULL));
> > + if (ret < 0)
> > + cleanupResult = ret;
> > + }
> > + current_thread++;
> > + }
> > +
> > + // finalize streams and collect results
> > + current_data = init_data;
> > + for (i = 0; i < nstreams; i++) {
> > + if (current_data->result < 0) {
> > + // thread ran into error: collect result and break
> > + runResult = current_data->result;
> > + break;
> > + }
> > + else {
> > + // thread success: create streams on AVFormatContext
> > + ret = end_open_demux_for_component(s, current_data->pls);
> > + if (ret < 0)
> > + runResult = ret;
> > + }
> > + current_data++;
> > + }
> > +
> > + // cleanup mutex and conditions
> > + ret = AVERROR(pthread_mutex_destroy(&work_pool_mutex));
> > + if (ret < 0)
> > + cleanupResult = ret;
> > +
> > + if (c->is_init_section_common_video) {
> > + ret = AVERROR(pthread_mutex_destroy(&common_video_mutex));
> > + if (ret < 0)
> > + cleanupResult = ret;
> > +
> > + ret = AVERROR(pthread_cond_destroy(&common_video_cond));
> > + if (ret < 0)
> > + cleanupResult = ret;
> > + }
> > +
> > + if (c->is_init_section_common_audio) {
> > + ret = AVERROR(pthread_mutex_destroy(&common_audio_mutex));
> > + if (ret < 0)
> > + cleanupResult = ret;
> > +
> > + ret = AVERROR(pthread_cond_destroy(&common_audio_cond));
> > + if (ret < 0)
> > + cleanupResult = ret;
> > + }
> > +
> > + if (c->is_init_section_common_subtitle) {
> > + ret = AVERROR(pthread_mutex_destroy(&common_subtitle_mutex));
> > + if (ret < 0)
> > + cleanupResult = ret;
> > +
> > + ret = AVERROR(pthread_cond_destroy(&common_subtitle_cond));
> > + if (ret < 0)
> > + cleanupResult = ret;
> > + }
> > +
> > + // return results if errors have occured in one of the phases
> > + if (initResult < 0)
> > + return initResult;
> > +
> > + if (runResult < 0)
> > + return runResult;
> > +
> > + if (cleanupResult < 0)
> > + return cleanupResult;
> > +
> > + return 0;
> > +}
> > +
> > +#endif
> > +
> > static int dash_read_header(AVFormatContext *s)
> > {
> > DASHContext *c = s->priv_data;
> > @@ -2067,6 +2395,23 @@ static int dash_read_header(AVFormatContext *s)
> > if (c->n_subtitles)
> > c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles);
> >
> > + int threads = 0;
> > + int nstreams = c->n_videos + c->n_audios + c->n_subtitles;
> > +
> > +#if HAVE_THREADS
> > + threads = FFMIN(nstreams, c->init_threads);
> > +#endif
> > +
> > + if (threads > 1)
> > + {
> > +#if HAVE_THREADS
> > + ret = init_streams_multithreaded(s, nstreams, threads);
> > + if (ret < 0)
> > + return ret;
> > +#endif
> > + }
> > + else
> > + {
> > /* Open the demuxer for video and audio components if available */
> > for (i = 0; i < c->n_videos; i++) {
> > rep = c->videos[i];
> > @@ -2115,6 +2460,7 @@ static int dash_read_header(AVFormatContext *s)
> >
> > if (!stream_index)
> > return AVERROR_INVALIDDATA;
> > + }
> >
> > /* Create a program */
> > program = av_new_program(s, 0);
> > @@ -2366,7 +2712,10 @@ static const AVOption dash_options[] = {
> > OFFSET(allowed_extensions), AV_OPT_TYPE_STRING,
> > {.str = "aac,m4a,m4s,m4v,mov,mp4,webm,ts"},
> > INT_MIN, INT_MAX, FLAGS},
> > - { "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key), AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS },
> > + { "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key),
> > + AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS },
> > + { "init_threads", "Number of threads to use for initializing the DASH stream",
> > + OFFSET(init_threads), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 64, FLAGS },
> > {NULL}
> > };
> >
> > --
> > 2.31.1.windows.1
> >
>
> 1. We actually have an API to process multiple tasks by different
> threads: Look at libavutil/slicethread.h. Why can't you reuse that?
I saw that usually be used in avfilters for slice multi-thread, or i
misunderstand something?
> 2. In case initialization of one of the conditions/mutexes fails, you
> are nevertheless destroying them; you are even destroying completely
> uninitialized mutexes. This is undefined behaviour. Checking the result
> of it does not fix this.
>
> - Andreas
Thanks
Steven
_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
To unsubscribe, visit link above, or email
ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
^ permalink raw reply [flat|nested] 26+ messages in thread
* Re: [FFmpeg-devel] [PATCH v3 2/3] lavf/dashdec: Multithreaded DASH initialization
2022-08-31 7:25 ` Steven Liu
@ 2022-08-31 12:17 ` Andreas Rheinhardt
0 siblings, 0 replies; 26+ messages in thread
From: Andreas Rheinhardt @ 2022-08-31 12:17 UTC (permalink / raw)
To: ffmpeg-devel
Steven Liu:
> Andreas Rheinhardt <andreas.rheinhardt@outlook.com> 于2022年8月31日周三 10:54写道:
>> 1. We actually have an API to process multiple tasks by different
>> threads: Look at libavutil/slicethread.h. Why can't you reuse that?
> I saw that usually be used in avfilters for slice multi-thread, or i
> misunderstand something?
>
It is also used by our slice-threaded decoders. In fact, it is
everywhere where we do slice threading.
- Andreas
_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
To unsubscribe, visit link above, or email
ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
^ permalink raw reply [flat|nested] 26+ messages in thread
* Re: [FFmpeg-devel] [PATCH v3 2/3] lavf/dashdec: Multithreaded DASH initialization
2022-08-31 2:54 ` Andreas Rheinhardt
2022-08-31 7:25 ` Steven Liu
@ 2022-09-04 21:29 ` Lukas Fellechner
2022-09-04 22:50 ` Andreas Rheinhardt
1 sibling, 1 reply; 26+ messages in thread
From: Lukas Fellechner @ 2022-09-04 21:29 UTC (permalink / raw)
To: ffmpeg-devel
Andreas Rheinhardt andreas.rheinhardt at outlook.com
Wed Aug 31 05:54:12 EEST 2022
>
> > +#if HAVE_THREADS
> > +
> > +struct work_pool_data
> > +{
> > + AVFormatContext *ctx;
> > + struct representation *pls;
> > + struct representation *common_pls;
> > + pthread_mutex_t *common_mutex;
> > + pthread_cond_t *common_condition;
> > + int is_common;
> > + int is_started;
> > + int result;
> > +};
> > +
> > +struct thread_data
>
> This is against our naming conventions: CamelCase for struct tags and
> typedefs, lowercase names with underscore for variable names.
In the code files I looked at, CamelCase is only used for typedef structs.
All structs without typedef are lower case with underscores, so I aligned
with that, originally.
I will make this a typedef struct and use CamelCase for next patch.
> > +static int init_streams_multithreaded(AVFormatContext *s, int nstreams, int threads)
> > +{
> > + DASHContext *c = s->priv_data;
> > + int ret = 0;
> > + int stream_index = 0;
> > + int i;
>
> We allow "for (int i = 0;"
Oh, I did not know that, and I did not see it being used here anywhere.
Will use in next patch, it's my preferred style, actually.
> > +
> > + // alloc data
> > + struct work_pool_data *init_data = (struct work_pool_data*)av_mallocz(sizeof(struct work_pool_data) * nstreams);
> > + if (!init_data)
> > + return AVERROR(ENOMEM);
> > +
> > + struct thread_data *thread_data = (struct thread_data*)av_mallocz(sizeof(struct thread_data) * threads);
> > + if (!thread_data)
> > + return AVERROR(ENOMEM);
>
> 1. init_data leaks here on error.
> 2. In fact, it seems to me that both init_data and thread_data are
> nowhere freed.
True, I must have lost the av_free call at some point.
> > + // init work pool data
> > + struct work_pool_data* current_data = init_data;
> > +
> > + for (i = 0; i < c->n_videos; i++) {
> > + create_work_pool_data(s, stream_index, c->videos[i],
> > + c->is_init_section_common_video ? c->videos[0] : NULL,
> > + current_data, &common_video_mutex, &common_video_cond);
> > +
> > + stream_index++;
> > + current_data++;
> > + }
> > +
> > + for (i = 0; i < c->n_audios; i++) {
> > + create_work_pool_data(s, stream_index, c->audios[i],
> > + c->is_init_section_common_audio ? c->audios[0] : NULL,
> > + current_data, &common_audio_mutex, &common_audio_cond);
> > +
> > + stream_index++;
> > + current_data++;
> > + }
> > +
> > + for (i = 0; i < c->n_subtitles; i++) {
> > + create_work_pool_data(s, stream_index, c->subtitles[i],
> > + c->is_init_section_common_subtitle ? c->subtitles[0] : NULL,
> > + current_data, &common_subtitle_mutex, &common_subtitle_cond);
> > +
> > + stream_index++;
> > + current_data++;
> > + }
>
> This is very repetitive.
Will improve for next patch.
> 1. We actually have an API to process multiple tasks by different
> threads: Look at libavutil/slicethread.h. Why can't you reuse that?
> 2. In case initialization of one of the conditions/mutexes fails, you
> are nevertheless destroying them; you are even destroying completely
> uninitialized mutexes. This is undefined behaviour. Checking the result
> of it does not fix this.
>
> - Andreas
1. The slicethread implementation is pretty hard to understand.
I was not sure if it can be used for normal parallelization, because
it looked more like some kind of thread pool for continuous data
processing. But after taking a second look, I think I can use it here.
I will try and see if it works well.
2. I was not aware that this is undefined behavior. Will switch to
PTHREAD_MUTEX_INITIALIZER and PTHREAD_COND_INITIALIZER macros,
which return a safely initialized mutex/cond.
I also noticed one cross-thread issue that I will solve in the next patch.
_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
To unsubscribe, visit link above, or email
ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
^ permalink raw reply [flat|nested] 26+ messages in thread
* Re: [FFmpeg-devel] [PATCH v3 2/3] lavf/dashdec: Multithreaded DASH initialization
2022-09-04 21:29 ` Lukas Fellechner
@ 2022-09-04 22:50 ` Andreas Rheinhardt
2022-09-05 10:15 ` Lukas Fellechner
0 siblings, 1 reply; 26+ messages in thread
From: Andreas Rheinhardt @ 2022-09-04 22:50 UTC (permalink / raw)
To: ffmpeg-devel
Lukas Fellechner:
> Andreas Rheinhardt andreas.rheinhardt at outlook.com
> Wed Aug 31 05:54:12 EEST 2022
>>
>>> +#if HAVE_THREADS
>>> +
>>> +struct work_pool_data
>>> +{
>>> + AVFormatContext *ctx;
>>> + struct representation *pls;
>>> + struct representation *common_pls;
>>> + pthread_mutex_t *common_mutex;
>>> + pthread_cond_t *common_condition;
>>> + int is_common;
>>> + int is_started;
>>> + int result;
>>> +};
>>> +
>>> +struct thread_data
>>
>> This is against our naming conventions: CamelCase for struct tags and
>> typedefs, lowercase names with underscore for variable names.
>
> In the code files I looked at, CamelCase is only used for typedef structs.
> All structs without typedef are lower case with underscores, so I aligned
> with that, originally.
>
> I will make this a typedef struct and use CamelCase for next patch.
>
>>> +static int init_streams_multithreaded(AVFormatContext *s, int nstreams, int threads)
>>> +{
>>> + DASHContext *c = s->priv_data;
>>> + int ret = 0;
>>> + int stream_index = 0;
>>> + int i;
>>
>> We allow "for (int i = 0;"
>
> Oh, I did not know that, and I did not see it being used here anywhere.
> Will use in next patch, it's my preferred style, actually.
>
>>> +
>>> + // alloc data
>>> + struct work_pool_data *init_data = (struct work_pool_data*)av_mallocz(sizeof(struct work_pool_data) * nstreams);
>>> + if (!init_data)
>>> + return AVERROR(ENOMEM);
>>> +
>>> + struct thread_data *thread_data = (struct thread_data*)av_mallocz(sizeof(struct thread_data) * threads);
>>> + if (!thread_data)
>>> + return AVERROR(ENOMEM);
>>
>> 1. init_data leaks here on error.
>> 2. In fact, it seems to me that both init_data and thread_data are
>> nowhere freed.
>
> True, I must have lost the av_free call at some point.
>
>>> + // init work pool data
>>> + struct work_pool_data* current_data = init_data;
>>> +
>>> + for (i = 0; i < c->n_videos; i++) {
>>> + create_work_pool_data(s, stream_index, c->videos[i],
>>> + c->is_init_section_common_video ? c->videos[0] : NULL,
>>> + current_data, &common_video_mutex, &common_video_cond);
>>> +
>>> + stream_index++;
>>> + current_data++;
>>> + }
>>> +
>>> + for (i = 0; i < c->n_audios; i++) {
>>> + create_work_pool_data(s, stream_index, c->audios[i],
>>> + c->is_init_section_common_audio ? c->audios[0] : NULL,
>>> + current_data, &common_audio_mutex, &common_audio_cond);
>>> +
>>> + stream_index++;
>>> + current_data++;
>>> + }
>>> +
>>> + for (i = 0; i < c->n_subtitles; i++) {
>>> + create_work_pool_data(s, stream_index, c->subtitles[i],
>>> + c->is_init_section_common_subtitle ? c->subtitles[0] : NULL,
>>> + current_data, &common_subtitle_mutex, &common_subtitle_cond);
>>> +
>>> + stream_index++;
>>> + current_data++;
>>> + }
>>
>> This is very repetitive.
>
> Will improve for next patch.
>
>> 1. We actually have an API to process multiple tasks by different
>> threads: Look at libavutil/slicethread.h. Why can't you reuse that?
>> 2. In case initialization of one of the conditions/mutexes fails, you
>> are nevertheless destroying them; you are even destroying completely
>> uninitialized mutexes. This is undefined behaviour. Checking the result
>> of it does not fix this.
>>
>> - Andreas
>
> 1. The slicethread implementation is pretty hard to understand.
> I was not sure if it can be used for normal parallelization, because
> it looked more like some kind of thread pool for continuous data
> processing. But after taking a second look, I think I can use it here.
> I will try and see if it works well.
>
> 2. I was not aware that this is undefined behavior. Will switch to
> PTHREAD_MUTEX_INITIALIZER and PTHREAD_COND_INITIALIZER macros,
> which return a safely initialized mutex/cond.
>
"The behavior is undefined if the value specified by the mutex argument
to pthread_mutex_destroy() does not refer to an initialized mutex."
(From
https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_mutex_destroy.html)
Furthermore: "In cases where default mutex attributes are appropriate,
the macro PTHREAD_MUTEX_INITIALIZER can be used to initialize mutexes.
The effect shall be equivalent to dynamic initialization by a call to
pthread_mutex_init() with parameter attr specified as NULL, except that
no error checks are performed." The last sentence sounds as if one would
then have to check mutex locking.
Moreover, older pthread standards did not allow to use
PTHREAD_MUTEX_INITIALIZER with non-static mutexes, so I don't know
whether we can use that. Also our pthreads-wrapper on top of
OS/2-threads does not provide PTHREAD_COND_INITIALIZER (which is used
nowhere in the codebase).
> I also noticed one cross-thread issue that I will solve in the next patch.
_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
To unsubscribe, visit link above, or email
ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
^ permalink raw reply [flat|nested] 26+ messages in thread
* Re: [FFmpeg-devel] [PATCH v3 2/3] lavf/dashdec: Multithreaded DASH initialization
2022-09-04 22:50 ` Andreas Rheinhardt
@ 2022-09-05 10:15 ` Lukas Fellechner
2022-09-05 10:45 ` Andreas Rheinhardt
0 siblings, 1 reply; 26+ messages in thread
From: Lukas Fellechner @ 2022-09-05 10:15 UTC (permalink / raw)
To: ffmpeg-devel
> Gesendet: Montag, 05. September 2022 um 00:50 Uhr
> Von: "Andreas Rheinhardt" <andreas.rheinhardt@outlook.com>
> An: ffmpeg-devel@ffmpeg.org
> Betreff: Re: [FFmpeg-devel] [PATCH v3 2/3] lavf/dashdec: Multithreaded DASH initialization
> Lukas Fellechner:
> > Andreas Rheinhardt andreas.rheinhardt at outlook.com
> > Wed Aug 31 05:54:12 EEST 2022
> >>
> >
> >> 1. We actually have an API to process multiple tasks by different
> >> threads: Look at libavutil/slicethread.h. Why can't you reuse that?
> >> 2. In case initialization of one of the conditions/mutexes fails, you
> >> are nevertheless destroying them; you are even destroying completely
> >> uninitialized mutexes. This is undefined behaviour. Checking the result
> >> of it does not fix this.
> >>
> >> - Andreas
> >
> > 1. The slicethread implementation is pretty hard to understand.
> > I was not sure if it can be used for normal parallelization, because
> > it looked more like some kind of thread pool for continuous data
> > processing. But after taking a second look, I think I can use it here.
> > I will try and see if it works well.
> >
> > 2. I was not aware that this is undefined behavior. Will switch to
> > PTHREAD_MUTEX_INITIALIZER and PTHREAD_COND_INITIALIZER macros,
> > which return a safely initialized mutex/cond.
> >
>
> "The behavior is undefined if the value specified by the mutex argument
> to pthread_mutex_destroy() does not refer to an initialized mutex."
> (From
> https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_mutex_destroy.html)
>
> Furthermore: "In cases where default mutex attributes are appropriate,
> the macro PTHREAD_MUTEX_INITIALIZER can be used to initialize mutexes.
> The effect shall be equivalent to dynamic initialization by a call to
> pthread_mutex_init() with parameter attr specified as NULL, except that
> no error checks are performed." The last sentence sounds as if one would
> then have to check mutex locking.
>
> Moreover, older pthread standards did not allow to use
> PTHREAD_MUTEX_INITIALIZER with non-static mutexes, so I don't know
> whether we can use that. Also our pthreads-wrapper on top of
> OS/2-threads does not provide PTHREAD_COND_INITIALIZER (which is used
> nowhere in the codebase).
I missed that detail about the initializer macro. Thank you for clearing
that up.
After looking more into threads implementation in ffmpeg, I wonder if I
really need to check any results of init/destroy or other functions.
In slicethreads.c, there is zero checking on any of the lock functions.
The pthreads-based implementation does internally check the results of all
function calls and calls abort() in case of errors ("strict_" wrappers).
The Win32 implementation uses SRW locks which cannot even return errors.
And the OS2 implementation returns 0 on all calls as well.
So right now, I think that I should continue with normal _init() calls
(no macros) and drop all error checking, just like slicethread does.
Are you fine with that approach?
_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
To unsubscribe, visit link above, or email
ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
^ permalink raw reply [flat|nested] 26+ messages in thread
* Re: [FFmpeg-devel] [PATCH v3 2/3] lavf/dashdec: Multithreaded DASH initialization
2022-09-05 10:15 ` Lukas Fellechner
@ 2022-09-05 10:45 ` Andreas Rheinhardt
2022-09-05 14:28 ` Lukas Fellechner
2022-09-11 20:35 ` Lukas Fellechner
0 siblings, 2 replies; 26+ messages in thread
From: Andreas Rheinhardt @ 2022-09-05 10:45 UTC (permalink / raw)
To: ffmpeg-devel
Lukas Fellechner:
>> Gesendet: Montag, 05. September 2022 um 00:50 Uhr
>> Von: "Andreas Rheinhardt" <andreas.rheinhardt@outlook.com>
>> An: ffmpeg-devel@ffmpeg.org
>> Betreff: Re: [FFmpeg-devel] [PATCH v3 2/3] lavf/dashdec: Multithreaded DASH initialization
>> Lukas Fellechner:
>>> Andreas Rheinhardt andreas.rheinhardt at outlook.com
>>> Wed Aug 31 05:54:12 EEST 2022
>>>>
>>>
>>>> 1. We actually have an API to process multiple tasks by different
>>>> threads: Look at libavutil/slicethread.h. Why can't you reuse that?
>>>> 2. In case initialization of one of the conditions/mutexes fails, you
>>>> are nevertheless destroying them; you are even destroying completely
>>>> uninitialized mutexes. This is undefined behaviour. Checking the result
>>>> of it does not fix this.
>>>>
>>>> - Andreas
>>>
>>> 1. The slicethread implementation is pretty hard to understand.
>>> I was not sure if it can be used for normal parallelization, because
>>> it looked more like some kind of thread pool for continuous data
>>> processing. But after taking a second look, I think I can use it here.
>>> I will try and see if it works well.
>>>
>>> 2. I was not aware that this is undefined behavior. Will switch to
>>> PTHREAD_MUTEX_INITIALIZER and PTHREAD_COND_INITIALIZER macros,
>>> which return a safely initialized mutex/cond.
>>>
>>
>> "The behavior is undefined if the value specified by the mutex argument
>> to pthread_mutex_destroy() does not refer to an initialized mutex."
>> (From
>> https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_mutex_destroy.html)
>>
>> Furthermore: "In cases where default mutex attributes are appropriate,
>> the macro PTHREAD_MUTEX_INITIALIZER can be used to initialize mutexes.
>> The effect shall be equivalent to dynamic initialization by a call to
>> pthread_mutex_init() with parameter attr specified as NULL, except that
>> no error checks are performed." The last sentence sounds as if one would
>> then have to check mutex locking.
>>
>> Moreover, older pthread standards did not allow to use
>> PTHREAD_MUTEX_INITIALIZER with non-static mutexes, so I don't know
>> whether we can use that. Also our pthreads-wrapper on top of
>> OS/2-threads does not provide PTHREAD_COND_INITIALIZER (which is used
>> nowhere in the codebase).
>
> I missed that detail about the initializer macro. Thank you for clearing
> that up.
>
> After looking more into threads implementation in ffmpeg, I wonder if I
> really need to check any results of init/destroy or other functions.
> In slicethreads.c, there is zero checking on any of the lock functions.
> The pthreads-based implementation does internally check the results of all
> function calls and calls abort() in case of errors ("strict_" wrappers).
> The Win32 implementation uses SRW locks which cannot even return errors.
> And the OS2 implementation returns 0 on all calls as well.
>
> So right now, I think that I should continue with normal _init() calls
> (no macros) and drop all error checking, just like slicethread does.
> Are you fine with that approach?
Zero checking is our old approach; the new approach checks for errors
and ensures that only mutexes/condition variables that have been
properly initialized are destroyed. See ff_pthread_init/free in
libavcodec/pthread.c (you can't use this in libavformat, because these
functions are local to libavcodec).
- Andreas
_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
To unsubscribe, visit link above, or email
ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
^ permalink raw reply [flat|nested] 26+ messages in thread
* Re: [FFmpeg-devel] [PATCH v3 2/3] lavf/dashdec: Multithreaded DASH initialization
2022-09-05 10:45 ` Andreas Rheinhardt
@ 2022-09-05 14:28 ` Lukas Fellechner
2022-09-11 20:35 ` Lukas Fellechner
1 sibling, 0 replies; 26+ messages in thread
From: Lukas Fellechner @ 2022-09-05 14:28 UTC (permalink / raw)
To: ffmpeg-devel
>Gesendet: Montag, 05. September 2022 um 12:45 Uhr
>Von: "Andreas Rheinhardt" <andreas.rheinhardt@outlook.com>
>An: ffmpeg-devel@ffmpeg.org
>Betreff: Re: [FFmpeg-devel] [PATCH v3 2/3] lavf/dashdec: Multithreaded DASH initialization
>Lukas Fellechner:
>>> Gesendet: Montag, 05. September 2022 um 00:50 Uhr
>>> Von: "Andreas Rheinhardt" <andreas.rheinhardt@outlook.com>
>>> An: ffmpeg-devel@ffmpeg.org
>>> Betreff: Re: [FFmpeg-devel] [PATCH v3 2/3] lavf/dashdec: Multithreaded DASH initialization
>>> Lukas Fellechner:
>>>> Andreas Rheinhardt andreas.rheinhardt at outlook.com
>>>> Wed Aug 31 05:54:12 EEST 2022
>>>>>
>>>>
>>>>> 1. We actually have an API to process multiple tasks by different
>>>>> threads: Look at libavutil/slicethread.h. Why can't you reuse that?
>>>>> 2. In case initialization of one of the conditions/mutexes fails, you
>>>>> are nevertheless destroying them; you are even destroying completely
>>>>> uninitialized mutexes. This is undefined behaviour. Checking the result
>>>>> of it does not fix this.
>>>>>
>>>>> - Andreas
>>>>
>>>> 1. The slicethread implementation is pretty hard to understand.
>>>> I was not sure if it can be used for normal parallelization, because
>>>> it looked more like some kind of thread pool for continuous data
>>>> processing. But after taking a second look, I think I can use it here.
>>>> I will try and see if it works well.
>>>>
>>>> 2. I was not aware that this is undefined behavior. Will switch to
>>>> PTHREAD_MUTEX_INITIALIZER and PTHREAD_COND_INITIALIZER macros,
>>>> which return a safely initialized mutex/cond.
>>>>
>>>
>>> "The behavior is undefined if the value specified by the mutex argument
>>> to pthread_mutex_destroy() does not refer to an initialized mutex."
>>> (From
>>> https://pubs.opengroup.org/onlinepubs/9699919799/functions/pthread_mutex_destroy.html)
>>>
>>> Furthermore: "In cases where default mutex attributes are appropriate,
>>> the macro PTHREAD_MUTEX_INITIALIZER can be used to initialize mutexes.
>>> The effect shall be equivalent to dynamic initialization by a call to
>>> pthread_mutex_init() with parameter attr specified as NULL, except that
>>> no error checks are performed." The last sentence sounds as if one would
>>> then have to check mutex locking.
>>>
>>> Moreover, older pthread standards did not allow to use
>>> PTHREAD_MUTEX_INITIALIZER with non-static mutexes, so I don't know
>>> whether we can use that. Also our pthreads-wrapper on top of
>>> OS/2-threads does not provide PTHREAD_COND_INITIALIZER (which is used
>>> nowhere in the codebase).
>>
>> I missed that detail about the initializer macro. Thank you for clearing
>> that up.
>>
>> After looking more into threads implementation in ffmpeg, I wonder if I
>> really need to check any results of init/destroy or other functions.
>> In slicethreads.c, there is zero checking on any of the lock functions.
>> The pthreads-based implementation does internally check the results of all
>> function calls and calls abort() in case of errors ("strict_" wrappers).
>> The Win32 implementation uses SRW locks which cannot even return errors.
>> And the OS2 implementation returns 0 on all calls as well.
>>
>> So right now, I think that I should continue with normal _init() calls
>> (no macros) and drop all error checking, just like slicethread does.
>> Are you fine with that approach?
>
> Zero checking is our old approach; the new approach checks for errors
> and ensures that only mutexes/condition variables that have been
> properly initialized are destroyed. See ff_pthread_init/free in
> libavcodec/pthread.c (you can't use this in libavformat, because these
> functions are local to libavcodec).
>
> - Andreas
I see. I will try to do clean error checking then.
_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
To unsubscribe, visit link above, or email
ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
^ permalink raw reply [flat|nested] 26+ messages in thread
* Re: [FFmpeg-devel] [PATCH v3 2/3] lavf/dashdec: Multithreaded DASH initialization
2022-09-05 10:45 ` Andreas Rheinhardt
2022-09-05 14:28 ` Lukas Fellechner
@ 2022-09-11 20:35 ` Lukas Fellechner
1 sibling, 0 replies; 26+ messages in thread
From: Lukas Fellechner @ 2022-09-11 20:35 UTC (permalink / raw)
To: ffmpeg-devel
Gesendet: Montag, 05. September 2022 um 12:45 Uhr
Von: "Andreas Rheinhardt" <andreas.rheinhardt@outlook.com>
An: ffmpeg-devel@ffmpeg.org
Betreff: Re: [FFmpeg-devel] [PATCH v3 2/3] lavf/dashdec: Multithreaded DASH initialization
Lukas Fellechner:
>>> Moreover, older pthread standards did not allow to use
>>> PTHREAD_MUTEX_INITIALIZER with non-static mutexes, so I don't know
>>> whether we can use that. Also our pthreads-wrapper on top of
>>> OS/2-threads does not provide PTHREAD_COND_INITIALIZER (which is used
>>> nowhere in the codebase).
>>
>> I missed that detail about the initializer macro. Thank you for clearing
>> that up.
>>
>> After looking more into threads implementation in ffmpeg, I wonder if I
>> really need to check any results of init/destroy or other functions.
>> In slicethreads.c, there is zero checking on any of the lock functions.
>> The pthreads-based implementation does internally check the results of all
>> function calls and calls abort() in case of errors ("strict_" wrappers).
>> The Win32 implementation uses SRW locks which cannot even return errors.
>> And the OS2 implementation returns 0 on all calls as well.
>>
>> So right now, I think that I should continue with normal _init() calls
>> (no macros) and drop all error checking, just like slicethread does.
>> Are you fine with that approach?
>
> Zero checking is our old approach; the new approach checks for errors
> and ensures that only mutexes/condition variables that have been
> properly initialized are destroyed. See ff_pthread_init/free in
> libavcodec/pthread.c (you can't use this in libavformat, because these
> functions are local to libavcodec).
>
> - Andreas
I was able to switch to using the slicethread implementation. It has a
very minor delay on init, because it waits for all threads to fully start
up before continueing. But it is only few ms and not worth adding a new
implementation just for that.
I also changed the initialization and release of mutex and conds, with
full return code checking and safe release.
There was one cross-thread issue I needed to address.
A multi hour duration test (connecting in endless loop) did not show
any issues after fixing the avio_opts cross thread access.
Please see the v4 patch for all the changes.
- Lukas
_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
To unsubscribe, visit link above, or email
ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
^ permalink raw reply [flat|nested] 26+ messages in thread
* [FFmpeg-devel] [PATCH v3 3/3] lavf/dashdec: Fix indentation after multithreading
2022-08-23 19:03 ` [FFmpeg-devel] [PATCH v3 0/3] " Lukas Fellechner
2022-08-23 19:03 ` [FFmpeg-devel] [PATCH v3 1/3] lavf/dashdec: Prepare DASH decoder for multithreading Lukas Fellechner
2022-08-23 19:03 ` [FFmpeg-devel] [PATCH v3 2/3] lavf/dashdec: Multithreaded DASH initialization Lukas Fellechner
@ 2022-08-23 19:03 ` Lukas Fellechner
2 siblings, 0 replies; 26+ messages in thread
From: Lukas Fellechner @ 2022-08-23 19:03 UTC (permalink / raw)
To: ffmpeg-devel; +Cc: Lukas Fellechner
---
libavformat/dashdec.c | 74 +++++++++++++++++++++----------------------
1 file changed, 37 insertions(+), 37 deletions(-)
diff --git a/libavformat/dashdec.c b/libavformat/dashdec.c
index 20f2557ea3..f653b9850e 100644
--- a/libavformat/dashdec.c
+++ b/libavformat/dashdec.c
@@ -2412,54 +2412,54 @@ static int dash_read_header(AVFormatContext *s)
}
else
{
- /* Open the demuxer for video and audio components if available */
- for (i = 0; i < c->n_videos; i++) {
- rep = c->videos[i];
- if (i > 0 && c->is_init_section_common_video) {
- ret = copy_init_section(rep, c->videos[0]);
- if (ret < 0)
+ /* Open the demuxer for video and audio components if available */
+ for (i = 0; i < c->n_videos; i++) {
+ rep = c->videos[i];
+ if (i > 0 && c->is_init_section_common_video) {
+ ret = copy_init_section(rep, c->videos[0]);
+ if (ret < 0)
+ return ret;
+ }
+ ret = open_demux_for_component(s, rep);
+
+ if (ret)
return ret;
+ rep->stream_index = stream_index;
+ ++stream_index;
}
- ret = open_demux_for_component(s, rep);
- if (ret)
- return ret;
- rep->stream_index = stream_index;
- ++stream_index;
- }
+ for (i = 0; i < c->n_audios; i++) {
+ rep = c->audios[i];
+ if (i > 0 && c->is_init_section_common_audio) {
+ ret = copy_init_section(rep, c->audios[0]);
+ if (ret < 0)
+ return ret;
+ }
+ ret = open_demux_for_component(s, rep);
- for (i = 0; i < c->n_audios; i++) {
- rep = c->audios[i];
- if (i > 0 && c->is_init_section_common_audio) {
- ret = copy_init_section(rep, c->audios[0]);
- if (ret < 0)
+ if (ret)
return ret;
+ rep->stream_index = stream_index;
+ ++stream_index;
}
- ret = open_demux_for_component(s, rep);
- if (ret)
- return ret;
- rep->stream_index = stream_index;
- ++stream_index;
- }
+ for (i = 0; i < c->n_subtitles; i++) {
+ rep = c->subtitles[i];
+ if (i > 0 && c->is_init_section_common_subtitle) {
+ ret = copy_init_section(rep, c->subtitles[0]);
+ if (ret < 0)
+ return ret;
+ }
+ ret = open_demux_for_component(s, rep);
- for (i = 0; i < c->n_subtitles; i++) {
- rep = c->subtitles[i];
- if (i > 0 && c->is_init_section_common_subtitle) {
- ret = copy_init_section(rep, c->subtitles[0]);
- if (ret < 0)
+ if (ret)
return ret;
+ rep->stream_index = stream_index;
+ ++stream_index;
}
- ret = open_demux_for_component(s, rep);
- if (ret)
- return ret;
- rep->stream_index = stream_index;
- ++stream_index;
- }
-
- if (!stream_index)
- return AVERROR_INVALIDDATA;
+ if (!stream_index)
+ return AVERROR_INVALIDDATA;
}
/* Create a program */
--
2.31.1.windows.1
_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
To unsubscribe, visit link above, or email
ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
^ permalink raw reply [flat|nested] 26+ messages in thread
* [FFmpeg-devel] [PATCH v4 0/4] lavf/dashdec: Multithreaded DASH initialization
2022-08-20 21:35 [FFmpeg-devel] [PATCH 1/1] lavf/dashdec: Multithreaded DASH initialization Lukas Fellechner
` (2 preceding siblings ...)
2022-08-23 19:03 ` [FFmpeg-devel] [PATCH v3 0/3] " Lukas Fellechner
@ 2022-09-05 21:16 ` Lukas Fellechner
2022-09-05 21:16 ` [FFmpeg-devel] [PATCH v4 1/4] lavf/dashdec: Prepare DASH decoder for multithreading Lukas Fellechner
` (3 more replies)
3 siblings, 4 replies; 26+ messages in thread
From: Lukas Fellechner @ 2022-09-05 21:16 UTC (permalink / raw)
To: ffmpeg-devel
Initializing DASH streams is currently slow, because each individual
stream is opened and probed sequentially. With DASH streams often
having somewhere between 10-20 streams, this can easily take up to
half a minute on slow connections.
This patch adds an "init_threads" option, specifying the max number
of threads to use. Multiple worker threads are spun up to massively
bring down init times.
In-Reply-To: trinity-36a68f08-f239-4450-b893-af6bfa783181-1661031307501@3c-app-gmx-bs35
_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
To unsubscribe, visit link above, or email
ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
^ permalink raw reply [flat|nested] 26+ messages in thread
* [FFmpeg-devel] [PATCH v4 1/4] lavf/dashdec: Prepare DASH decoder for multithreading
2022-09-05 21:16 ` [FFmpeg-devel] [PATCH v4 0/4] lavf/dashdec: Multithreaded DASH initialization Lukas Fellechner
@ 2022-09-05 21:16 ` Lukas Fellechner
2022-09-05 21:16 ` [FFmpeg-devel] [PATCH v4 2/4] lavf/dashdec: Multithreaded DASH initialization Lukas Fellechner
` (2 subsequent siblings)
3 siblings, 0 replies; 26+ messages in thread
From: Lukas Fellechner @ 2022-09-05 21:16 UTC (permalink / raw)
To: ffmpeg-devel; +Cc: Lukas Fellechner
For adding multithreading to the DASH decoder initialization,
the open_demux_for_component() method must be split up into two parts:
begin_open_demux_for_component(): Opens the stream and does probing
and format detection. This can be run in parallel.
end_open_demux_for_component(): Creates the AVStreams and adds
them to the common parent AVFormatContext. This method must always be
run synchronously, after all threads are finished.
---
libavformat/dashdec.c | 42 ++++++++++++++++++++++++++++++------------
1 file changed, 30 insertions(+), 12 deletions(-)
diff --git a/libavformat/dashdec.c b/libavformat/dashdec.c
index 63bf7e96a5..e82da45e43 100644
--- a/libavformat/dashdec.c
+++ b/libavformat/dashdec.c
@@ -1918,10 +1918,9 @@ fail:
return ret;
}
-static int open_demux_for_component(AVFormatContext *s, struct representation *pls)
+static int begin_open_demux_for_component(AVFormatContext *s, struct representation *pls)
{
int ret = 0;
- int i;
pls->parent = s;
pls->cur_seq_no = calc_cur_seg_no(s, pls);
@@ -1931,9 +1930,15 @@ static int open_demux_for_component(AVFormatContext *s, struct representation *p
}
ret = reopen_demux_for_component(s, pls);
- if (ret < 0) {
- goto fail;
- }
+
+ return ret;
+}
+
+static int end_open_demux_for_component(AVFormatContext *s, struct representation *pls)
+{
+ int ret = 0;
+ int i;
+
for (i = 0; i < pls->ctx->nb_streams; i++) {
AVStream *st = avformat_new_stream(s, NULL);
AVStream *ist = pls->ctx->streams[i];
@@ -1965,6 +1970,19 @@ fail:
return ret;
}
+static int open_demux_for_component(AVFormatContext* s, struct representation* pls)
+{
+ int ret = 0;
+
+ ret = begin_open_demux_for_component(s, pls);
+ if (ret < 0)
+ return ret;
+
+ ret = end_open_demux_for_component(s, pls);
+
+ return ret;
+}
+
static int is_common_init_section_exist(struct representation **pls, int n_pls)
{
struct fragment *first_init_section = pls[0]->init_section;
@@ -2040,9 +2058,15 @@ static int dash_read_header(AVFormatContext *s)
av_dict_set(&c->avio_opts, "seekable", "0", 0);
}
- if(c->n_videos)
+ if (c->n_videos)
c->is_init_section_common_video = is_common_init_section_exist(c->videos, c->n_videos);
+ if (c->n_audios)
+ c->is_init_section_common_audio = is_common_init_section_exist(c->audios, c->n_audios);
+
+ if (c->n_subtitles)
+ c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles);
+
/* Open the demuxer for video and audio components if available */
for (i = 0; i < c->n_videos; i++) {
rep = c->videos[i];
@@ -2059,9 +2083,6 @@ static int dash_read_header(AVFormatContext *s)
++stream_index;
}
- if(c->n_audios)
- c->is_init_section_common_audio = is_common_init_section_exist(c->audios, c->n_audios);
-
for (i = 0; i < c->n_audios; i++) {
rep = c->audios[i];
if (i > 0 && c->is_init_section_common_audio) {
@@ -2077,9 +2098,6 @@ static int dash_read_header(AVFormatContext *s)
++stream_index;
}
- if (c->n_subtitles)
- c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles);
-
for (i = 0; i < c->n_subtitles; i++) {
rep = c->subtitles[i];
if (i > 0 && c->is_init_section_common_subtitle) {
--
2.28.0.windows.1
_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
To unsubscribe, visit link above, or email
ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
^ permalink raw reply [flat|nested] 26+ messages in thread
* [FFmpeg-devel] [PATCH v4 2/4] lavf/dashdec: Multithreaded DASH initialization
2022-09-05 21:16 ` [FFmpeg-devel] [PATCH v4 0/4] lavf/dashdec: Multithreaded DASH initialization Lukas Fellechner
2022-09-05 21:16 ` [FFmpeg-devel] [PATCH v4 1/4] lavf/dashdec: Prepare DASH decoder for multithreading Lukas Fellechner
@ 2022-09-05 21:16 ` Lukas Fellechner
2022-09-05 21:16 ` [FFmpeg-devel] [PATCH v4 3/4] lavf/dashdec: Prevent cross-thread avio_opts modification Lukas Fellechner
2022-09-05 21:16 ` [FFmpeg-devel] [PATCH v4 4/4] lavf/dashdec: Fix indentation after adding multithreading Lukas Fellechner
3 siblings, 0 replies; 26+ messages in thread
From: Lukas Fellechner @ 2022-09-05 21:16 UTC (permalink / raw)
To: ffmpeg-devel; +Cc: Lukas Fellechner
This patch adds an "init_threads" option, specifying the max
number of threads to use. Multiple worker threads are spun up
to massively bring down init times.
---
libavformat/dashdec.c | 286 +++++++++++++++++++++++++++++++++++++++++-
1 file changed, 285 insertions(+), 1 deletion(-)
diff --git a/libavformat/dashdec.c b/libavformat/dashdec.c
index e82da45e43..0532e2c918 100644
--- a/libavformat/dashdec.c
+++ b/libavformat/dashdec.c
@@ -24,6 +24,8 @@
#include "libavutil/opt.h"
#include "libavutil/time.h"
#include "libavutil/parseutils.h"
+#include "libavutil/thread.h"
+#include "libavutil/slicethread.h"
#include "internal.h"
#include "avio_internal.h"
#include "dash.h"
@@ -152,6 +154,8 @@ typedef struct DASHContext {
int max_url_size;
char *cenc_decryption_key;
+ int init_threads;
+
/* Flags for init section*/
int is_init_section_common_video;
int is_init_section_common_audio;
@@ -2033,6 +2037,265 @@ static void move_metadata(AVStream *st, const char *key, char **value)
}
}
+#if HAVE_THREADS
+
+typedef struct WorkPoolData
+{
+ AVFormatContext *ctx;
+ struct representation *pls;
+ struct representation *common_pls;
+ pthread_mutex_t *common_mutex;
+ pthread_cond_t *common_condition;
+ int is_common;
+ int is_started;
+ int result;
+} WorkPoolData;
+
+static void thread_worker(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads)
+{
+ WorkPoolData *work_pool = (WorkPoolData*)priv;
+ WorkPoolData *data = work_pool + jobnr;
+ int ret;
+
+ // if we are common section provider, init and signal
+ if (data->is_common) {
+ data->pls->parent = data->ctx;
+ ret = update_init_section(data->pls);
+ if (ret < 0) {
+ pthread_cond_signal(data->common_condition);
+ goto end;
+ }
+ else
+ ret = AVERROR(pthread_cond_signal(data->common_condition));
+ }
+
+ // if we depend on common section provider, wait for signal and copy
+ if (data->common_pls) {
+ ret = AVERROR(pthread_cond_wait(data->common_condition, data->common_mutex));
+ if (ret < 0)
+ goto end;
+
+ if (!data->common_pls->init_sec_buf) {
+ goto end;
+ ret = AVERROR(EFAULT);
+ }
+
+ ret = copy_init_section(data->pls, data->common_pls);
+ if (ret < 0)
+ goto end;
+ }
+
+ ret = begin_open_demux_for_component(data->ctx, data->pls);
+ if (ret < 0)
+ goto end;
+
+end:
+ data->result = ret;
+}
+
+static void create_work_pool_data(AVFormatContext *ctx, int *stream_index,
+ struct representation **streams, int num_streams, int is_init_section_common,
+ WorkPoolData *work_pool, pthread_mutex_t* common_mutex,
+ pthread_cond_t* common_condition)
+{
+ work_pool += *stream_index;
+
+ for (int i = 0; i < num_streams; i++) {
+ work_pool->ctx = ctx;
+ work_pool->pls = streams[i];
+ work_pool->pls->stream_index = *stream_index;
+ work_pool->common_condition = common_condition;
+ work_pool->common_mutex = common_mutex;
+ work_pool->result = -1;
+
+ if (is_init_section_common) {
+ if (i == 0)
+ work_pool->is_common = 1;
+ else
+ work_pool->common_pls = streams[0];
+ }
+
+ work_pool++;
+ *stream_index = *stream_index + 1;
+ }
+}
+
+static pthread_mutex_t* create_mutex()
+{
+ pthread_mutex_t* mutex = (pthread_mutex_t*)av_malloc(sizeof(pthread_mutex_t));
+ if (!mutex)
+ return NULL;
+
+ if (pthread_mutex_init(mutex, NULL)) {
+ av_free(mutex);
+ return NULL;
+ }
+
+ return mutex;
+}
+
+static int free_mutex(pthread_mutex_t **mutex)
+{
+ int ret = 0;
+ if (*mutex) {
+ ret = pthread_mutex_destroy(*mutex);
+ av_free(*mutex);
+ *mutex = NULL;
+ }
+ return ret;
+}
+
+static pthread_cond_t* create_cond()
+{
+ pthread_cond_t* cond = (pthread_cond_t*)av_malloc(sizeof(pthread_cond_t));
+ if (!cond)
+ return NULL;
+
+ if (pthread_cond_init(cond, NULL)) {
+ av_free(cond);
+ return NULL;
+ }
+
+ return cond;
+}
+
+static int free_cond(pthread_cond_t **cond)
+{
+ int ret = 0;
+ if (*cond) {
+ ret = pthread_cond_destroy(*cond);
+ av_free(*cond);
+ *cond = NULL;
+ }
+ return ret;
+}
+
+static int init_streams_multithreaded(AVFormatContext *s, int nstreams, int threads)
+{
+ DASHContext *c = s->priv_data;
+ int ret = 0;
+ int stream_index = 0;
+ AVSliceThread *slice_thread;
+
+ // we need to cleanup even in case of errors,
+ // so we need to store results of run and cleanup phase
+ int initResult = 0;
+ int runResult = 0;
+ int cleanupResult = 0;
+
+ // alloc data
+ WorkPoolData *work_pool = (WorkPoolData*)av_mallocz(
+ sizeof(WorkPoolData) * nstreams);
+ if (!work_pool)
+ return AVERROR(ENOMEM);
+
+ if (!avpriv_slicethread_create(&slice_thread, (void*)work_pool, &thread_worker, NULL, threads)) {
+ av_free(work_pool);
+ return AVERROR(ENOMEM);
+}
+
+ // alloc mutex and conditions
+ c->init_mutex = create_mutex();
+
+ pthread_mutex_t *common_video_mutex = create_mutex();
+ pthread_cond_t *common_video_cond = create_cond();
+
+ pthread_mutex_t *common_audio_mutex = create_mutex();
+ pthread_cond_t *common_audio_cond = create_cond();
+
+ pthread_mutex_t *common_subtitle_mutex = create_mutex();
+ pthread_cond_t *common_subtitle_cond = create_cond();
+
+ if (!(c->init_mutex && common_video_mutex && common_video_cond && common_audio_mutex &&
+ common_audio_cond && common_subtitle_mutex && common_subtitle_cond)) {
+ initResult = AVERROR(ENOMEM);
+ goto cleanup;
+ }
+
+ // set work pool data
+ create_work_pool_data(s, &stream_index, c->videos, c->n_videos,
+ c->is_init_section_common_video, work_pool,
+ common_video_mutex, common_video_cond);
+
+ create_work_pool_data(s, &stream_index, c->audios, c->n_audios,
+ c->is_init_section_common_audio, work_pool,
+ common_audio_mutex, common_audio_cond);
+
+ create_work_pool_data(s, &stream_index, c->subtitles, c->n_subtitles,
+ c->is_init_section_common_subtitle, work_pool,
+ common_subtitle_mutex, common_subtitle_cond);
+
+ // run threads
+ avpriv_slicethread_execute(slice_thread, nstreams, 0);
+
+ // finalize streams and collect results
+ WorkPoolData* current_data = work_pool;
+ for (int i = 0; i < nstreams; i++) {
+ if (current_data->result < 0) {
+ // thread ran into error: collect result and break
+ runResult = current_data->result;
+ break;
+ }
+ else {
+ // thread success: create streams on AVFormatContext
+ ret = end_open_demux_for_component(s, current_data->pls);
+ if (ret < 0) {
+ runResult = ret;
+ break;
+ }
+ }
+ current_data++;
+ }
+
+cleanup:
+ // cleanup mutex and conditions
+ ret = free_mutex(&c->init_mutex);
+ if (ret < 0)
+ cleanupResult = ret;
+
+ ret = free_mutex(&common_video_mutex);
+ if (ret < 0)
+ cleanupResult = ret;
+
+ ret = free_cond(&common_video_cond);
+ if (ret < 0)
+ cleanupResult = ret;
+
+ ret = free_mutex(&common_audio_mutex);
+ if (ret < 0)
+ cleanupResult = ret;
+
+ ret = free_cond(&common_audio_cond);
+ if (ret < 0)
+ cleanupResult = ret;
+
+ ret = free_mutex(&common_subtitle_mutex);
+ if (ret < 0)
+ cleanupResult = ret;
+
+ ret = free_cond(&common_subtitle_cond);
+ if (ret < 0)
+ cleanupResult = ret;
+
+ // cleanup threads and workpool
+ av_free(work_pool);
+ avpriv_slicethread_free(&slice_thread);
+
+ // return results if errors have occured in one of the phases
+ if (initResult < 0)
+ return initResult;
+
+ if (runResult < 0)
+ return runResult;
+
+ if (cleanupResult < 0)
+ return cleanupResult;
+
+ return 0;
+}
+
+#endif
+
static int dash_read_header(AVFormatContext *s)
{
DASHContext *c = s->priv_data;
@@ -2067,6 +2330,23 @@ static int dash_read_header(AVFormatContext *s)
if (c->n_subtitles)
c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles);
+ int threads = 1;
+ int nstreams = c->n_videos + c->n_audios + c->n_subtitles;
+
+#if HAVE_THREADS
+ threads = FFMIN(nstreams, c->init_threads);
+#endif
+
+ if (threads > 1)
+ {
+#if HAVE_THREADS
+ ret = init_streams_multithreaded(s, nstreams, threads);
+ if (ret < 0)
+ return ret;
+#endif
+ }
+ else
+ {
/* Open the demuxer for video and audio components if available */
for (i = 0; i < c->n_videos; i++) {
rep = c->videos[i];
@@ -2115,6 +2395,7 @@ static int dash_read_header(AVFormatContext *s)
if (!stream_index)
return AVERROR_INVALIDDATA;
+ }
/* Create a program */
program = av_new_program(s, 0);
@@ -2366,7 +2647,10 @@ static const AVOption dash_options[] = {
OFFSET(allowed_extensions), AV_OPT_TYPE_STRING,
{.str = "aac,m4a,m4s,m4v,mov,mp4,webm,ts"},
INT_MIN, INT_MAX, FLAGS},
- { "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key), AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS },
+ { "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key),
+ AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS },
+ { "init_threads", "Number of threads to use for initializing the DASH stream",
+ OFFSET(init_threads), AV_OPT_TYPE_INT, {.i64 = 1}, 1, INT_MAX, FLAGS },
{NULL}
};
--
2.28.0.windows.1
_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
To unsubscribe, visit link above, or email
ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
^ permalink raw reply [flat|nested] 26+ messages in thread
* [FFmpeg-devel] [PATCH v4 3/4] lavf/dashdec: Prevent cross-thread avio_opts modification
2022-09-05 21:16 ` [FFmpeg-devel] [PATCH v4 0/4] lavf/dashdec: Multithreaded DASH initialization Lukas Fellechner
2022-09-05 21:16 ` [FFmpeg-devel] [PATCH v4 1/4] lavf/dashdec: Prepare DASH decoder for multithreading Lukas Fellechner
2022-09-05 21:16 ` [FFmpeg-devel] [PATCH v4 2/4] lavf/dashdec: Multithreaded DASH initialization Lukas Fellechner
@ 2022-09-05 21:16 ` Lukas Fellechner
2022-09-05 21:16 ` [FFmpeg-devel] [PATCH v4 4/4] lavf/dashdec: Fix indentation after adding multithreading Lukas Fellechner
3 siblings, 0 replies; 26+ messages in thread
From: Lukas Fellechner @ 2022-09-05 21:16 UTC (permalink / raw)
To: ffmpeg-devel; +Cc: Lukas Fellechner
open_url modifies the shared avio_opts dict (update cookies).
This can cause problems during multithreaded initialization.
To prevent this, I take a copy of avio_opts, use that in open_url,
and copy the updated dict back afterwards.
---
libavformat/dashdec.c | 34 ++++++++++++++++++++++++++++++++--
1 file changed, 32 insertions(+), 2 deletions(-)
diff --git a/libavformat/dashdec.c b/libavformat/dashdec.c
index 0532e2c918..19e657d836 100644
--- a/libavformat/dashdec.c
+++ b/libavformat/dashdec.c
@@ -156,6 +156,11 @@ typedef struct DASHContext {
int init_threads;
+#if HAVE_THREADS
+ /* Set during parallel initialization, to allow locking of avio_opts */
+ pthread_mutex_t *init_mutex;
+#endif
+
/* Flags for init section*/
int is_init_section_common_video;
int is_init_section_common_audio;
@@ -1699,7 +1704,32 @@ static int open_input(DASHContext *c, struct representation *pls, struct fragmen
ff_make_absolute_url(url, c->max_url_size, c->base_url, seg->url);
av_log(pls->parent, AV_LOG_VERBOSE, "DASH request for url '%s', offset %"PRId64"\n",
url, seg->url_offset);
- ret = open_url(pls->parent, &pls->input, url, &c->avio_opts, opts, NULL);
+
+ AVDictionary *avio_opts = c->avio_opts;
+
+#if HAVE_THREADS
+ // If we are doing parallel initialization, take a snapshot of the avio_opts,
+ // and copy the modified dictionary ("cookies" updated) back, after the url is opened.
+ if (c->init_mutex) {
+ pthread_mutex_lock(c->init_mutex);
+ avio_opts = NULL;
+ ret = av_dict_copy(&avio_opts, c->avio_opts, 0);
+ pthread_mutex_unlock(c->init_mutex);
+ if (ret < 0)
+ goto cleanup;
+ }
+#endif
+
+ ret = open_url(pls->parent, &pls->input, url, &avio_opts, opts, NULL);
+
+#if HAVE_THREADS
+ if (c->init_mutex) {
+ pthread_mutex_lock(c->init_mutex);
+ av_dict_free(&c->avio_opts);
+ c->avio_opts = avio_opts;
+ pthread_mutex_unlock(c->init_mutex);
+ }
+#endif
cleanup:
av_free(url);
@@ -2192,7 +2222,7 @@ static int init_streams_multithreaded(AVFormatContext *s, int nstreams, int thre
if (!avpriv_slicethread_create(&slice_thread, (void*)work_pool, &thread_worker, NULL, threads)) {
av_free(work_pool);
return AVERROR(ENOMEM);
-}
+ }
// alloc mutex and conditions
c->init_mutex = create_mutex();
--
2.28.0.windows.1
_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
To unsubscribe, visit link above, or email
ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
^ permalink raw reply [flat|nested] 26+ messages in thread
* [FFmpeg-devel] [PATCH v4 4/4] lavf/dashdec: Fix indentation after adding multithreading
2022-09-05 21:16 ` [FFmpeg-devel] [PATCH v4 0/4] lavf/dashdec: Multithreaded DASH initialization Lukas Fellechner
` (2 preceding siblings ...)
2022-09-05 21:16 ` [FFmpeg-devel] [PATCH v4 3/4] lavf/dashdec: Prevent cross-thread avio_opts modification Lukas Fellechner
@ 2022-09-05 21:16 ` Lukas Fellechner
3 siblings, 0 replies; 26+ messages in thread
From: Lukas Fellechner @ 2022-09-05 21:16 UTC (permalink / raw)
To: ffmpeg-devel; +Cc: Lukas Fellechner
Whitespace change only. No functional changes.
---
libavformat/dashdec.c | 74 +++++++++++++++++++++----------------------
1 file changed, 37 insertions(+), 37 deletions(-)
diff --git a/libavformat/dashdec.c b/libavformat/dashdec.c
index 19e657d836..22f727da3b 100644
--- a/libavformat/dashdec.c
+++ b/libavformat/dashdec.c
@@ -2377,54 +2377,54 @@ static int dash_read_header(AVFormatContext *s)
}
else
{
- /* Open the demuxer for video and audio components if available */
- for (i = 0; i < c->n_videos; i++) {
- rep = c->videos[i];
- if (i > 0 && c->is_init_section_common_video) {
- ret = copy_init_section(rep, c->videos[0]);
- if (ret < 0)
+ /* Open the demuxer for video and audio components if available */
+ for (i = 0; i < c->n_videos; i++) {
+ rep = c->videos[i];
+ if (i > 0 && c->is_init_section_common_video) {
+ ret = copy_init_section(rep, c->videos[0]);
+ if (ret < 0)
+ return ret;
+ }
+ ret = open_demux_for_component(s, rep);
+
+ if (ret)
return ret;
+ rep->stream_index = stream_index;
+ ++stream_index;
}
- ret = open_demux_for_component(s, rep);
- if (ret)
- return ret;
- rep->stream_index = stream_index;
- ++stream_index;
- }
+ for (i = 0; i < c->n_audios; i++) {
+ rep = c->audios[i];
+ if (i > 0 && c->is_init_section_common_audio) {
+ ret = copy_init_section(rep, c->audios[0]);
+ if (ret < 0)
+ return ret;
+ }
+ ret = open_demux_for_component(s, rep);
- for (i = 0; i < c->n_audios; i++) {
- rep = c->audios[i];
- if (i > 0 && c->is_init_section_common_audio) {
- ret = copy_init_section(rep, c->audios[0]);
- if (ret < 0)
+ if (ret)
return ret;
+ rep->stream_index = stream_index;
+ ++stream_index;
}
- ret = open_demux_for_component(s, rep);
- if (ret)
- return ret;
- rep->stream_index = stream_index;
- ++stream_index;
- }
+ for (i = 0; i < c->n_subtitles; i++) {
+ rep = c->subtitles[i];
+ if (i > 0 && c->is_init_section_common_subtitle) {
+ ret = copy_init_section(rep, c->subtitles[0]);
+ if (ret < 0)
+ return ret;
+ }
+ ret = open_demux_for_component(s, rep);
- for (i = 0; i < c->n_subtitles; i++) {
- rep = c->subtitles[i];
- if (i > 0 && c->is_init_section_common_subtitle) {
- ret = copy_init_section(rep, c->subtitles[0]);
- if (ret < 0)
+ if (ret)
return ret;
+ rep->stream_index = stream_index;
+ ++stream_index;
}
- ret = open_demux_for_component(s, rep);
- if (ret)
- return ret;
- rep->stream_index = stream_index;
- ++stream_index;
- }
-
- if (!stream_index)
- return AVERROR_INVALIDDATA;
+ if (!stream_index)
+ return AVERROR_INVALIDDATA;
}
/* Create a program */
--
2.28.0.windows.1
_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
To unsubscribe, visit link above, or email
ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
^ permalink raw reply [flat|nested] 26+ messages in thread