From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from ffbox0-bg.mplayerhq.hu (ffbox0-bg.ffmpeg.org [79.124.17.100]) by master.gitmailbox.com (Postfix) with ESMTP id BD18D400AB for ; Sat, 20 Aug 2022 21:54:04 +0000 (UTC) Received: from [127.0.1.1] (localhost [127.0.0.1]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTP id 4403C68B989; Sun, 21 Aug 2022 00:54:01 +0300 (EEST) Received: from mout.gmx.net (mout.gmx.net [212.227.17.21]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTPS id A7BB768B405 for ; Sun, 21 Aug 2022 00:53:55 +0300 (EEST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=gmx.net; s=badeba3b8450; t=1661032435; bh=7yZaS49dEB77ydLMbWXwwFlSdtJEDL9Op/PTSXY2/uA=; h=X-UI-Sender-Class:From:To:Subject:Date:In-Reply-To:References; b=b74tApmnMj0WsuT34MhQhhgc009XVlYAnheEQ2VP4VPv6iHKyOfgRhdN+kgDOvyAF xa5HBu8Fz3xZN/KvV+fJwMS8JgqPrm7GbpYKi5fT6E2jGWJrtVNEElURSvv0tRNz0h fyz4onmBvj6C9kOyfCgCfoe0yrnXvq5CcLyc3SE4= X-UI-Sender-Class: 01bb95c1-4bf8-414a-932a-4f6e2808ef9c Received: from [94.134.107.190] ([94.134.107.190]) by web-mail.gmx.net (3c-app-gmx-bs35.server.lan [172.19.170.87]) (via HTTP); Sat, 20 Aug 2022 23:53:54 +0200 MIME-Version: 1.0 Message-ID: From: Lukas Fellechner To: ffmpeg-devel@ffmpeg.org Date: Sat, 20 Aug 2022 23:53:54 +0200 Importance: normal Sensitivity: Normal In-Reply-To: References: X-UI-Message-Type: mail X-Priority: 3 X-Provags-ID: V03:K1:k3RftN6Hg+3T9l/1J70Ple7Yczir+BjZxTiAg9x3lUAy9nqyztwa+jjEVGEgubuTZouiu 52Knm/5RO1HVaXEVNuapzNwIzRgiS2jCa9BBahy+KkU4SYsfXgJWT29Xw2HWATb88Kx5ccfET3v0 dhfAegCmexigEIUKO6O5XSW1fLqRNDzRfDocxh3vq5HDiV/jqT3WrsX30eUThUfERRKCPG4jUOa7 pHVAQyTzsSoplCR/6IsuAwG+wCSXb7jb6Lk60bjXLdcklQiD7pqBzZuSodDk0NkWfl3w6sxM+t98 xU= X-Spam-Flag: NO X-UI-Out-Filterresults: notjunk:1;V03:K0:oRkGqsMVfGg=:1881eiWVjiZOHRtEov4+qz E4J0EQpvwjMzdnJxDrLTUITyvaoFvUpbeLFwWC5kMiC2+9pxaE4/BOhd6Urok+a0vfz3dlK+s m5nM8T3GCsd7gAiI1LXfdB4FABCyxIJ8xCeGxPm5XS/E3FlAtTl9ZN5VJ9GX7jhxsBN+SRRVU f+uwKY2mybIemmPaHC+BJaQHZCsJGaKtDPOWBj5fnWcf2voCcrJ9eTGQ2AnZ+fXCbHPa26qpT 3h2F0Gsf4u/13RIpFIwUDHUk/PE+XQ5BOAObmcWCe4frGTGiYWaqAb7Y+9MbwC+tgDCYZU2lw MZnlC1hMo1nZqrDm4tO2DMMbLbpNQIuMfsGRu7X1NKpj/hbIkoUuFB9nu8FAatX4IWuFQzXkR AcwOD+PYRjWFXgRAZWZGIJDnHd3PXwmP5FsDzu0MlVeTYBbrZiliKU/pRLDKIdCMr5GGU/h4j MtTRYm+R9m12TPUY1K6bL3n+T1opTWIxLUU8qCFeCD7fBdaw0Tv+JwF4GiuNXmH2LyLG6vCgT V2/sgZWUroYkHLV3xBwT02hQ9iGaEzX9FGR5Syud1bIqipE4fhbUqwCuCoFWB3MXlZt7yIh+g 2E+t4uPmkyIbYZK7epmnWZhym2mNojiJbl+JjYUh7UEFKg8SnG14L84xsF/i7RNBtYG7FSFIW FJ1vZS3HHnMYkBeLDU3w3I4Gy3rV8+DyZV7iObeQ7/rgcBquDz0KwuqR1qQplrsTgGd3FL7OS WlCkZUon5rHbr9Jsq1zVlzxwkQKw4/aekRFlvwXVwIySte82lffo2cGtCZdZyulAz8Em9+hm1 cwgwV1c Subject: Re: [FFmpeg-devel] [PATCH 1/1] lavf/dashdec: Multithreaded DASH initialization X-BeenThere: ffmpeg-devel@ffmpeg.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: FFmpeg development discussions and patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: FFmpeg development discussions and patches Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: ffmpeg-devel-bounces@ffmpeg.org Sender: "ffmpeg-devel" Archived-At: List-Archive: List-Post: Trying with inline PATCH since attached file was not showing up... --- From: Lukas Fellechner Subject: [PATCH 1/1] lavf/dashdec: Multithreaded DASH initialization Initializing DASH streams is currently slow, because each individual stream is opened and probed sequentially. With DASH streams often having somewhere between 10-20 streams, this can easily take up to half a minute. This patch adds an "init-threads" option, specifying the max number of threads to use. Multiple worker threads are spun up to massively bring down init times. --- libavformat/dashdec.c | 421 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 375 insertions(+), 46 deletions(-) diff --git a/libavformat/dashdec.c b/libavformat/dashdec.c index 63bf7e96a5..69a6c2ba79 100644 --- a/libavformat/dashdec.c +++ b/libavformat/dashdec.c @@ -24,6 +24,7 @@ #include "libavutil/opt.h" #include "libavutil/time.h" #include "libavutil/parseutils.h" +#include "libavutil/thread.h" #include "internal.h" #include "avio_internal.h" #include "dash.h" @@ -152,6 +153,8 @@ typedef struct DASHContext { int max_url_size; char *cenc_decryption_key; + int init_threads; + /* Flags for init section*/ int is_init_section_common_video; int is_init_section_common_audio; @@ -1918,22 +1921,40 @@ fail: return ret; } -static int open_demux_for_component(AVFormatContext *s, struct representation *pls) +static int open_demux_for_component(AVFormatContext* s, struct representation* pls) +{ + int ret = 0; + + ret = begin_open_demux_for_component(s, pls); + if (ret < 0) + return ret; + + ret = end_open_demux_for_component(s, pls); + + return ret; +} + +static int begin_open_demux_for_component(AVFormatContext* s, struct representation* pls) { int ret = 0; - int i; pls->parent = s; - pls->cur_seq_no = calc_cur_seg_no(s, pls); + pls->cur_seq_no = calc_cur_seg_no(s, pls); if (!pls->last_seq_no) { pls->last_seq_no = calc_max_seg_no(pls, s->priv_data); } ret = reopen_demux_for_component(s, pls); - if (ret < 0) { - goto fail; - } + + return ret; +} + +static int end_open_demux_for_component(AVFormatContext* s, struct representation* pls) +{ + int ret = 0; + int i; + for (i = 0; i < pls->ctx->nb_streams; i++) { AVStream *st = avformat_new_stream(s, NULL); AVStream *ist = pls->ctx->streams[i]; @@ -2015,6 +2036,131 @@ static void move_metadata(AVStream *st, const char *key, char **value) } } +struct work_pool_data +{ + AVFormatContext* ctx; + struct representation* pls; + struct representation* common_pls; + pthread_mutex_t* common_mutex; + pthread_cond_t* common_condition; + int is_common; + int is_started; + int result; +}; + +struct thread_data +{ + pthread_t thread; + pthread_mutex_t* mutex; + struct work_pool_data* work_pool; + int work_pool_size; + int is_started; +}; + +static void *worker_thread(void *ptr) +{ + int ret = 0; + int i; + struct thread_data* thread_data = (struct thread_data*)ptr; + struct work_pool_data* work_pool = NULL; + struct work_pool_data* data = NULL; + for (;;) { + + // get next work item + pthread_mutex_lock(thread_data->mutex); + data = NULL; + work_pool = thread_data->work_pool; + for (i = 0; i < thread_data->work_pool_size; i++) { + if (!work_pool->is_started) { + data = work_pool; + data->is_started = 1; + break; + } + work_pool++; + } + pthread_mutex_unlock(thread_data->mutex); + + if (!data) { + // no more work to do + return NULL; + } + + // if we are common section provider, init and signal + if (data->is_common) { + data->pls->parent = data->ctx; + ret = update_init_section(data->pls); + if (ret < 0) { + pthread_cond_signal(data->common_condition); + goto end; + } + else + ret = AVERROR(pthread_cond_signal(data->common_condition)); + } + + // if we depend on common section provider, wait for signal and copy + if (data->common_pls) { + ret = AVERROR(pthread_cond_wait(data->common_condition, data->common_mutex)); + if (ret < 0) + goto end; + + if (!data->common_pls->init_sec_buf) { + goto end; + ret = AVERROR(EFAULT); + } + + ret = copy_init_section(data->pls, data->common_pls); + if (ret < 0) + goto end; + } + + ret = begin_open_demux_for_component(data->ctx, data->pls); + if (ret < 0) + goto end; + + end: + data->result = ret; + } + + + return NULL; +} + +static void create_work_pool_data(AVFormatContext* ctx, int stream_index, + struct representation* pls, struct representation* common_pls, + struct work_pool_data* init_data, pthread_mutex_t* common_mutex, + pthread_cond_t* common_condition) +{ + init_data->ctx = ctx; + init_data->pls = pls; + init_data->pls->stream_index = stream_index; + init_data->common_condition = common_condition; + init_data->common_mutex = common_mutex; + init_data->result = -1; + + if (pls == common_pls) { + init_data->is_common = 1; + } + else if (common_pls) { + init_data->common_pls = common_pls; + } +} + +static int start_thread(struct thread_data *thread_data, + struct work_pool_data *work_pool, int work_pool_size, pthread_mutex_t *mutex) +{ + int ret; + + thread_data->mutex = mutex; + thread_data->work_pool = work_pool; + thread_data->work_pool_size = work_pool_size; + + ret = AVERROR(pthread_create(&thread_data->thread, NULL, worker_thread, (void*)thread_data)); + if (ret == 0) + thread_data->is_started = 1; + + return ret; +} + static int dash_read_header(AVFormatContext *s) { DASHContext *c = s->priv_data; @@ -2040,63 +2186,245 @@ static int dash_read_header(AVFormatContext *s) av_dict_set(&c->avio_opts, "seekable", "0", 0); } - if(c->n_videos) + if (c->n_videos) c->is_init_section_common_video = is_common_init_section_exist(c->videos, c->n_videos); - /* Open the demuxer for video and audio components if available */ - for (i = 0; i < c->n_videos; i++) { - rep = c->videos[i]; - if (i > 0 && c->is_init_section_common_video) { - ret = copy_init_section(rep, c->videos[0]); + if (c->n_audios) + c->is_init_section_common_audio = is_common_init_section_exist(c->audios, c->n_audios); + + if (c->n_subtitles) + c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles); + + int nstreams = c->n_videos + c->n_audios + c->n_subtitles; + int threads = FFMIN(nstreams, c->init_threads); + + if (threads > 1) + { + // alloc data + struct work_pool_data* init_data = (struct work_pool_data*)av_mallocz(sizeof(struct work_pool_data) * nstreams); + if (!init_data) + return AVERROR(ENOMEM); + + struct thread_data* thread_data = (struct thread_data*)av_mallocz(sizeof(struct thread_data) * threads); + if (!thread_data) + return AVERROR(ENOMEM); + + // alloc mutex and conditions + pthread_mutex_t work_pool_mutex; + + pthread_mutex_t common_video_mutex; + pthread_cond_t common_video_cond; + + pthread_mutex_t common_audio_mutex; + pthread_cond_t common_audio_cond; + + pthread_mutex_t common_subtitle_mutex; + pthread_cond_t common_subtitle_cond; + + // init mutex and conditions + ret = AVERROR(pthread_mutex_init(&work_pool_mutex, NULL)); + if (ret < 0) + goto cleanup; + + if (c->is_init_section_common_video) { + ret = AVERROR(pthread_mutex_init(&common_video_mutex, NULL)); if (ret < 0) - return ret; + goto cleanup; + + ret = AVERROR(pthread_cond_init(&common_video_cond, NULL)); + if (ret < 0) + goto cleanup; } - ret = open_demux_for_component(s, rep); - if (ret) - return ret; - rep->stream_index = stream_index; - ++stream_index; - } + if (c->is_init_section_common_audio) { + ret = AVERROR(pthread_mutex_init(&common_audio_mutex, NULL)); + if (ret < 0) + goto cleanup; - if(c->n_audios) - c->is_init_section_common_audio = is_common_init_section_exist(c->audios, c->n_audios); + ret = AVERROR(pthread_cond_init(&common_audio_cond, NULL)); + if (ret < 0) + goto cleanup; + } - for (i = 0; i < c->n_audios; i++) { - rep = c->audios[i]; - if (i > 0 && c->is_init_section_common_audio) { - ret = copy_init_section(rep, c->audios[0]); + if (c->is_init_section_common_subtitle) { + ret = AVERROR(pthread_mutex_init(&common_subtitle_mutex, NULL)); if (ret < 0) - return ret; + goto cleanup; + + ret = AVERROR(pthread_cond_init(&common_subtitle_cond, NULL)); + if (ret < 0) + goto cleanup; } - ret = open_demux_for_component(s, rep); - if (ret) - return ret; - rep->stream_index = stream_index; - ++stream_index; - } + // init work pool data + struct work_pool_data* current_data = init_data; - if (c->n_subtitles) - c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles); + for (i = 0; i < c->n_videos; i++) { + create_work_pool_data(s, stream_index, c->videos[i], + c->is_init_section_common_video ? c->videos[0] : NULL, + current_data, &common_video_mutex, &common_video_cond); - for (i = 0; i < c->n_subtitles; i++) { - rep = c->subtitles[i]; - if (i > 0 && c->is_init_section_common_subtitle) { - ret = copy_init_section(rep, c->subtitles[0]); + stream_index++; + current_data++; + } + + for (i = 0; i < c->n_audios; i++) { + create_work_pool_data(s, stream_index, c->audios[i], + c->is_init_section_common_audio ? c->audios[0] : NULL, + current_data, &common_audio_mutex, &common_audio_cond); + + stream_index++; + current_data++; + } + + for (i = 0; i < c->n_subtitles; i++) { + create_work_pool_data(s, stream_index, c->subtitles[i], + c->is_init_section_common_subtitle ? c->subtitles[0] : NULL, + current_data, &common_subtitle_mutex, &common_subtitle_cond); + + stream_index++; + current_data++; + } + + // start threads + struct thread_data* current_thread = thread_data; + for (i = 0; i < threads; i++) { + ret = start_thread(current_thread, init_data, nstreams, &work_pool_mutex); if (ret < 0) - return ret; + goto cleanup; + + current_thread++; } - ret = open_demux_for_component(s, rep); - if (ret) - return ret; - rep->stream_index = stream_index; - ++stream_index; + cleanup: + // we need to cleanup even in case of errors, so we need to store results of init, run and cleanup + int initResult = ret; + int runResult = 0; + int cleanupResult = 0; + + // join threads + current_thread = thread_data; + for (i = 0; i < threads; i++) { + if (current_thread->is_started) { + ret = AVERROR(pthread_join(current_thread->thread, NULL)); + if (ret < 0) + cleanupResult = ret; + } + current_thread++; + } + + // finalize streams and collect results + current_data = init_data; + for (i = 0; i < nstreams; i++) { + if (current_data->result < 0) { + // thread ran into error: collect result + runResult = current_data->result; + } + else { + // thread success: create streams on AVFormatContext + ret = end_open_demux_for_component(s, current_data->pls); + if (ret < 0) + runResult = ret; + } + current_data++; + } + + // cleanup mutex and conditions + ret = AVERROR(pthread_mutex_destroy(&work_pool_mutex)); + if (ret < 0) + cleanupResult = ret; + + if (c->is_init_section_common_video) { + ret = AVERROR(pthread_mutex_destroy(&common_video_mutex)); + if (ret < 0) + cleanupResult = ret; + + ret = AVERROR(pthread_cond_destroy(&common_video_cond)); + if (ret < 0) + cleanupResult = ret; + } + + if (c->is_init_section_common_audio) { + ret = AVERROR(pthread_mutex_destroy(&common_audio_mutex)); + if (ret < 0) + cleanupResult = ret; + + ret = AVERROR(pthread_cond_destroy(&common_audio_cond)); + if (ret < 0) + cleanupResult = ret; + } + + if (c->is_init_section_common_subtitle) { + ret = AVERROR(pthread_mutex_destroy(&common_subtitle_mutex)); + if (ret < 0) + cleanupResult = ret; + + ret = AVERROR(pthread_cond_destroy(&common_subtitle_cond)); + if (ret < 0) + cleanupResult = ret; + } + + // return results if errors have occured in one of the phases + if (initResult < 0) + return initResult; + + if (runResult < 0) + return runResult; + + if (cleanupResult < 0) + return cleanupResult; } + else + { + /* Open the demuxer for video and audio components if available */ + for (i = 0; i < c->n_videos; i++) { + rep = c->videos[i]; + if (i > 0 && c->is_init_section_common_video) { + ret = copy_init_section(rep, c->videos[0]); + if (ret < 0) + return ret; + } + ret = open_demux_for_component(s, rep); - if (!stream_index) - return AVERROR_INVALIDDATA; + if (ret) + return ret; + rep->stream_index = stream_index; + ++stream_index; + } + + for (i = 0; i < c->n_audios; i++) { + rep = c->audios[i]; + if (i > 0 && c->is_init_section_common_audio) { + ret = copy_init_section(rep, c->audios[0]); + if (ret < 0) + return ret; + } + ret = open_demux_for_component(s, rep); + + if (ret) + return ret; + rep->stream_index = stream_index; + ++stream_index; + } + + for (i = 0; i < c->n_subtitles; i++) { + rep = c->subtitles[i]; + if (i > 0 && c->is_init_section_common_subtitle) { + ret = copy_init_section(rep, c->subtitles[0]); + if (ret < 0) + return ret; + } + ret = open_demux_for_component(s, rep); + + if (ret) + return ret; + rep->stream_index = stream_index; + ++stream_index; + } + + if (!stream_index) + return AVERROR_INVALIDDATA; + } /* Create a program */ program = av_new_program(s, 0); @@ -2349,6 +2677,7 @@ static const AVOption dash_options[] = { {.str = "aac,m4a,m4s,m4v,mov,mp4,webm,ts"}, INT_MIN, INT_MAX, FLAGS}, { "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key), AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS }, + { "init_threads", "Number of threads to use for initializing the DASH stream", OFFSET(init_threads), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 64, FLAGS }, {NULL} }; -- 2.31.1.windows.1 _______________________________________________ ffmpeg-devel mailing list ffmpeg-devel@ffmpeg.org https://ffmpeg.org/mailman/listinfo/ffmpeg-devel To unsubscribe, visit link above, or email ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".