From: Lukas Fellechner <lukas.fellechner@gmx.net> To: ffmpeg-devel@ffmpeg.org Cc: Lukas Fellechner <lukas.fellechner@gmx.net> Subject: [FFmpeg-devel] [PATCH v4 2/4] lavf/dashdec: Multithreaded DASH initialization Date: Mon, 5 Sep 2022 23:16:32 +0200 Message-ID: <20220905211634.1460-3-lukas.fellechner@gmx.net> (raw) In-Reply-To: <20220905211634.1460-1-lukas.fellechner@gmx.net> This patch adds an "init_threads" option, specifying the max number of threads to use. Multiple worker threads are spun up to massively bring down init times. --- libavformat/dashdec.c | 286 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 285 insertions(+), 1 deletion(-) diff --git a/libavformat/dashdec.c b/libavformat/dashdec.c index e82da45e43..0532e2c918 100644 --- a/libavformat/dashdec.c +++ b/libavformat/dashdec.c @@ -24,6 +24,8 @@ #include "libavutil/opt.h" #include "libavutil/time.h" #include "libavutil/parseutils.h" +#include "libavutil/thread.h" +#include "libavutil/slicethread.h" #include "internal.h" #include "avio_internal.h" #include "dash.h" @@ -152,6 +154,8 @@ typedef struct DASHContext { int max_url_size; char *cenc_decryption_key; + int init_threads; + /* Flags for init section*/ int is_init_section_common_video; int is_init_section_common_audio; @@ -2033,6 +2037,265 @@ static void move_metadata(AVStream *st, const char *key, char **value) } } +#if HAVE_THREADS + +typedef struct WorkPoolData +{ + AVFormatContext *ctx; + struct representation *pls; + struct representation *common_pls; + pthread_mutex_t *common_mutex; + pthread_cond_t *common_condition; + int is_common; + int is_started; + int result; +} WorkPoolData; + +static void thread_worker(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads) +{ + WorkPoolData *work_pool = (WorkPoolData*)priv; + WorkPoolData *data = work_pool + jobnr; + int ret; + + // if we are common section provider, init and signal + if (data->is_common) { + data->pls->parent = data->ctx; + ret = update_init_section(data->pls); + if (ret < 0) { + pthread_cond_signal(data->common_condition); + goto end; + } + else + ret = AVERROR(pthread_cond_signal(data->common_condition)); + } + + // if we depend on common section provider, wait for signal and copy + if (data->common_pls) { + ret = AVERROR(pthread_cond_wait(data->common_condition, data->common_mutex)); + if (ret < 0) + goto end; + + if (!data->common_pls->init_sec_buf) { + goto end; + ret = AVERROR(EFAULT); + } + + ret = copy_init_section(data->pls, data->common_pls); + if (ret < 0) + goto end; + } + + ret = begin_open_demux_for_component(data->ctx, data->pls); + if (ret < 0) + goto end; + +end: + data->result = ret; +} + +static void create_work_pool_data(AVFormatContext *ctx, int *stream_index, + struct representation **streams, int num_streams, int is_init_section_common, + WorkPoolData *work_pool, pthread_mutex_t* common_mutex, + pthread_cond_t* common_condition) +{ + work_pool += *stream_index; + + for (int i = 0; i < num_streams; i++) { + work_pool->ctx = ctx; + work_pool->pls = streams[i]; + work_pool->pls->stream_index = *stream_index; + work_pool->common_condition = common_condition; + work_pool->common_mutex = common_mutex; + work_pool->result = -1; + + if (is_init_section_common) { + if (i == 0) + work_pool->is_common = 1; + else + work_pool->common_pls = streams[0]; + } + + work_pool++; + *stream_index = *stream_index + 1; + } +} + +static pthread_mutex_t* create_mutex() +{ + pthread_mutex_t* mutex = (pthread_mutex_t*)av_malloc(sizeof(pthread_mutex_t)); + if (!mutex) + return NULL; + + if (pthread_mutex_init(mutex, NULL)) { + av_free(mutex); + return NULL; + } + + return mutex; +} + +static int free_mutex(pthread_mutex_t **mutex) +{ + int ret = 0; + if (*mutex) { + ret = pthread_mutex_destroy(*mutex); + av_free(*mutex); + *mutex = NULL; + } + return ret; +} + +static pthread_cond_t* create_cond() +{ + pthread_cond_t* cond = (pthread_cond_t*)av_malloc(sizeof(pthread_cond_t)); + if (!cond) + return NULL; + + if (pthread_cond_init(cond, NULL)) { + av_free(cond); + return NULL; + } + + return cond; +} + +static int free_cond(pthread_cond_t **cond) +{ + int ret = 0; + if (*cond) { + ret = pthread_cond_destroy(*cond); + av_free(*cond); + *cond = NULL; + } + return ret; +} + +static int init_streams_multithreaded(AVFormatContext *s, int nstreams, int threads) +{ + DASHContext *c = s->priv_data; + int ret = 0; + int stream_index = 0; + AVSliceThread *slice_thread; + + // we need to cleanup even in case of errors, + // so we need to store results of run and cleanup phase + int initResult = 0; + int runResult = 0; + int cleanupResult = 0; + + // alloc data + WorkPoolData *work_pool = (WorkPoolData*)av_mallocz( + sizeof(WorkPoolData) * nstreams); + if (!work_pool) + return AVERROR(ENOMEM); + + if (!avpriv_slicethread_create(&slice_thread, (void*)work_pool, &thread_worker, NULL, threads)) { + av_free(work_pool); + return AVERROR(ENOMEM); +} + + // alloc mutex and conditions + c->init_mutex = create_mutex(); + + pthread_mutex_t *common_video_mutex = create_mutex(); + pthread_cond_t *common_video_cond = create_cond(); + + pthread_mutex_t *common_audio_mutex = create_mutex(); + pthread_cond_t *common_audio_cond = create_cond(); + + pthread_mutex_t *common_subtitle_mutex = create_mutex(); + pthread_cond_t *common_subtitle_cond = create_cond(); + + if (!(c->init_mutex && common_video_mutex && common_video_cond && common_audio_mutex && + common_audio_cond && common_subtitle_mutex && common_subtitle_cond)) { + initResult = AVERROR(ENOMEM); + goto cleanup; + } + + // set work pool data + create_work_pool_data(s, &stream_index, c->videos, c->n_videos, + c->is_init_section_common_video, work_pool, + common_video_mutex, common_video_cond); + + create_work_pool_data(s, &stream_index, c->audios, c->n_audios, + c->is_init_section_common_audio, work_pool, + common_audio_mutex, common_audio_cond); + + create_work_pool_data(s, &stream_index, c->subtitles, c->n_subtitles, + c->is_init_section_common_subtitle, work_pool, + common_subtitle_mutex, common_subtitle_cond); + + // run threads + avpriv_slicethread_execute(slice_thread, nstreams, 0); + + // finalize streams and collect results + WorkPoolData* current_data = work_pool; + for (int i = 0; i < nstreams; i++) { + if (current_data->result < 0) { + // thread ran into error: collect result and break + runResult = current_data->result; + break; + } + else { + // thread success: create streams on AVFormatContext + ret = end_open_demux_for_component(s, current_data->pls); + if (ret < 0) { + runResult = ret; + break; + } + } + current_data++; + } + +cleanup: + // cleanup mutex and conditions + ret = free_mutex(&c->init_mutex); + if (ret < 0) + cleanupResult = ret; + + ret = free_mutex(&common_video_mutex); + if (ret < 0) + cleanupResult = ret; + + ret = free_cond(&common_video_cond); + if (ret < 0) + cleanupResult = ret; + + ret = free_mutex(&common_audio_mutex); + if (ret < 0) + cleanupResult = ret; + + ret = free_cond(&common_audio_cond); + if (ret < 0) + cleanupResult = ret; + + ret = free_mutex(&common_subtitle_mutex); + if (ret < 0) + cleanupResult = ret; + + ret = free_cond(&common_subtitle_cond); + if (ret < 0) + cleanupResult = ret; + + // cleanup threads and workpool + av_free(work_pool); + avpriv_slicethread_free(&slice_thread); + + // return results if errors have occured in one of the phases + if (initResult < 0) + return initResult; + + if (runResult < 0) + return runResult; + + if (cleanupResult < 0) + return cleanupResult; + + return 0; +} + +#endif + static int dash_read_header(AVFormatContext *s) { DASHContext *c = s->priv_data; @@ -2067,6 +2330,23 @@ static int dash_read_header(AVFormatContext *s) if (c->n_subtitles) c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles); + int threads = 1; + int nstreams = c->n_videos + c->n_audios + c->n_subtitles; + +#if HAVE_THREADS + threads = FFMIN(nstreams, c->init_threads); +#endif + + if (threads > 1) + { +#if HAVE_THREADS + ret = init_streams_multithreaded(s, nstreams, threads); + if (ret < 0) + return ret; +#endif + } + else + { /* Open the demuxer for video and audio components if available */ for (i = 0; i < c->n_videos; i++) { rep = c->videos[i]; @@ -2115,6 +2395,7 @@ static int dash_read_header(AVFormatContext *s) if (!stream_index) return AVERROR_INVALIDDATA; + } /* Create a program */ program = av_new_program(s, 0); @@ -2366,7 +2647,10 @@ static const AVOption dash_options[] = { OFFSET(allowed_extensions), AV_OPT_TYPE_STRING, {.str = "aac,m4a,m4s,m4v,mov,mp4,webm,ts"}, INT_MIN, INT_MAX, FLAGS}, - { "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key), AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS }, + { "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key), + AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS }, + { "init_threads", "Number of threads to use for initializing the DASH stream", + OFFSET(init_threads), AV_OPT_TYPE_INT, {.i64 = 1}, 1, INT_MAX, FLAGS }, {NULL} }; -- 2.28.0.windows.1 _______________________________________________ ffmpeg-devel mailing list ffmpeg-devel@ffmpeg.org https://ffmpeg.org/mailman/listinfo/ffmpeg-devel To unsubscribe, visit link above, or email ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
next prev parent reply other threads:[~2022-09-05 21:17 UTC|newest] Thread overview: 26+ messages / expand[flat|nested] mbox.gz Atom feed top 2022-08-20 21:35 [FFmpeg-devel] [PATCH 1/1] " Lukas Fellechner 2022-08-20 21:53 ` Lukas Fellechner 2022-08-21 4:10 ` Steven Liu 2022-08-21 12:47 ` Lukas Fellechner 2022-08-21 19:26 ` [FFmpeg-devel] [PATCH v2] " Lukas Fellechner 2022-08-23 3:19 ` Steven Liu 2022-08-23 19:09 ` Lukas Fellechner 2022-08-23 19:03 ` [FFmpeg-devel] [PATCH v3 0/3] " Lukas Fellechner 2022-08-23 19:03 ` [FFmpeg-devel] [PATCH v3 1/3] lavf/dashdec: Prepare DASH decoder for multithreading Lukas Fellechner 2022-08-31 2:09 ` Steven Liu 2022-08-23 19:03 ` [FFmpeg-devel] [PATCH v3 2/3] lavf/dashdec: Multithreaded DASH initialization Lukas Fellechner 2022-08-31 2:54 ` Andreas Rheinhardt 2022-08-31 7:25 ` Steven Liu 2022-08-31 12:17 ` Andreas Rheinhardt 2022-09-04 21:29 ` Lukas Fellechner 2022-09-04 22:50 ` Andreas Rheinhardt 2022-09-05 10:15 ` Lukas Fellechner 2022-09-05 10:45 ` Andreas Rheinhardt 2022-09-05 14:28 ` Lukas Fellechner 2022-09-11 20:35 ` Lukas Fellechner 2022-08-23 19:03 ` [FFmpeg-devel] [PATCH v3 3/3] lavf/dashdec: Fix indentation after multithreading Lukas Fellechner 2022-09-05 21:16 ` [FFmpeg-devel] [PATCH v4 0/4] lavf/dashdec: Multithreaded DASH initialization Lukas Fellechner 2022-09-05 21:16 ` [FFmpeg-devel] [PATCH v4 1/4] lavf/dashdec: Prepare DASH decoder for multithreading Lukas Fellechner 2022-09-05 21:16 ` Lukas Fellechner [this message] 2022-09-05 21:16 ` [FFmpeg-devel] [PATCH v4 3/4] lavf/dashdec: Prevent cross-thread avio_opts modification Lukas Fellechner 2022-09-05 21:16 ` [FFmpeg-devel] [PATCH v4 4/4] lavf/dashdec: Fix indentation after adding multithreading Lukas Fellechner
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=20220905211634.1460-3-lukas.fellechner@gmx.net \ --to=lukas.fellechner@gmx.net \ --cc=ffmpeg-devel@ffmpeg.org \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
Git Inbox Mirror of the ffmpeg-devel mailing list - see https://ffmpeg.org/mailman/listinfo/ffmpeg-devel This inbox may be cloned and mirrored by anyone: git clone --mirror https://master.gitmailbox.com/ffmpegdev/0 ffmpegdev/git/0.git # If you have public-inbox 1.1+ installed, you may # initialize and index your mirror using the following commands: public-inbox-init -V2 ffmpegdev ffmpegdev/ https://master.gitmailbox.com/ffmpegdev \ ffmpegdev@gitmailbox.com public-inbox-index ffmpegdev Example config snippet for mirrors. AGPL code for this site: git clone https://public-inbox.org/public-inbox.git