From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from ffbox0-bg.mplayerhq.hu (ffbox0-bg.ffmpeg.org [79.124.17.100]) by master.gitmailbox.com (Postfix) with ESMTP id 71CBD47676 for ; Tue, 19 Sep 2023 19:24:42 +0000 (UTC) Received: from [127.0.1.1] (localhost [127.0.0.1]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTP id 5A08468C9BC; Tue, 19 Sep 2023 22:21:48 +0300 (EEST) Received: from mail1.khirnov.net (quelana.khirnov.net [94.230.150.81]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTPS id 87FF868C9A0 for ; Tue, 19 Sep 2023 22:21:45 +0300 (EEST) Received: from localhost (mail1.khirnov.net [IPv6:::1]) by mail1.khirnov.net (Postfix) with ESMTP id 353355101 for ; Tue, 19 Sep 2023 21:21:45 +0200 (CEST) Received: from mail1.khirnov.net ([IPv6:::1]) by localhost (mail1.khirnov.net [IPv6:::1]) (amavis, port 10024) with ESMTP id mPEI7qXTserB for ; Tue, 19 Sep 2023 21:21:40 +0200 (CEST) Received: from libav.khirnov.net (libav.khirnov.net [IPv6:2a00:c500:561:201::7]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256 client-signature RSA-PSS (2048 bits) client-digest SHA256) (Client CN "libav.khirnov.net", Issuer "smtp.khirnov.net SMTP CA" (verified OK)) by mail1.khirnov.net (Postfix) with ESMTPS id 2DF5C5216 for ; Tue, 19 Sep 2023 21:20:43 +0200 (CEST) Received: from libav.khirnov.net (libav.khirnov.net [IPv6:::1]) by libav.khirnov.net (Postfix) with ESMTP id 21D1D3A0D2D for ; Tue, 19 Sep 2023 21:20:43 +0200 (CEST) From: Anton Khirnov To: ffmpeg-devel@ffmpeg.org Date: Tue, 19 Sep 2023 21:10:50 +0200 Message-Id: <20230919191044.18873-24-anton@khirnov.net> X-Mailer: git-send-email 2.40.1 In-Reply-To: <20230919191044.18873-1-anton@khirnov.net> References: <20230919191044.18873-1-anton@khirnov.net> MIME-Version: 1.0 Subject: [FFmpeg-devel] [PATCH 23/27] WIP fftools/ffmpeg_dec: convert to the scheduler X-BeenThere: ffmpeg-devel@ffmpeg.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: FFmpeg development discussions and patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: FFmpeg development discussions and patches Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: ffmpeg-devel-bounces@ffmpeg.org Sender: "ffmpeg-devel" Archived-At: List-Archive: List-Post: --- fftools/ffmpeg.c | 9 +-- fftools/ffmpeg.h | 11 --- fftools/ffmpeg_dec.c | 189 +++++++++---------------------------------- 3 files changed, 42 insertions(+), 167 deletions(-) diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c index 00e57c4382..a09a9e1200 100644 --- a/fftools/ffmpeg.c +++ b/fftools/ffmpeg.c @@ -810,11 +810,6 @@ static int process_input_packet(InputStream *ist, const AVPacket *pkt, int no_eo int ret = 0; int eof_reached = 0; - if (ist->decoding_needed) { - ret = dec_packet(ist, pkt, no_eof); - if (ret < 0 && ret != AVERROR_EOF) - return ret; - } if (ret == AVERROR_EOF || (!pkt && !ist->decoding_needed)) eof_reached = 1; @@ -1036,6 +1031,7 @@ static void reset_eagain(void) ost->unavailable = 0; } +#if 0 static void decode_flush(InputFile *ifile) { for (int i = 0; i < ifile->nb_streams; i++) { @@ -1047,6 +1043,7 @@ static void decode_flush(InputFile *ifile) dec_packet(ist, NULL, 1); } } +#endif /* * Return @@ -1063,11 +1060,13 @@ static int process_input(int file_index, AVPacket *pkt) ret = 0; +#if 0 if (ret == 1) { /* the input file is looped: flush the decoders */ decode_flush(ifile); return AVERROR(EAGAIN); } +#endif if (ret < 0) { if (ret != AVERROR_EOF) { av_log(ifile, AV_LOG_ERROR, diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h index 4646c05bea..841f8d0d68 100644 --- a/fftools/ffmpeg.h +++ b/fftools/ffmpeg.h @@ -815,17 +815,6 @@ int hwaccel_retrieve_data(AVCodecContext *avctx, AVFrame *input); int dec_open(InputStream *ist, Scheduler *sch, unsigned sch_idx); void dec_free(Decoder **pdec); -/** - * Submit a packet for decoding - * - * When pkt==NULL and no_eof=0, there will be no more input. Flush decoders and - * mark all downstreams as finished. - * - * When pkt==NULL and no_eof=1, the stream was reset (e.g. after a seek). Flush - * decoders and await further input. - */ -int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof); - int enc_alloc(Encoder **penc, const AVCodec *codec, Scheduler *sch, unsigned sch_idx); void enc_free(Encoder **penc); diff --git a/fftools/ffmpeg_dec.c b/fftools/ffmpeg_dec.c index dc8d0374a3..400fa666b9 100644 --- a/fftools/ffmpeg_dec.c +++ b/fftools/ffmpeg_dec.c @@ -53,24 +53,6 @@ struct Decoder { Scheduler *sch; unsigned sch_idx; - - pthread_t thread; - /** - * Queue for sending coded packets from the main thread to - * the decoder thread. - * - * An empty packet is sent to flush the decoder without terminating - * decoding. - */ - ThreadQueue *queue_in; - /** - * Queue for sending decoded frames from the decoder thread - * to the main thread. - * - * An empty frame is sent to signal that a single packet has been fully - * processed. - */ - ThreadQueue *queue_out; }; // data that is local to the decoder thread and not visible outside of it @@ -79,24 +61,6 @@ typedef struct DecThreadContext { AVPacket *pkt; } DecThreadContext; -static int dec_thread_stop(Decoder *d) -{ - void *ret; - - if (!d->queue_in) - return 0; - - tq_send_finish(d->queue_in, 0); - tq_receive_finish(d->queue_out, 0); - - pthread_join(d->thread, &ret); - - tq_free(&d->queue_in); - tq_free(&d->queue_out); - - return (intptr_t)ret; -} - void dec_free(Decoder **pdec) { Decoder *dec = *pdec; @@ -104,8 +68,6 @@ void dec_free(Decoder **pdec) if (!dec) return; - dec_thread_stop(dec); - av_frame_free(&dec->frame); av_packet_free(&dec->pkt); @@ -147,25 +109,6 @@ fail: return AVERROR(ENOMEM); } -static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame) -{ - int i, ret = 0; - - for (i = 0; i < ist->nb_filters; i++) { - ret = ifilter_send_frame(ist->filters[i], decoded_frame, - i < ist->nb_filters - 1 || - ist->dec->type == AVMEDIA_TYPE_SUBTITLE); - if (ret == AVERROR_EOF) - ret = 0; /* ignore */ - if (ret < 0) { - av_log(NULL, AV_LOG_ERROR, - "Failed to inject frame into filter network: %s\n", av_err2str(ret)); - break; - } - } - return ret; -} - static AVRational audio_samplerate_update(void *logctx, Decoder *d, const AVFrame *frame) { @@ -420,7 +363,8 @@ static int process_subtitle(InputStream *ist, AVFrame *frame) if (!subtitle) return 0; - ret = send_frame_to_filters(ist, frame); + // XXX + //ret = send_frame_to_filters(ist, frame); if (ret < 0) return ret; @@ -495,7 +439,7 @@ static int transcode_subtitles(InputStream *ist, const AVPacket *pkt, ist->frames_decoded++; - // XXX the queue for transferring data back to the main thread runs + // XXX the queue for transferring data to consumers runs // on AVFrames, so we wrap AVSubtitle in an AVBufferRef and put that // inside the frame // eventually, subtitles should be switched to use AVFrames natively @@ -508,26 +452,11 @@ static int transcode_subtitles(InputStream *ist, const AVPacket *pkt, frame->width = ist->dec_ctx->width; frame->height = ist->dec_ctx->height; - ret = tq_send(d->queue_out, 0, frame); + ret = sch_dec_send(d->sch, d->sch_idx, frame); if (ret < 0) av_frame_unref(frame); - return ret; -} - -static int send_filter_eof(InputStream *ist) -{ - Decoder *d = ist->decoder; - int i, ret; - - for (i = 0; i < ist->nb_filters; i++) { - int64_t end_pts = d->last_frame_pts == AV_NOPTS_VALUE ? AV_NOPTS_VALUE : - d->last_frame_pts + d->last_frame_duration_est; - ret = ifilter_send_eof(ist->filters[i], end_pts, d->last_frame_tb); - if (ret < 0) - return ret; - } - return 0; + return ret == AVERROR_EOF ? AVERROR_EXIT : ret; } static int packet_decode(InputStream *ist, const AVPacket *pkt, AVFrame *frame) @@ -629,9 +558,9 @@ static int packet_decode(InputStream *ist, const AVPacket *pkt, AVFrame *frame) ist->frames_decoded++; - ret = tq_send(d->queue_out, 0, frame); + ret = sch_dec_send(d->sch, d->sch_idx, frame); if (ret < 0) - return ret; + return ret == AVERROR_EOF ? AVERROR_EXIT : ret; } } @@ -685,9 +614,9 @@ void *decoder_thread(void *arg) dec_thread_set_name(ist); while (!input_status) { - int dummy, flush_buffers; + int flush_buffers; - input_status = tq_receive(d->queue_in, &dummy, dt.pkt); + input_status = sch_dec_receive(d->sch, d->sch_idx, dt.pkt); flush_buffers = input_status >= 0 && !dt.pkt->buf; if (!dt.pkt->buf) av_log(ist, AV_LOG_VERBOSE, "Decoder thread received %s packet\n", @@ -698,6 +627,14 @@ void *decoder_thread(void *arg) av_packet_unref(dt.pkt); av_frame_unref(dt.frame); + // AVERROR_EOF - EOF from the decoder + // AVERROR_EXIT - EOF from the scheduler + // we treat them differently when flushing + if (ret == AVERROR_EXIT) { + ret = AVERROR_EOF; + flush_buffers = 0; + } + if (ret == AVERROR_EOF) { av_log(ist, AV_LOG_VERBOSE, "Decoder returned EOF, %s\n", flush_buffers ? "resetting" : "finishing"); @@ -725,23 +662,32 @@ void *decoder_thread(void *arg) av_err2str(ret)); break; } - - // signal to the consumer thread that the entire packet was processed - ret = tq_send(d->queue_out, 0, dt.frame); - if (ret < 0) { - if (ret != AVERROR_EOF) - av_log(ist, AV_LOG_ERROR, "Error communicating with the main thread\n"); - break; - } } // EOF is normal thread termination if (ret == AVERROR_EOF) ret = 0; + // on success send EOF timestamp to our downstreams + if (ret >= 0) { + av_frame_unref(dt.frame); + + dt.frame->pts = d->last_frame_pts == AV_NOPTS_VALUE ? AV_NOPTS_VALUE : + d->last_frame_pts + d->last_frame_duration_est; + dt.frame->time_base = d->last_frame_tb; + + // XXX check EOF/EXIT handling + ret = sch_dec_send(d->sch, d->sch_idx, dt.frame); + if (ret < 0 && ret != AVERROR_EOF) { + av_log(NULL, AV_LOG_FATAL, + "Error signalling EOF timestamp: %s\n", av_err2str(ret)); + goto finish; + } + ret = 0; + } + finish: - tq_receive_finish(d->queue_in, 0); - tq_send_finish (d->queue_out, 0); + sch_dec_send(d->sch, d->sch_idx, NULL); #if 0 // make sure the demuxer does not get stuck waiting for audio durations @@ -757,15 +703,12 @@ finish: return (void*)(intptr_t)ret; } +#if 0 int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof) { Decoder *d = ist->decoder; int ret = 0, thread_ret; - // thread already joined - if (!d->queue_in) - return AVERROR_EOF; - // send the packet/flush request/EOF to the decoder thread if (pkt || no_eof) { av_packet_unref(d->pkt); @@ -816,59 +759,10 @@ finish: if (ret < 0 && ret != AVERROR_EOF) return ret; - // signal EOF to our downstreams - ret = send_filter_eof(ist); - if (ret < 0) { - av_log(NULL, AV_LOG_FATAL, "Error marking filters as finished\n"); - return ret; - } return AVERROR_EOF; } - -static int dec_thread_start(InputStream *ist) -{ - Decoder *d = ist->decoder; - ObjPool *op; - int ret = 0; - - op = objpool_alloc_packets(); - if (!op) - return AVERROR(ENOMEM); - - d->queue_in = tq_alloc(1, 1, op, pkt_move); - if (!d->queue_in) { - objpool_free(&op); - return AVERROR(ENOMEM); - } - - op = objpool_alloc_frames(); - if (!op) - goto fail; - - d->queue_out = tq_alloc(1, 4, op, frame_move); - if (!d->queue_out) { - objpool_free(&op); - goto fail; - } - - ret = pthread_create(&d->thread, NULL, decoder_thread, ist); - if (ret) { - ret = AVERROR(ret); - av_log(ist, AV_LOG_ERROR, "pthread_create() failed: %s\n", - av_err2str(ret)); - goto fail; - } - - return 0; -fail: - if (ret >= 0) - ret = AVERROR(ENOMEM); - - tq_free(&d->queue_in); - tq_free(&d->queue_out); - return ret; -} +#endif static enum AVPixelFormat get_format(AVCodecContext *s, const enum AVPixelFormat *pix_fmts) { @@ -1121,12 +1015,5 @@ int dec_open(InputStream *ist, Scheduler *sch, unsigned sch_idx) if (ret < 0) return ret; - ret = dec_thread_start(ist); - if (ret < 0) { - av_log(ist, AV_LOG_ERROR, "Error starting decoder thread: %s\n", - av_err2str(ret)); - return ret; - } - return 0; } -- 2.40.1 _______________________________________________ ffmpeg-devel mailing list ffmpeg-devel@ffmpeg.org https://ffmpeg.org/mailman/listinfo/ffmpeg-devel To unsubscribe, visit link above, or email ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".