From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from ffbox0-bg.mplayerhq.hu (ffbox0-bg.ffmpeg.org [79.124.17.100]) by master.gitmailbox.com (Postfix) with ESMTP id 6608543F73 for ; Sat, 20 Aug 2022 21:35:18 +0000 (UTC) Received: from [127.0.1.1] (localhost [127.0.0.1]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTP id 87FC368B970; Sun, 21 Aug 2022 00:35:15 +0300 (EEST) Received: from mout.gmx.net (mout.gmx.net [212.227.17.21]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTPS id 7B7CD68B743 for ; Sun, 21 Aug 2022 00:35:08 +0300 (EEST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=gmx.net; s=badeba3b8450; t=1661031307; bh=NM5UYXF0w/QIzokm9n1P7RFcY9sPn1NUGkm1pVQ5lDM=; h=X-UI-Sender-Class:From:To:Subject:Date; b=CYPXcXOZoFKFVJmx+hB5K4eGoXDhgJrjFSXx7Hii6hQlRVsHTOp7kROt6lpCxnr22 XHb4t5UUb+jyTOnT4p5wqYiNIdssz+f2UanppqMhpp0J3fuKquh+hXb3djUvJpZr1E 7FZj2qIoUOqARcHXLejEP/I8BUryJ3YAIdWsuGm0= X-UI-Sender-Class: 01bb95c1-4bf8-414a-932a-4f6e2808ef9c Received: from [94.134.107.190] ([94.134.107.190]) by web-mail.gmx.net (3c-app-gmx-bs35.server.lan [172.19.170.87]) (via HTTP); Sat, 20 Aug 2022 23:35:07 +0200 MIME-Version: 1.0 Message-ID: From: Lukas Fellechner To: ffmpeg-devel@ffmpeg.org Content-Type: multipart/mixed; boundary=rehcsed-5bfa0b9a-3921-4474-88ab-82f1cf7ae0c6 Date: Sat, 20 Aug 2022 23:35:07 +0200 Importance: normal Sensitivity: Normal X-Priority: 3 X-Provags-ID: V03:K1:IUIYCg5ZPLSizvZ1OoFIV10x5KR6u2o0QMcEvn6vEed3jz5q7bIFJyHhghUVCZToRVwNf j+XqI8lUWZ76GnhCvlAuDgh1w6Sz/AwUeplh9usaQSp0k44MNypLU0b3kZ6lxurT9nHMU8hr7t1q XeQ8O7UXeFrZfabL62i4zeoctN7VUb+vVBz82BnTnrKWuierFfMejmE8//4+xwnO1py5i5hUjY4n r652Z5p8CHsNr00KkNxPnHhrtrI0ZdXitO2FXrGjhgrzgigW8JGQSOZ7Nf4c01+/M8jWjnqExEEc jg= X-Spam-Flag: NO X-UI-Out-Filterresults: notjunk:1;V03:K0:3fmhJh+lDtg=:9MiYbOI0KICAZteHn8xju2 Etg6AumpbNNAItD2wyONmo6oLWFjAlU++wQlPp9vUQTihQj0ncOpqA+jHbAgiA0RQkm4dqMR2 /H+bTjFa4k65CZSYve3wmPbHAY4lbPXQCkgmxEWK9BHnDc73g8V32VkTntWtc0SyYz5avC4IS KEJ/kv4g0zfVmRBw6qZYWiUFqGM2WWSi7lTXfVPbYLcidrj0wqCtcpFOXZ3uda9zoiHJYwTzv FADOUyMWIMGhRylso4tAumx5NP+w35Ogmnb9DiCITfVUrSqWpN1/WEJAF7Uwu/Ek+agO3cEQB Tzsa5Es1qydYR3T2ib9Q8HLVcFIGlKiILhhPr0E2+d5fS+p/itMjLVmk6EB9G6Re51RgeT5rO OKR/GdSNaomeqCNK7jboZlg3XC6M6YUfzoSa+xvebYRDxuV7Y0tJUvSs8qq5qXVIyGIU1W0mr +ZrEj2bZKuSjRAhhm8b2O4OzLwf71PKZWxdy1yZcqC0qMSzKcJ7mq3IpBoZMeymQKh3tXQ5ni T9E07PcK5EcicbxZrwyLjjfdQ+ZorEU4KnGAMBKTVRudT0ITxpL1wZw++NVn/agX8amrJ/mmr ox+UH/LfOpTwUkQqH5Z/+utJSF9O85OhzrKyc48rVf8Wk+F7rbCjp3O2vBB4hc3Jg8lpZnxQI pBV8PSYOWCZrC1YsvtUPMsyjws7VUsE8PkBZR7eO1NCM5dNmuYu7iUh4z7eTyl32mvChP9pVE xWtIoVccD89d55NXSKgLHqpAMmMdCv5OvXentXh/9XLeZTvolvK4Bd3mRZe7JHDKgWjQosbNi WW11HD9 Subject: [FFmpeg-devel] [PATCH 1/1] lavf/dashdec: Multithreaded DASH initialization X-BeenThere: ffmpeg-devel@ffmpeg.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: FFmpeg development discussions and patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: FFmpeg development discussions and patches Errors-To: ffmpeg-devel-bounces@ffmpeg.org Sender: "ffmpeg-devel" Archived-At: List-Archive: List-Post: --rehcsed-5bfa0b9a-3921-4474-88ab-82f1cf7ae0c6 Content-Type: text/plain; charset=UTF-8 This patch adds multithreading support to DASH initialization. Initializing DASH streams is currently slow, because each individual stream is opened and probed sequentially. With DASH streams often having somewhere between 10-20 substreams, this can easily take up to half a minute on slow connections. This patch adds an "init-threads" option, specifying the max number of threads to use for parallel probing and initialization of substreams. If "init-threads" is set to a value larger than 1, multiple worker threads are spun up to massively bring down init times. Here is a free DASH stream for testing: http://www.bok.net/dash/tears_of_steel/cleartext/stream.mpd It has 7 substreams. I currently get init times of 3.5 seconds without patch. The patch brings it down to about 0.5 seconds, so using 7 threads nearly cut down init times by factor 7. On a slower connection (100mbit), the same stream took 7-8 seconds to initialize, the patch brought it down to just over 1 second. In the current patch, the behavior is disabled by default (init-threads = 0). But I think it could make sense to enable it by default, maybe with a reasonable value of 8? Not sure though, open for discussion. Some notes on the actual implementation: - DASH streams sometimes share a common init section. If this is the case, a mutex and condition is used, to make sure that the first stream reads the common section before the following streams start initialization. - Only the init and probing part is done in parallel. After all threads are joined, I collect the results and add the AVStreams to the parent AVFormatContext. That is why I split open_demux_for_component() into begin_open_demux_for_component() and end_open_demux_for_component(). - I tried to do this as clean as possible and added multiple comments. - Multithreading is never simple, so a proper review is needed. If this gets merged, I might try to do the same for HLS. This is my first PR by the way, so please be nice :) Lukas --rehcsed-5bfa0b9a-3921-4474-88ab-82f1cf7ae0c6 Content-Type: application/octet-stream Content-Disposition: attachment; filename=0001-lavf-dashdec-Multithreaded-DASH-initialization.patch Content-Transfer-Encoding: quoted-printable =46rom ca906837c9b7534f245cb76b0df17cde7de51dba Mon Sep 17 00:00:00 2001 From: Lukas Fellechner Date: Fri, 19 Aug 2022 15:29:14 +0200 Subject: [PATCH 1/1] lavf/dashdec: Multithreaded DASH initialization Initializing DASH streams is currently slow, because each individual strea= m is opened and probed sequentially. With DASH streams often having somewh= ere between 10-20 streams, this can easily take up to half a minute. This = patch adds an "init-threads" option, specifying the max number of threads = to use. Multiple worker threads are spun up to massively bring down init t= imes. =2D-- libavformat/dashdec.c | 421 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 375 insertions(+), 46 deletions(-) diff --git a/libavformat/dashdec.c b/libavformat/dashdec.c index 63bf7e96a5..69a6c2ba79 100644 =2D-- a/libavformat/dashdec.c +++ b/libavformat/dashdec.c @@ -24,6 +24,7 @@ #include "libavutil/opt.h" #include "libavutil/time.h" #include "libavutil/parseutils.h" +#include "libavutil/thread.h" #include "internal.h" #include "avio_internal.h" #include "dash.h" @@ -152,6 +153,8 @@ typedef struct DASHContext { int max_url_size; char *cenc_decryption_key; + int init_threads; + /* Flags for init section*/ int is_init_section_common_video; int is_init_section_common_audio; @@ -1918,22 +1921,40 @@ fail: return ret; } -static int open_demux_for_component(AVFormatContext *s, struct representa= tion *pls) +static int open_demux_for_component(AVFormatContext* s, struct representa= tion* pls) +{ + int ret =3D 0; + + ret =3D begin_open_demux_for_component(s, pls); + if (ret < 0) + return ret; + + ret =3D end_open_demux_for_component(s, pls); + + return ret; +} + +static int begin_open_demux_for_component(AVFormatContext* s, struct repr= esentation* pls) { int ret =3D 0; - int i; pls->parent =3D s; - pls->cur_seq_no =3D calc_cur_seg_no(s, pls); + pls->cur_seq_no =3D calc_cur_seg_no(s, pls); if (!pls->last_seq_no) { pls->last_seq_no =3D calc_max_seg_no(pls, s->priv_data); } ret =3D reopen_demux_for_component(s, pls); - if (ret < 0) { - goto fail; - } + + return ret; +} + +static int end_open_demux_for_component(AVFormatContext* s, struct repres= entation* pls) +{ + int ret =3D 0; + int i; + for (i =3D 0; i < pls->ctx->nb_streams; i++) { AVStream *st =3D avformat_new_stream(s, NULL); AVStream *ist =3D pls->ctx->streams[i]; @@ -2015,6 +2036,131 @@ static void move_metadata(AVStream *st, const char= *key, char **value) } } +struct work_pool_data +{ + AVFormatContext* ctx; + struct representation* pls; + struct representation* common_pls; + pthread_mutex_t* common_mutex; + pthread_cond_t* common_condition; + int is_common; + int is_started; + int result; +}; + +struct thread_data +{ + pthread_t thread; + pthread_mutex_t* mutex; + struct work_pool_data* work_pool; + int work_pool_size; + int is_started; +}; + +static void *worker_thread(void *ptr) +{ + int ret =3D 0; + int i; + struct thread_data* thread_data =3D (struct thread_data*)ptr; + struct work_pool_data* work_pool =3D NULL; + struct work_pool_data* data =3D NULL; + for (;;) { + + // get next work item + pthread_mutex_lock(thread_data->mutex); + data =3D NULL; + work_pool =3D thread_data->work_pool; + for (i =3D 0; i < thread_data->work_pool_size; i++) { + if (!work_pool->is_started) { + data =3D work_pool; + data->is_started =3D 1; + break; + } + work_pool++; + } + pthread_mutex_unlock(thread_data->mutex); + + if (!data) { + // no more work to do + return NULL; + } + + // if we are common section provider, init and signal + if (data->is_common) { + data->pls->parent =3D data->ctx; + ret =3D update_init_section(data->pls); + if (ret < 0) { + pthread_cond_signal(data->common_condition); + goto end; + } + else + ret =3D AVERROR(pthread_cond_signal(data->common_conditio= n)); + } + + // if we depend on common section provider, wait for signal and c= opy + if (data->common_pls) { + ret =3D AVERROR(pthread_cond_wait(data->common_condition, dat= a->common_mutex)); + if (ret < 0) + goto end; + + if (!data->common_pls->init_sec_buf) { + goto end; + ret =3D AVERROR(EFAULT); + } + + ret =3D copy_init_section(data->pls, data->common_pls); + if (ret < 0) + goto end; + } + + ret =3D begin_open_demux_for_component(data->ctx, data->pls); + if (ret < 0) + goto end; + + end: + data->result =3D ret; + } + + + return NULL; +} + +static void create_work_pool_data(AVFormatContext* ctx, int stream_index, + struct representation* pls, struct representation* common_pls, + struct work_pool_data* init_data, pthread_mutex_t* common_mutex, + pthread_cond_t* common_condition) +{ + init_data->ctx =3D ctx; + init_data->pls =3D pls; + init_data->pls->stream_index =3D stream_index; + init_data->common_condition =3D common_condition; + init_data->common_mutex =3D common_mutex; + init_data->result =3D -1; + + if (pls =3D=3D common_pls) { + init_data->is_common =3D 1; + } + else if (common_pls) { + init_data->common_pls =3D common_pls; + } +} + +static int start_thread(struct thread_data *thread_data, + struct work_pool_data *work_pool, int work_pool_size, pthread_mutex_t= *mutex) +{ + int ret; + + thread_data->mutex =3D mutex; + thread_data->work_pool =3D work_pool; + thread_data->work_pool_size =3D work_pool_size; + + ret =3D AVERROR(pthread_create(&thread_data->thread, NULL, worker_thr= ead, (void*)thread_data)); + if (ret =3D=3D 0) + thread_data->is_started =3D 1; + + return ret; +} + static int dash_read_header(AVFormatContext *s) { DASHContext *c =3D s->priv_data; @@ -2040,63 +2186,245 @@ static int dash_read_header(AVFormatContext *s) av_dict_set(&c->avio_opts, "seekable", "0", 0); } - if(c->n_videos) + if (c->n_videos) c->is_init_section_common_video =3D is_common_init_section_exist(= c->videos, c->n_videos); - /* Open the demuxer for video and audio components if available */ - for (i =3D 0; i < c->n_videos; i++) { - rep =3D c->videos[i]; - if (i > 0 && c->is_init_section_common_video) { - ret =3D copy_init_section(rep, c->videos[0]); + if (c->n_audios) + c->is_init_section_common_audio =3D is_common_init_section_exist(= c->audios, c->n_audios); + + if (c->n_subtitles) + c->is_init_section_common_subtitle =3D is_common_init_section_exi= st(c->subtitles, c->n_subtitles); + + int nstreams =3D c->n_videos + c->n_audios + c->n_subtitles; + int threads =3D FFMIN(nstreams, c->init_threads); + + if (threads > 1) + { + // alloc data + struct work_pool_data* init_data =3D (struct work_pool_data*)av_m= allocz(sizeof(struct work_pool_data) * nstreams); + if (!init_data) + return AVERROR(ENOMEM); + + struct thread_data* thread_data =3D (struct thread_data*)av_mallo= cz(sizeof(struct thread_data) * threads); + if (!thread_data) + return AVERROR(ENOMEM); + + // alloc mutex and conditions + pthread_mutex_t work_pool_mutex; + + pthread_mutex_t common_video_mutex; + pthread_cond_t common_video_cond; + + pthread_mutex_t common_audio_mutex; + pthread_cond_t common_audio_cond; + + pthread_mutex_t common_subtitle_mutex; + pthread_cond_t common_subtitle_cond; + + // init mutex and conditions + ret =3D AVERROR(pthread_mutex_init(&work_pool_mutex, NULL)); + if (ret < 0) + goto cleanup; + + if (c->is_init_section_common_video) { + ret =3D AVERROR(pthread_mutex_init(&common_video_mutex, NULL)= ); if (ret < 0) - return ret; + goto cleanup; + + ret =3D AVERROR(pthread_cond_init(&common_video_cond, NULL)); + if (ret < 0) + goto cleanup; } - ret =3D open_demux_for_component(s, rep); - if (ret) - return ret; - rep->stream_index =3D stream_index; - ++stream_index; - } + if (c->is_init_section_common_audio) { + ret =3D AVERROR(pthread_mutex_init(&common_audio_mutex, NULL)= ); + if (ret < 0) + goto cleanup; - if(c->n_audios) - c->is_init_section_common_audio =3D is_common_init_section_exist(= c->audios, c->n_audios); + ret =3D AVERROR(pthread_cond_init(&common_audio_cond, NULL)); + if (ret < 0) + goto cleanup; + } - for (i =3D 0; i < c->n_audios; i++) { - rep =3D c->audios[i]; - if (i > 0 && c->is_init_section_common_audio) { - ret =3D copy_init_section(rep, c->audios[0]); + if (c->is_init_section_common_subtitle) { + ret =3D AVERROR(pthread_mutex_init(&common_subtitle_mutex, NU= LL)); if (ret < 0) - return ret; + goto cleanup; + + ret =3D AVERROR(pthread_cond_init(&common_subtitle_cond, NULL= )); + if (ret < 0) + goto cleanup; } - ret =3D open_demux_for_component(s, rep); - if (ret) - return ret; - rep->stream_index =3D stream_index; - ++stream_index; - } + // init work pool data + struct work_pool_data* current_data =3D init_data; - if (c->n_subtitles) - c->is_init_section_common_subtitle =3D is_common_init_section_exi= st(c->subtitles, c->n_subtitles); + for (i =3D 0; i < c->n_videos; i++) { + create_work_pool_data(s, stream_index, c->videos[i], + c->is_init_section_common_video ? c->videos[0] : NULL, + current_data, &common_video_mutex, &common_video_cond); - for (i =3D 0; i < c->n_subtitles; i++) { - rep =3D c->subtitles[i]; - if (i > 0 && c->is_init_section_common_subtitle) { - ret =3D copy_init_section(rep, c->subtitles[0]); + stream_index++; + current_data++; + } + + for (i =3D 0; i < c->n_audios; i++) { + create_work_pool_data(s, stream_index, c->audios[i], + c->is_init_section_common_audio ? c->audios[0] : NULL, + current_data, &common_audio_mutex, &common_audio_cond); + + stream_index++; + current_data++; + } + + for (i =3D 0; i < c->n_subtitles; i++) { + create_work_pool_data(s, stream_index, c->subtitles[i], + c->is_init_section_common_subtitle ? c->subtitles[0] : NU= LL, + current_data, &common_subtitle_mutex, &common_subtitle_co= nd); + + stream_index++; + current_data++; + } + + // start threads + struct thread_data* current_thread =3D thread_data; + for (i =3D 0; i < threads; i++) { + ret =3D start_thread(current_thread, init_data, nstreams, &wo= rk_pool_mutex); if (ret < 0) - return ret; + goto cleanup; + + current_thread++; } - ret =3D open_demux_for_component(s, rep); - if (ret) - return ret; - rep->stream_index =3D stream_index; - ++stream_index; + cleanup: + // we need to cleanup even in case of errors, so we need to store= results of init, run and cleanup + int initResult =3D ret; + int runResult =3D 0; + int cleanupResult =3D 0; + + // join threads + current_thread =3D thread_data; + for (i =3D 0; i < threads; i++) { + if (current_thread->is_started) { + ret =3D AVERROR(pthread_join(current_thread->thread, NULL= )); + if (ret < 0) + cleanupResult =3D ret; + } + current_thread++; + } + + // finalize streams and collect results + current_data =3D init_data; + for (i =3D 0; i < nstreams; i++) { + if (current_data->result < 0) { + // thread ran into error: collect result + runResult =3D current_data->result; + } + else { + // thread success: create streams on AVFormatContext + ret =3D end_open_demux_for_component(s, current_data->pls= ); + if (ret < 0) + runResult =3D ret; + } + current_data++; + } + + // cleanup mutex and conditions + ret =3D AVERROR(pthread_mutex_destroy(&work_pool_mutex)); + if (ret < 0) + cleanupResult =3D ret; + + if (c->is_init_section_common_video) { + ret =3D AVERROR(pthread_mutex_destroy(&common_video_mutex)); + if (ret < 0) + cleanupResult =3D ret; + + ret =3D AVERROR(pthread_cond_destroy(&common_video_cond)); + if (ret < 0) + cleanupResult =3D ret; + } + + if (c->is_init_section_common_audio) { + ret =3D AVERROR(pthread_mutex_destroy(&common_audio_mutex)); + if (ret < 0) + cleanupResult =3D ret; + + ret =3D AVERROR(pthread_cond_destroy(&common_audio_cond)); + if (ret < 0) + cleanupResult =3D ret; + } + + if (c->is_init_section_common_subtitle) { + ret =3D AVERROR(pthread_mutex_destroy(&common_subtitle_mutex)= ); + if (ret < 0) + cleanupResult =3D ret; + + ret =3D AVERROR(pthread_cond_destroy(&common_subtitle_cond)); + if (ret < 0) + cleanupResult =3D ret; + } + + // return results if errors have occured in one of the phases + if (initResult < 0) + return initResult; + + if (runResult < 0) + return runResult; + + if (cleanupResult < 0) + return cleanupResult; } + else + { + /* Open the demuxer for video and audio components if available *= / + for (i =3D 0; i < c->n_videos; i++) { + rep =3D c->videos[i]; + if (i > 0 && c->is_init_section_common_video) { + ret =3D copy_init_section(rep, c->videos[0]); + if (ret < 0) + return ret; + } + ret =3D open_demux_for_component(s, rep); - if (!stream_index) - return AVERROR_INVALIDDATA; + if (ret) + return ret; + rep->stream_index =3D stream_index; + ++stream_index; + } + + for (i =3D 0; i < c->n_audios; i++) { + rep =3D c->audios[i]; + if (i > 0 && c->is_init_section_common_audio) { + ret =3D copy_init_section(rep, c->audios[0]); + if (ret < 0) + return ret; + } + ret =3D open_demux_for_component(s, rep); + + if (ret) + return ret; + rep->stream_index =3D stream_index; + ++stream_index; + } + + for (i =3D 0; i < c->n_subtitles; i++) { + rep =3D c->subtitles[i]; + if (i > 0 && c->is_init_section_common_subtitle) { + ret =3D copy_init_section(rep, c->subtitles[0]); + if (ret < 0) + return ret; + } + ret =3D open_demux_for_component(s, rep); + + if (ret) + return ret; + rep->stream_index =3D stream_index; + ++stream_index; + } + + if (!stream_index) + return AVERROR_INVALIDDATA; + } /* Create a program */ program =3D av_new_program(s, 0); @@ -2349,6 +2677,7 @@ static const AVOption dash_options[] =3D { {.str =3D "aac,m4a,m4s,m4v,mov,mp4,webm,ts"}, INT_MIN, INT_MAX, FLAGS}, { "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_de= cryption_key), AV_OPT_TYPE_STRING, {.str =3D NULL}, INT_MIN, INT_MAX, .fla= gs =3D FLAGS }, + { "init_threads", "Number of threads to use for initializing the DASH= stream", OFFSET(init_threads), AV_OPT_TYPE_INT, {.i64 =3D 0}, 0, 64, FLAG= S }, {NULL} }; =2D- 2.31.1.windows.1 --rehcsed-5bfa0b9a-3921-4474-88ab-82f1cf7ae0c6 Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Content-Disposition: inline _______________________________________________ ffmpeg-devel mailing list ffmpeg-devel@ffmpeg.org https://ffmpeg.org/mailman/listinfo/ffmpeg-devel To unsubscribe, visit link above, or email ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe". --rehcsed-5bfa0b9a-3921-4474-88ab-82f1cf7ae0c6--