Git Inbox Mirror of the ffmpeg-devel mailing list - see https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
 help / color / mirror / Atom feed
From: Lukas Fellechner <lukas.fellechner@gmx.net>
To: ffmpeg-devel@ffmpeg.org
Cc: Lukas Fellechner <lukas.fellechner@gmx.net>
Subject: [FFmpeg-devel] [PATCH v4 2/4] lavf/dashdec: Multithreaded DASH initialization
Date: Mon,  5 Sep 2022 23:16:32 +0200
Message-ID: <20220905211634.1460-3-lukas.fellechner@gmx.net> (raw)
In-Reply-To: <20220905211634.1460-1-lukas.fellechner@gmx.net>

This patch adds an "init_threads" option, specifying the max
number of threads to use. Multiple worker threads are spun up
to massively bring down init times.
---
 libavformat/dashdec.c | 286 +++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 285 insertions(+), 1 deletion(-)

diff --git a/libavformat/dashdec.c b/libavformat/dashdec.c
index e82da45e43..0532e2c918 100644
--- a/libavformat/dashdec.c
+++ b/libavformat/dashdec.c
@@ -24,6 +24,8 @@
 #include "libavutil/opt.h"
 #include "libavutil/time.h"
 #include "libavutil/parseutils.h"
+#include "libavutil/thread.h"
+#include "libavutil/slicethread.h"
 #include "internal.h"
 #include "avio_internal.h"
 #include "dash.h"
@@ -152,6 +154,8 @@ typedef struct DASHContext {
     int max_url_size;
     char *cenc_decryption_key;

+    int init_threads;
+
     /* Flags for init section*/
     int is_init_section_common_video;
     int is_init_section_common_audio;
@@ -2033,6 +2037,265 @@ static void move_metadata(AVStream *st, const char *key, char **value)
     }
 }

+#if HAVE_THREADS
+
+typedef struct WorkPoolData
+{
+    AVFormatContext *ctx;
+    struct representation *pls;
+    struct representation *common_pls;
+    pthread_mutex_t *common_mutex;
+    pthread_cond_t *common_condition;
+    int is_common;
+    int is_started;
+    int result;
+} WorkPoolData;
+
+static void thread_worker(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads)
+{
+    WorkPoolData *work_pool = (WorkPoolData*)priv;
+    WorkPoolData *data = work_pool + jobnr;
+    int ret;
+
+    // if we are common section provider, init and signal
+    if (data->is_common) {
+        data->pls->parent = data->ctx;
+        ret = update_init_section(data->pls);
+        if (ret < 0) {
+            pthread_cond_signal(data->common_condition);
+            goto end;
+        }
+        else
+            ret = AVERROR(pthread_cond_signal(data->common_condition));
+    }
+
+    // if we depend on common section provider, wait for signal and copy
+    if (data->common_pls) {
+        ret = AVERROR(pthread_cond_wait(data->common_condition, data->common_mutex));
+        if (ret < 0)
+            goto end;
+
+        if (!data->common_pls->init_sec_buf) {
+            goto end;
+            ret = AVERROR(EFAULT);
+        }
+
+        ret = copy_init_section(data->pls, data->common_pls);
+        if (ret < 0)
+            goto end;
+    }
+
+    ret = begin_open_demux_for_component(data->ctx, data->pls);
+    if (ret < 0)
+        goto end;
+
+end:
+    data->result = ret;
+}
+
+static void create_work_pool_data(AVFormatContext *ctx, int *stream_index,
+    struct representation **streams, int num_streams, int is_init_section_common,
+    WorkPoolData *work_pool, pthread_mutex_t* common_mutex,
+    pthread_cond_t* common_condition)
+{
+    work_pool += *stream_index;
+
+    for (int i = 0; i < num_streams; i++) {
+        work_pool->ctx = ctx;
+        work_pool->pls = streams[i];
+        work_pool->pls->stream_index = *stream_index;
+        work_pool->common_condition = common_condition;
+        work_pool->common_mutex = common_mutex;
+        work_pool->result = -1;
+
+        if (is_init_section_common) {
+            if (i == 0)
+                work_pool->is_common = 1;
+            else
+                work_pool->common_pls = streams[0];
+        }
+
+        work_pool++;
+        *stream_index = *stream_index + 1;
+    }
+}
+
+static pthread_mutex_t* create_mutex()
+{
+    pthread_mutex_t* mutex = (pthread_mutex_t*)av_malloc(sizeof(pthread_mutex_t));
+    if (!mutex)
+        return NULL;
+
+    if (pthread_mutex_init(mutex, NULL)) {
+        av_free(mutex);
+        return NULL;
+    }
+
+    return mutex;
+}
+
+static int free_mutex(pthread_mutex_t **mutex)
+{
+    int ret = 0;
+    if (*mutex) {
+        ret = pthread_mutex_destroy(*mutex);
+        av_free(*mutex);
+        *mutex = NULL;
+    }
+    return ret;
+}
+
+static pthread_cond_t* create_cond()
+{
+    pthread_cond_t* cond = (pthread_cond_t*)av_malloc(sizeof(pthread_cond_t));
+    if (!cond)
+        return NULL;
+
+    if (pthread_cond_init(cond, NULL)) {
+        av_free(cond);
+        return NULL;
+    }
+
+    return cond;
+}
+
+static int free_cond(pthread_cond_t **cond)
+{
+    int ret = 0;
+    if (*cond) {
+        ret = pthread_cond_destroy(*cond);
+        av_free(*cond);
+        *cond = NULL;
+    }
+    return ret;
+}
+
+static int init_streams_multithreaded(AVFormatContext *s, int nstreams, int threads)
+{
+    DASHContext *c = s->priv_data;
+    int ret = 0;
+    int stream_index = 0;
+    AVSliceThread *slice_thread;
+
+    // we need to cleanup even in case of errors,
+    // so we need to store results of run and cleanup phase
+    int initResult = 0;
+    int runResult = 0;
+    int cleanupResult = 0;
+
+    // alloc data
+    WorkPoolData *work_pool = (WorkPoolData*)av_mallocz(
+        sizeof(WorkPoolData) * nstreams);
+    if (!work_pool)
+        return AVERROR(ENOMEM);
+
+    if (!avpriv_slicethread_create(&slice_thread, (void*)work_pool, &thread_worker, NULL, threads)) {
+        av_free(work_pool);
+        return AVERROR(ENOMEM);
+}
+
+    // alloc mutex and conditions
+    c->init_mutex = create_mutex();
+
+    pthread_mutex_t *common_video_mutex = create_mutex();
+    pthread_cond_t *common_video_cond = create_cond();
+
+    pthread_mutex_t *common_audio_mutex = create_mutex();
+    pthread_cond_t *common_audio_cond = create_cond();
+
+    pthread_mutex_t *common_subtitle_mutex = create_mutex();
+    pthread_cond_t *common_subtitle_cond = create_cond();
+
+    if (!(c->init_mutex && common_video_mutex && common_video_cond && common_audio_mutex &&
+        common_audio_cond && common_subtitle_mutex && common_subtitle_cond)) {
+        initResult = AVERROR(ENOMEM);
+        goto cleanup;
+    }
+
+    // set work pool data
+    create_work_pool_data(s, &stream_index, c->videos, c->n_videos,
+        c->is_init_section_common_video, work_pool,
+        common_video_mutex, common_video_cond);
+
+    create_work_pool_data(s, &stream_index, c->audios, c->n_audios,
+        c->is_init_section_common_audio, work_pool,
+        common_audio_mutex, common_audio_cond);
+
+    create_work_pool_data(s, &stream_index, c->subtitles, c->n_subtitles,
+        c->is_init_section_common_subtitle, work_pool,
+        common_subtitle_mutex, common_subtitle_cond);
+
+    // run threads
+    avpriv_slicethread_execute(slice_thread, nstreams, 0);
+
+    // finalize streams and collect results
+    WorkPoolData* current_data = work_pool;
+    for (int i = 0; i < nstreams; i++) {
+        if (current_data->result < 0) {
+            // thread ran into error: collect result and break
+            runResult = current_data->result;
+            break;
+        }
+        else {
+            // thread success: create streams on AVFormatContext
+            ret = end_open_demux_for_component(s, current_data->pls);
+            if (ret < 0) {
+                runResult = ret;
+                break;
+            }
+        }
+        current_data++;
+    }
+
+cleanup:
+    // cleanup mutex and conditions
+    ret = free_mutex(&c->init_mutex);
+    if (ret < 0)
+        cleanupResult = ret;
+
+    ret = free_mutex(&common_video_mutex);
+    if (ret < 0)
+        cleanupResult = ret;
+
+    ret = free_cond(&common_video_cond);
+    if (ret < 0)
+        cleanupResult = ret;
+
+    ret = free_mutex(&common_audio_mutex);
+    if (ret < 0)
+        cleanupResult = ret;
+
+    ret = free_cond(&common_audio_cond);
+    if (ret < 0)
+        cleanupResult = ret;
+
+    ret = free_mutex(&common_subtitle_mutex);
+    if (ret < 0)
+        cleanupResult = ret;
+
+    ret = free_cond(&common_subtitle_cond);
+    if (ret < 0)
+        cleanupResult = ret;
+
+    // cleanup threads and workpool
+    av_free(work_pool);
+    avpriv_slicethread_free(&slice_thread);
+
+    // return results if errors have occured in one of the phases
+    if (initResult < 0)
+        return initResult;
+
+    if (runResult < 0)
+        return runResult;
+
+    if (cleanupResult < 0)
+        return cleanupResult;
+
+    return 0;
+}
+
+#endif
+
 static int dash_read_header(AVFormatContext *s)
 {
     DASHContext *c = s->priv_data;
@@ -2067,6 +2330,23 @@ static int dash_read_header(AVFormatContext *s)
     if (c->n_subtitles)
         c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles);

+    int threads = 1;
+    int nstreams = c->n_videos + c->n_audios + c->n_subtitles;
+
+#if HAVE_THREADS
+    threads = FFMIN(nstreams, c->init_threads);
+#endif
+
+    if (threads > 1)
+    {
+#if HAVE_THREADS
+        ret = init_streams_multithreaded(s, nstreams, threads);
+        if (ret < 0)
+            return ret;
+#endif
+    }
+    else
+    {
     /* Open the demuxer for video and audio components if available */
     for (i = 0; i < c->n_videos; i++) {
         rep = c->videos[i];
@@ -2115,6 +2395,7 @@ static int dash_read_header(AVFormatContext *s)

     if (!stream_index)
         return AVERROR_INVALIDDATA;
+    }

     /* Create a program */
     program = av_new_program(s, 0);
@@ -2366,7 +2647,10 @@ static const AVOption dash_options[] = {
         OFFSET(allowed_extensions), AV_OPT_TYPE_STRING,
         {.str = "aac,m4a,m4s,m4v,mov,mp4,webm,ts"},
         INT_MIN, INT_MAX, FLAGS},
-    { "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key), AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS },
+    { "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key),
+        AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS },
+    { "init_threads", "Number of threads to use for initializing the DASH stream",
+        OFFSET(init_threads), AV_OPT_TYPE_INT, {.i64 = 1}, 1, INT_MAX, FLAGS },
     {NULL}
 };

--
2.28.0.windows.1

_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel

To unsubscribe, visit link above, or email
ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".

  parent reply	other threads:[~2022-09-05 21:17 UTC|newest]

Thread overview: 26+ messages / expand[flat|nested]  mbox.gz  Atom feed  top
2022-08-20 21:35 [FFmpeg-devel] [PATCH 1/1] " Lukas Fellechner
2022-08-20 21:53 ` Lukas Fellechner
2022-08-21  4:10   ` Steven Liu
2022-08-21 12:47     ` Lukas Fellechner
2022-08-21 19:26 ` [FFmpeg-devel] [PATCH v2] " Lukas Fellechner
2022-08-23  3:19   ` Steven Liu
2022-08-23 19:09     ` Lukas Fellechner
2022-08-23 19:03 ` [FFmpeg-devel] [PATCH v3 0/3] " Lukas Fellechner
2022-08-23 19:03   ` [FFmpeg-devel] [PATCH v3 1/3] lavf/dashdec: Prepare DASH decoder for multithreading Lukas Fellechner
2022-08-31  2:09     ` Steven Liu
2022-08-23 19:03   ` [FFmpeg-devel] [PATCH v3 2/3] lavf/dashdec: Multithreaded DASH initialization Lukas Fellechner
2022-08-31  2:54     ` Andreas Rheinhardt
2022-08-31  7:25       ` Steven Liu
2022-08-31 12:17         ` Andreas Rheinhardt
2022-09-04 21:29       ` Lukas Fellechner
2022-09-04 22:50         ` Andreas Rheinhardt
2022-09-05 10:15           ` Lukas Fellechner
2022-09-05 10:45             ` Andreas Rheinhardt
2022-09-05 14:28               ` Lukas Fellechner
2022-09-11 20:35               ` Lukas Fellechner
2022-08-23 19:03   ` [FFmpeg-devel] [PATCH v3 3/3] lavf/dashdec: Fix indentation after multithreading Lukas Fellechner
2022-09-05 21:16 ` [FFmpeg-devel] [PATCH v4 0/4] lavf/dashdec: Multithreaded DASH initialization Lukas Fellechner
2022-09-05 21:16   ` [FFmpeg-devel] [PATCH v4 1/4] lavf/dashdec: Prepare DASH decoder for multithreading Lukas Fellechner
2022-09-05 21:16   ` Lukas Fellechner [this message]
2022-09-05 21:16   ` [FFmpeg-devel] [PATCH v4 3/4] lavf/dashdec: Prevent cross-thread avio_opts modification Lukas Fellechner
2022-09-05 21:16   ` [FFmpeg-devel] [PATCH v4 4/4] lavf/dashdec: Fix indentation after adding multithreading Lukas Fellechner

Reply instructions:

You may reply publicly to this message via plain-text email
using any one of the following methods:

* Save the following mbox file, import it into your mail client,
  and reply-to-all from there: mbox

  Avoid top-posting and favor interleaved quoting:
  https://en.wikipedia.org/wiki/Posting_style#Interleaved_style

* Reply using the --to, --cc, and --in-reply-to
  switches of git-send-email(1):

  git send-email \
    --in-reply-to=20220905211634.1460-3-lukas.fellechner@gmx.net \
    --to=lukas.fellechner@gmx.net \
    --cc=ffmpeg-devel@ffmpeg.org \
    /path/to/YOUR_REPLY

  https://kernel.org/pub/software/scm/git/docs/git-send-email.html

* If your mail client supports setting the In-Reply-To header
  via mailto: links, try the mailto: link

Git Inbox Mirror of the ffmpeg-devel mailing list - see https://ffmpeg.org/mailman/listinfo/ffmpeg-devel

This inbox may be cloned and mirrored by anyone:

	git clone --mirror https://master.gitmailbox.com/ffmpegdev/0 ffmpegdev/git/0.git

	# If you have public-inbox 1.1+ installed, you may
	# initialize and index your mirror using the following commands:
	public-inbox-init -V2 ffmpegdev ffmpegdev/ https://master.gitmailbox.com/ffmpegdev \
		ffmpegdev@gitmailbox.com
	public-inbox-index ffmpegdev

Example config snippet for mirrors.


AGPL code for this site: git clone https://public-inbox.org/public-inbox.git