From mboxrd@z Thu Jan  1 00:00:00 1970
Return-Path: <ffmpeg-devel-bounces@ffmpeg.org>
Received: from ffbox0-bg.mplayerhq.hu (ffbox0-bg.ffmpeg.org [79.124.17.100])
	by master.gitmailbox.com (Postfix) with ESMTP id 707B343FB7
	for <ffmpegdev@gitmailbox.com>; Tue, 23 Aug 2022 19:04:22 +0000 (UTC)
Received: from [127.0.1.1] (localhost [127.0.0.1])
	by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTP id 2B1C468B9CB;
	Tue, 23 Aug 2022 22:04:15 +0300 (EEST)
Received: from mout.gmx.net (mout.gmx.net [212.227.15.19])
 by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTPS id 71C8E68B9C8
 for <ffmpeg-devel@ffmpeg.org>; Tue, 23 Aug 2022 22:04:08 +0300 (EEST)
DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=gmx.net;
 s=badeba3b8450; t=1661281447;
 bh=V93ic9gnpqW79zuBnuFoKGkmOoiABM6pcY3Kbs0vlZ4=;
 h=X-UI-Sender-Class:From:To:Cc:Subject:Date:In-Reply-To:References;
 b=D0eRKmo+TaAJGTo/mgBkbgwgT7pI2/3bvH5BgKgaXB5RlinpcvdtSY/0Z3GoLSp1P
 BWc8+fS6aZ2Ht0U2yTVjdQPnUram4AtFoVjUk9YIQtzAbwSOUUxuqJcV4mT/jAJJ2J
 TDNDkrG8KF1CPNS1M+Z7Ubr5lM1q8Xay0im3q4OE=
X-UI-Sender-Class: 01bb95c1-4bf8-414a-932a-4f6e2808ef9c
Received: from localhost.localdomain ([94.134.107.162]) by mail.gmx.net
 (mrgmx004 [212.227.17.190]) with ESMTPSA (Nemesis) id
 1Ml6mE-1p9lpj31FU-00lRBs; Tue, 23 Aug 2022 21:04:07 +0200
From: Lukas Fellechner <lukas.fellechner@gmx.net>
To: ffmpeg-devel@ffmpeg.org
Date: Tue, 23 Aug 2022 21:03:25 +0200
Message-Id: <20220823190326.249-3-lukas.fellechner@gmx.net>
X-Mailer: git-send-email 2.31.1.windows.1
In-Reply-To: <20220823190326.249-1-lukas.fellechner@gmx.net>
References: <trinity-36a68f08-f239-4450-b893-af6bfa783181-1661031307501@3c-app-gmx-bs35>
 <20220823190326.249-1-lukas.fellechner@gmx.net>
MIME-Version: 1.0
X-Provags-ID: V03:K1:G56h+r4vSBVqAa6adNqkBFSFbm1lPHaIjef5LNRyHIKmdChjXZa
 bC3+RCsLA8IlKDeUDP/DECPPehK3eBCgNUXeYI/YQ2ZoVb6u5bM4UWrblsc5HPTKlWWWgZZ
 lARl4OcJ9mnq4IpeSaRB4iRmxoRThUZEPlhAYuG9MLZsQe5k2wpCew8n3Lc/HHzpbOcfLKo
 a23zLEbJnPefAabD9c2Ng==
X-Spam-Flag: NO
X-UI-Out-Filterresults: notjunk:1;V03:K0:kCGSDhhcJEY=:UOfuDFJPwyF9cglbcKt/kQ
 KNMk3gA+JZkX3ikpABL969SIY0jgale2u2W1Qrts8HW7VKo3vdcWoAwLcprtktzcWMVqfBxvG
 NiFy+79FOoKp+OPm9yFOSTGNj+JdH71BTyqQfCznxyy00PuL27F0jvDAFCTrj8+AMPBN+Gdmv
 k0pG4TNaHaXSSWhlJLFVlzZRX6wTBajXvrCmUcxQ1IGzVl/MUkxzRNbvUqbkf+VL8CGS4Mg2l
 JSMT/NhrJqWdj9mcKo6IgQJu8KwSgaWDv2lu3jx2esMzeUJmcJF5ADm1XSM1ze2UlaevKtb7J
 XNZxHIBwhGvmHguh+4oy81IqTH2K8JZI1tljwRYiGKqDqaSahjNFNsvJLcux+codUBdIMraL3
 GgudK/Rft4nAcQSYapMQTKXvZ4xNbePAUfJd23xUyvaVjCjCcAEh/d3lAJGlAmUVvVngloaoT
 tpbEpB0WxPpGEjBDXo5y/ZwNc/km62zcCu8U5gr74qtzMr2byvcwg33C+VweSO6bTT42n0BXa
 ui5G6TJ3XbcL6BvaHvkB8ASbX4GC5dxe2BMoFypPqkWcXYCBGeVSk+H4xyDCCBq6ycrKtlbRE
 sFUPAYjbl9vp8ecV/GDW3G4+hn1DDJoe0dqJuauDS3HJTd0H8UTnVmQDM9nACgd3iNBZD89kF
 DdLdR4ozYDszXr9bPljgVfwhRvK2weEl02PIBooQ7E2DfRRq6O9VfKzEV5OU8p1a8zdcgadOb
 QjccPJn92K4fOEr8voZlLuURHeNbGVG5EgKFRcoCdr1MEd46bqvL2tniMS60yWb55rmxf/iIY
 8/JI0w/xOHDb37bnwqpYIsQPAXPANqJluPjj5MhMMg3hV7dFO7/M+tz5nNiYgkddbMsQxFjtT
 aPky0EjOE74m5hdGpz6k6acRSDMXHc7UN5WAGYKSHI6DW9/Y1ar/6V6nQ/FPhgzkNiWBs9XXH
 KpYGuCLIgmZx5LBblXq7ZahcUQvuAkIy//h2n6gAA4LCwHi9Wn4DvisvKhTpPs5tJQ3K+BNyY
 QQeoU1Eby1QAyCKt7Ci/5Js03WhaJ6HLbL49GaQNQf+Shy0ceilFvW4tXunh/ZJ8Ni2VOepMh
 VVJQjvemaPyxJjC0qhMU/ac6+mq2OtSscJcuUMdrtp94ZtX7J/PoATffQ==
Subject: [FFmpeg-devel] [PATCH v3 2/3] lavf/dashdec: Multithreaded DASH
 initialization
X-BeenThere: ffmpeg-devel@ffmpeg.org
X-Mailman-Version: 2.1.29
Precedence: list
List-Id: FFmpeg development discussions and patches <ffmpeg-devel.ffmpeg.org>
List-Unsubscribe: <https://ffmpeg.org/mailman/options/ffmpeg-devel>,
 <mailto:ffmpeg-devel-request@ffmpeg.org?subject=unsubscribe>
List-Archive: <https://ffmpeg.org/pipermail/ffmpeg-devel>
List-Post: <mailto:ffmpeg-devel@ffmpeg.org>
List-Help: <mailto:ffmpeg-devel-request@ffmpeg.org?subject=help>
List-Subscribe: <https://ffmpeg.org/mailman/listinfo/ffmpeg-devel>,
 <mailto:ffmpeg-devel-request@ffmpeg.org?subject=subscribe>
Reply-To: FFmpeg development discussions and patches <ffmpeg-devel@ffmpeg.org>
Cc: Lukas Fellechner <lukas.fellechner@gmx.net>
Content-Type: text/plain; charset="us-ascii"
Content-Transfer-Encoding: 7bit
Errors-To: ffmpeg-devel-bounces@ffmpeg.org
Sender: "ffmpeg-devel" <ffmpeg-devel-bounces@ffmpeg.org>
Archived-At: <https://master.gitmailbox.com/ffmpegdev/20220823190326.249-3-lukas.fellechner@gmx.net/>
List-Archive: <https://master.gitmailbox.com/ffmpegdev/>
List-Post: <mailto:ffmpegdev@gitmailbox.com>

This patch adds an "init-threads" option, specifying the max
number of threads to use. Multiple worker threads are spun up
to massively bring down init times.
---
 libavformat/dashdec.c | 351 +++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 350 insertions(+), 1 deletion(-)

diff --git a/libavformat/dashdec.c b/libavformat/dashdec.c
index e82da45e43..20f2557ea3 100644
--- a/libavformat/dashdec.c
+++ b/libavformat/dashdec.c
@@ -24,6 +24,7 @@
 #include "libavutil/opt.h"
 #include "libavutil/time.h"
 #include "libavutil/parseutils.h"
+#include "libavutil/thread.h"
 #include "internal.h"
 #include "avio_internal.h"
 #include "dash.h"
@@ -152,6 +153,8 @@ typedef struct DASHContext {
     int max_url_size;
     char *cenc_decryption_key;

+    int init_threads;
+
     /* Flags for init section*/
     int is_init_section_common_video;
     int is_init_section_common_audio;
@@ -2033,6 +2036,331 @@ static void move_metadata(AVStream *st, const char *key, char **value)
     }
 }

+#if HAVE_THREADS
+
+struct work_pool_data
+{
+    AVFormatContext *ctx;
+    struct representation *pls;
+    struct representation *common_pls;
+    pthread_mutex_t *common_mutex;
+    pthread_cond_t *common_condition;
+    int is_common;
+    int is_started;
+    int result;
+};
+
+struct thread_data
+{
+    pthread_t thread;
+    pthread_mutex_t *mutex;
+    struct work_pool_data *work_pool;
+    int work_pool_size;
+    int is_started;
+    int has_error;
+};
+
+static void *worker_thread(void *ptr)
+{
+    int ret = 0;
+    int i;
+    struct thread_data *thread_data = (struct thread_data*)ptr;
+    struct work_pool_data *work_pool = NULL;
+    struct work_pool_data *data = NULL;
+    for (;;) {
+
+        // get next work item unless there was an error
+        pthread_mutex_lock(thread_data->mutex);
+        data = NULL;
+        if (!thread_data->has_error) {
+            work_pool = thread_data->work_pool;
+            for (i = 0; i < thread_data->work_pool_size; i++) {
+                if (!work_pool->is_started) {
+                    data = work_pool;
+                    data->is_started = 1;
+                    break;
+                }
+                work_pool++;
+            }
+        }
+        pthread_mutex_unlock(thread_data->mutex);
+
+        if (!data) {
+            // no more work to do
+            return NULL;
+        }
+
+        // if we are common section provider, init and signal
+        if (data->is_common) {
+            data->pls->parent = data->ctx;
+            ret = update_init_section(data->pls);
+            if (ret < 0) {
+                pthread_cond_signal(data->common_condition);
+                goto end;
+            }
+            else
+                ret = AVERROR(pthread_cond_signal(data->common_condition));
+        }
+
+        // if we depend on common section provider, wait for signal and copy
+        if (data->common_pls) {
+            ret = AVERROR(pthread_cond_wait(data->common_condition, data->common_mutex));
+            if (ret < 0)
+                goto end;
+
+            if (!data->common_pls->init_sec_buf) {
+                goto end;
+                ret = AVERROR(EFAULT);
+            }
+
+            ret = copy_init_section(data->pls, data->common_pls);
+            if (ret < 0)
+                goto end;
+        }
+
+        ret = begin_open_demux_for_component(data->ctx, data->pls);
+        if (ret < 0)
+            goto end;
+
+    end:
+        data->result = ret;
+
+        // notify error to other threads and exit
+        if (ret < 0) {
+            pthread_mutex_lock(thread_data->mutex);
+            thread_data->has_error = 1;
+            pthread_mutex_unlock(thread_data->mutex);
+            return NULL;
+        }
+    }
+
+
+    return NULL;
+}
+
+static void create_work_pool_data(AVFormatContext *ctx, int stream_index,
+    struct representation *pls, struct representation *common_pls,
+    struct work_pool_data *init_data, pthread_mutex_t *common_mutex,
+    pthread_cond_t *common_condition)
+{
+    init_data->ctx = ctx;
+    init_data->pls = pls;
+    init_data->pls->stream_index = stream_index;
+    init_data->common_condition = common_condition;
+    init_data->common_mutex = common_mutex;
+    init_data->result = -1;
+
+    if (pls == common_pls) {
+        init_data->is_common = 1;
+    }
+    else if (common_pls) {
+        init_data->common_pls = common_pls;
+    }
+}
+
+static int start_thread(struct thread_data *thread_data,
+    struct work_pool_data *work_pool, int work_pool_size, pthread_mutex_t *mutex)
+{
+    int ret;
+
+    thread_data->mutex = mutex;
+    thread_data->work_pool = work_pool;
+    thread_data->work_pool_size = work_pool_size;
+
+    ret = AVERROR(pthread_create(&thread_data->thread, NULL, worker_thread, (void*)thread_data));
+    if (ret == 0)
+        thread_data->is_started = 1;
+
+    return ret;
+}
+
+static int init_streams_multithreaded(AVFormatContext *s, int nstreams, int threads)
+{
+    DASHContext *c = s->priv_data;
+    int ret = 0;
+    int stream_index = 0;
+    int i;
+
+    // alloc data
+    struct work_pool_data *init_data = (struct work_pool_data*)av_mallocz(sizeof(struct work_pool_data) * nstreams);
+    if (!init_data)
+        return AVERROR(ENOMEM);
+
+    struct thread_data *thread_data = (struct thread_data*)av_mallocz(sizeof(struct thread_data) * threads);
+    if (!thread_data)
+        return AVERROR(ENOMEM);
+
+    // alloc mutex and conditions
+    pthread_mutex_t work_pool_mutex;
+
+    pthread_mutex_t common_video_mutex;
+    pthread_cond_t common_video_cond;
+
+    pthread_mutex_t common_audio_mutex;
+    pthread_cond_t common_audio_cond;
+
+    pthread_mutex_t common_subtitle_mutex;
+    pthread_cond_t common_subtitle_cond;
+
+    // init mutex and conditions
+    ret = AVERROR(pthread_mutex_init(&work_pool_mutex, NULL));
+    if (ret < 0)
+        goto cleanup;
+
+    if (c->is_init_section_common_video) {
+        ret = AVERROR(pthread_mutex_init(&common_video_mutex, NULL));
+        if (ret < 0)
+            goto cleanup;
+
+        ret = AVERROR(pthread_cond_init(&common_video_cond, NULL));
+        if (ret < 0)
+            goto cleanup;
+    }
+
+    if (c->is_init_section_common_audio) {
+        ret = AVERROR(pthread_mutex_init(&common_audio_mutex, NULL));
+        if (ret < 0)
+            goto cleanup;
+
+        ret = AVERROR(pthread_cond_init(&common_audio_cond, NULL));
+        if (ret < 0)
+            goto cleanup;
+    }
+
+    if (c->is_init_section_common_subtitle) {
+        ret = AVERROR(pthread_mutex_init(&common_subtitle_mutex, NULL));
+        if (ret < 0)
+            goto cleanup;
+
+        ret = AVERROR(pthread_cond_init(&common_subtitle_cond, NULL));
+        if (ret < 0)
+            goto cleanup;
+    }
+
+    // init work pool data
+    struct work_pool_data* current_data = init_data;
+
+    for (i = 0; i < c->n_videos; i++) {
+        create_work_pool_data(s, stream_index, c->videos[i],
+            c->is_init_section_common_video ? c->videos[0] : NULL,
+            current_data, &common_video_mutex, &common_video_cond);
+
+        stream_index++;
+        current_data++;
+    }
+
+    for (i = 0; i < c->n_audios; i++) {
+        create_work_pool_data(s, stream_index, c->audios[i],
+            c->is_init_section_common_audio ? c->audios[0] : NULL,
+            current_data, &common_audio_mutex, &common_audio_cond);
+
+        stream_index++;
+        current_data++;
+    }
+
+    for (i = 0; i < c->n_subtitles; i++) {
+        create_work_pool_data(s, stream_index, c->subtitles[i],
+            c->is_init_section_common_subtitle ? c->subtitles[0] : NULL,
+            current_data, &common_subtitle_mutex, &common_subtitle_cond);
+
+        stream_index++;
+        current_data++;
+    }
+
+    // start threads
+    struct thread_data *current_thread = thread_data;
+    for (i = 0; i < threads; i++) {
+        ret = start_thread(current_thread, init_data, nstreams, &work_pool_mutex);
+        if (ret < 0)
+            goto cleanup;
+
+        current_thread++;
+    }
+
+cleanup:
+    // we need to cleanup even in case of errors, so we need to store results of init, run and cleanup
+    int initResult = ret;
+    int runResult = 0;
+    int cleanupResult = 0;
+
+    // join threads
+    current_thread = thread_data;
+    for (i = 0; i < threads; i++) {
+        if (current_thread->is_started) {
+            ret = AVERROR(pthread_join(current_thread->thread, NULL));
+            if (ret < 0)
+                cleanupResult = ret;
+        }
+        current_thread++;
+    }
+
+    // finalize streams and collect results
+    current_data = init_data;
+    for (i = 0; i < nstreams; i++) {
+        if (current_data->result < 0) {
+            // thread ran into error: collect result and break
+            runResult = current_data->result;
+            break;
+        }
+        else {
+            // thread success: create streams on AVFormatContext
+            ret = end_open_demux_for_component(s, current_data->pls);
+            if (ret < 0)
+                runResult = ret;
+        }
+        current_data++;
+    }
+
+    // cleanup mutex and conditions
+    ret = AVERROR(pthread_mutex_destroy(&work_pool_mutex));
+    if (ret < 0)
+        cleanupResult = ret;
+
+    if (c->is_init_section_common_video) {
+        ret = AVERROR(pthread_mutex_destroy(&common_video_mutex));
+        if (ret < 0)
+            cleanupResult = ret;
+
+        ret = AVERROR(pthread_cond_destroy(&common_video_cond));
+        if (ret < 0)
+            cleanupResult = ret;
+    }
+
+    if (c->is_init_section_common_audio) {
+        ret = AVERROR(pthread_mutex_destroy(&common_audio_mutex));
+        if (ret < 0)
+            cleanupResult = ret;
+
+        ret = AVERROR(pthread_cond_destroy(&common_audio_cond));
+        if (ret < 0)
+            cleanupResult = ret;
+    }
+
+    if (c->is_init_section_common_subtitle) {
+        ret = AVERROR(pthread_mutex_destroy(&common_subtitle_mutex));
+        if (ret < 0)
+            cleanupResult = ret;
+
+        ret = AVERROR(pthread_cond_destroy(&common_subtitle_cond));
+        if (ret < 0)
+            cleanupResult = ret;
+    }
+
+    // return results if errors have occured in one of the phases
+    if (initResult < 0)
+        return initResult;
+
+    if (runResult < 0)
+        return runResult;
+
+    if (cleanupResult < 0)
+        return cleanupResult;
+
+    return 0;
+}
+
+#endif
+
 static int dash_read_header(AVFormatContext *s)
 {
     DASHContext *c = s->priv_data;
@@ -2067,6 +2395,23 @@ static int dash_read_header(AVFormatContext *s)
     if (c->n_subtitles)
         c->is_init_section_common_subtitle = is_common_init_section_exist(c->subtitles, c->n_subtitles);

+    int threads = 0;
+    int nstreams = c->n_videos + c->n_audios + c->n_subtitles;
+
+#if HAVE_THREADS
+    threads = FFMIN(nstreams, c->init_threads);
+#endif
+
+    if (threads > 1)
+    {
+#if HAVE_THREADS
+        ret = init_streams_multithreaded(s, nstreams, threads);
+        if (ret < 0)
+            return ret;
+#endif
+    }
+    else
+    {
     /* Open the demuxer for video and audio components if available */
     for (i = 0; i < c->n_videos; i++) {
         rep = c->videos[i];
@@ -2115,6 +2460,7 @@ static int dash_read_header(AVFormatContext *s)

     if (!stream_index)
         return AVERROR_INVALIDDATA;
+    }

     /* Create a program */
     program = av_new_program(s, 0);
@@ -2366,7 +2712,10 @@ static const AVOption dash_options[] = {
         OFFSET(allowed_extensions), AV_OPT_TYPE_STRING,
         {.str = "aac,m4a,m4s,m4v,mov,mp4,webm,ts"},
         INT_MIN, INT_MAX, FLAGS},
-    { "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key), AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS },
+    { "cenc_decryption_key", "Media decryption key (hex)", OFFSET(cenc_decryption_key),
+        AV_OPT_TYPE_STRING, {.str = NULL}, INT_MIN, INT_MAX, .flags = FLAGS },
+    { "init_threads", "Number of threads to use for initializing the DASH stream",
+        OFFSET(init_threads), AV_OPT_TYPE_INT, {.i64 = 0}, 0, 64, FLAGS },
     {NULL}
 };

--
2.31.1.windows.1

_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel

To unsubscribe, visit link above, or email
ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".