From: Anton Khirnov <anton@khirnov.net>
To: ffmpeg-devel@ffmpeg.org
Subject: [FFmpeg-devel] [PATCH 15/24] fftools/ffmpeg_demux: switch from AVThreadMessageQueue to ThreadQueue
Date: Sat, 4 Nov 2023 08:56:24 +0100
Message-ID: <20231104092125.10213-16-anton@khirnov.net> (raw)
In-Reply-To: <20231104092125.10213-1-anton@khirnov.net>
* the code is made shorter and simpler
* avoids constantly allocating and freeing AVPackets, thanks to
ThreadQueue integration with ObjPool
* is consistent with decoding/filtering/muxing
* reduces the diff in the future switch to thread-aware scheduling
This makes ifile_get_packet() always block. Any potential issues caused
by this will be resolved by the switch to thread-aware scheduling in
future commits.
---
fftools/ffmpeg.c | 32 ++++++------
fftools/ffmpeg.h | 3 +-
fftools/ffmpeg_demux.c | 108 ++++++++++++++--------------------------
fftools/ffmpeg_filter.c | 5 +-
4 files changed, 58 insertions(+), 90 deletions(-)
diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c
index cdb16ef90b..038649d9b5 100644
--- a/fftools/ffmpeg.c
+++ b/fftools/ffmpeg.c
@@ -1030,9 +1030,6 @@ static int check_keyboard_interaction(int64_t cur_time)
static void reset_eagain(void)
{
- int i;
- for (i = 0; i < nb_input_files; i++)
- input_files[i]->eagain = 0;
for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost))
ost->unavailable = 0;
}
@@ -1056,19 +1053,14 @@ static void decode_flush(InputFile *ifile)
* this function should be called again
* - AVERROR_EOF -- this function should not be called again
*/
-static int process_input(int file_index)
+static int process_input(int file_index, AVPacket *pkt)
{
InputFile *ifile = input_files[file_index];
InputStream *ist;
- AVPacket *pkt;
int ret, i;
- ret = ifile_get_packet(ifile, &pkt);
+ ret = ifile_get_packet(ifile, pkt);
- if (ret == AVERROR(EAGAIN)) {
- ifile->eagain = 1;
- return ret;
- }
if (ret == 1) {
/* the input file is looped: flush the decoders */
decode_flush(ifile);
@@ -1115,7 +1107,7 @@ static int process_input(int file_index)
ret = process_input_packet(ist, pkt, 0);
- av_packet_free(&pkt);
+ av_packet_unref(pkt);
return ret < 0 ? ret : 0;
}
@@ -1125,7 +1117,7 @@ static int process_input(int file_index)
*
* @return 0 for success, <0 for error
*/
-static int transcode_step(OutputStream *ost)
+static int transcode_step(OutputStream *ost, AVPacket *demux_pkt)
{
InputStream *ist = NULL;
int ret;
@@ -1140,10 +1132,8 @@ static int transcode_step(OutputStream *ost)
av_assert0(ist);
}
- ret = process_input(ist->file_index);
+ ret = process_input(ist->file_index, demux_pkt);
if (ret == AVERROR(EAGAIN)) {
- if (input_files[ist->file_index]->eagain)
- ost->unavailable = 1;
return 0;
}
@@ -1169,12 +1159,19 @@ static int transcode(int *err_rate_exceeded)
int ret = 0, i;
InputStream *ist;
int64_t timer_start;
+ AVPacket *demux_pkt = NULL;
print_stream_maps();
*err_rate_exceeded = 0;
atomic_store(&transcode_init_done, 1);
+ demux_pkt = av_packet_alloc();
+ if (!demux_pkt) {
+ ret = AVERROR(ENOMEM);
+ goto fail;
+ }
+
if (stdin_interaction) {
av_log(NULL, AV_LOG_INFO, "Press [q] to stop, [?] for help\n");
}
@@ -1202,7 +1199,7 @@ static int transcode(int *err_rate_exceeded)
break;
}
- ret = transcode_step(ost);
+ ret = transcode_step(ost, demux_pkt);
if (ret < 0 && ret != AVERROR_EOF) {
av_log(NULL, AV_LOG_ERROR, "Error while filtering: %s\n", av_err2str(ret));
break;
@@ -1243,6 +1240,9 @@ static int transcode(int *err_rate_exceeded)
/* dump report by using the first video and audio streams */
print_report(1, timer_start, av_gettime_relative());
+fail:
+ av_packet_free(&demux_pkt);
+
return ret;
}
diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h
index 9852df8320..8de91ab85a 100644
--- a/fftools/ffmpeg.h
+++ b/fftools/ffmpeg.h
@@ -407,7 +407,6 @@ typedef struct InputFile {
AVFormatContext *ctx;
int eof_reached; /* true if eof reached */
- int eagain; /* true if last read attempt returned EAGAIN */
int64_t input_ts_offset;
int input_sync_ref;
/**
@@ -857,7 +856,7 @@ void ifile_close(InputFile **f);
* caller should flush decoders and read from this demuxer again
* - a negative error code on failure
*/
-int ifile_get_packet(InputFile *f, AVPacket **pkt);
+int ifile_get_packet(InputFile *f, AVPacket *pkt);
int ist_output_add(InputStream *ist, OutputStream *ost);
int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple);
diff --git a/fftools/ffmpeg_demux.c b/fftools/ffmpeg_demux.c
index 791952f120..65a5e08ca5 100644
--- a/fftools/ffmpeg_demux.c
+++ b/fftools/ffmpeg_demux.c
@@ -21,6 +21,8 @@
#include "ffmpeg.h"
#include "ffmpeg_utils.h"
+#include "objpool.h"
+#include "thread_queue.h"
#include "libavutil/avassert.h"
#include "libavutil/avstring.h"
@@ -33,7 +35,6 @@
#include "libavutil/time.h"
#include "libavutil/timestamp.h"
#include "libavutil/thread.h"
-#include "libavutil/threadmessage.h"
#include "libavcodec/packet.h"
@@ -107,19 +108,13 @@ typedef struct Demuxer {
double readrate_initial_burst;
- AVThreadMessageQueue *in_thread_queue;
+ ThreadQueue *thread_queue;
int thread_queue_size;
pthread_t thread;
- int non_blocking;
int read_started;
} Demuxer;
-typedef struct DemuxMsg {
- AVPacket *pkt;
- int looping;
-} DemuxMsg;
-
static DemuxStream *ds_from_ist(InputStream *ist)
{
return (DemuxStream*)ist;
@@ -440,26 +435,16 @@ static int ts_fixup(Demuxer *d, AVPacket *pkt)
return 0;
}
-// process an input packet into a message to send to the consumer thread
-// src is always cleared by this function
-static int input_packet_process(Demuxer *d, DemuxMsg *msg, AVPacket *src)
+static int input_packet_process(Demuxer *d, AVPacket *pkt)
{
InputFile *f = &d->f;
- InputStream *ist = f->streams[src->stream_index];
+ InputStream *ist = f->streams[pkt->stream_index];
DemuxStream *ds = ds_from_ist(ist);
- AVPacket *pkt;
int ret = 0;
- pkt = av_packet_alloc();
- if (!pkt) {
- av_packet_unref(src);
- return AVERROR(ENOMEM);
- }
- av_packet_move_ref(pkt, src);
-
ret = ts_fixup(d, pkt);
if (ret < 0)
- goto fail;
+ return ret;
ds->data_size += pkt->size;
ds->nb_packets++;
@@ -475,13 +460,7 @@ static int input_packet_process(Demuxer *d, DemuxMsg *msg, AVPacket *src)
av_ts2timestr(input_files[ist->file_index]->ts_offset, &AV_TIME_BASE_Q));
}
- msg->pkt = pkt;
- pkt = NULL;
-
-fail:
- av_packet_free(&pkt);
-
- return ret;
+ return 0;
}
static void readrate_sleep(Demuxer *d)
@@ -531,7 +510,6 @@ static void *input_thread(void *arg)
Demuxer *d = arg;
InputFile *f = &d->f;
AVPacket *pkt;
- unsigned flags = d->non_blocking ? AV_THREAD_MESSAGE_NONBLOCK : 0;
int ret = 0;
pkt = av_packet_alloc();
@@ -547,8 +525,6 @@ static void *input_thread(void *arg)
d->wallclock_start = av_gettime_relative();
while (1) {
- DemuxMsg msg = { NULL };
-
ret = av_read_frame(f->ctx, pkt);
if (ret == AVERROR(EAGAIN)) {
@@ -558,8 +534,8 @@ static void *input_thread(void *arg)
if (ret < 0) {
if (d->loop) {
/* signal looping to the consumer thread */
- msg.looping = 1;
- ret = av_thread_message_queue_send(d->in_thread_queue, &msg, 0);
+ pkt->stream_index = -1;
+ ret = tq_send(d->thread_queue, 0, pkt);
if (ret >= 0)
ret = seek_to_start(d);
if (ret >= 0)
@@ -602,35 +578,26 @@ static void *input_thread(void *arg)
}
}
- ret = input_packet_process(d, &msg, pkt);
+ ret = input_packet_process(d, pkt);
if (ret < 0)
break;
if (f->readrate)
readrate_sleep(d);
- ret = av_thread_message_queue_send(d->in_thread_queue, &msg, flags);
- if (flags && ret == AVERROR(EAGAIN)) {
- flags = 0;
- ret = av_thread_message_queue_send(d->in_thread_queue, &msg, flags);
- av_log(f, AV_LOG_WARNING,
- "Thread message queue blocking; consider raising the "
- "thread_queue_size option (current value: %d)\n",
- d->thread_queue_size);
- }
+ ret = tq_send(d->thread_queue, 0, pkt);
if (ret < 0) {
if (ret != AVERROR_EOF)
av_log(f, AV_LOG_ERROR,
"Unable to send packet to main thread: %s\n",
av_err2str(ret));
- av_packet_free(&msg.pkt);
break;
}
}
finish:
av_assert0(ret < 0);
- av_thread_message_queue_set_err_recv(d->in_thread_queue, ret);
+ tq_send_finish(d->thread_queue, 0);
av_packet_free(&pkt);
@@ -642,16 +609,16 @@ finish:
static void thread_stop(Demuxer *d)
{
InputFile *f = &d->f;
- DemuxMsg msg;
- if (!d->in_thread_queue)
+ if (!d->thread_queue)
return;
- av_thread_message_queue_set_err_send(d->in_thread_queue, AVERROR_EOF);
- while (av_thread_message_queue_recv(d->in_thread_queue, &msg, 0) >= 0)
- av_packet_free(&msg.pkt);
+
+ tq_receive_finish(d->thread_queue, 0);
pthread_join(d->thread, NULL);
- av_thread_message_queue_free(&d->in_thread_queue);
+
+ tq_free(&d->thread_queue);
+
av_thread_message_queue_free(&f->audio_ts_queue);
}
@@ -659,18 +626,20 @@ static int thread_start(Demuxer *d)
{
int ret;
InputFile *f = &d->f;
+ ObjPool *op;
if (d->thread_queue_size <= 0)
d->thread_queue_size = (nb_input_files > 1 ? 8 : 1);
- if (nb_input_files > 1 &&
- (f->ctx->pb ? !f->ctx->pb->seekable :
- strcmp(f->ctx->iformat->name, "lavfi")))
- d->non_blocking = 1;
- ret = av_thread_message_queue_alloc(&d->in_thread_queue,
- d->thread_queue_size, sizeof(DemuxMsg));
- if (ret < 0)
- return ret;
+ op = objpool_alloc_packets();
+ if (!op)
+ return AVERROR(ENOMEM);
+
+ d->thread_queue = tq_alloc(1, d->thread_queue_size, op, pkt_move);
+ if (!d->thread_queue) {
+ objpool_free(&op);
+ return AVERROR(ENOMEM);
+ }
if (d->loop) {
int nb_audio_dec = 0;
@@ -700,31 +669,30 @@ static int thread_start(Demuxer *d)
return 0;
fail:
- av_thread_message_queue_free(&d->in_thread_queue);
+ tq_free(&d->thread_queue);
return ret;
}
-int ifile_get_packet(InputFile *f, AVPacket **pkt)
+int ifile_get_packet(InputFile *f, AVPacket *pkt)
{
Demuxer *d = demuxer_from_ifile(f);
- DemuxMsg msg;
- int ret;
+ int ret, dummy;
- if (!d->in_thread_queue) {
+ if (!d->thread_queue) {
ret = thread_start(d);
if (ret < 0)
return ret;
}
- ret = av_thread_message_queue_recv(d->in_thread_queue, &msg,
- d->non_blocking ?
- AV_THREAD_MESSAGE_NONBLOCK : 0);
+ ret = tq_receive(d->thread_queue, &dummy, pkt);
if (ret < 0)
return ret;
- if (msg.looping)
- return 1;
- *pkt = msg.pkt;
+ if (pkt->stream_index == -1) {
+ av_assert0(!pkt->data && !pkt->side_data_elems);
+ return 1;
+ }
+
return 0;
}
diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c
index e288ea4b80..a98a02e643 100644
--- a/fftools/ffmpeg_filter.c
+++ b/fftools/ffmpeg_filter.c
@@ -2002,9 +2002,8 @@ static int choose_input(const FilterGraph *fg, const FilterGraphThread *fgt)
for (int i = 0; i < fg->nb_inputs; i++) {
InputFilter *ifilter = fg->inputs[i];
InputFilterPriv *ifp = ifp_from_ifilter(ifilter);
- InputStream *ist = ifp->ist;
- if (input_files[ist->file_index]->eagain || fgt->eof_in[i])
+ if (fgt->eof_in[i])
continue;
nb_requests = av_buffersrc_get_nb_failed_requests(ifp->filter);
@@ -2014,6 +2013,8 @@ static int choose_input(const FilterGraph *fg, const FilterGraphThread *fgt)
}
}
+ av_assert0(best_input >= 0);
+
return best_input;
}
--
2.42.0
_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
To unsubscribe, visit link above, or email
ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
next prev parent reply other threads:[~2023-11-04 9:23 UTC|newest]
Thread overview: 51+ messages / expand[flat|nested] mbox.gz Atom feed top
2023-11-04 7:56 [FFmpeg-devel] [PATCH] ffmpeg CLI multithreading Anton Khirnov
2023-11-04 7:56 ` [FFmpeg-devel] [PATCH 01/24] lavf/mux: do not apply max_interleave_delta to subtitles Anton Khirnov
2023-11-04 7:56 ` [FFmpeg-devel] [PATCH 02/24] lavfi/af_amix: make sure the output does not depend on input ordering Anton Khirnov
2023-11-04 7:56 ` [FFmpeg-devel] [PATCH 03/24] lavc/8bps: fix exporting palette after 63767b79a570404628b2521b83104108b7b6884c Anton Khirnov
2023-11-04 7:56 ` [FFmpeg-devel] [PATCH 04/24] fftools/ffmpeg: move a few inline function into a new header Anton Khirnov
2023-11-04 7:56 ` [FFmpeg-devel] [PATCH 05/24] fftools/thread_queue: do not return elements for receive-finished streams Anton Khirnov
2023-11-04 7:56 ` [FFmpeg-devel] [PATCH 06/24] fftools/thread_queue: count receive-finished streams as finished Anton Khirnov
2023-11-04 7:56 ` [FFmpeg-devel] [PATCH 07/24] fftools/ffmpeg: rework keeping track of file duration for -stream_loop Anton Khirnov
2023-11-04 7:56 ` [FFmpeg-devel] [PATCH 08/24] fftools/ffmpeg_filter: remove an unnecessary sub2video_push_ref() call Anton Khirnov
2023-11-04 14:19 ` Nicolas George
2023-11-09 10:42 ` Anton Khirnov
2023-11-09 10:47 ` Nicolas George
2023-11-09 21:29 ` Anton Khirnov
2023-11-17 9:44 ` Nicolas George
2023-11-17 11:52 ` Anton Khirnov
2023-11-04 7:56 ` [FFmpeg-devel] [PATCH 09/24] fftools/ffmpeg_filter: track input/output index in {Input, Output}FilterPriv Anton Khirnov
2023-11-04 7:56 ` [FFmpeg-devel] [PATCH 10/24] fftools/ffmpeg_filter: move filtering to a separate thread Anton Khirnov
2023-11-04 19:54 ` Michael Niedermayer
2023-11-09 11:45 ` [FFmpeg-devel] [PATCH v2 " Anton Khirnov
2023-11-04 7:56 ` [FFmpeg-devel] [PATCH 11/24] fftools/ffmpeg_filter: buffer sub2video heartbeat frames like other frames Anton Khirnov
2023-11-04 7:56 ` [FFmpeg-devel] [PATCH 12/24] fftools/ffmpeg_filter: reindent Anton Khirnov
2023-11-04 7:56 ` [FFmpeg-devel] [PATCH 13/24] fftools/ffmpeg_mux: add muxing thread private data Anton Khirnov
2023-11-04 7:56 ` [FFmpeg-devel] [PATCH 14/24] fftools/ffmpeg_mux: move bitstream filtering to the muxer thread Anton Khirnov
2023-11-04 13:39 ` James Almer
2023-11-09 11:41 ` Anton Khirnov
2023-11-09 11:47 ` James Almer
2023-11-09 12:00 ` Anton Khirnov
2023-11-04 7:56 ` Anton Khirnov [this message]
2023-11-04 7:56 ` [FFmpeg-devel] [PATCH 16/24] fftools/ffmpeg: disable -fix_sub_duration_heartbeat Anton Khirnov
2023-11-04 7:56 ` [FFmpeg-devel] [PATCH 17/24] fftools/ffmpeg_enc: move encoding to a separate thread Anton Khirnov
2023-11-04 7:56 ` [FFmpeg-devel] [PATCH 18/24] fftools/ffmpeg: add thread-aware transcode scheduling infrastructure Anton Khirnov
2023-11-04 13:53 ` James Almer
2023-11-09 11:06 ` Anton Khirnov
2023-11-04 18:44 ` Michael Niedermayer
2023-11-09 11:36 ` [FFmpeg-devel] [PATCH v2 " Anton Khirnov
2023-11-11 15:21 ` [FFmpeg-devel] [PATCH v3 " Anton Khirnov
2023-11-09 11:39 ` [FFmpeg-devel] [PATCH " Anton Khirnov
2023-11-04 7:56 ` [FFmpeg-devel] [PATCH 19/24] fftools/ffmpeg_demux: convert to the scheduler Anton Khirnov
2023-11-04 7:56 ` [FFmpeg-devel] [PATCH 20/24] fftools/ffmpeg_dec: " Anton Khirnov
2023-11-04 18:30 ` Michael Niedermayer
2023-11-11 15:24 ` Anton Khirnov
2023-11-04 7:56 ` [FFmpeg-devel] [PATCH 21/24] fftools/ffmpeg_filter: " Anton Khirnov
2023-11-04 7:56 ` [FFmpeg-devel] [PATCH 22/24] fftools/ffmpeg_enc: " Anton Khirnov
2023-11-04 7:56 ` [FFmpeg-devel] [PATCH 23/24] fftools/ffmpeg_mux: " Anton Khirnov
2023-11-04 7:56 ` [FFmpeg-devel] [PATCH 24/24] ffmpeg: switch to scheduler Anton Khirnov
2023-11-09 11:46 ` [FFmpeg-devel] [PATCH] ffmpeg CLI multithreading Anton Khirnov
2023-11-09 12:46 ` Paul B Mahol
2023-11-11 20:10 ` Michael Niedermayer
2023-11-17 10:46 ` Anton Khirnov
2023-11-13 12:34 ` Jan Ekström
2023-11-13 12:38 ` James Almer
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=20231104092125.10213-16-anton@khirnov.net \
--to=anton@khirnov.net \
--cc=ffmpeg-devel@ffmpeg.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Git Inbox Mirror of the ffmpeg-devel mailing list - see https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
This inbox may be cloned and mirrored by anyone:
git clone --mirror https://master.gitmailbox.com/ffmpegdev/0 ffmpegdev/git/0.git
# If you have public-inbox 1.1+ installed, you may
# initialize and index your mirror using the following commands:
public-inbox-init -V2 ffmpegdev ffmpegdev/ https://master.gitmailbox.com/ffmpegdev \
ffmpegdev@gitmailbox.com
public-inbox-index ffmpegdev
Example config snippet for mirrors.
AGPL code for this site: git clone https://public-inbox.org/public-inbox.git