From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from ffbox0-bg.mplayerhq.hu (ffbox0-bg.ffmpeg.org [79.124.17.100]) by master.gitmailbox.com (Postfix) with ESMTP id 37FF244311 for ; Mon, 5 Sep 2022 21:17:25 +0000 (UTC) Received: from [127.0.1.1] (localhost [127.0.0.1]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTP id 6059F68B9D5; Tue, 6 Sep 2022 00:17:06 +0300 (EEST) Received: from mout.gmx.net (mout.gmx.net [212.227.17.21]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTPS id CA33468B800 for ; Tue, 6 Sep 2022 00:16:58 +0300 (EEST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=gmx.net; s=badeba3b8450; t=1662412618; bh=21rWWfcHsQa+8QrNdx1n0Pf7In6mstWmZS0691x0o0g=; h=X-UI-Sender-Class:From:To:Cc:Subject:Date:In-Reply-To:References; b=V/mfa+vZLf+95pQ/BE6qrlFPA/UJliURKRFetD5+IoRgGywNXyHo16HBGyp61OXPN c/fMoIWqGRgvm7aigqfYR/9/cEHmJtnLMMiJWv0h7IHLxvXoS20dIWWRqgl2CYiDpu HcGTz8LsXysJ0QxvGPdNuL8Yeej/pvRwv9izEk9I= X-UI-Sender-Class: 01bb95c1-4bf8-414a-932a-4f6e2808ef9c Received: from localhost.localdomain ([94.134.107.163]) by mail.gmx.net (mrgmx105 [212.227.17.168]) with ESMTPSA (Nemesis) id 1MplXp-1pAPS845iX-00q8ua; Mon, 05 Sep 2022 23:16:58 +0200 From: Lukas Fellechner To: ffmpeg-devel@ffmpeg.org Date: Mon, 5 Sep 2022 23:16:32 +0200 Message-Id: <20220905211634.1460-3-lukas.fellechner@gmx.net> X-Mailer: git-send-email 2.31.1.windows.1 In-Reply-To: <20220905211634.1460-1-lukas.fellechner@gmx.net> References: <20220905211634.1460-1-lukas.fellechner@gmx.net> MIME-Version: 1.0 X-Provags-ID: V03:K1:3l55Or0MUsmUe1HAjtlDxMZJWK25NaSHlQuZIY/qHOkRk8WE/Mc 6d3pIc4B32aRGzKgqcllqD/mGW1qcQ2f5vglj0NxTgVdFIx05d+ZF6OiVjPUv67y82hfdl8 vL9HUaxsUwIt50mAkqnvkYlNMhh4wpHvGpYtq1yx759/thQPzdElGjSyVBzz9EOfYicYvrx UbGL/ETds/q2GOCHCdhBw== X-Spam-Flag: NO X-UI-Out-Filterresults: notjunk:1;V03:K0:w6WmU/gesC8=:BJ1p8dSqRZ2CLkcvjE/m2K h4+CE2OaS/y7VmtlyU4EGwSCLYqBE2saMuGVY6H2bKgx1j+QWN4vsUb6M84FNv941goe2pbFa 92L4JEbNU5acFg4Bp57hmtY+JaUdI6X7xo39H03qsSYPD2blX/Pw+a86Mg13py3+krkzRMLaY 54SRVTfxbTFZOUDt4PH2/d9dDW7SHPBKLu7YxHHi61UbT+apAtx/9y/2WjrP+jArg9zrhLTb1 sUARbn2sHkDDnEjtwznX3CMZpjKVqQOI51RRlmqt1NtRcCfCsyBPhCL0xfkenJsjqVWmkggp5 AKxBvLmcPQdjNztlwjoI3bg2MXQ6XPF4cqxOXXSQTMjK8dR3ilQ/vV7QZwccqs97FdCl5pWhs 4EdYh4priFxaetlrOI+9G3XzV0cup3yzxxFx+g+0DoY4shw2LHzPxTHRWhcxP8+oK0WnDAXUu WWbwEUb6BdbaeM8d3S7ZRhZl+bomrwb/eGs/1ZBz3G65bP+z60rZi8ngmLAQZRplrN/5/xzqI 1oPy9snowyUk/vmBqLRt/6669OIybktXXhxdObkdjNUgZdeKJS8ES6xfyu6sa7l0PoDoiRpqK VmICwKs13Zs5HtUc+LRRVoorXwBLIolL8+YYWMB/TT44YuenuMybrJUPjcEWAjUvvMfPmjnaZ Dc1bRtEvtlD3LJoCaR+5nGGlPwqg9/wgK64R2tS2/T8yjPrIpigxty31r39CKu2mjLEinH7IQ 5PKyCIoqWgyX/+BwHbnrQ5oGn23ppjdVZ4xJsDr571L9hEUSfAqvTmWYqNRM8q4ogUoVneqvY u8MSvo6sn/oZ6H4TNaINNlGcF8XoX1dvApwRXFR0l9+uncp1A/lSnA+JOwpGMIArmNF5zsM7N 0XNjrhr6uHS2S6If666Pyd3MMN5k3O+8SMXyA6EyrHKAXKzZbjcLAqDs3THq2qj2NH1g6qFqE csfP19NH44B+usckOUTPoAtLuw7KHoynQCFDhDWoR04fP8vTw+t9UYNgge0NPKGec4UMFObOD Z4Gn7YVp5PDKMspQ9NfzgyWmNCXBw61PebdAunYxFnxjarpChX/gZ3iBGTCb9MhCb3eWO6OFy w7TKo2hWsb++aOJBPNCvtlho7uu+navfVJcgIQ+zSGUMQFp5LtTc5AzOweAVMG0L02FxwRMET RxRury5CXaCeINgv4wsY9NWnPM Subject: [FFmpeg-devel] [PATCH v4 2/4] lavf/dashdec: Multithreaded DASH initialization X-BeenThere: ffmpeg-devel@ffmpeg.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: FFmpeg development discussions and patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: FFmpeg development discussions and patches Cc: Lukas Fellechner Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: ffmpeg-devel-bounces@ffmpeg.org Sender: "ffmpeg-devel" Archived-At: List-Archive: List-Post: This patch adds an "init_threads" option, specifying the max number of threads to use. Multiple worker threads are spun up to massively bring down init times. --- libavformat/dashdec.c | 286 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 285 insertions(+), 1 deletion(-) diff --git a/libavformat/dashdec.c b/libavformat/dashdec.c index e82da45e43..0532e2c918 100644 --- a/libavformat/dashdec.c +++ b/libavformat/dashdec.c @@ -24,6 +24,8 @@ #include "libavutil/opt.h" #include "libavutil/time.h" #include "libavutil/parseutils.h" +#include "libavutil/thread.h" +#include "libavutil/slicethread.h" #include "internal.h" #include "avio_internal.h" #include "dash.h" @@ -152,6 +154,8 @@ typedef struct DASHContext { int max_url_size; char *cenc_decryption_key; + int init_threads; + /* Flags for init section*/ int is_init_section_common_video; int is_init_section_common_audio; @@ -2033,6 +2037,265 @@ static void move_metadata(AVStream *st, const char *key, char **value) } } +#if HAVE_THREADS + +typedef struct WorkPoolData +{ + AVFormatContext *ctx; + struct representation *pls; + struct representation *common_pls; + pthread_mutex_t *common_mutex; + pthread_cond_t *common_condition; + int is_common; + int is_started; + int result; +} WorkPoolData; + +static void thread_worker(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads) +{ + WorkPoolData *work_pool = (WorkPoolData*)priv; + WorkPoolData *data = work_pool + jobnr; + int ret; + + // if we are common section provider, init and signal + if (data->is_common) { + data->pls->parent = data->ctx; + ret = update_init_section(data->pls); + if (ret < 0) { + pthread_cond_signal(data->common_condition); + goto end; + } + else + ret = AVERROR(pthread_cond_signal(data->common_condition)); + } + + // if we depend on common section provider, wait for signal and copy + if (data->common_pls) { + ret = AVERROR(pthread_cond_wait(data->common_condition, data->common_mutex)); + if (ret < 0) + goto end; + + if (!data->common_pls->init_sec_buf) { + goto end; + ret = AVERROR(EFAULT); + } + + ret = copy_init_section(data->pls, data->common_pls); + if (ret < 0) + goto end; + } + + ret = begin_open_demux_for_component(data->ctx, data->pls); + if (ret < 0) + goto end; + +end: + data->result = ret; +} + +static void create_work_pool_data(AVFormatContext *ctx, int *stream_index, + struct representation **streams, int num_streams, int is_init_section_common, + WorkPoolData *work_pool, pthread_mutex_t* common_mutex, + pthread_cond_t* common_condition) +{ + work_pool += *stream_index; + + for (int i = 0; i < num_streams; i++) { + work_pool->ctx = ctx; + work_pool->pls = streams[i]; + work_pool->pls->stream_index = *stream_index; + work_pool->common_condition = common_condition; + work_pool->common_mutex = common_mutex; + work_pool->result = -1; + + if (is_init_section_common) { + if (i == 0) + work_pool->is_common = 1; + else + work_pool->common_pls = streams[0]; + } + + work_pool++; + *stream_index = *stream_index + 1; + } +} + +static pthread_mutex_t* create_mutex() +{ + pthread_mutex_t* mutex = (pthread_mutex_t*)av_malloc(sizeof(pthread_mutex_t)); + if (!mutex) + return NULL; + + if (pthread_mutex_init(mutex, NULL)) { + av_free(mutex); + return NULL; + } + + return mutex; +} + +static int free_mutex(pthread_mutex_t **mutex) +{ + int ret = 0; + if (*mutex) { + ret = pthread_mutex_destroy(*mutex); + av_free(*mutex); + *mutex = NULL; + } + return ret; +} + +static pthread_cond_t* create_cond() +{ + pthread_cond_t* cond = (pthread_cond_t*)av_malloc(sizeof(pthread_cond_t)); + if (!cond) + return NULL; + + if (pthread_cond_init(cond, NULL)) { + av_free(cond); + return NULL; + } + + return cond; +} + +static int free_cond(pthread_cond_t **cond) +{ + int ret = 0; + if (*cond) { + ret = pthread_cond_destroy(*cond); + av_free(*cond); + *cond = NULL; + } + return ret; +} + +static int init_streams_multithreaded(AVFormatContext *s, int nstreams, int threads) +{ + DASHContext *c = s->priv_data; + int ret = 0; + int stream_index = 0; + AVSliceThread *slice_thread; + + // we need to cleanup even in case of errors, + // so we need to store results of run and cleanup phase + int initResult = 0; + int runResult = 0; + int cleanupResult = 0; + + // alloc data + WorkPoolData *work_pool = (WorkPoolData*)av_mallocz( + sizeof(WorkPoolData) * nstreams); + if (!work_pool) + return AVERROR(ENOMEM); + + if (!avpriv_slicethread_create(&slice_thread, (void*)work_pool, &thread_worker, NULL, threads)) { + av_free(work_pool); + return AVERROR(ENOMEM); +} + + // alloc mutex and conditions + c->init_mutex = create_mutex(); + + pthread_mutex_t *common_video_mutex = create_mutex(); + pthread_cond_t *common_video_cond = create_cond(); + + pthread_mutex_t *common_audio_mutex = create_mutex(); + pthread_cond_t *common_audio_cond = create_cond(); + + pthread_mutex_t *common_subtitle_mutex = create_mutex(); + pthread_cond_t *common_subtitle_cond = create_cond(); + + if (!(c->init_mutex && common_video_mutex && common_video_cond && common_audio_mutex && + common_audio_cond && common_subtitle_mutex && common_subtitle_cond)) { + initResult = AVERROR(ENOMEM); + goto cleanup; + } + + // set work pool data + create_work_pool_data(s, &stream_index, c->videos, c->n_videos, + c->is_init_section_common_video, work_pool, + common_video_mutex, common_video_cond); + + create_work_pool_data(s, &stream_index, c->audios, c->n_audios, + c->is_init_section_common_audio, work_pool, + common_audio_mutex, common_audio_cond); + + create_work_pool_data(s, &stream_index, c->subtitles, c->n_subtitles, + c->is_init_section_common_subtitle, work_pool, + common_subtitle_mutex, common_subtitle_cond); + + // run threads + avpriv_slicethread_execute(slice_thread, nstreams, 0); + + // finalize streams and collect results + WorkPoolData* current_data = work_pool; + for (int i = 0; i < nstreams; i++) { + if (current_data->result < 0) { + // thread ran into error: collect result and break + runResult = current_data->result; + break; + } + else { + // thread success: create streams on AVFormatContext + ret = end_open_demux_for_component(s, current_data->pls); + if (ret < 0) { + runResult = ret; + break; + } + } + current_data++; + } + +cleanup: + // cleanup mutex and conditions + ret = free_mutex(&c->init_mutex); + if (ret < 0) + cleanupResult = ret; + + ret = free_mutex(&common_video_mutex); + if (ret < 0) + cleanupResult = ret; + + ret = free_cond(&common_video_cond); + if (ret < 0) + cleanupResult = ret; + + ret = free_mutex(&common_audio_mutex); + if (ret < 0) + cleanupResult = ret; + + ret = free_cond(&common_audio_cond); + if (ret < 0) + cleanupResult = ret; + + ret = free_mutex(&common_subtitle_mutex); + if (ret < 0) + cleanupResult = ret; + + ret = free_cond(&common_subtitle_cond); + if (ret < 0) + cleanupResult = ret; + + // cleanup threads and workpool + av_free(work_pool); + avpriv_slicethread_free(&slice_thread); + + // return results if errors have occured in one of the phases + if (initResult < 0) + return initResult; + + if (runResult < 0) + return runResult; + + if (cleanupResult < 0) + return cleanupResult; + + return 0; +} + +#endif + static int dash_read_header(AVFormatContext *s) { DASHContext *c = s->priv_data; @@ -2067,6 +2330,23 @@ static int dash_read_header(AVFormatContext *s) if (c->n_subtitles) c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles); + int threads = 1; + int nstreams = c->n_videos + c->n_audios + c->n_subtitles; + +#if HAVE_THREADS + threads = FFMIN(nstreams, c->init_threads); +#endif + + if (threads > 1) + { +#if HAVE_THREADS + ret = init_streams_multithreaded(s, nstreams, threads); + if (ret < 0) + return ret; +#endif + } + else + { /* Open the demuxer for video and audio components if available */ for (i = 0; i < c->n_videos; i++) { rep = c->videos[i]; @@ -2115,6 +2395,7 @@ static int dash_read_header(AVFormatContext *s) if (!stream_index) return AVERROR_INVALIDDATA; + } /* Create a program */ program = av_new_program(s, 0); @@ -2366,7 +2647,10 @@ static const AVOption dash_options[] = { OFFSET(allowed_extensions), AV_OPT_TYPE_STRING, {.str = "aac,m4a,m4s,m4v,mov,mp4,webm,ts"}, INT_MIN, INT_MAX, FLAGS}, - { "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key), AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS }, + { "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key), + AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS }, + { "init_threads", "Number of threads to use for initializing the DASH stream", + OFFSET(init_threads), AV_OPT_TYPE_INT, {.i64 = 1}, 1, INT_MAX, FLAGS }, {NULL} }; -- 2.28.0.windows.1 _______________________________________________ ffmpeg-devel mailing list ffmpeg-devel@ffmpeg.org https://ffmpeg.org/mailman/listinfo/ffmpeg-devel To unsubscribe, visit link above, or email ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".