From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from ffbox0-bg.mplayerhq.hu (ffbox0-bg.ffmpeg.org [79.124.17.100]) by master.gitmailbox.com (Postfix) with ESMTP id CB12647683 for ; Tue, 19 Sep 2023 19:24:59 +0000 (UTC) Received: from [127.0.1.1] (localhost [127.0.0.1]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTP id 8C28B68C9C2; Tue, 19 Sep 2023 22:21:52 +0300 (EEST) Received: from mail1.khirnov.net (quelana.khirnov.net [94.230.150.81]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTPS id C3C0168C92B for ; Tue, 19 Sep 2023 22:21:50 +0300 (EEST) Received: from localhost (mail1.khirnov.net [IPv6:::1]) by mail1.khirnov.net (Postfix) with ESMTP id 6D0125101 for ; Tue, 19 Sep 2023 21:21:50 +0200 (CEST) Received: from mail1.khirnov.net ([IPv6:::1]) by localhost (mail1.khirnov.net [IPv6:::1]) (amavis, port 10024) with ESMTP id IVtA_HGyEKvT for ; Tue, 19 Sep 2023 21:21:45 +0200 (CEST) Received: from libav.khirnov.net (libav.khirnov.net [IPv6:2a00:c500:561:201::7]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256 client-signature RSA-PSS (2048 bits) client-digest SHA256) (Client CN "libav.khirnov.net", Issuer "smtp.khirnov.net SMTP CA" (verified OK)) by mail1.khirnov.net (Postfix) with ESMTPS id 39CC8521E for ; Tue, 19 Sep 2023 21:20:43 +0200 (CEST) Received: from libav.khirnov.net (libav.khirnov.net [IPv6:::1]) by libav.khirnov.net (Postfix) with ESMTP id 2D8CF3A150F for ; Tue, 19 Sep 2023 21:20:43 +0200 (CEST) From: Anton Khirnov To: ffmpeg-devel@ffmpeg.org Date: Tue, 19 Sep 2023 21:10:51 +0200 Message-Id: <20230919191044.18873-25-anton@khirnov.net> X-Mailer: git-send-email 2.40.1 In-Reply-To: <20230919191044.18873-1-anton@khirnov.net> References: <20230919191044.18873-1-anton@khirnov.net> MIME-Version: 1.0 Subject: [FFmpeg-devel] [PATCH 24/27] WIP fftools/ffmpeg_filter: convert to the scheduler X-BeenThere: ffmpeg-devel@ffmpeg.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: FFmpeg development discussions and patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: FFmpeg development discussions and patches Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: ffmpeg-devel-bounces@ffmpeg.org Sender: "ffmpeg-devel" Archived-At: List-Archive: List-Post: --- fftools/ffmpeg.h | 22 -- fftools/ffmpeg_filter.c | 533 ++++++++++++---------------------------- 2 files changed, 152 insertions(+), 403 deletions(-) diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h index 841f8d0d68..eb4e8e27a9 100644 --- a/fftools/ffmpeg.h +++ b/fftools/ffmpeg.h @@ -312,9 +312,6 @@ typedef struct OutputFilter { enum AVMediaType type; - /* pts of the last frame received from this filter, in AV_TIME_BASE_Q */ - int64_t last_pts; - uint64_t nb_frames_dup; uint64_t nb_frames_drop; } OutputFilter; @@ -745,8 +742,6 @@ int subtitle_wrap_frame(AVFrame *frame, AVSubtitle *subtitle, int copy); */ FrameData *frame_data(AVFrame *frame); -int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference); -int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb); void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb); /** @@ -769,26 +764,9 @@ int fg_create(FilterGraph **pfg, char *graph_desc, Scheduler *sch); void fg_free(FilterGraph **pfg); -/** - * Perform a step of transcoding for the specified filter graph. - * - * @param[in] graph filter graph to consider - * @param[out] best_ist input stream where a frame would allow to continue - * @return 0 for success, <0 for error - */ -int fg_transcode_step(FilterGraph *graph, InputStream **best_ist); - void fg_send_command(FilterGraph *fg, double time, const char *target, const char *command, const char *arg, int all_filters); -/** - * Get and encode new output from specified filtergraph, without causing - * activity. - * - * @return 0 for success, <0 for severe errors - */ -int reap_filters(FilterGraph *fg, int flush); - int ffmpeg_parse_options(int argc, char **argv, Scheduler *sch); void enc_stats_write(OutputStream *ost, EncStats *es, diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c index e8e78f5454..c588dde452 100644 --- a/fftools/ffmpeg_filter.c +++ b/fftools/ffmpeg_filter.c @@ -21,7 +21,6 @@ #include #include "ffmpeg.h" -#include "thread_queue.h" #include "libavfilter/avfilter.h" #include "libavfilter/buffersink.h" @@ -46,7 +45,6 @@ enum FrameOpaque { FRAME_OPAQUE_REAP_FILTERS = 1, FRAME_OPAQUE_CHOOSE_INPUT, FRAME_OPAQUE_SUB_HEARTBEAT, - FRAME_OPAQUE_EOF, FRAME_OPAQUE_SEND_COMMAND, }; @@ -62,8 +60,7 @@ typedef struct FilterGraphPriv { int is_meta; int disable_conversions; - int nb_inputs_bound; - int nb_outputs_bound; + unsigned nb_outputs_done; const char *graph_desc; @@ -75,14 +72,10 @@ typedef struct FilterGraphPriv { Scheduler *sch; unsigned sch_idx; - pthread_t thread; /** * Queue for sending frames from the main thread to the filtergraph. Has * nb_inputs+1 streams - the first nb_inputs stream correspond to * filtergraph inputs. Frames on those streams may have their opaque set to - * - FRAME_OPAQUE_EOF: frame contains no data, but pts+timebase of the - * EOF event for the correspondint stream. Will be immediately followed by - * this stream being send-closed. * - FRAME_OPAQUE_SUB_HEARTBEAT: frame contains no data, but pts+timebase of * a subtitle heartbeat event. Will only be sent for sub2video streams. * @@ -96,7 +89,7 @@ typedef struct FilterGraphPriv { * available the terminating empty frame's opaque will contain the index+1 * of the filtergraph input to which more input frames should be supplied. */ - ThreadQueue *queue_in; + /** * Queue for sending frames from the filtergraph back to the main thread. * Has nb_outputs+1 streams - the first nb_outputs stream correspond to @@ -105,7 +98,7 @@ typedef struct FilterGraphPriv { * The last stream is "control" - see documentation for queue_in for more * details. */ - ThreadQueue *queue_out; + //ThreadQueue *queue_out; // submitting frames to filter thread returned EOF // this only happens on thread exit, so is not per-input int eof_in; @@ -130,6 +123,7 @@ typedef struct FilterGraphThread { // The output index is stored in frame opaque. AVFifo *frame_queue_out; + // set to 1 after at least one frame passed through this output int got_frame; // EOF status of each input/output, as received by the thread @@ -260,9 +254,6 @@ typedef struct OutputFilterPriv { int64_t ts_offset; int64_t next_pts; FPSConvContext fps; - - // set to 1 after at least one frame passed through this output - int got_frame; } OutputFilterPriv; static OutputFilterPriv *ofp_from_ofilter(OutputFilter *ofilter) @@ -660,57 +651,6 @@ static int ifilter_has_all_input_formats(FilterGraph *fg) static void *filter_thread(void *arg); -// start the filtering thread once all inputs and outputs are bound -static int fg_thread_try_start(FilterGraphPriv *fgp) -{ - FilterGraph *fg = &fgp->fg; - ObjPool *op; - int ret = 0; - - if (fgp->nb_inputs_bound < fg->nb_inputs || - fgp->nb_outputs_bound < fg->nb_outputs) - return 0; - - op = objpool_alloc_frames(); - if (!op) - return AVERROR(ENOMEM); - - fgp->queue_in = tq_alloc(fg->nb_inputs + 1, 1, op, frame_move); - if (!fgp->queue_in) { - objpool_free(&op); - return AVERROR(ENOMEM); - } - - // at least one output is mandatory - op = objpool_alloc_frames(); - if (!op) - goto fail; - - fgp->queue_out = tq_alloc(fg->nb_outputs + 1, 1, op, frame_move); - if (!fgp->queue_out) { - objpool_free(&op); - goto fail; - } - - ret = pthread_create(&fgp->thread, NULL, filter_thread, fgp); - if (ret) { - ret = AVERROR(ret); - av_log(NULL, AV_LOG_ERROR, "pthread_create() for filtergraph %d failed: %s\n", - fg->index, av_err2str(ret)); - goto fail; - } - - return 0; -fail: - if (ret >= 0) - ret = AVERROR(ENOMEM); - - tq_free(&fgp->queue_in); - tq_free(&fgp->queue_out); - - return ret; -} - static char *describe_filter_link(FilterGraph *fg, AVFilterInOut *inout, int in) { AVFilterContext *ctx = inout->filter_ctx; @@ -736,7 +676,6 @@ static OutputFilter *ofilter_alloc(FilterGraph *fg) ofilter->graph = fg; ofp->format = -1; ofp->index = fg->nb_outputs - 1; - ofilter->last_pts = AV_NOPTS_VALUE; return ofilter; } @@ -768,10 +707,7 @@ static int ifilter_bind_ist(InputFilter *ifilter, InputStream *ist) return AVERROR(ENOMEM); } - fgp->nb_inputs_bound++; - av_assert0(fgp->nb_inputs_bound <= ifilter->graph->nb_inputs); - - return fg_thread_try_start(fgp); + return 0; } static int set_channel_layout(OutputFilterPriv *f, OutputStream *ost) @@ -910,10 +846,7 @@ int ofilter_bind_ost(OutputFilter *ofilter, OutputStream *ost, if (ret < 0) return ret; - fgp->nb_outputs_bound++; - av_assert0(fgp->nb_outputs_bound <= fg->nb_outputs); - - return fg_thread_try_start(fgp); + return 0; } static InputFilter *ifilter_alloc(FilterGraph *fg) @@ -943,34 +876,6 @@ static InputFilter *ifilter_alloc(FilterGraph *fg) return ifilter; } -static int fg_thread_stop(FilterGraphPriv *fgp) -{ - void *ret; - - if (!fgp->queue_in) - return 0; - - for (int i = 0; i <= fgp->fg.nb_inputs; i++) { - InputFilterPriv *ifp = i < fgp->fg.nb_inputs ? - ifp_from_ifilter(fgp->fg.inputs[i]) : NULL; - - if (ifp) - ifp->eof = 1; - - tq_send_finish(fgp->queue_in, i); - } - - for (int i = 0; i <= fgp->fg.nb_outputs; i++) - tq_receive_finish(fgp->queue_out, i); - - pthread_join(fgp->thread, &ret); - - tq_free(&fgp->queue_in); - tq_free(&fgp->queue_out); - - return (int)(intptr_t)ret; -} - void fg_free(FilterGraph **pfg) { FilterGraph *fg = *pfg; @@ -980,8 +885,6 @@ void fg_free(FilterGraph **pfg) return; fgp = fgp_from_fg(fg); - fg_thread_stop(fgp); - avfilter_graph_free(&fg->graph); for (int j = 0; j < fg->nb_inputs; j++) { InputFilter *ifilter = fg->inputs[j]; @@ -2253,8 +2156,56 @@ finish: fps->dropped_keyframe |= fps->last_dropped && (frame->flags & AV_FRAME_FLAG_KEY); } +static int close_output(OutputFilterPriv *ofp, FilterGraphThread *fgt) +{ + FilterGraphPriv *fgp = fgp_from_fg(ofp->ofilter.graph); + int ret; + + // we are finished and no frames were ever seen at this output, + // at least initialize the encoder with a dummy frame + if (!fgt->got_frame) { + AVFrame *frame = fgt->frame; + FrameData *fd; + + frame->time_base = ofp->tb_out; + frame->format = ofp->format; + + frame->width = ofp->width; + frame->height = ofp->height; + frame->sample_aspect_ratio = ofp->sample_aspect_ratio; + + frame->sample_rate = ofp->sample_rate; + if (ofp->ch_layout.nb_channels) { + ret = av_channel_layout_copy(&frame->ch_layout, &ofp->ch_layout); + if (ret < 0) + return ret; + } + + fd = frame_data(frame); + if (!fd) + return AVERROR(ENOMEM); + + fd->frame_rate_filter = ofp->fps.framerate; + + av_assert0(!frame->buf[0]); + + av_log(ofp->ofilter.ost, AV_LOG_WARNING, + "No filtered frames for output stream, trying to " + "initialize anyway.\n"); + + ret = sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index, frame); + av_frame_unref(frame); + if (ret < 0) + return ret; + } + + fgt->eof_out[ofp->index] = 1; + + return sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index, NULL); +} + static int fg_output_frame(OutputFilterPriv *ofp, FilterGraphThread *fgt, - AVFrame *frame, int buffer) + AVFrame *frame) { FilterGraphPriv *fgp = fgp_from_fg(ofp->ofilter.graph); AVFrame *frame_prev = ofp->fps.last_frame; @@ -2301,28 +2252,17 @@ static int fg_output_frame(OutputFilterPriv *ofp, FilterGraphThread *fgt, frame_out = frame; } - if (buffer) { - AVFrame *f = av_frame_alloc(); - - if (!f) { - av_frame_unref(frame_out); - return AVERROR(ENOMEM); - } - - av_frame_move_ref(f, frame_out); - f->opaque = (void*)(intptr_t)ofp->index; - - ret = av_fifo_write(fgt->frame_queue_out, &f, 1); - if (ret < 0) { - av_frame_free(&f); - return AVERROR(ENOMEM); - } - } else { - // return the frame to the main thread - ret = tq_send(fgp->queue_out, ofp->index, frame_out); + { + // send the frame to consumers + ret = sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index, frame_out); if (ret < 0) { av_frame_unref(frame_out); - fgt->eof_out[ofp->index] = 1; + + if (!fgt->eof_out[ofp->index]) { + fgt->eof_out[ofp->index] = 1; + fgp->nb_outputs_done++; + } + return ret == AVERROR_EOF ? 0 : ret; } } @@ -2343,16 +2283,14 @@ static int fg_output_frame(OutputFilterPriv *ofp, FilterGraphThread *fgt, av_frame_move_ref(frame_prev, frame); } - if (!frame) { - tq_send_finish(fgp->queue_out, ofp->index); - fgt->eof_out[ofp->index] = 1; - } + if (!frame) + return close_output(ofp, fgt); return 0; } static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt, - AVFrame *frame, int buffer) + AVFrame *frame) { FilterGraphPriv *fgp = fgp_from_fg(ofp->ofilter.graph); OutputStream *ost = ofp->ofilter.ost; @@ -2362,8 +2300,8 @@ static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt, ret = av_buffersink_get_frame_flags(filter, frame, AV_BUFFERSINK_FLAG_NO_REQUEST); - if (ret == AVERROR_EOF && !buffer && !fgt->eof_out[ofp->index]) { - ret = fg_output_frame(ofp, fgt, NULL, buffer); + if (ret == AVERROR_EOF && !fgt->eof_out[ofp->index]) { + ret = fg_output_frame(ofp, fgt, NULL); return (ret < 0) ? ret : 1; } else if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) { return 1; @@ -2417,7 +2355,7 @@ static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt, fd->frame_rate_filter = ofp->fps.framerate; } - ret = fg_output_frame(ofp, fgt, frame, buffer); + ret = fg_output_frame(ofp, fgt, frame); av_frame_unref(frame); if (ret < 0) return ret; @@ -2425,44 +2363,29 @@ static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt, return 0; } -/* retrieve all frames available at filtergraph outputs and either send them to - * the main thread (buffer=0) or buffer them for later (buffer=1) */ +/* retrieve all frames available at filtergraph outputs + * and send them to consumers */ static int read_frames(FilterGraph *fg, FilterGraphThread *fgt, - AVFrame *frame, int buffer) + AVFrame *frame) { FilterGraphPriv *fgp = fgp_from_fg(fg); - int ret = 0; if (!fg->graph) return 0; - // process buffered frames - if (!buffer) { - AVFrame *f; - - while (av_fifo_read(fgt->frame_queue_out, &f, 1) >= 0) { - int out_idx = (intptr_t)f->opaque; - f->opaque = NULL; - ret = tq_send(fgp->queue_out, out_idx, f); - av_frame_free(&f); - if (ret < 0 && ret != AVERROR_EOF) - return ret; - } - } - /* Reap all buffers present in the buffer sinks */ for (int i = 0; i < fg->nb_outputs; i++) { OutputFilterPriv *ofp = ofp_from_ofilter(fg->outputs[i]); int ret = 0; while (!ret) { - ret = fg_output_step(ofp, fgt, frame, buffer); + ret = fg_output_step(ofp, fgt, frame); if (ret < 0) return ret; } } - return 0; + return (fgp->nb_outputs_done == fg->nb_outputs) ? AVERROR_EOF : 0; } static void sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb) @@ -2536,6 +2459,9 @@ static int send_eof(FilterGraphThread *fgt, InputFilter *ifilter, InputFilterPriv *ifp = ifp_from_ifilter(ifilter); int ret; + if (fgt->eof_in[ifp->index]) + return 0; + fgt->eof_in[ifp->index] = 1; if (ifp->filter) { @@ -2637,7 +2563,7 @@ static int send_frame(FilterGraph *fg, FilterGraphThread *fgt, return ret; } - ret = read_frames(fg, fgt, tmp, 1); + ret = read_frames(fg, fgt, tmp); av_frame_free(&tmp); if (ret < 0) return ret; @@ -2674,6 +2600,29 @@ static int choose_input(const FilterGraph *fg, const FilterGraphThread *fgt) { int nb_requests, nb_requests_max = 0; int best_input = -1; + int ret; + + if (!fg->graph) { + for (int i = 0; i < fg->nb_inputs; i++) { + InputFilterPriv *ifp = ifp_from_ifilter(fg->inputs[i]); + if (ifp->format < 0 && !fgt->eof_in[i]) + return i; + } + + // This state - graph is not configured, but all inputs are either + // initialized or EOF - should be unreachable because sending EOF to a + // filter without even a fallback format should fail + av_assert0(0); + return AVERROR_BUG; + } + + ret = avfilter_graph_request_oldest(fg->graph); + if (ret < 0 && ret != AVERROR(EAGAIN)) + return ret; + + // we drain all frames from the filtergraph after each input frame, so + // we should never get success here + av_assert0(ret < 0); for (int i = 0; i < fg->nb_inputs; i++) { InputFilter *ifilter = fg->inputs[i]; @@ -2690,9 +2639,12 @@ static int choose_input(const FilterGraph *fg, const FilterGraphThread *fgt) } } + av_assert0(best_input >= 0); + return best_input; } +#if 0 static int msg_process(FilterGraphPriv *fgp, FilterGraphThread *fgt, AVFrame *frame) { @@ -2766,6 +2718,7 @@ done: return 0; } +#endif static void fg_thread_set_name(const FilterGraph *fg) { @@ -2852,28 +2805,53 @@ static void *filter_thread(void *arg) while (1) { InputFilter *ifilter; InputFilterPriv *ifp; - int input_idx, eof_frame; + int input_idx; - input_status = tq_receive(fgp->queue_in, &input_idx, fgt.frame); - if (input_idx < 0 || - (input_idx == fg->nb_inputs && input_status < 0)) { + // XXX + if (!fg->nb_inputs) { + abort(); + goto no_inputs; + } + + input_idx = choose_input(fg, &fgt); + // XXX + if (input_idx < 0) + goto finish; + + input_status = sch_filter_receive(fgp->sch, fgp->sch_idx, + &input_idx, fgt.frame); + if (input_idx < 0) { av_log(fg, AV_LOG_VERBOSE, "Filtering thread received EOF\n"); break; } + // XXX +#if 0 // message on the control stream if (input_idx == fg->nb_inputs) { ret = msg_process(fgp, &fgt, fgt.frame); if (ret < 0) goto finish; +#elif 0 + enum FrameOpaque msg = (intptr_t)fgt.frame->opaque; + + fgt.frame->opaque = NULL; + av_assert0(msg > 0); + av_assert0(msg == FRAME_OPAQUE_SEND_COMMAND); + + { + FilterCommand *fc = (FilterCommand*)frame->buf[0]->data; + send_command(fg, fc->time, fc->target, fc->command, fc->arg, fc->all_filters); + av_frame_unref(frame); + } + continue; } +#endif - // we received an input frame or EOF ifilter = fg->inputs[input_idx]; ifp = ifp_from_ifilter(ifilter); - eof_frame = input_status >= 0 && (intptr_t)fgt.frame->opaque == FRAME_OPAQUE_EOF; if (ifp->type_src == AVMEDIA_TYPE_SUBTITLE) { if (input_status >= 0 && (intptr_t)fgt.frame->opaque == FRAME_OPAQUE_SUB_HEARTBEAT) sub2video_heartbeat(ifilter, fgt.frame->pts, fgt.frame->time_base); @@ -2890,20 +2868,17 @@ static void *filter_thread(void *arg) if (ret < 0) break; - if (eof_frame) { - // an EOF frame is immediately followed by sender closing - // the corresponding stream, so retrieve that event - input_status = tq_receive(fgp->queue_in, &input_idx, fgt.frame); - av_assert0(input_status == AVERROR_EOF && input_idx == ifp->index); - } - - // signal to the main thread that we are done - ret = tq_send(fgp->queue_out, fg->nb_outputs, fgt.frame); +no_inputs: + // retrieve all newly avalable frames + // XXX: handle filtergraphs connected to an unlimited source + ret = read_frames(fg, &fgt, fgt.frame); if (ret < 0) { if (ret == AVERROR_EOF) - break; + av_log(fg, AV_LOG_VERBOSE, "All consumers returned EOF\n"); + else + av_log(fg, AV_LOG_ERROR, "Error sending frames to consumers: %s\n", + av_err2str(ret)); - av_log(fg, AV_LOG_ERROR, "Error communicating with the main thread\n"); goto finish; } } @@ -2913,10 +2888,9 @@ finish: if (ret == AVERROR_EOF) ret = 0; - for (int i = 0; i <= fg->nb_inputs; i++) - tq_receive_finish(fgp->queue_in, i); - for (int i = 0; i <= fg->nb_outputs; i++) - tq_send_finish(fgp->queue_out, i); + // XXX close_output() here? + + sch_filter_send(fgp->sch, fgp->sch_idx, -1, NULL); fg_thread_uninit(&fgt); @@ -2925,64 +2899,6 @@ finish: return (void*)(intptr_t)ret; } -static int thread_send_frame(FilterGraphPriv *fgp, InputFilter *ifilter, - AVFrame *frame, enum FrameOpaque type) -{ - InputFilterPriv *ifp = ifp_from_ifilter(ifilter); - int output_idx, ret; - - if (ifp->eof) { - av_frame_unref(frame); - return AVERROR_EOF; - } - - frame->opaque = (void*)(intptr_t)type; - - ret = tq_send(fgp->queue_in, ifp->index, frame); - if (ret < 0) { - ifp->eof = 1; - av_frame_unref(frame); - return ret; - } - - if (type == FRAME_OPAQUE_EOF) - tq_send_finish(fgp->queue_in, ifp->index); - - // wait for the frame to be processed - ret = tq_receive(fgp->queue_out, &output_idx, frame); - av_assert0(output_idx == fgp->fg.nb_outputs || ret == AVERROR_EOF); - - return ret; -} - -int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int keep_reference) -{ - FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph); - int ret; - - if (keep_reference) { - ret = av_frame_ref(fgp->frame, frame); - if (ret < 0) - return ret; - } else - av_frame_move_ref(fgp->frame, frame); - - return thread_send_frame(fgp, ifilter, fgp->frame, 0); -} - -int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb) -{ - FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph); - int ret; - - fgp->frame->pts = pts; - fgp->frame->time_base = tb; - - ret = thread_send_frame(fgp, ifilter, fgp->frame, FRAME_OPAQUE_EOF); - - return ret == AVERROR_EOF ? 0 : ret; -} - void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational tb) { FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph); @@ -2990,144 +2906,10 @@ void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, AVRational t fgp->frame->pts = pts; fgp->frame->time_base = tb; + // XXX +#if 0 thread_send_frame(fgp, ifilter, fgp->frame, FRAME_OPAQUE_SUB_HEARTBEAT); -} - -int fg_transcode_step(FilterGraph *graph, InputStream **best_ist) -{ - FilterGraphPriv *fgp = fgp_from_fg(graph); - int ret, got_frames = 0; - - if (fgp->eof_in) - return AVERROR_EOF; - - // signal to the filtering thread to return all frames it can - av_assert0(!fgp->frame->buf[0]); - fgp->frame->opaque = (void*)(intptr_t)(best_ist ? - FRAME_OPAQUE_CHOOSE_INPUT : - FRAME_OPAQUE_REAP_FILTERS); - - ret = tq_send(fgp->queue_in, graph->nb_inputs, fgp->frame); - if (ret < 0) { - fgp->eof_in = 1; - goto finish; - } - - while (1) { - OutputFilter *ofilter; - OutputFilterPriv *ofp; - OutputStream *ost; - int output_idx; - - ret = tq_receive(fgp->queue_out, &output_idx, fgp->frame); - - // EOF on the whole queue or the control stream - if (output_idx < 0 || - (ret < 0 && output_idx == graph->nb_outputs)) - goto finish; - - // EOF for a specific stream - if (ret < 0) { - ofilter = graph->outputs[output_idx]; - ofp = ofp_from_ofilter(ofilter); - - // we are finished and no frames were ever seen at this output, - // at least initialize the encoder with a dummy frame - if (!ofp->got_frame) { - AVFrame *frame = fgp->frame; - FrameData *fd; - - frame->time_base = ofp->tb_out; - frame->format = ofp->format; - - frame->width = ofp->width; - frame->height = ofp->height; - frame->sample_aspect_ratio = ofp->sample_aspect_ratio; - - frame->sample_rate = ofp->sample_rate; - if (ofp->ch_layout.nb_channels) { - ret = av_channel_layout_copy(&frame->ch_layout, &ofp->ch_layout); - if (ret < 0) - return ret; - } - - fd = frame_data(frame); - if (!fd) - return AVERROR(ENOMEM); - - fd->frame_rate_filter = ofp->fps.framerate; - - av_assert0(!frame->buf[0]); - - av_log(ofilter->ost, AV_LOG_WARNING, - "No filtered frames for output stream, trying to " - "initialize anyway.\n"); - - enc_open(ofilter->ost, frame); - av_frame_unref(frame); - } - - close_output_stream(graph->outputs[output_idx]->ost); - continue; - } - - // request was fully processed by the filtering thread, - // return the input stream to read from, if needed - if (output_idx == graph->nb_outputs) { - int input_idx = (intptr_t)fgp->frame->opaque - 1; - av_assert0(input_idx <= graph->nb_inputs); - - if (best_ist) { - *best_ist = (input_idx >= 0 && input_idx < graph->nb_inputs) ? - ifp_from_ifilter(graph->inputs[input_idx])->ist : NULL; - - if (input_idx < 0 && !got_frames) { - for (int i = 0; i < graph->nb_outputs; i++) - graph->outputs[i]->ost->unavailable = 1; - } - } - break; - } - - // got a frame from the filtering thread, send it for encoding - ofilter = graph->outputs[output_idx]; - ost = ofilter->ost; - ofp = ofp_from_ofilter(ofilter); - - if (ost->finished) { - av_frame_unref(fgp->frame); - tq_receive_finish(fgp->queue_out, output_idx); - continue; - } - - if (fgp->frame->pts != AV_NOPTS_VALUE) { - ofilter->last_pts = av_rescale_q(fgp->frame->pts, - fgp->frame->time_base, - AV_TIME_BASE_Q); - } - - ret = enc_frame(ost, fgp->frame); - av_frame_unref(fgp->frame); - if (ret < 0) - goto finish; - - ofp->got_frame = 1; - got_frames = 1; - } - -finish: - if (ret < 0) { - fgp->eof_in = 1; - for (int i = 0; i < graph->nb_outputs; i++) - close_output_stream(graph->outputs[i]->ost); - } - - return ret; -} - -int reap_filters(FilterGraph *fg, int flush) -{ - return fg_transcode_step(fg, NULL); +#endif } void fg_send_command(FilterGraph *fg, double time, const char *target, @@ -3138,9 +2920,6 @@ void fg_send_command(FilterGraph *fg, double time, const char *target, FilterCommand *fc; int output_idx, ret; - if (!fgp->queue_in) - return; - fc = av_mallocz(sizeof(*fc)); if (!fc) return; @@ -3165,13 +2944,5 @@ void fg_send_command(FilterGraph *fg, double time, const char *target, fgp->frame->buf[0] = buf; fgp->frame->opaque = (void*)(intptr_t)FRAME_OPAQUE_SEND_COMMAND; - ret = tq_send(fgp->queue_in, fg->nb_inputs + 1, fgp->frame); - if (ret < 0) { - av_frame_unref(fgp->frame); - return; - } - - // wait for the frame to be processed - ret = tq_receive(fgp->queue_out, &output_idx, fgp->frame); - av_assert0(output_idx == fgp->fg.nb_outputs || ret == AVERROR_EOF); + // XXX actually implement the command } -- 2.40.1 _______________________________________________ ffmpeg-devel mailing list ffmpeg-devel@ffmpeg.org https://ffmpeg.org/mailman/listinfo/ffmpeg-devel To unsubscribe, visit link above, or email ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".