Git Inbox Mirror of the ffmpeg-devel mailing list - see https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
 help / color / mirror / Atom feed
* [FFmpeg-devel] [PATCH 1/2] avcodec/thread: add support for frame threading receive_frame based decoders
@ 2022-12-05 13:39 Timo Rothenpieler
  2022-12-05 13:39 ` [FFmpeg-devel] [PATCH 2/2] avcodec/mjpegdec: add support for frame threading Timo Rothenpieler
  2022-12-06 14:37 ` [FFmpeg-devel] [PATCH 1/2] avcodec/thread: add support for frame threading receive_frame based decoders Anton Khirnov
  0 siblings, 2 replies; 13+ messages in thread
From: Timo Rothenpieler @ 2022-12-05 13:39 UTC (permalink / raw)
  To: ffmpeg-devel; +Cc: Timo Rothenpieler

This is fairly basic and makes a lot of assumptions, but it works
for the most simple cases.

For one, it only ever fetches exactly one packet per call to receive_frame.
Right now it's impossible for there to ever be more than one, but the API
allows for more, which might need handled in the future.

It also basically translates the new API back to the old, since that's how
the frame threading code operates. Which feels backwards in regards to
the new API, but it was the path with least resistance in implementing this.
---
 libavcodec/decode.c        |  6 +++-
 libavcodec/pthread_frame.c | 68 ++++++++++++++++++++++++++++++++++++--
 libavcodec/thread.h        | 17 ++++++++++
 3 files changed, 88 insertions(+), 3 deletions(-)

diff --git a/libavcodec/decode.c b/libavcodec/decode.c
index 6be2d3d6ed..72a8253aae 100644
--- a/libavcodec/decode.c
+++ b/libavcodec/decode.c
@@ -577,7 +577,11 @@ static int decode_receive_frame_internal(AVCodecContext *avctx, AVFrame *frame)
     av_assert0(!frame->buf[0]);
 
     if (codec->cb_type == FF_CODEC_CB_TYPE_RECEIVE_FRAME) {
-        ret = codec->cb.receive_frame(avctx, frame);
+        if (HAVE_THREADS && avctx->active_thread_type & FF_THREAD_FRAME)
+            ret = ff_thread_receive_frame(avctx, frame);
+        else
+            ret = codec->cb.receive_frame(avctx, frame);
+
         if (ret != AVERROR(EAGAIN))
             av_packet_unref(avci->last_pkt_props);
     } else
diff --git a/libavcodec/pthread_frame.c b/libavcodec/pthread_frame.c
index df82a4125f..8f704e35d3 100644
--- a/libavcodec/pthread_frame.c
+++ b/libavcodec/pthread_frame.c
@@ -92,6 +92,7 @@ typedef struct PerThreadContext {
     AVCodecContext *avctx;          ///< Context used to decode packets passed to this thread.
 
     AVPacket       *avpkt;          ///< Input packet (for decoding) or output (for encoding).
+    int             avpkt_read;     ///< Indicates if the packet has been read for this recv_frame call already.
 
     AVFrame *frame;                 ///< Output frame (for decoding) or input (for encoding).
     int     got_frame;              ///< The output of got_picture_ptr from the last avcodec_decode_video() call.
@@ -237,8 +238,14 @@ FF_ENABLE_DEPRECATION_WARNINGS
         }
 
         av_frame_unref(p->frame);
-        p->got_frame = 0;
-        p->result = codec->cb.decode(avctx, p->frame, &p->got_frame, p->avpkt);
+        if (codec->cb_type == FF_CODEC_CB_TYPE_RECEIVE_FRAME) {
+            p->avpkt_read = 0;
+            p->result = codec->cb.receive_frame(avctx, p->frame);
+            p->got_frame = !p->result;
+        } else {
+            p->got_frame = 0;
+            p->result = codec->cb.decode(avctx, p->frame, &p->got_frame, p->avpkt);
+        }
 
         if ((p->result < 0 || !p->got_frame) && p->frame->buf[0])
             ff_thread_release_buffer(avctx, p->frame);
@@ -621,6 +628,58 @@ finish:
     return err;
 }
 
+int ff_thread_receive_frame(AVCodecContext *avctx, AVFrame *frame)
+{
+    AVPacket *const avpkt = avctx->internal->in_pkt;
+    int got_picture = 0;
+    int draining = 0;
+    int ret;
+
+    av_packet_unref(avpkt);
+    ret = ff_decode_get_packet(avctx, avpkt);
+    if (ret < 0 && ret != AVERROR_EOF)
+        return ret;
+    draining = ret == AVERROR_EOF;
+
+    ret = ff_thread_decode_frame(avctx, frame, &got_picture, avpkt);
+
+    if (ret == avpkt->size) {
+        if (got_picture) {
+            return 0;
+        } else if (draining) {
+            return AVERROR_EOF;
+        }
+
+        return AVERROR(EAGAIN);
+    } else if (ret < 0) {
+        return ret;
+    }
+
+    return AVERROR_BUG;
+}
+
+int ff_thread_decode_get_packet(AVCodecContext *avctx, AVPacket *pkt)
+{
+    PerThreadContext *p;
+    int err;
+
+    if (!(avctx->active_thread_type & FF_THREAD_FRAME))
+        return ff_decode_get_packet(avctx, pkt);
+
+    p = avctx->internal->thread_ctx;
+
+    if (p->avpkt_read)
+        return AVERROR(EAGAIN);
+
+    err = av_packet_ref(pkt, p->avpkt);
+    if (err < 0)
+        return err;
+
+    p->avpkt_read = 1;
+
+    return 0;
+}
+
 void ff_thread_report_progress(ThreadFrame *f, int n, int field)
 {
     PerThreadContext *p;
@@ -775,6 +834,7 @@ void ff_frame_thread_free(AVCodecContext *avctx, int thread_count)
             av_freep(&ctx->slice_offset);
 
             av_buffer_unref(&ctx->internal->pool);
+            av_packet_free(&ctx->internal->in_pkt);
             av_freep(&ctx->internal);
             av_buffer_unref(&ctx->hw_frames_ctx);
         }
@@ -826,6 +886,10 @@ static av_cold int init_thread(PerThreadContext *p, int *threads_to_free,
         return AVERROR(ENOMEM);
     copy->internal->thread_ctx = p;
 
+    copy->internal->in_pkt = av_packet_alloc();
+    if (!copy->internal->in_pkt)
+        return AVERROR(ENOMEM);
+
     copy->delay = avctx->delay;
 
     if (codec->priv_data_size) {
diff --git a/libavcodec/thread.h b/libavcodec/thread.h
index d5673f25ea..76e7d44bd4 100644
--- a/libavcodec/thread.h
+++ b/libavcodec/thread.h
@@ -52,6 +52,15 @@ void ff_thread_flush(AVCodecContext *avctx);
 int ff_thread_decode_frame(AVCodecContext *avctx, AVFrame *picture,
                            int *got_picture_ptr, AVPacket *avpkt);
 
+/**
+ * Receive a new frame from a decoding thread.
+ * Returns the next available frame, or AVERROR(EAGAIN) if
+ * none is available.
+ *
+ * Parameters are the same as FFCodec.receive_frame.
+ */
+int ff_thread_receive_frame(AVCodecContext *avctx, AVFrame *frame);
+
 /**
  * If the codec defines update_thread_context(), call this
  * when they are ready for the next thread to start decoding
@@ -99,6 +108,14 @@ int ff_thread_get_buffer(AVCodecContext *avctx, AVFrame *f, int flags);
  */
 void ff_thread_release_buffer(AVCodecContext *avctx, AVFrame *f);
 
+/**
+ * Wrapper around ff_decode_get_packet() for frame-multithreaded codecs.
+ * Call this functions instead of ff_decode_get_packet().
+ *
+ * Parameters are the same as ff_decode_get_packet().
+ */
+int ff_thread_decode_get_packet(AVCodecContext *avctx, AVPacket *pkt);
+
 int ff_thread_init(AVCodecContext *s);
 int ff_slice_thread_execute_with_mainfunc(AVCodecContext *avctx,
         int (*action_func2)(AVCodecContext *c, void *arg, int jobnr, int threadnr),
-- 
2.34.1

_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel

To unsubscribe, visit link above, or email
ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".

^ permalink raw reply	[flat|nested] 13+ messages in thread
* [FFmpeg-devel] [PATCH 1/2] lavc: convert frame threading to the receive_frame() pattern
@ 2022-12-07 11:43 Timo Rothenpieler
  2022-12-07 11:43 ` [FFmpeg-devel] [PATCH 2/2] avcodec/mjpegdec: add support for frame threading Timo Rothenpieler
  0 siblings, 1 reply; 13+ messages in thread
From: Timo Rothenpieler @ 2022-12-07 11:43 UTC (permalink / raw)
  To: ffmpeg-devel; +Cc: Anton Khirnov

From: Anton Khirnov <anton@khirnov.net>

Reorganize the code such that the frame threading code does not call the
decoders directly, but instead calls back into the generic decoding
code. This avoids duplicating the logic that wraps the decoder
invocation and will be useful in the following commits.
---
 libavcodec/decode.c        |  57 +++++---
 libavcodec/decode.h        |   7 +
 libavcodec/internal.h      |   7 +
 libavcodec/pthread_frame.c | 276 ++++++++++++++++++++++++-------------
 libavcodec/thread.h        |  18 +--
 5 files changed, 241 insertions(+), 124 deletions(-)

diff --git a/libavcodec/decode.c b/libavcodec/decode.c
index 6be2d3d6ed..bf3c0cbe0a 100644
--- a/libavcodec/decode.c
+++ b/libavcodec/decode.c
@@ -202,6 +202,10 @@ fail:
     return ret;
 }
 
+#if !HAVE_THREADS
+#define ff_thread_get_packet(avctx, pkt) (AVERROR_BUG)
+#endif
+
 int ff_decode_get_packet(AVCodecContext *avctx, AVPacket *pkt)
 {
     AVCodecInternal *avci = avctx->internal;
@@ -210,7 +214,14 @@ int ff_decode_get_packet(AVCodecContext *avctx, AVPacket *pkt)
     if (avci->draining)
         return AVERROR_EOF;
 
-    ret = av_bsf_receive_packet(avci->bsf, pkt);
+    /* If we are a worker thread, get the next packet from the threading
+     * context. Otherwise we are the main (user-facing) context, so we get the
+     * next packet from the input filterchain.
+     */
+    if (avctx->internal->is_frame_mt)
+        ret = ff_thread_get_packet(avctx, pkt);
+    else
+        ret = av_bsf_receive_packet(avci->bsf, pkt);
     if (ret == AVERROR_EOF)
         avci->draining = 1;
     if (ret < 0)
@@ -295,30 +306,25 @@ static inline int decode_simple_internal(AVCodecContext *avctx, AVFrame *frame,
         return AVERROR_EOF;
 
     if (!pkt->data &&
-        !(avctx->codec->capabilities & AV_CODEC_CAP_DELAY ||
-          avctx->active_thread_type & FF_THREAD_FRAME))
+        !(avctx->codec->capabilities & AV_CODEC_CAP_DELAY))
         return AVERROR_EOF;
 
     got_frame = 0;
 
-    if (HAVE_THREADS && avctx->active_thread_type & FF_THREAD_FRAME) {
-        ret = ff_thread_decode_frame(avctx, frame, &got_frame, pkt);
-    } else {
-        ret = codec->cb.decode(avctx, frame, &got_frame, pkt);
-
-        if (!(codec->caps_internal & FF_CODEC_CAP_SETS_PKT_DTS))
-            frame->pkt_dts = pkt->dts;
-        if (avctx->codec->type == AVMEDIA_TYPE_VIDEO) {
-            if(!avctx->has_b_frames)
-                frame->pkt_pos = pkt->pos;
-            //FIXME these should be under if(!avctx->has_b_frames)
-            /* get_buffer is supposed to set frame parameters */
-            if (!(avctx->codec->capabilities & AV_CODEC_CAP_DR1)) {
-                if (!frame->sample_aspect_ratio.num)  frame->sample_aspect_ratio = avctx->sample_aspect_ratio;
-                if (!frame->width)                    frame->width               = avctx->width;
-                if (!frame->height)                   frame->height              = avctx->height;
-                if (frame->format == AV_PIX_FMT_NONE) frame->format              = avctx->pix_fmt;
-            }
+    ret = codec->cb.decode(avctx, frame, &got_frame, pkt);
+
+    if (!(codec->caps_internal & FF_CODEC_CAP_SETS_PKT_DTS))
+        frame->pkt_dts = pkt->dts;
+    if (avctx->codec->type == AVMEDIA_TYPE_VIDEO) {
+        if(!avctx->has_b_frames)
+            frame->pkt_pos = pkt->pos;
+        //FIXME these should be under if(!avctx->has_b_frames)
+        /* get_buffer is supposed to set frame parameters */
+        if (!(avctx->codec->capabilities & AV_CODEC_CAP_DR1)) {
+            if (!frame->sample_aspect_ratio.num)  frame->sample_aspect_ratio = avctx->sample_aspect_ratio;
+            if (!frame->width)                    frame->width               = avctx->width;
+            if (!frame->height)                   frame->height              = avctx->height;
+            if (frame->format == AV_PIX_FMT_NONE) frame->format              = avctx->pix_fmt;
         }
     }
     emms_c();
@@ -568,7 +574,7 @@ static int decode_simple_receive_frame(AVCodecContext *avctx, AVFrame *frame)
     return 0;
 }
 
-static int decode_receive_frame_internal(AVCodecContext *avctx, AVFrame *frame)
+int ff_decode_receive_frame_internal(AVCodecContext *avctx, AVFrame *frame)
 {
     AVCodecInternal *avci = avctx->internal;
     const FFCodec *const codec = ffcodec(avctx->codec);
@@ -634,6 +640,13 @@ FF_ENABLE_DEPRECATION_WARNINGS
     return ret;
 }
 
+static int decode_receive_frame_internal(AVCodecContext *avctx, AVFrame *frame)
+{
+    if (avctx->active_thread_type & FF_THREAD_FRAME)
+        return ff_thread_receive_frame(avctx, frame);
+    return ff_decode_receive_frame_internal(avctx, frame);
+}
+
 int attribute_align_arg avcodec_send_packet(AVCodecContext *avctx, const AVPacket *avpkt)
 {
     AVCodecInternal *avci = avctx->internal;
diff --git a/libavcodec/decode.h b/libavcodec/decode.h
index 5d95369b5e..34beb70f97 100644
--- a/libavcodec/decode.h
+++ b/libavcodec/decode.h
@@ -58,6 +58,13 @@ typedef struct FrameDecodeData {
  */
 int ff_decode_receive_frame(AVCodecContext *avctx, AVFrame *frame);
 
+/**
+ * Do the actual decoding and obtain a decoded frame from the decoder, if
+ * available. When frame threading is used, this is invoked by the worker
+ * threads, otherwise by the top layer directly.
+ */
+int ff_decode_receive_frame_internal(AVCodecContext *avctx, AVFrame *frame);
+
 /**
  * Called by decoders to get the next packet for decoding.
  *
diff --git a/libavcodec/internal.h b/libavcodec/internal.h
index 76a6ea6bc6..99e4bb3095 100644
--- a/libavcodec/internal.h
+++ b/libavcodec/internal.h
@@ -56,6 +56,13 @@ typedef struct AVCodecInternal {
      */
     int is_copy;
 
+    /**
+     * This field is set to 1 when frame threading is being used and the parent
+     * AVCodecContext of this AVCodecInternal is a worker-thread context (i.e.
+     * one of those actually doing the decoding), 0 otherwise.
+     */
+    int is_frame_mt;
+
     /**
      * An audio frame with less than required samples has been submitted (and
      * potentially padded with silence). Reject all subsequent frames.
diff --git a/libavcodec/pthread_frame.c b/libavcodec/pthread_frame.c
index df82a4125f..08550fc728 100644
--- a/libavcodec/pthread_frame.c
+++ b/libavcodec/pthread_frame.c
@@ -46,6 +46,7 @@
 #include "libavutil/log.h"
 #include "libavutil/mem.h"
 #include "libavutil/opt.h"
+#include "libavutil/fifo.h"
 #include "libavutil/thread.h"
 
 enum {
@@ -73,6 +74,12 @@ enum {
     INITIALIZED,    ///< Thread has been properly set up
 };
 
+typedef struct DecodedFrames {
+    AVFrame  **f;
+    size_t  nb_f;
+    size_t  nb_f_allocated;
+} DecodedFrames;
+
 /**
  * Context used by codec threads and stored in their AVCodecInternal thread_ctx.
  */
@@ -93,8 +100,10 @@ typedef struct PerThreadContext {
 
     AVPacket       *avpkt;          ///< Input packet (for decoding) or output (for encoding).
 
-    AVFrame *frame;                 ///< Output frame (for decoding) or input (for encoding).
-    int     got_frame;              ///< The output of got_picture_ptr from the last avcodec_decode_video() call.
+    /**
+     * Decoded frames from a single decode iteration.
+     */
+    DecodedFrames df;
     int     result;                 ///< The result of the last codec decode/encode() call.
 
     atomic_int state;
@@ -141,6 +150,14 @@ typedef struct FrameThreadContext {
     pthread_cond_t async_cond;
     int async_lock;
 
+    DecodedFrames df;
+    int result;
+
+    /**
+     * Packet to be submitted to the next thread for decoding.
+     */
+    AVPacket *next_pkt;
+
     int next_decoding;             ///< The next context to submit a packet to.
     int next_finished;             ///< The next context to return output from.
 
@@ -190,6 +207,51 @@ static void thread_set_name(PerThreadContext *p)
     ff_thread_setname(name);
 }
 
+// get a free frame to decode into
+static AVFrame *decoded_frames_get_free(DecodedFrames *df)
+{
+    if (df->nb_f == df->nb_f_allocated) {
+        AVFrame **tmp = av_realloc_array(df->f, df->nb_f + 1,
+                                         sizeof(*df->f));
+        if (!tmp)
+            return NULL;
+        df->f = tmp;
+
+        df->f[df->nb_f] = av_frame_alloc();
+        if (!df->f[df->nb_f])
+            return NULL;
+
+        df->nb_f_allocated++;
+    }
+
+    av_frame_unref(df->f[df->nb_f]);
+    return df->f[df->nb_f];
+}
+
+static void decoded_frames_pop(DecodedFrames *df, AVFrame *dst)
+{
+    AVFrame *tmp_frame = df->f[0];
+    av_frame_move_ref(dst, tmp_frame);
+    memmove(df->f, df->f + 1, (df->nb_f - 1) * sizeof(*df->f));
+    df->f[--df->nb_f] = tmp_frame;
+}
+
+static void decoded_frames_flush(DecodedFrames *df)
+{
+    for (int i = 0; i < df->nb_f; i++)
+        av_frame_unref(df->f[i]);
+    df->nb_f = 0;
+}
+
+static void decoded_frames_free(DecodedFrames *df)
+{
+    for (int i = 0; i < df->nb_f_allocated; i++)
+        av_frame_free(&df->f[i]);
+    av_freep(&df->f);
+    df->nb_f           = 0;
+    df->nb_f_allocated = 0;
+}
+
 /**
  * Codec worker thread.
  *
@@ -202,6 +264,7 @@ static attribute_align_arg void *frame_worker_thread(void *arg)
     PerThreadContext *p = arg;
     AVCodecContext *avctx = p->avctx;
     const FFCodec *codec = ffcodec(avctx->codec);
+    int ret;
 
     thread_set_name(p);
 
@@ -236,16 +299,31 @@ FF_ENABLE_DEPRECATION_WARNINGS
             p->hwaccel_serializing = 1;
         }
 
-        av_frame_unref(p->frame);
-        p->got_frame = 0;
-        p->result = codec->cb.decode(avctx, p->frame, &p->got_frame, p->avpkt);
+        ret = 0;
+        while (ret >= 0) {
+            AVFrame *frame;
 
-        if ((p->result < 0 || !p->got_frame) && p->frame->buf[0])
-            ff_thread_release_buffer(avctx, p->frame);
+            /* get the frame which will store the output */
+            frame = decoded_frames_get_free(&p->df);
+            if (!frame) {
+                p->result = AVERROR(ENOMEM);
+                goto alloc_fail;
+            }
+
+            /* do the actual decoding */
+            ret = ff_decode_receive_frame_internal(avctx, frame);
+            if (ret == 0)
+                p->df.nb_f++;
+            else if (ret < 0 && frame->buf[0])
+                ff_thread_release_buffer(avctx, frame);
+
+            p->result = (ret == AVERROR(EAGAIN)) ? 0 : ret;
+        }
 
         if (atomic_load(&p->state) == STATE_SETTING_UP)
             ff_thread_finish_setup(avctx);
 
+alloc_fail:
         if (p->hwaccel_serializing) {
             /* wipe hwaccel state to avoid stale pointers lying around;
              * the state was transferred to FrameThreadContext in
@@ -433,23 +511,26 @@ static void release_delayed_buffers(PerThreadContext *p)
 #endif
 
 static int submit_packet(PerThreadContext *p, AVCodecContext *user_avctx,
-                         AVPacket *avpkt)
+                         AVPacket *in_pkt)
 {
     FrameThreadContext *fctx = p->parent;
     PerThreadContext *prev_thread = fctx->prev_thread;
-    const AVCodec *codec = p->avctx->codec;
-    int ret;
-
-    if (!avpkt->size && !(codec->capabilities & AV_CODEC_CAP_DELAY))
-        return 0;
+    int err;
 
     pthread_mutex_lock(&p->mutex);
 
-    ret = update_context_from_user(p->avctx, user_avctx);
-    if (ret) {
+    av_packet_unref(p->avpkt);
+    av_packet_move_ref(p->avpkt, in_pkt);
+
+    p->avctx->internal->draining      = user_avctx->internal->draining;
+    p->avctx->internal->draining_done = user_avctx->internal->draining_done;
+
+    err = update_context_from_user(p->avctx, user_avctx);
+    if (err < 0) {
         pthread_mutex_unlock(&p->mutex);
-        return ret;
+        return err;
     }
+
     atomic_store_explicit(&p->debug_threads,
                           (p->avctx->debug & FF_DEBUG_THREADS) != 0,
                           memory_order_relaxed);
@@ -459,7 +540,6 @@ static int submit_packet(PerThreadContext *p, AVCodecContext *user_avctx,
 #endif
 
     if (prev_thread) {
-        int err;
         if (atomic_load(&prev_thread->state) == STATE_SETTING_UP) {
             pthread_mutex_lock(&prev_thread->progress_mutex);
             while (atomic_load(&prev_thread->state) == STATE_SETTING_UP)
@@ -480,14 +560,6 @@ static int submit_packet(PerThreadContext *p, AVCodecContext *user_avctx,
     FFSWAP(void*,            p->avctx->hwaccel_context,             fctx->stash_hwaccel_context);
     FFSWAP(void*,            p->avctx->internal->hwaccel_priv_data, fctx->stash_hwaccel_priv);
 
-    av_packet_unref(p->avpkt);
-    ret = av_packet_ref(p->avpkt, avpkt);
-    if (ret < 0) {
-        pthread_mutex_unlock(&p->mutex);
-        av_log(p->avctx, AV_LOG_ERROR, "av_packet_ref() failed in submit_packet()\n");
-        return ret;
-    }
-
     atomic_store(&p->state, STATE_SETTING_UP);
     pthread_cond_signal(&p->input_cond);
     pthread_mutex_unlock(&p->mutex);
@@ -531,57 +603,42 @@ FF_ENABLE_DEPRECATION_WARNINGS
 #endif
 
     fctx->prev_thread = p;
-    fctx->next_decoding++;
+    fctx->next_decoding = (fctx->next_decoding + 1) % p->avctx->thread_count;
 
     return 0;
 }
 
-int ff_thread_decode_frame(AVCodecContext *avctx,
-                           AVFrame *picture, int *got_picture_ptr,
-                           AVPacket *avpkt)
+int ff_thread_receive_frame(AVCodecContext *avctx, AVFrame *frame)
 {
     FrameThreadContext *fctx = avctx->internal->thread_ctx;
-    int finished = fctx->next_finished;
-    PerThreadContext *p;
-    int err;
+    int ret = 0;
 
     /* release the async lock, permitting blocked hwaccel threads to
      * go forward while we are in this function */
     async_unlock(fctx);
 
-    /*
-     * Submit a packet to the next decoding thread.
-     */
-
-    p = &fctx->threads[fctx->next_decoding];
-    err = submit_packet(p, avctx, avpkt);
-    if (err)
-        goto finish;
-
-    /*
-     * If we're still receiving the initial packets, don't return a frame.
-     */
+    /* submit packets to threads while there are no buffered results to return */
+    while (!fctx->df.nb_f && !fctx->result) {
+        PerThreadContext *p;
 
-    if (fctx->next_decoding > (avctx->thread_count-1-(avctx->codec_id == AV_CODEC_ID_FFV1)))
-        fctx->delaying = 0;
+        /* get a packet to be submitted to the next thread */
+        av_packet_unref(fctx->next_pkt);
+        ret = ff_decode_get_packet(avctx, fctx->next_pkt);
+        if (ret < 0 && ret != AVERROR_EOF)
+            goto finish;
 
-    if (fctx->delaying) {
-        *got_picture_ptr=0;
-        if (avpkt->size) {
-            err = avpkt->size;
+        ret = submit_packet(&fctx->threads[fctx->next_decoding], avctx,
+                            fctx->next_pkt);
+        if (ret < 0)
             goto finish;
-        }
-    }
 
-    /*
-     * Return the next available frame from the oldest thread.
-     * If we're at the end of the stream, then we have to skip threads that
-     * didn't output a frame/error, because we don't want to accidentally signal
-     * EOF (avpkt->size == 0 && *got_picture_ptr == 0 && err >= 0).
-     */
+        /* do not return any frames until all threads have something to do */
+        if (fctx->next_decoding != fctx->next_finished &&
+            !avctx->internal->draining)
+            continue;
 
-    do {
-        p = &fctx->threads[finished++];
+        p                   = &fctx->threads[fctx->next_finished];
+        fctx->next_finished = (fctx->next_finished + 1) % avctx->thread_count;
 
         if (atomic_load(&p->state) != STATE_INPUT_READY) {
             pthread_mutex_lock(&p->progress_mutex);
@@ -590,35 +647,26 @@ int ff_thread_decode_frame(AVCodecContext *avctx,
             pthread_mutex_unlock(&p->progress_mutex);
         }
 
-        av_frame_move_ref(picture, p->frame);
-        *got_picture_ptr = p->got_frame;
-        picture->pkt_dts = p->avpkt->dts;
-        err = p->result;
-
-        /*
-         * A later call with avkpt->size == 0 may loop over all threads,
-         * including this one, searching for a frame/error to return before being
-         * stopped by the "finished != fctx->next_finished" condition.
-         * Make sure we don't mistakenly return the same frame/error again.
-         */
-        p->got_frame = 0;
-        p->result = 0;
-
-        if (finished >= avctx->thread_count) finished = 0;
-    } while (!avpkt->size && !*got_picture_ptr && err >= 0 && finished != fctx->next_finished);
+        fctx->result = p->result;
+        p->result    = 0;
 
-    update_context_from_thread(avctx, p->avctx, 1);
-
-    if (fctx->next_decoding >= avctx->thread_count) fctx->next_decoding = 0;
+        if (p->df.nb_f)
+            FFSWAP(DecodedFrames, fctx->df, p->df);
+    }
 
-    fctx->next_finished = finished;
+    /* a thread may return multiple frames AND an error
+     * we first return all the frames, then the error */
+    if (fctx->df.nb_f) {
+        decoded_frames_pop(&fctx->df, frame);
+        ret = 0;
+    } else {
+        ret = fctx->result;
+        fctx->result = 0;
+    }
 
-    /* return the size of the consumed packet if no error occurred */
-    if (err >= 0)
-        err = avpkt->size;
 finish:
     async_lock(fctx);
-    return err;
+    return ret;
 }
 
 void ff_thread_report_progress(ThreadFrame *f, int n, int field)
@@ -718,7 +766,6 @@ static void park_frame_worker_threads(FrameThreadContext *fctx, int thread_count
                 pthread_cond_wait(&p->output_cond, &p->progress_mutex);
             pthread_mutex_unlock(&p->progress_mutex);
         }
-        p->got_frame = 0;
     }
 
     async_lock(fctx);
@@ -772,6 +819,17 @@ void ff_frame_thread_free(AVCodecContext *avctx, int thread_count)
                 av_freep(&ctx->priv_data);
             }
 
+            if (ctx->internal->pkt_props) {
+                while (av_fifo_can_read(ctx->internal->pkt_props)) {
+                    av_packet_unref(ctx->internal->last_pkt_props);
+                    av_fifo_read(ctx->internal->pkt_props, ctx->internal->last_pkt_props, 1);
+                }
+                av_fifo_freep2(&ctx->internal->pkt_props);
+            }
+
+            av_packet_free(&ctx->internal->last_pkt_props);
+            av_packet_free(&ctx->internal->in_pkt);
+
             av_freep(&ctx->slice_offset);
 
             av_buffer_unref(&ctx->internal->pool);
@@ -779,7 +837,7 @@ void ff_frame_thread_free(AVCodecContext *avctx, int thread_count)
             av_buffer_unref(&ctx->hw_frames_ctx);
         }
 
-        av_frame_free(&p->frame);
+        decoded_frames_free(&p->df);
 
         ff_pthread_free(p, per_thread_offsets);
         av_packet_free(&p->avpkt);
@@ -787,6 +845,9 @@ void ff_frame_thread_free(AVCodecContext *avctx, int thread_count)
         av_freep(&p->avctx);
     }
 
+    decoded_frames_free(&fctx->df);
+    av_packet_free(&fctx->next_pkt);
+
     av_freep(&fctx->threads);
     ff_pthread_free(fctx, thread_ctx_offsets);
 
@@ -845,14 +906,26 @@ static av_cold int init_thread(PerThreadContext *p, int *threads_to_free,
     if (err < 0)
         return err;
 
-    if (!(p->frame = av_frame_alloc()) ||
-        !(p->avpkt = av_packet_alloc()))
+    if (!(p->avpkt = av_packet_alloc()))
         return AVERROR(ENOMEM);
-    copy->internal->last_pkt_props = p->avpkt;
 
+    copy->internal->is_frame_mt = 1;
     if (!first)
         copy->internal->is_copy = 1;
 
+    copy->internal->in_pkt = av_packet_alloc();
+    if (!copy->internal->in_pkt)
+        return AVERROR(ENOMEM);
+
+    copy->internal->last_pkt_props = av_packet_alloc();
+    if (!copy->internal->last_pkt_props)
+        return AVERROR(ENOMEM);
+
+    copy->internal->pkt_props = av_fifo_alloc2(1, sizeof(*copy->internal->last_pkt_props),
+                                               AV_FIFO_FLAG_AUTO_GROW);
+    if (!copy->internal->pkt_props)
+        return AVERROR(ENOMEM);
+
     if (codec->init) {
         err = codec->init(copy);
         if (err < 0) {
@@ -908,6 +981,10 @@ int ff_frame_thread_init(AVCodecContext *avctx)
         return err;
     }
 
+    fctx->next_pkt = av_packet_alloc();
+    if (!fctx->next_pkt)
+        return AVERROR(ENOMEM);
+
     fctx->async_lock = 1;
     fctx->delaying = 1;
 
@@ -952,12 +1029,13 @@ void ff_thread_flush(AVCodecContext *avctx)
     fctx->next_decoding = fctx->next_finished = 0;
     fctx->delaying = 1;
     fctx->prev_thread = NULL;
+
+    decoded_frames_flush(&fctx->df);
+
     for (i = 0; i < avctx->thread_count; i++) {
         PerThreadContext *p = &fctx->threads[i];
-        // Make sure decode flush calls with size=0 won't return old frames
-        p->got_frame = 0;
-        av_frame_unref(p->frame);
-        p->result = 0;
+
+        decoded_frames_flush(&p->df);
 
 #if FF_API_THREAD_SAFE_CALLBACKS
         release_delayed_buffers(p);
@@ -1181,3 +1259,15 @@ void ff_thread_release_ext_buffer(AVCodecContext *avctx, ThreadFrame *f)
     f->owner[0] = f->owner[1] = NULL;
     ff_thread_release_buffer(avctx, f->f);
 }
+
+int ff_thread_get_packet(AVCodecContext *avctx, AVPacket *pkt)
+{
+    PerThreadContext *p = avctx->internal->thread_ctx;
+
+    if (p->avpkt->buf) {
+        av_packet_move_ref(pkt, p->avpkt);
+        return 0;
+    }
+
+    return avctx->internal->draining ? AVERROR_EOF : AVERROR(EAGAIN);
+}
diff --git a/libavcodec/thread.h b/libavcodec/thread.h
index d5673f25ea..7ae69990fb 100644
--- a/libavcodec/thread.h
+++ b/libavcodec/thread.h
@@ -40,17 +40,12 @@
 void ff_thread_flush(AVCodecContext *avctx);
 
 /**
- * Submit a new frame to a decoding thread.
- * Returns the next available frame in picture. *got_picture_ptr
- * will be 0 if none is available.
- * The return value on success is the size of the consumed packet for
- * compatibility with FFCodec.decode. This means the decoder
- * has to consume the full packet.
+ * Submit available packets for decoding to worker threads, return a
+ * decoded frame if available. Returns AVERROR(EAGAIN) if none is available.
  *
- * Parameters are the same as FFCodec.decode.
+ * Parameters are the same as FFCodec.receive_frame.
  */
-int ff_thread_decode_frame(AVCodecContext *avctx, AVFrame *picture,
-                           int *got_picture_ptr, AVPacket *avpkt);
+int ff_thread_receive_frame(AVCodecContext *avctx, AVFrame *frame);
 
 /**
  * If the codec defines update_thread_context(), call this
@@ -99,6 +94,11 @@ int ff_thread_get_buffer(AVCodecContext *avctx, AVFrame *f, int flags);
  */
 void ff_thread_release_buffer(AVCodecContext *avctx, AVFrame *f);
 
+/**
+ * Get a packet for decoding. This gets invoked by the worker threads.
+ */
+int ff_thread_get_packet(AVCodecContext *avctx, AVPacket *pkt);
+
 int ff_thread_init(AVCodecContext *s);
 int ff_slice_thread_execute_with_mainfunc(AVCodecContext *avctx,
         int (*action_func2)(AVCodecContext *c, void *arg, int jobnr, int threadnr),
-- 
2.34.1

_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel

To unsubscribe, visit link above, or email
ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".

^ permalink raw reply	[flat|nested] 13+ messages in thread

end of thread, other threads:[~2023-09-07 17:20 UTC | newest]

Thread overview: 13+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2022-12-05 13:39 [FFmpeg-devel] [PATCH 1/2] avcodec/thread: add support for frame threading receive_frame based decoders Timo Rothenpieler
2022-12-05 13:39 ` [FFmpeg-devel] [PATCH 2/2] avcodec/mjpegdec: add support for frame threading Timo Rothenpieler
2022-12-05 14:15   ` Andreas Rheinhardt
2022-12-05 14:28     ` Paul B Mahol
2022-12-05 23:02     ` Timo Rothenpieler
2023-09-07 17:17       ` Paul B Mahol
2023-09-07 17:28         ` Paul B Mahol
2022-12-06 14:37 ` [FFmpeg-devel] [PATCH 1/2] avcodec/thread: add support for frame threading receive_frame based decoders Anton Khirnov
2022-12-06 14:39   ` Timo Rothenpieler
2022-12-06 14:43     ` Anton Khirnov
2022-12-06 15:08       ` Timo Rothenpieler
2022-12-06 16:00         ` Anton Khirnov
2022-12-07 11:43 [FFmpeg-devel] [PATCH 1/2] lavc: convert frame threading to the receive_frame() pattern Timo Rothenpieler
2022-12-07 11:43 ` [FFmpeg-devel] [PATCH 2/2] avcodec/mjpegdec: add support for frame threading Timo Rothenpieler

Git Inbox Mirror of the ffmpeg-devel mailing list - see https://ffmpeg.org/mailman/listinfo/ffmpeg-devel

This inbox may be cloned and mirrored by anyone:

	git clone --mirror https://master.gitmailbox.com/ffmpegdev/0 ffmpegdev/git/0.git

	# If you have public-inbox 1.1+ installed, you may
	# initialize and index your mirror using the following commands:
	public-inbox-init -V2 ffmpegdev ffmpegdev/ https://master.gitmailbox.com/ffmpegdev \
		ffmpegdev@gitmailbox.com
	public-inbox-index ffmpegdev

Example config snippet for mirrors.


AGPL code for this site: git clone https://public-inbox.org/public-inbox.git