From: Paul B Mahol <onemda@gmail.com> To: FFmpeg development discussions and patches <ffmpeg-devel@ffmpeg.org> Subject: Re: [FFmpeg-devel] [PATCH 10/10] fftools/ffmpeg: convert to a threaded architecture Date: Wed, 6 Dec 2023 12:22:48 +0100 Message-ID: <CAPYw7P6ps2PGpA-JWgaVwPVPTaYbTPMJcMRdBGhTwv9XrEA_TQ@mail.gmail.com> (raw) In-Reply-To: <20231206103002.30084-11-anton@khirnov.net> On Wed, Dec 6, 2023 at 11:32 AM Anton Khirnov <anton@khirnov.net> wrote: > Change the main loop and every component (demuxers, decoders, filters, > encoders, muxers) to use the previously added transcode scheduler. Every > instance of every such component was already running in a separate > thread, but now they can actually run in parallel. > > Changes the results of ffmpeg-fix_sub_duration_heartbeat - tested by > JEEB to be more correct and deterministic. > Un-reviewable, please split it. > --- > Changelog | 2 + > fftools/ffmpeg.c | 374 +-------- > fftools/ffmpeg.h | 97 +-- > fftools/ffmpeg_dec.c | 321 ++------ > fftools/ffmpeg_demux.c | 268 ++++--- > fftools/ffmpeg_enc.c | 368 ++------- > fftools/ffmpeg_filter.c | 722 +++++------------- > fftools/ffmpeg_mux.c | 324 ++------ > fftools/ffmpeg_mux.h | 24 +- > fftools/ffmpeg_mux_init.c | 88 +-- > fftools/ffmpeg_opt.c | 6 +- > .../fate/ffmpeg-fix_sub_duration_heartbeat | 36 +- > 12 files changed, 600 insertions(+), 2030 deletions(-) > > diff --git a/Changelog b/Changelog > index f00bc27ca4..67ef92eb02 100644 > --- a/Changelog > +++ b/Changelog > @@ -7,6 +7,8 @@ version <next>: > - EVC encoding using external library libxeve > - QOA decoder and demuxer > - aap filter > +- demuxing, decoding, filtering, encoding, and muxing in the > + ffmpeg CLI now all run in parallel > > version 6.1: > - libaribcaption decoder > diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c > index b8a97258a0..30b594fd97 100644 > --- a/fftools/ffmpeg.c > +++ b/fftools/ffmpeg.c > @@ -117,7 +117,7 @@ typedef struct BenchmarkTimeStamps { > static BenchmarkTimeStamps get_benchmark_time_stamps(void); > static int64_t getmaxrss(void); > > -unsigned nb_output_dumped = 0; > +atomic_uint nb_output_dumped = 0; > > static BenchmarkTimeStamps current_time; > AVIOContext *progress_avio = NULL; > @@ -138,30 +138,6 @@ static struct termios oldtty; > static int restore_tty; > #endif > > -/* sub2video hack: > - Convert subtitles to video with alpha to insert them in filter graphs. > - This is a temporary solution until libavfilter gets real subtitles > support. > - */ > - > -static void sub2video_heartbeat(InputFile *infile, int64_t pts, > AVRational tb) > -{ > - /* When a frame is read from a file, examine all sub2video streams in > - the same file and send the sub2video frame again. Otherwise, > decoded > - video frames could be accumulating in the filter graph while a > filter > - (possibly overlay) is desperately waiting for a subtitle frame. */ > - for (int i = 0; i < infile->nb_streams; i++) { > - InputStream *ist = infile->streams[i]; > - > - if (ist->dec_ctx->codec_type != AVMEDIA_TYPE_SUBTITLE) > - continue; > - > - for (int j = 0; j < ist->nb_filters; j++) > - ifilter_sub2video_heartbeat(ist->filters[j], pts, tb); > - } > -} > - > -/* end of sub2video hack */ > - > static void term_exit_sigsafe(void) > { > #if HAVE_TERMIOS_H > @@ -499,23 +475,13 @@ void update_benchmark(const char *fmt, ...) > } > } > > -void close_output_stream(OutputStream *ost) > -{ > - OutputFile *of = output_files[ost->file_index]; > - ost->finished |= ENCODER_FINISHED; > - > - if (ost->sq_idx_encode >= 0) > - sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL)); > -} > - > -static void print_report(int is_last_report, int64_t timer_start, int64_t > cur_time) > +static void print_report(int is_last_report, int64_t timer_start, int64_t > cur_time, int64_t pts) > { > AVBPrint buf, buf_script; > int64_t total_size = of_filesize(output_files[0]); > int vid; > double bitrate; > double speed; > - int64_t pts = AV_NOPTS_VALUE; > static int64_t last_time = -1; > static int first_report = 1; > uint64_t nb_frames_dup = 0, nb_frames_drop = 0; > @@ -533,7 +499,7 @@ static void print_report(int is_last_report, int64_t > timer_start, int64_t cur_ti > last_time = cur_time; > } > if (((cur_time - last_time) < stats_period && !first_report) || > - (first_report && nb_output_dumped < nb_output_files)) > + (first_report && atomic_load(&nb_output_dumped) < > nb_output_files)) > return; > last_time = cur_time; > } > @@ -544,7 +510,7 @@ static void print_report(int is_last_report, int64_t > timer_start, int64_t cur_ti > av_bprint_init(&buf, 0, AV_BPRINT_SIZE_AUTOMATIC); > av_bprint_init(&buf_script, 0, AV_BPRINT_SIZE_AUTOMATIC); > for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) { > - const float q = ost->enc ? ost->quality / (float) FF_QP2LAMBDA : > -1; > + const float q = ost->enc ? atomic_load(&ost->quality) / (float) > FF_QP2LAMBDA : -1; > > if (vid && ost->type == AVMEDIA_TYPE_VIDEO) { > av_bprintf(&buf, "q=%2.1f ", q); > @@ -565,22 +531,18 @@ static void print_report(int is_last_report, int64_t > timer_start, int64_t cur_ti > if (is_last_report) > av_bprintf(&buf, "L"); > > - nb_frames_dup = ost->filter->nb_frames_dup; > - nb_frames_drop = ost->filter->nb_frames_drop; > + nb_frames_dup = atomic_load(&ost->filter->nb_frames_dup); > + nb_frames_drop = atomic_load(&ost->filter->nb_frames_drop); > > vid = 1; > } > - /* compute min output value */ > - if (ost->last_mux_dts != AV_NOPTS_VALUE) { > - if (pts == AV_NOPTS_VALUE || ost->last_mux_dts > pts) > - pts = ost->last_mux_dts; > - if (copy_ts) { > - if (copy_ts_first_pts == AV_NOPTS_VALUE && pts > 1) > - copy_ts_first_pts = pts; > - if (copy_ts_first_pts != AV_NOPTS_VALUE) > - pts -= copy_ts_first_pts; > - } > - } > + } > + > + if (copy_ts) { > + if (copy_ts_first_pts == AV_NOPTS_VALUE && pts > 1) > + copy_ts_first_pts = pts; > + if (copy_ts_first_pts != AV_NOPTS_VALUE) > + pts -= copy_ts_first_pts; > } > > us = FFABS64U(pts) % AV_TIME_BASE; > @@ -783,81 +745,6 @@ int subtitle_wrap_frame(AVFrame *frame, AVSubtitle > *subtitle, int copy) > return 0; > } > > -int trigger_fix_sub_duration_heartbeat(OutputStream *ost, const AVPacket > *pkt) > -{ > - OutputFile *of = output_files[ost->file_index]; > - int64_t signal_pts = av_rescale_q(pkt->pts, pkt->time_base, > - AV_TIME_BASE_Q); > - > - if (!ost->fix_sub_duration_heartbeat || !(pkt->flags & > AV_PKT_FLAG_KEY)) > - // we are only interested in heartbeats on streams configured, and > - // only on random access points. > - return 0; > - > - for (int i = 0; i < of->nb_streams; i++) { > - OutputStream *iter_ost = of->streams[i]; > - InputStream *ist = iter_ost->ist; > - int ret = AVERROR_BUG; > - > - if (iter_ost == ost || !ist || !ist->decoding_needed || > - ist->dec_ctx->codec_type != AVMEDIA_TYPE_SUBTITLE) > - // We wish to skip the stream that causes the heartbeat, > - // output streams without an input stream, streams not decoded > - // (as fix_sub_duration is only done for decoded subtitles) as > - // well as non-subtitle streams. > - continue; > - > - if ((ret = fix_sub_duration_heartbeat(ist, signal_pts)) < 0) > - return ret; > - } > - > - return 0; > -} > - > -/* pkt = NULL means EOF (needed to flush decoder buffers) */ > -static int process_input_packet(InputStream *ist, const AVPacket *pkt, > int no_eof) > -{ > - InputFile *f = input_files[ist->file_index]; > - int64_t dts_est = AV_NOPTS_VALUE; > - int ret = 0; > - int eof_reached = 0; > - > - if (ist->decoding_needed) { > - ret = dec_packet(ist, pkt, no_eof); > - if (ret < 0 && ret != AVERROR_EOF) > - return ret; > - } > - if (ret == AVERROR_EOF || (!pkt && !ist->decoding_needed)) > - eof_reached = 1; > - > - if (pkt && pkt->opaque_ref) { > - DemuxPktData *pd = (DemuxPktData*)pkt->opaque_ref->data; > - dts_est = pd->dts_est; > - } > - > - if (f->recording_time != INT64_MAX) { > - int64_t start_time = 0; > - if (copy_ts) { > - start_time += f->start_time != AV_NOPTS_VALUE ? f->start_time > : 0; > - start_time += start_at_zero ? 0 : f->start_time_effective; > - } > - if (dts_est >= f->recording_time + start_time) > - pkt = NULL; > - } > - > - for (int oidx = 0; oidx < ist->nb_outputs; oidx++) { > - OutputStream *ost = ist->outputs[oidx]; > - if (ost->enc || (!pkt && no_eof)) > - continue; > - > - ret = of_streamcopy(ost, pkt, dts_est); > - if (ret < 0) > - return ret; > - } > - > - return !eof_reached; > -} > - > static void print_stream_maps(void) > { > av_log(NULL, AV_LOG_INFO, "Stream mapping:\n"); > @@ -934,43 +821,6 @@ static void print_stream_maps(void) > } > } > > -/** > - * Select the output stream to process. > - * > - * @retval 0 an output stream was selected > - * @retval AVERROR(EAGAIN) need to wait until more input is available > - * @retval AVERROR_EOF no more streams need output > - */ > -static int choose_output(OutputStream **post) > -{ > - int64_t opts_min = INT64_MAX; > - OutputStream *ost_min = NULL; > - > - for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) { > - int64_t opts; > - > - if (ost->filter && ost->filter->last_pts != AV_NOPTS_VALUE) { > - opts = ost->filter->last_pts; > - } else { > - opts = ost->last_mux_dts == AV_NOPTS_VALUE ? > - INT64_MIN : ost->last_mux_dts; > - } > - > - if (!ost->initialized && !ost->finished) { > - ost_min = ost; > - break; > - } > - if (!ost->finished && opts < opts_min) { > - opts_min = opts; > - ost_min = ost; > - } > - } > - if (!ost_min) > - return AVERROR_EOF; > - *post = ost_min; > - return ost_min->unavailable ? AVERROR(EAGAIN) : 0; > -} > - > static void set_tty_echo(int on) > { > #if HAVE_TERMIOS_H > @@ -1042,149 +892,21 @@ static int check_keyboard_interaction(int64_t > cur_time) > return 0; > } > > -static void reset_eagain(void) > -{ > - for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) > - ost->unavailable = 0; > -} > - > -static void decode_flush(InputFile *ifile) > -{ > - for (int i = 0; i < ifile->nb_streams; i++) { > - InputStream *ist = ifile->streams[i]; > - > - if (ist->discard || !ist->decoding_needed) > - continue; > - > - dec_packet(ist, NULL, 1); > - } > -} > - > -/* > - * Return > - * - 0 -- one packet was read and processed > - * - AVERROR(EAGAIN) -- no packets were available for selected file, > - * this function should be called again > - * - AVERROR_EOF -- this function should not be called again > - */ > -static int process_input(int file_index, AVPacket *pkt) > -{ > - InputFile *ifile = input_files[file_index]; > - InputStream *ist; > - int ret, i; > - > - ret = ifile_get_packet(ifile, pkt); > - > - if (ret == 1) { > - /* the input file is looped: flush the decoders */ > - decode_flush(ifile); > - return AVERROR(EAGAIN); > - } > - if (ret < 0) { > - if (ret != AVERROR_EOF) { > - av_log(ifile, AV_LOG_ERROR, > - "Error retrieving a packet from demuxer: %s\n", > av_err2str(ret)); > - if (exit_on_error) > - return ret; > - } > - > - for (i = 0; i < ifile->nb_streams; i++) { > - ist = ifile->streams[i]; > - if (!ist->discard) { > - ret = process_input_packet(ist, NULL, 0); > - if (ret>0) > - return 0; > - else if (ret < 0) > - return ret; > - } > - > - /* mark all outputs that don't go through lavfi as finished */ > - for (int oidx = 0; oidx < ist->nb_outputs; oidx++) { > - OutputStream *ost = ist->outputs[oidx]; > - OutputFile *of = output_files[ost->file_index]; > - > - ret = of_output_packet(of, ost, NULL); > - if (ret < 0) > - return ret; > - } > - } > - > - ifile->eof_reached = 1; > - return AVERROR(EAGAIN); > - } > - > - reset_eagain(); > - > - ist = ifile->streams[pkt->stream_index]; > - > - sub2video_heartbeat(ifile, pkt->pts, pkt->time_base); > - > - ret = process_input_packet(ist, pkt, 0); > - > - av_packet_unref(pkt); > - > - return ret < 0 ? ret : 0; > -} > - > -/** > - * Run a single step of transcoding. > - * > - * @return 0 for success, <0 for error > - */ > -static int transcode_step(OutputStream *ost, AVPacket *demux_pkt) > -{ > - InputStream *ist = NULL; > - int ret; > - > - if (ost->filter) { > - if ((ret = fg_transcode_step(ost->filter->graph, &ist)) < 0) > - return ret; > - if (!ist) > - return 0; > - } else { > - ist = ost->ist; > - av_assert0(ist); > - } > - > - ret = process_input(ist->file_index, demux_pkt); > - if (ret == AVERROR(EAGAIN)) { > - return 0; > - } > - > - if (ret < 0) > - return ret == AVERROR_EOF ? 0 : ret; > - > - // process_input() above might have caused output to become available > - // in multiple filtergraphs, so we process all of them > - for (int i = 0; i < nb_filtergraphs; i++) { > - ret = reap_filters(filtergraphs[i], 0); > - if (ret < 0) > - return ret; > - } > - > - return 0; > -} > - > /* > * The following code is the main loop of the file converter > */ > -static int transcode(Scheduler *sch, int *err_rate_exceeded) > +static int transcode(Scheduler *sch) > { > int ret = 0, i; > - InputStream *ist; > - int64_t timer_start; > - AVPacket *demux_pkt = NULL; > + int64_t timer_start, transcode_ts = 0; > > print_stream_maps(); > > - *err_rate_exceeded = 0; > atomic_store(&transcode_init_done, 1); > > - demux_pkt = av_packet_alloc(); > - if (!demux_pkt) { > - ret = AVERROR(ENOMEM); > - goto fail; > - } > + ret = sch_start(sch); > + if (ret < 0) > + return ret; > > if (stdin_interaction) { > av_log(NULL, AV_LOG_INFO, "Press [q] to stop, [?] for help\n"); > @@ -1192,8 +914,7 @@ static int transcode(Scheduler *sch, int > *err_rate_exceeded) > > timer_start = av_gettime_relative(); > > - while (!received_sigterm) { > - OutputStream *ost; > + while (!sch_wait(sch, stats_period, &transcode_ts)) { > int64_t cur_time= av_gettime_relative(); > > /* if 'q' pressed, exits */ > @@ -1201,49 +922,11 @@ static int transcode(Scheduler *sch, int > *err_rate_exceeded) > if (check_keyboard_interaction(cur_time) < 0) > break; > > - ret = choose_output(&ost); > - if (ret == AVERROR(EAGAIN)) { > - reset_eagain(); > - av_usleep(10000); > - ret = 0; > - continue; > - } else if (ret < 0) { > - av_log(NULL, AV_LOG_VERBOSE, "No more output streams to write > to, finishing.\n"); > - ret = 0; > - break; > - } > - > - ret = transcode_step(ost, demux_pkt); > - if (ret < 0 && ret != AVERROR_EOF) { > - av_log(NULL, AV_LOG_ERROR, "Error while filtering: %s\n", > av_err2str(ret)); > - break; > - } > - > /* dump report by using the output first video and audio streams > */ > - print_report(0, timer_start, cur_time); > + print_report(0, timer_start, cur_time, transcode_ts); > } > > - /* at the end of stream, we must flush the decoder buffers */ > - for (ist = ist_iter(NULL); ist; ist = ist_iter(ist)) { > - float err_rate; > - > - if (!input_files[ist->file_index]->eof_reached) { > - int err = process_input_packet(ist, NULL, 0); > - ret = err_merge(ret, err); > - } > - > - err_rate = (ist->frames_decoded || ist->decode_errors) ? > - ist->decode_errors / (ist->frames_decoded + > ist->decode_errors) : 0.f; > - if (err_rate > max_error_rate) { > - av_log(ist, AV_LOG_FATAL, "Decode error rate %g exceeds > maximum %g\n", > - err_rate, max_error_rate); > - *err_rate_exceeded = 1; > - } else if (err_rate) > - av_log(ist, AV_LOG_VERBOSE, "Decode error rate %g\n", > err_rate); > - } > - ret = err_merge(ret, enc_flush()); > - > - term_exit(); > + ret = sch_stop(sch); > > /* write the trailer if needed */ > for (i = 0; i < nb_output_files; i++) { > @@ -1251,11 +934,10 @@ static int transcode(Scheduler *sch, int > *err_rate_exceeded) > ret = err_merge(ret, err); > } > > - /* dump report by using the first video and audio streams */ > - print_report(1, timer_start, av_gettime_relative()); > + term_exit(); > > -fail: > - av_packet_free(&demux_pkt); > + /* dump report by using the first video and audio streams */ > + print_report(1, timer_start, av_gettime_relative(), transcode_ts); > > return ret; > } > @@ -1308,7 +990,7 @@ int main(int argc, char **argv) > { > Scheduler *sch = NULL; > > - int ret, err_rate_exceeded; > + int ret; > BenchmarkTimeStamps ti; > > init_dynload(); > @@ -1350,7 +1032,7 @@ int main(int argc, char **argv) > } > > current_time = ti = get_benchmark_time_stamps(); > - ret = transcode(sch, &err_rate_exceeded); > + ret = transcode(sch); > if (ret >= 0 && do_benchmark) { > int64_t utime, stime, rtime; > current_time = get_benchmark_time_stamps(); > @@ -1362,8 +1044,8 @@ int main(int argc, char **argv) > utime / 1000000.0, stime / 1000000.0, rtime / 1000000.0); > } > > - ret = received_nb_signals ? 255 : > - err_rate_exceeded ? 69 : ret; > + ret = received_nb_signals ? 255 : > + (ret == FFMPEG_ERROR_RATE_EXCEEDED) ? 69 : ret; > > finish: > if (ret == AVERROR_EXIT) > diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h > index a89038b765..ba82b7490d 100644 > --- a/fftools/ffmpeg.h > +++ b/fftools/ffmpeg.h > @@ -61,6 +61,8 @@ > #define FFMPEG_OPT_TOP 1 > #define FFMPEG_OPT_FORCE_KF_SOURCE_NO_DROP 1 > > +#define FFMPEG_ERROR_RATE_EXCEEDED FFERRTAG('E', 'R', 'E', 'D') > + > enum VideoSyncMethod { > VSYNC_AUTO = -1, > VSYNC_PASSTHROUGH, > @@ -82,13 +84,16 @@ enum HWAccelID { > }; > > enum FrameOpaque { > - FRAME_OPAQUE_REAP_FILTERS = 1, > - FRAME_OPAQUE_CHOOSE_INPUT, > - FRAME_OPAQUE_SUB_HEARTBEAT, > + FRAME_OPAQUE_SUB_HEARTBEAT = 1, > FRAME_OPAQUE_EOF, > FRAME_OPAQUE_SEND_COMMAND, > }; > > +enum PacketOpaque { > + PKT_OPAQUE_SUB_HEARTBEAT = 1, > + PKT_OPAQUE_FIX_SUB_DURATION, > +}; > + > typedef struct HWDevice { > const char *name; > enum AVHWDeviceType type; > @@ -309,11 +314,8 @@ typedef struct OutputFilter { > > enum AVMediaType type; > > - /* pts of the last frame received from this filter, in AV_TIME_BASE_Q > */ > - int64_t last_pts; > - > - uint64_t nb_frames_dup; > - uint64_t nb_frames_drop; > + atomic_uint_least64_t nb_frames_dup; > + atomic_uint_least64_t nb_frames_drop; > } OutputFilter; > > typedef struct FilterGraph { > @@ -426,11 +428,6 @@ typedef struct InputFile { > > float readrate; > int accurate_seek; > - > - /* when looping the input file, this queue is used by decoders to > report > - * the last frame timestamp back to the demuxer thread */ > - AVThreadMessageQueue *audio_ts_queue; > - int audio_ts_queue_size; > } InputFile; > > enum forced_keyframes_const { > @@ -532,8 +529,6 @@ typedef struct OutputStream { > InputStream *ist; > > AVStream *st; /* stream in the output file */ > - /* dts of the last packet sent to the muxing queue, in AV_TIME_BASE_Q > */ > - int64_t last_mux_dts; > > AVRational enc_timebase; > > @@ -578,13 +573,6 @@ typedef struct OutputStream { > AVDictionary *sws_dict; > AVDictionary *swr_opts; > char *apad; > - OSTFinished finished; /* no more packets should be written for > this stream */ > - int unavailable; /* true if the steram is > unavailable (possibly temporarily) */ > - > - // init_output_stream() has been called for this stream > - // The encoder and the bitstream filters have been initialized and > the stream > - // parameters are set in the AVStream. > - int initialized; > > const char *attachment_filename; > > @@ -598,9 +586,8 @@ typedef struct OutputStream { > uint64_t samples_encoded; > > /* packet quality factor */ > - int quality; > + atomic_int quality; > > - int sq_idx_encode; > int sq_idx_mux; > > EncStats enc_stats_pre; > @@ -658,7 +645,6 @@ extern FilterGraph **filtergraphs; > extern int nb_filtergraphs; > > extern char *vstats_filename; > -extern char *sdp_filename; > > extern float dts_delta_threshold; > extern float dts_error_threshold; > @@ -691,7 +677,7 @@ extern const AVIOInterruptCB int_cb; > extern const OptionDef options[]; > extern HWDevice *filter_hw_device; > > -extern unsigned nb_output_dumped; > +extern atomic_uint nb_output_dumped; > > extern int ignore_unknown_streams; > extern int copy_unknown_streams; > @@ -737,10 +723,6 @@ FrameData *frame_data(AVFrame *frame); > > const FrameData *frame_data_c(AVFrame *frame); > > -int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int > keep_reference); > -int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb); > -void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, > AVRational tb); > - > /** > * Set up fallback filtering parameters from a decoder context. They will > only > * be used if no frames are ever sent on this input, otherwise the actual > @@ -761,26 +743,9 @@ int fg_create(FilterGraph **pfg, char *graph_desc, > Scheduler *sch); > > void fg_free(FilterGraph **pfg); > > -/** > - * Perform a step of transcoding for the specified filter graph. > - * > - * @param[in] graph filter graph to consider > - * @param[out] best_ist input stream where a frame would allow to > continue > - * @return 0 for success, <0 for error > - */ > -int fg_transcode_step(FilterGraph *graph, InputStream **best_ist); > - > void fg_send_command(FilterGraph *fg, double time, const char *target, > const char *command, const char *arg, int > all_filters); > > -/** > - * Get and encode new output from specified filtergraph, without causing > - * activity. > - * > - * @return 0 for success, <0 for severe errors > - */ > -int reap_filters(FilterGraph *fg, int flush); > - > int ffmpeg_parse_options(int argc, char **argv, Scheduler *sch); > > void enc_stats_write(OutputStream *ost, EncStats *es, > @@ -807,25 +772,11 @@ int hwaccel_retrieve_data(AVCodecContext *avctx, > AVFrame *input); > int dec_open(InputStream *ist, Scheduler *sch, unsigned sch_idx); > void dec_free(Decoder **pdec); > > -/** > - * Submit a packet for decoding > - * > - * When pkt==NULL and no_eof=0, there will be no more input. Flush > decoders and > - * mark all downstreams as finished. > - * > - * When pkt==NULL and no_eof=1, the stream was reset (e.g. after a seek). > Flush > - * decoders and await further input. > - */ > -int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof); > - > int enc_alloc(Encoder **penc, const AVCodec *codec, > Scheduler *sch, unsigned sch_idx); > void enc_free(Encoder **penc); > > -int enc_open(OutputStream *ost, const AVFrame *frame); > -int enc_subtitle(OutputFile *of, OutputStream *ost, const AVSubtitle > *sub); > -int enc_frame(OutputStream *ost, AVFrame *frame); > -int enc_flush(void); > +int enc_open(void *opaque, const AVFrame *frame); > > /* > * Initialize muxing state for the given stream, should be called > @@ -840,30 +791,11 @@ void of_free(OutputFile **pof); > > void of_enc_stats_close(void); > > -int of_output_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt); > - > -/** > - * @param dts predicted packet dts in AV_TIME_BASE_Q > - */ > -int of_streamcopy(OutputStream *ost, const AVPacket *pkt, int64_t dts); > - > int64_t of_filesize(OutputFile *of); > > int ifile_open(const OptionsContext *o, const char *filename, Scheduler > *sch); > void ifile_close(InputFile **f); > > -/** > - * Get next input packet from the demuxer. > - * > - * @param pkt the packet is written here when this function returns 0 > - * @return > - * - 0 when a packet has been read successfully > - * - 1 when stream end was reached, but the stream is looped; > - * caller should flush decoders and read from this demuxer again > - * - a negative error code on failure > - */ > -int ifile_get_packet(InputFile *f, AVPacket *pkt); > - > int ist_output_add(InputStream *ist, OutputStream *ost); > int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple); > > @@ -880,9 +812,6 @@ InputStream *ist_iter(InputStream *prev); > * pass NULL to start iteration */ > OutputStream *ost_iter(OutputStream *prev); > > -void close_output_stream(OutputStream *ost); > -int trigger_fix_sub_duration_heartbeat(OutputStream *ost, const AVPacket > *pkt); > -int fix_sub_duration_heartbeat(InputStream *ist, int64_t signal_pts); > void update_benchmark(const char *fmt, ...); > > #define SPECIFIER_OPT_FMT_str "%s" > diff --git a/fftools/ffmpeg_dec.c b/fftools/ffmpeg_dec.c > index 90ea0d6d93..5dde82a276 100644 > --- a/fftools/ffmpeg_dec.c > +++ b/fftools/ffmpeg_dec.c > @@ -54,24 +54,6 @@ struct Decoder { > > Scheduler *sch; > unsigned sch_idx; > - > - pthread_t thread; > - /** > - * Queue for sending coded packets from the main thread to > - * the decoder thread. > - * > - * An empty packet is sent to flush the decoder without terminating > - * decoding. > - */ > - ThreadQueue *queue_in; > - /** > - * Queue for sending decoded frames from the decoder thread > - * to the main thread. > - * > - * An empty frame is sent to signal that a single packet has been > fully > - * processed. > - */ > - ThreadQueue *queue_out; > }; > > // data that is local to the decoder thread and not visible outside of it > @@ -80,24 +62,6 @@ typedef struct DecThreadContext { > AVPacket *pkt; > } DecThreadContext; > > -static int dec_thread_stop(Decoder *d) > -{ > - void *ret; > - > - if (!d->queue_in) > - return 0; > - > - tq_send_finish(d->queue_in, 0); > - tq_receive_finish(d->queue_out, 0); > - > - pthread_join(d->thread, &ret); > - > - tq_free(&d->queue_in); > - tq_free(&d->queue_out); > - > - return (intptr_t)ret; > -} > - > void dec_free(Decoder **pdec) > { > Decoder *dec = *pdec; > @@ -105,8 +69,6 @@ void dec_free(Decoder **pdec) > if (!dec) > return; > > - dec_thread_stop(dec); > - > av_frame_free(&dec->frame); > av_packet_free(&dec->pkt); > > @@ -148,25 +110,6 @@ fail: > return AVERROR(ENOMEM); > } > > -static int send_frame_to_filters(InputStream *ist, AVFrame *decoded_frame) > -{ > - int i, ret = 0; > - > - for (i = 0; i < ist->nb_filters; i++) { > - ret = ifilter_send_frame(ist->filters[i], decoded_frame, > - i < ist->nb_filters - 1 || > - ist->dec->type == AVMEDIA_TYPE_SUBTITLE); > - if (ret == AVERROR_EOF) > - ret = 0; /* ignore */ > - if (ret < 0) { > - av_log(NULL, AV_LOG_ERROR, > - "Failed to inject frame into filter network: %s\n", > av_err2str(ret)); > - break; > - } > - } > - return ret; > -} > - > static AVRational audio_samplerate_update(void *logctx, Decoder *d, > const AVFrame *frame) > { > @@ -421,28 +364,14 @@ static int process_subtitle(InputStream *ist, > AVFrame *frame) > if (!subtitle) > return 0; > > - ret = send_frame_to_filters(ist, frame); > + ret = sch_dec_send(d->sch, d->sch_idx, frame); > if (ret < 0) > - return ret; > + av_frame_unref(frame); > > - subtitle = (AVSubtitle*)frame->buf[0]->data; > - if (!subtitle->num_rects) > - return 0; > - > - for (int oidx = 0; oidx < ist->nb_outputs; oidx++) { > - OutputStream *ost = ist->outputs[oidx]; > - if (!ost->enc || ost->type != AVMEDIA_TYPE_SUBTITLE) > - continue; > - > - ret = enc_subtitle(output_files[ost->file_index], ost, subtitle); > - if (ret < 0) > - return ret; > - } > - > - return 0; > + return ret == AVERROR_EOF ? AVERROR_EXIT : ret; > } > > -int fix_sub_duration_heartbeat(InputStream *ist, int64_t signal_pts) > +static int fix_sub_duration_heartbeat(InputStream *ist, int64_t > signal_pts) > { > Decoder *d = ist->decoder; > int ret = AVERROR_BUG; > @@ -468,12 +397,24 @@ int fix_sub_duration_heartbeat(InputStream *ist, > int64_t signal_pts) > static int transcode_subtitles(InputStream *ist, const AVPacket *pkt, > AVFrame *frame) > { > - Decoder *d = ist->decoder; > + Decoder *d = ist->decoder; > AVPacket *flush_pkt = NULL; > AVSubtitle subtitle; > int got_output; > int ret; > > + if (pkt && (intptr_t)pkt->opaque == PKT_OPAQUE_SUB_HEARTBEAT) { > + frame->pts = pkt->pts; > + frame->time_base = pkt->time_base; > + frame->opaque = (void*)(intptr_t)FRAME_OPAQUE_SUB_HEARTBEAT; > + > + ret = sch_dec_send(d->sch, d->sch_idx, frame); > + return ret == AVERROR_EOF ? AVERROR_EXIT : ret; > + } else if (pkt && (intptr_t)pkt->opaque == > PKT_OPAQUE_FIX_SUB_DURATION) { > + return fix_sub_duration_heartbeat(ist, av_rescale_q(pkt->pts, > pkt->time_base, > + > AV_TIME_BASE_Q)); > + } > + > if (!pkt) { > flush_pkt = av_packet_alloc(); > if (!flush_pkt) > @@ -496,7 +437,7 @@ static int transcode_subtitles(InputStream *ist, const > AVPacket *pkt, > > ist->frames_decoded++; > > - // XXX the queue for transferring data back to the main thread runs > + // XXX the queue for transferring data to consumers runs > // on AVFrames, so we wrap AVSubtitle in an AVBufferRef and put that > // inside the frame > // eventually, subtitles should be switched to use AVFrames natively > @@ -509,26 +450,7 @@ static int transcode_subtitles(InputStream *ist, > const AVPacket *pkt, > frame->width = ist->dec_ctx->width; > frame->height = ist->dec_ctx->height; > > - ret = tq_send(d->queue_out, 0, frame); > - if (ret < 0) > - av_frame_unref(frame); > - > - return ret; > -} > - > -static int send_filter_eof(InputStream *ist) > -{ > - Decoder *d = ist->decoder; > - int i, ret; > - > - for (i = 0; i < ist->nb_filters; i++) { > - int64_t end_pts = d->last_frame_pts == AV_NOPTS_VALUE ? > AV_NOPTS_VALUE : > - d->last_frame_pts + d->last_frame_duration_est; > - ret = ifilter_send_eof(ist->filters[i], end_pts, > d->last_frame_tb); > - if (ret < 0) > - return ret; > - } > - return 0; > + return process_subtitle(ist, frame); > } > > static int packet_decode(InputStream *ist, AVPacket *pkt, AVFrame *frame) > @@ -635,9 +557,11 @@ static int packet_decode(InputStream *ist, AVPacket > *pkt, AVFrame *frame) > > ist->frames_decoded++; > > - ret = tq_send(d->queue_out, 0, frame); > - if (ret < 0) > - return ret; > + ret = sch_dec_send(d->sch, d->sch_idx, frame); > + if (ret < 0) { > + av_frame_unref(frame); > + return ret == AVERROR_EOF ? AVERROR_EXIT : ret; > + } > } > } > > @@ -679,7 +603,6 @@ fail: > void *decoder_thread(void *arg) > { > InputStream *ist = arg; > - InputFile *ifile = input_files[ist->file_index]; > Decoder *d = ist->decoder; > DecThreadContext dt; > int ret = 0, input_status = 0; > @@ -691,19 +614,31 @@ void *decoder_thread(void *arg) > dec_thread_set_name(ist); > > while (!input_status) { > - int dummy, flush_buffers; > + int flush_buffers, have_data; > > - input_status = tq_receive(d->queue_in, &dummy, dt.pkt); > - flush_buffers = input_status >= 0 && !dt.pkt->buf; > - if (!dt.pkt->buf) > + input_status = sch_dec_receive(d->sch, d->sch_idx, dt.pkt); > + have_data = input_status >= 0 && > + (dt.pkt->buf || dt.pkt->side_data_elems || > + (intptr_t)dt.pkt->opaque == PKT_OPAQUE_SUB_HEARTBEAT || > + (intptr_t)dt.pkt->opaque == PKT_OPAQUE_FIX_SUB_DURATION); > + flush_buffers = input_status >= 0 && !have_data; > + if (!have_data) > av_log(ist, AV_LOG_VERBOSE, "Decoder thread received %s > packet\n", > flush_buffers ? "flush" : "EOF"); > > - ret = packet_decode(ist, dt.pkt->buf ? dt.pkt : NULL, dt.frame); > + ret = packet_decode(ist, have_data ? dt.pkt : NULL, dt.frame); > > av_packet_unref(dt.pkt); > av_frame_unref(dt.frame); > > + // AVERROR_EOF - EOF from the decoder > + // AVERROR_EXIT - EOF from the scheduler > + // we treat them differently when flushing > + if (ret == AVERROR_EXIT) { > + ret = AVERROR_EOF; > + flush_buffers = 0; > + } > + > if (ret == AVERROR_EOF) { > av_log(ist, AV_LOG_VERBOSE, "Decoder returned EOF, %s\n", > flush_buffers ? "resetting" : "finishing"); > @@ -711,11 +646,10 @@ void *decoder_thread(void *arg) > if (!flush_buffers) > break; > > - /* report last frame duration to the demuxer thread */ > + /* report last frame duration to the scheduler */ > if (ist->dec->type == AVMEDIA_TYPE_AUDIO) { > - Timestamp ts = { .ts = d->last_frame_pts + > d->last_frame_duration_est, > - .tb = d->last_frame_tb }; > - av_thread_message_queue_send(ifile->audio_ts_queue, &ts, > 0); > + dt.pkt->pts = d->last_frame_pts + > d->last_frame_duration_est; > + dt.pkt->time_base = d->last_frame_tb; > } > > avcodec_flush_buffers(ist->dec_ctx); > @@ -724,149 +658,47 @@ void *decoder_thread(void *arg) > av_err2str(ret)); > break; > } > - > - // signal to the consumer thread that the entire packet was > processed > - ret = tq_send(d->queue_out, 0, dt.frame); > - if (ret < 0) { > - if (ret != AVERROR_EOF) > - av_log(ist, AV_LOG_ERROR, "Error communicating with the > main thread\n"); > - break; > - } > } > > // EOF is normal thread termination > if (ret == AVERROR_EOF) > ret = 0; > > + // on success send EOF timestamp to our downstreams > + if (ret >= 0) { > + float err_rate; > + > + av_frame_unref(dt.frame); > + > + dt.frame->opaque = (void*)(intptr_t)FRAME_OPAQUE_EOF; > + dt.frame->pts = d->last_frame_pts == AV_NOPTS_VALUE ? > AV_NOPTS_VALUE : > + d->last_frame_pts + > d->last_frame_duration_est; > + dt.frame->time_base = d->last_frame_tb; > + > + ret = sch_dec_send(d->sch, d->sch_idx, dt.frame); > + if (ret < 0 && ret != AVERROR_EOF) { > + av_log(NULL, AV_LOG_FATAL, > + "Error signalling EOF timestamp: %s\n", > av_err2str(ret)); > + goto finish; > + } > + ret = 0; > + > + err_rate = (ist->frames_decoded || ist->decode_errors) ? > + ist->decode_errors / (ist->frames_decoded + > ist->decode_errors) : 0.f; > + if (err_rate > max_error_rate) { > + av_log(ist, AV_LOG_FATAL, "Decode error rate %g exceeds > maximum %g\n", > + err_rate, max_error_rate); > + ret = FFMPEG_ERROR_RATE_EXCEEDED; > + } else if (err_rate) > + av_log(ist, AV_LOG_VERBOSE, "Decode error rate %g\n", > err_rate); > + } > + > finish: > - tq_receive_finish(d->queue_in, 0); > - tq_send_finish (d->queue_out, 0); > - > - // make sure the demuxer does not get stuck waiting for audio > durations > - // that will never arrive > - if (ifile->audio_ts_queue && ist->dec->type == AVMEDIA_TYPE_AUDIO) > - av_thread_message_queue_set_err_recv(ifile->audio_ts_queue, > AVERROR_EOF); > - > dec_thread_uninit(&dt); > > - av_log(ist, AV_LOG_VERBOSE, "Terminating decoder thread\n"); > - > return (void*)(intptr_t)ret; > } > > -int dec_packet(InputStream *ist, const AVPacket *pkt, int no_eof) > -{ > - Decoder *d = ist->decoder; > - int ret = 0, thread_ret; > - > - // thread already joined > - if (!d->queue_in) > - return AVERROR_EOF; > - > - // send the packet/flush request/EOF to the decoder thread > - if (pkt || no_eof) { > - av_packet_unref(d->pkt); > - > - if (pkt) { > - ret = av_packet_ref(d->pkt, pkt); > - if (ret < 0) > - goto finish; > - } > - > - ret = tq_send(d->queue_in, 0, d->pkt); > - if (ret < 0) > - goto finish; > - } else > - tq_send_finish(d->queue_in, 0); > - > - // retrieve all decoded data for the packet > - while (1) { > - int dummy; > - > - ret = tq_receive(d->queue_out, &dummy, d->frame); > - if (ret < 0) > - goto finish; > - > - // packet fully processed > - if (!d->frame->buf[0]) > - return 0; > - > - // process the decoded frame > - if (ist->dec->type == AVMEDIA_TYPE_SUBTITLE) { > - ret = process_subtitle(ist, d->frame); > - } else { > - ret = send_frame_to_filters(ist, d->frame); > - } > - av_frame_unref(d->frame); > - if (ret < 0) > - goto finish; > - } > - > -finish: > - thread_ret = dec_thread_stop(d); > - if (thread_ret < 0) { > - av_log(ist, AV_LOG_ERROR, "Decoder thread returned error: %s\n", > - av_err2str(thread_ret)); > - ret = err_merge(ret, thread_ret); > - } > - // non-EOF errors here are all fatal > - if (ret < 0 && ret != AVERROR_EOF) > - return ret; > - > - // signal EOF to our downstreams > - ret = send_filter_eof(ist); > - if (ret < 0) { > - av_log(NULL, AV_LOG_FATAL, "Error marking filters as finished\n"); > - return ret; > - } > - > - return AVERROR_EOF; > -} > - > -static int dec_thread_start(InputStream *ist) > -{ > - Decoder *d = ist->decoder; > - ObjPool *op; > - int ret = 0; > - > - op = objpool_alloc_packets(); > - if (!op) > - return AVERROR(ENOMEM); > - > - d->queue_in = tq_alloc(1, 1, op, pkt_move); > - if (!d->queue_in) { > - objpool_free(&op); > - return AVERROR(ENOMEM); > - } > - > - op = objpool_alloc_frames(); > - if (!op) > - goto fail; > - > - d->queue_out = tq_alloc(1, 4, op, frame_move); > - if (!d->queue_out) { > - objpool_free(&op); > - goto fail; > - } > - > - ret = pthread_create(&d->thread, NULL, decoder_thread, ist); > - if (ret) { > - ret = AVERROR(ret); > - av_log(ist, AV_LOG_ERROR, "pthread_create() failed: %s\n", > - av_err2str(ret)); > - goto fail; > - } > - > - return 0; > -fail: > - if (ret >= 0) > - ret = AVERROR(ENOMEM); > - > - tq_free(&d->queue_in); > - tq_free(&d->queue_out); > - return ret; > -} > - > static enum AVPixelFormat get_format(AVCodecContext *s, const enum > AVPixelFormat *pix_fmts) > { > InputStream *ist = s->opaque; > @@ -1118,12 +950,5 @@ int dec_open(InputStream *ist, Scheduler *sch, > unsigned sch_idx) > if (ret < 0) > return ret; > > - ret = dec_thread_start(ist); > - if (ret < 0) { > - av_log(ist, AV_LOG_ERROR, "Error starting decoder thread: %s\n", > - av_err2str(ret)); > - return ret; > - } > - > return 0; > } > diff --git a/fftools/ffmpeg_demux.c b/fftools/ffmpeg_demux.c > index 2234dbe076..91cd7a1125 100644 > --- a/fftools/ffmpeg_demux.c > +++ b/fftools/ffmpeg_demux.c > @@ -22,8 +22,6 @@ > #include "ffmpeg.h" > #include "ffmpeg_sched.h" > #include "ffmpeg_utils.h" > -#include "objpool.h" > -#include "thread_queue.h" > > #include "libavutil/avassert.h" > #include "libavutil/avstring.h" > @@ -35,7 +33,6 @@ > #include "libavutil/pixdesc.h" > #include "libavutil/time.h" > #include "libavutil/timestamp.h" > -#include "libavutil/thread.h" > > #include "libavcodec/packet.h" > > @@ -66,7 +63,11 @@ typedef struct DemuxStream { > > double ts_scale; > > + // scheduler returned EOF for this stream > + int finished; > + > int streamcopy_needed; > + int have_sub2video; > > int wrap_correction_done; > int saw_first_ts; > @@ -101,6 +102,7 @@ typedef struct Demuxer { > > /* number of times input stream should be looped */ > int loop; > + int have_audio_dec; > /* duration of the looped segment of the input file */ > Timestamp duration; > /* pts with the smallest/largest values ever seen */ > @@ -113,11 +115,12 @@ typedef struct Demuxer { > double readrate_initial_burst; > > Scheduler *sch; > - ThreadQueue *thread_queue; > - int thread_queue_size; > - pthread_t thread; > + > + AVPacket *pkt_heartbeat; > > int read_started; > + int nb_streams_used; > + int nb_streams_finished; > } Demuxer; > > static DemuxStream *ds_from_ist(InputStream *ist) > @@ -153,7 +156,7 @@ static void report_new_stream(Demuxer *d, const > AVPacket *pkt) > d->nb_streams_warn = pkt->stream_index + 1; > } > > -static int seek_to_start(Demuxer *d) > +static int seek_to_start(Demuxer *d, Timestamp end_pts) > { > InputFile *ifile = &d->f; > AVFormatContext *is = ifile->ctx; > @@ -163,21 +166,10 @@ static int seek_to_start(Demuxer *d) > if (ret < 0) > return ret; > > - if (ifile->audio_ts_queue_size) { > - int got_ts = 0; > - > - while (got_ts < ifile->audio_ts_queue_size) { > - Timestamp ts; > - ret = av_thread_message_queue_recv(ifile->audio_ts_queue, > &ts, 0); > - if (ret < 0) > - return ret; > - got_ts++; > - > - if (d->max_pts.ts == AV_NOPTS_VALUE || > - av_compare_ts(d->max_pts.ts, d->max_pts.tb, ts.ts, ts.tb) > < 0) > - d->max_pts = ts; > - } > - } > + if (end_pts.ts != AV_NOPTS_VALUE && > + (d->max_pts.ts == AV_NOPTS_VALUE || > + av_compare_ts(d->max_pts.ts, d->max_pts.tb, end_pts.ts, > end_pts.tb) < 0)) > + d->max_pts = end_pts; > > if (d->max_pts.ts != AV_NOPTS_VALUE) { > int64_t min_pts = d->min_pts.ts == AV_NOPTS_VALUE ? 0 : > d->min_pts.ts; > @@ -404,7 +396,7 @@ static int ts_fixup(Demuxer *d, AVPacket *pkt) > duration = av_rescale_q(d->duration.ts, d->duration.tb, > pkt->time_base); > if (pkt->pts != AV_NOPTS_VALUE) { > // audio decoders take precedence for estimating total file > duration > - int64_t pkt_duration = ifile->audio_ts_queue_size ? 0 : > pkt->duration; > + int64_t pkt_duration = d->have_audio_dec ? 0 : pkt->duration; > > pkt->pts += duration; > > @@ -440,7 +432,7 @@ static int ts_fixup(Demuxer *d, AVPacket *pkt) > return 0; > } > > -static int input_packet_process(Demuxer *d, AVPacket *pkt) > +static int input_packet_process(Demuxer *d, AVPacket *pkt, unsigned > *send_flags) > { > InputFile *f = &d->f; > InputStream *ist = f->streams[pkt->stream_index]; > @@ -451,6 +443,16 @@ static int input_packet_process(Demuxer *d, AVPacket > *pkt) > if (ret < 0) > return ret; > > + if (f->recording_time != INT64_MAX) { > + int64_t start_time = 0; > + if (copy_ts) { > + start_time += f->start_time != AV_NOPTS_VALUE ? f->start_time > : 0; > + start_time += start_at_zero ? 0 : f->start_time_effective; > + } > + if (ds->dts >= f->recording_time + start_time) > + *send_flags |= DEMUX_SEND_STREAMCOPY_EOF; > + } > + > ds->data_size += pkt->size; > ds->nb_packets++; > > @@ -465,6 +467,8 @@ static int input_packet_process(Demuxer *d, AVPacket > *pkt) > av_ts2timestr(input_files[ist->file_index]->ts_offset, > &AV_TIME_BASE_Q)); > } > > + pkt->stream_index = ds->sch_idx_stream; > + > return 0; > } > > @@ -488,6 +492,65 @@ static void readrate_sleep(Demuxer *d) > } > } > > +static int do_send(Demuxer *d, DemuxStream *ds, AVPacket *pkt, unsigned > flags, > + const char *pkt_desc) > +{ > + int ret; > + > + ret = sch_demux_send(d->sch, d->f.index, pkt, flags); > + if (ret == AVERROR_EOF) { > + av_packet_unref(pkt); > + > + av_log(ds, AV_LOG_VERBOSE, "All consumers of this stream are > done\n"); > + ds->finished = 1; > + > + if (++d->nb_streams_finished == d->nb_streams_used) { > + av_log(d, AV_LOG_VERBOSE, "All consumers are done\n"); > + return AVERROR_EOF; > + } > + } else if (ret < 0) { > + if (ret != AVERROR_EXIT) > + av_log(d, AV_LOG_ERROR, > + "Unable to send %s packet to consumers: %s\n", > + pkt_desc, av_err2str(ret)); > + return ret; > + } > + > + return 0; > +} > + > +static int demux_send(Demuxer *d, DemuxStream *ds, AVPacket *pkt, > unsigned flags) > +{ > + InputFile *f = &d->f; > + int ret; > + > + // send heartbeat for sub2video streams > + if (d->pkt_heartbeat && pkt->pts != AV_NOPTS_VALUE) { > + for (int i = 0; i < f->nb_streams; i++) { > + DemuxStream *ds1 = ds_from_ist(f->streams[i]); > + > + if (ds1->finished || !ds1->have_sub2video) > + continue; > + > + d->pkt_heartbeat->pts = pkt->pts; > + d->pkt_heartbeat->time_base = pkt->time_base; > + d->pkt_heartbeat->stream_index = ds1->sch_idx_stream; > + d->pkt_heartbeat->opaque = > (void*)(intptr_t)PKT_OPAQUE_SUB_HEARTBEAT; > + > + ret = do_send(d, ds1, d->pkt_heartbeat, 0, "heartbeat"); > + if (ret < 0) > + return ret; > + } > + } > + > + ret = do_send(d, ds, pkt, flags, "demuxed"); > + if (ret < 0) > + return ret; > + > + > + return 0; > +} > + > static void discard_unused_programs(InputFile *ifile) > { > for (int j = 0; j < ifile->ctx->nb_programs; j++) { > @@ -527,9 +590,13 @@ static void *input_thread(void *arg) > > discard_unused_programs(f); > > + d->read_started = 1; > d->wallclock_start = av_gettime_relative(); > > while (1) { > + DemuxStream *ds; > + unsigned send_flags = 0; > + > ret = av_read_frame(f->ctx, pkt); > > if (ret == AVERROR(EAGAIN)) { > @@ -538,11 +605,13 @@ static void *input_thread(void *arg) > } > if (ret < 0) { > if (d->loop) { > - /* signal looping to the consumer thread */ > + /* signal looping to our consumers */ > pkt->stream_index = -1; > - ret = tq_send(d->thread_queue, 0, pkt); > + > + ret = sch_demux_send(d->sch, f->index, pkt, 0); > if (ret >= 0) > - ret = seek_to_start(d); > + ret = seek_to_start(d, (Timestamp){ .ts = pkt->pts, > + .tb = > pkt->time_base }); > if (ret >= 0) > continue; > > @@ -551,9 +620,11 @@ static void *input_thread(void *arg) > > if (ret == AVERROR_EOF) > av_log(d, AV_LOG_VERBOSE, "EOF while reading input\n"); > - else > + else { > av_log(d, AV_LOG_ERROR, "Error during demuxing: %s\n", > av_err2str(ret)); > + ret = exit_on_error ? ret : 0; > + } > > break; > } > @@ -565,8 +636,9 @@ static void *input_thread(void *arg) > > /* the following test is needed in case new streams appear > dynamically in stream : we ignore them */ > - if (pkt->stream_index >= f->nb_streams || > - f->streams[pkt->stream_index]->discard) { > + ds = pkt->stream_index < f->nb_streams ? > + ds_from_ist(f->streams[pkt->stream_index]) : NULL; > + if (!ds || ds->ist.discard || ds->finished) { > report_new_stream(d, pkt); > av_packet_unref(pkt); > continue; > @@ -583,122 +655,26 @@ static void *input_thread(void *arg) > } > } > > - ret = input_packet_process(d, pkt); > + ret = input_packet_process(d, pkt, &send_flags); > if (ret < 0) > break; > > if (f->readrate) > readrate_sleep(d); > > - ret = tq_send(d->thread_queue, 0, pkt); > - if (ret < 0) { > - if (ret != AVERROR_EOF) > - av_log(f, AV_LOG_ERROR, > - "Unable to send packet to main thread: %s\n", > - av_err2str(ret)); > + ret = demux_send(d, ds, pkt, send_flags); > + if (ret < 0) > break; > - } > } > > + // EOF/EXIT is normal termination > + if (ret == AVERROR_EOF || ret == AVERROR_EXIT) > + ret = 0; > + > finish: > - av_assert0(ret < 0); > - tq_send_finish(d->thread_queue, 0); > - > av_packet_free(&pkt); > > - av_log(d, AV_LOG_VERBOSE, "Terminating demuxer thread\n"); > - > - return NULL; > -} > - > -static void thread_stop(Demuxer *d) > -{ > - InputFile *f = &d->f; > - > - if (!d->thread_queue) > - return; > - > - tq_receive_finish(d->thread_queue, 0); > - > - pthread_join(d->thread, NULL); > - > - tq_free(&d->thread_queue); > - > - av_thread_message_queue_free(&f->audio_ts_queue); > -} > - > -static int thread_start(Demuxer *d) > -{ > - int ret; > - InputFile *f = &d->f; > - ObjPool *op; > - > - if (d->thread_queue_size <= 0) > - d->thread_queue_size = (nb_input_files > 1 ? 8 : 1); > - > - op = objpool_alloc_packets(); > - if (!op) > - return AVERROR(ENOMEM); > - > - d->thread_queue = tq_alloc(1, d->thread_queue_size, op, pkt_move); > - if (!d->thread_queue) { > - objpool_free(&op); > - return AVERROR(ENOMEM); > - } > - > - if (d->loop) { > - int nb_audio_dec = 0; > - > - for (int i = 0; i < f->nb_streams; i++) { > - InputStream *ist = f->streams[i]; > - nb_audio_dec += !!(ist->decoding_needed && > - ist->st->codecpar->codec_type == > AVMEDIA_TYPE_AUDIO); > - } > - > - if (nb_audio_dec) { > - ret = av_thread_message_queue_alloc(&f->audio_ts_queue, > - nb_audio_dec, > sizeof(Timestamp)); > - if (ret < 0) > - goto fail; > - f->audio_ts_queue_size = nb_audio_dec; > - } > - } > - > - if ((ret = pthread_create(&d->thread, NULL, input_thread, d))) { > - av_log(d, AV_LOG_ERROR, "pthread_create failed: %s. Try to > increase `ulimit -v` or decrease `ulimit -s`.\n", strerror(ret)); > - ret = AVERROR(ret); > - goto fail; > - } > - > - d->read_started = 1; > - > - return 0; > -fail: > - tq_free(&d->thread_queue); > - return ret; > -} > - > -int ifile_get_packet(InputFile *f, AVPacket *pkt) > -{ > - Demuxer *d = demuxer_from_ifile(f); > - int ret, dummy; > - > - if (!d->thread_queue) { > - ret = thread_start(d); > - if (ret < 0) > - return ret; > - } > - > - ret = tq_receive(d->thread_queue, &dummy, pkt); > - if (ret < 0) > - return ret; > - > - if (pkt->stream_index == -1) { > - av_assert0(!pkt->data && !pkt->side_data_elems); > - return 1; > - } > - > - return 0; > + return (void*)(intptr_t)ret; > } > > static void demux_final_stats(Demuxer *d) > @@ -769,8 +745,6 @@ void ifile_close(InputFile **pf) > if (!f) > return; > > - thread_stop(d); > - > if (d->read_started) > demux_final_stats(d); > > @@ -780,6 +754,8 @@ void ifile_close(InputFile **pf) > > avformat_close_input(&f->ctx); > > + av_packet_free(&d->pkt_heartbeat); > + > av_freep(pf); > } > > @@ -802,7 +778,11 @@ static int ist_use(InputStream *ist, int > decoding_needed) > ds->sch_idx_stream = ret; > } > > - ist->discard = 0; > + if (ist->discard) { > + ist->discard = 0; > + d->nb_streams_used++; > + } > + > ist->st->discard = ist->user_set_discard; > ist->decoding_needed |= decoding_needed; > ds->streamcopy_needed |= !decoding_needed; > @@ -823,6 +803,8 @@ static int ist_use(InputStream *ist, int > decoding_needed) > ret = dec_open(ist, d->sch, ds->sch_idx_dec); > if (ret < 0) > return ret; > + > + d->have_audio_dec |= is_audio; > } > > return 0; > @@ -848,6 +830,7 @@ int ist_output_add(InputStream *ist, OutputStream *ost) > > int ist_filter_add(InputStream *ist, InputFilter *ifilter, int is_simple) > { > + Demuxer *d = demuxer_from_ifile(input_files[ist->file_index]); > DemuxStream *ds = ds_from_ist(ist); > int ret; > > @@ -866,6 +849,15 @@ int ist_filter_add(InputStream *ist, InputFilter > *ifilter, int is_simple) > if (ret < 0) > return ret; > > + if (ist->dec_ctx->codec_type == AVMEDIA_TYPE_SUBTITLE) { > + if (!d->pkt_heartbeat) { > + d->pkt_heartbeat = av_packet_alloc(); > + if (!d->pkt_heartbeat) > + return AVERROR(ENOMEM); > + } > + ds->have_sub2video = 1; > + } > + > return ds->sch_idx_dec; > } > > @@ -1607,8 +1599,6 @@ int ifile_open(const OptionsContext *o, const char > *filename, Scheduler *sch) > "since neither -readrate nor -re were given\n"); > } > > - d->thread_queue_size = o->thread_queue_size; > - > /* Add all the streams from the given input file to the demuxer */ > for (int i = 0; i < ic->nb_streams; i++) { > ret = ist_add(o, d, ic->streams[i]); > diff --git a/fftools/ffmpeg_enc.c b/fftools/ffmpeg_enc.c > index 9871381c0e..9383b167f7 100644 > --- a/fftools/ffmpeg_enc.c > +++ b/fftools/ffmpeg_enc.c > @@ -41,12 +41,6 @@ > #include "libavformat/avformat.h" > > struct Encoder { > - AVFrame *sq_frame; > - > - // packet for receiving encoded output > - AVPacket *pkt; > - AVFrame *sub_frame; > - > // combined size of all the packets received from the encoder > uint64_t data_size; > > @@ -54,25 +48,9 @@ struct Encoder { > uint64_t packets_encoded; > > int opened; > - int finished; > > Scheduler *sch; > unsigned sch_idx; > - > - pthread_t thread; > - /** > - * Queue for sending frames from the main thread to > - * the encoder thread. > - */ > - ThreadQueue *queue_in; > - /** > - * Queue for sending encoded packets from the encoder thread > - * to the main thread. > - * > - * An empty packet is sent to signal that a previously sent > - * frame has been fully processed. > - */ > - ThreadQueue *queue_out; > }; > > // data that is local to the decoder thread and not visible outside of it > @@ -81,24 +59,6 @@ typedef struct EncoderThread { > AVPacket *pkt; > } EncoderThread; > > -static int enc_thread_stop(Encoder *e) > -{ > - void *ret; > - > - if (!e->queue_in) > - return 0; > - > - tq_send_finish(e->queue_in, 0); > - tq_receive_finish(e->queue_out, 0); > - > - pthread_join(e->thread, &ret); > - > - tq_free(&e->queue_in); > - tq_free(&e->queue_out); > - > - return (int)(intptr_t)ret; > -} > - > void enc_free(Encoder **penc) > { > Encoder *enc = *penc; > @@ -106,13 +66,6 @@ void enc_free(Encoder **penc) > if (!enc) > return; > > - enc_thread_stop(enc); > - > - av_frame_free(&enc->sq_frame); > - av_frame_free(&enc->sub_frame); > - > - av_packet_free(&enc->pkt); > - > av_freep(penc); > } > > @@ -127,25 +80,12 @@ int enc_alloc(Encoder **penc, const AVCodec *codec, > if (!enc) > return AVERROR(ENOMEM); > > - if (codec->type == AVMEDIA_TYPE_SUBTITLE) { > - enc->sub_frame = av_frame_alloc(); > - if (!enc->sub_frame) > - goto fail; > - } > - > - enc->pkt = av_packet_alloc(); > - if (!enc->pkt) > - goto fail; > - > enc->sch = sch; > enc->sch_idx = sch_idx; > > *penc = enc; > > return 0; > -fail: > - enc_free(&enc); > - return AVERROR(ENOMEM); > } > > static int hw_device_setup_for_encode(OutputStream *ost, AVBufferRef > *frames_ref) > @@ -224,52 +164,9 @@ static int set_encoder_id(OutputFile *of, > OutputStream *ost) > return 0; > } > > -static int enc_thread_start(OutputStream *ost) > -{ > - Encoder *e = ost->enc; > - ObjPool *op; > - int ret = 0; > - > - op = objpool_alloc_frames(); > - if (!op) > - return AVERROR(ENOMEM); > - > - e->queue_in = tq_alloc(1, 1, op, frame_move); > - if (!e->queue_in) { > - objpool_free(&op); > - return AVERROR(ENOMEM); > - } > - > - op = objpool_alloc_packets(); > - if (!op) > - goto fail; > - > - e->queue_out = tq_alloc(1, 4, op, pkt_move); > - if (!e->queue_out) { > - objpool_free(&op); > - goto fail; > - } > - > - ret = pthread_create(&e->thread, NULL, encoder_thread, ost); > - if (ret) { > - ret = AVERROR(ret); > - av_log(ost, AV_LOG_ERROR, "pthread_create() failed: %s\n", > - av_err2str(ret)); > - goto fail; > - } > - > - return 0; > -fail: > - if (ret >= 0) > - ret = AVERROR(ENOMEM); > - > - tq_free(&e->queue_in); > - tq_free(&e->queue_out); > - return ret; > -} > - > -int enc_open(OutputStream *ost, const AVFrame *frame) > +int enc_open(void *opaque, const AVFrame *frame) > { > + OutputStream *ost = opaque; > InputStream *ist = ost->ist; > Encoder *e = ost->enc; > AVCodecContext *enc_ctx = ost->enc_ctx; > @@ -277,6 +174,7 @@ int enc_open(OutputStream *ost, const AVFrame *frame) > const AVCodec *enc = enc_ctx->codec; > OutputFile *of = output_files[ost->file_index]; > FrameData *fd; > + int frame_samples = 0; > int ret; > > if (e->opened) > @@ -420,17 +318,8 @@ int enc_open(OutputStream *ost, const AVFrame *frame) > > e->opened = 1; > > - if (ost->sq_idx_encode >= 0) { > - e->sq_frame = av_frame_alloc(); > - if (!e->sq_frame) > - return AVERROR(ENOMEM); > - } > - > - if (ost->enc_ctx->frame_size) { > - av_assert0(ost->sq_idx_encode >= 0); > - sq_frame_samples(output_files[ost->file_index]->sq_encode, > - ost->sq_idx_encode, ost->enc_ctx->frame_size); > - } > + if (ost->enc_ctx->frame_size) > + frame_samples = ost->enc_ctx->frame_size; > > ret = check_avoptions(ost->encoder_opts); > if (ret < 0) > @@ -476,18 +365,11 @@ int enc_open(OutputStream *ost, const AVFrame *frame) > if (ost->st->time_base.num <= 0 || ost->st->time_base.den <= 0) > ost->st->time_base = av_add_q(ost->enc_ctx->time_base, > (AVRational){0, 1}); > > - ret = enc_thread_start(ost); > - if (ret < 0) { > - av_log(ost, AV_LOG_ERROR, "Error starting encoder thread: %s\n", > - av_err2str(ret)); > - return ret; > - } > - > ret = of_stream_init(of, ost); > if (ret < 0) > return ret; > > - return 0; > + return frame_samples; > } > > static int check_recording_time(OutputStream *ost, int64_t ts, AVRational > tb) > @@ -514,8 +396,7 @@ static int do_subtitle_out(OutputFile *of, > OutputStream *ost, const AVSubtitle * > av_log(ost, AV_LOG_ERROR, "Subtitle packets must have a pts\n"); > return exit_on_error ? AVERROR(EINVAL) : 0; > } > - if (ost->finished || > - (of->start_time != AV_NOPTS_VALUE && sub->pts < of->start_time)) > + if ((of->start_time != AV_NOPTS_VALUE && sub->pts < of->start_time)) > return 0; > > enc = ost->enc_ctx; > @@ -579,7 +460,7 @@ static int do_subtitle_out(OutputFile *of, > OutputStream *ost, const AVSubtitle * > } > pkt->dts = pkt->pts; > > - ret = tq_send(e->queue_out, 0, pkt); > + ret = sch_enc_send(e->sch, e->sch_idx, pkt); > if (ret < 0) { > av_packet_unref(pkt); > return ret; > @@ -671,10 +552,13 @@ static int update_video_stats(OutputStream *ost, > const AVPacket *pkt, int write_ > int64_t frame_number; > double ti1, bitrate, avg_bitrate; > double psnr_val = -1; > + int quality; > > - ost->quality = sd ? AV_RL32(sd) : -1; > + quality = sd ? AV_RL32(sd) : -1; > pict_type = sd ? sd[4] : AV_PICTURE_TYPE_NONE; > > + atomic_store(&ost->quality, quality); > + > if ((enc->flags & AV_CODEC_FLAG_PSNR) && sd && sd[5]) { > // FIXME the scaling assumes 8bit > double error = AV_RL64(sd + 8) / (enc->width * enc->height * > 255.0 * 255.0); > @@ -697,10 +581,10 @@ static int update_video_stats(OutputStream *ost, > const AVPacket *pkt, int write_ > frame_number = e->packets_encoded; > if (vstats_version <= 1) { > fprintf(vstats_file, "frame= %5"PRId64" q= %2.1f ", frame_number, > - ost->quality / (float)FF_QP2LAMBDA); > + quality / (float)FF_QP2LAMBDA); > } else { > fprintf(vstats_file, "out= %2d st= %2d frame= %5"PRId64" q= %2.1f > ", ost->file_index, ost->index, frame_number, > - ost->quality / (float)FF_QP2LAMBDA); > + quality / (float)FF_QP2LAMBDA); > } > > if (psnr_val >= 0) > @@ -801,18 +685,11 @@ static int encode_frame(OutputFile *of, OutputStream > *ost, AVFrame *frame, > av_ts2str(pkt->duration), av_ts2timestr(pkt->duration, > &enc->time_base)); > } > > - if ((ret = trigger_fix_sub_duration_heartbeat(ost, pkt)) < 0) { > - av_log(NULL, AV_LOG_ERROR, > - "Subtitle heartbeat logic failed in %s! (%s)\n", > - __func__, av_err2str(ret)); > - return ret; > - } > - > e->data_size += pkt->size; > > e->packets_encoded++; > > - ret = tq_send(e->queue_out, 0, pkt); > + ret = sch_enc_send(e->sch, e->sch_idx, pkt); > if (ret < 0) { > av_packet_unref(pkt); > return ret; > @@ -822,50 +699,6 @@ static int encode_frame(OutputFile *of, OutputStream > *ost, AVFrame *frame, > av_assert0(0); > } > > -static int submit_encode_frame(OutputFile *of, OutputStream *ost, > - AVFrame *frame, AVPacket *pkt) > -{ > - Encoder *e = ost->enc; > - int ret; > - > - if (ost->sq_idx_encode < 0) > - return encode_frame(of, ost, frame, pkt); > - > - if (frame) { > - ret = av_frame_ref(e->sq_frame, frame); > - if (ret < 0) > - return ret; > - frame = e->sq_frame; > - } > - > - ret = sq_send(of->sq_encode, ost->sq_idx_encode, > - SQFRAME(frame)); > - if (ret < 0) { > - if (frame) > - av_frame_unref(frame); > - if (ret != AVERROR_EOF) > - return ret; > - } > - > - while (1) { > - AVFrame *enc_frame = e->sq_frame; > - > - ret = sq_receive(of->sq_encode, ost->sq_idx_encode, > - SQFRAME(enc_frame)); > - if (ret == AVERROR_EOF) { > - enc_frame = NULL; > - } else if (ret < 0) { > - return (ret == AVERROR(EAGAIN)) ? 0 : ret; > - } > - > - ret = encode_frame(of, ost, enc_frame, pkt); > - if (enc_frame) > - av_frame_unref(enc_frame); > - if (ret < 0) > - return ret; > - } > -} > - > static int do_audio_out(OutputFile *of, OutputStream *ost, > AVFrame *frame, AVPacket *pkt) > { > @@ -881,7 +714,7 @@ static int do_audio_out(OutputFile *of, OutputStream > *ost, > if (!check_recording_time(ost, frame->pts, frame->time_base)) > return AVERROR_EOF; > > - return submit_encode_frame(of, ost, frame, pkt); > + return encode_frame(of, ost, frame, pkt); > } > > static enum AVPictureType forced_kf_apply(void *logctx, KeyframeForceCtx > *kf, > @@ -949,7 +782,7 @@ static int do_video_out(OutputFile *of, OutputStream > *ost, > } > #endif > > - return submit_encode_frame(of, ost, in_picture, pkt); > + return encode_frame(of, ost, in_picture, pkt); > } > > static int frame_encode(OutputStream *ost, AVFrame *frame, AVPacket *pkt) > @@ -958,9 +791,12 @@ static int frame_encode(OutputStream *ost, AVFrame > *frame, AVPacket *pkt) > enum AVMediaType type = ost->type; > > if (type == AVMEDIA_TYPE_SUBTITLE) { > + const AVSubtitle *subtitle = frame && frame->buf[0] ? > + (AVSubtitle*)frame->buf[0]->data : > NULL; > + > // no flushing for subtitles > - return frame ? > - do_subtitle_out(of, ost, (AVSubtitle*)frame->buf[0]->data, > pkt) : 0; > + return subtitle && subtitle->num_rects ? > + do_subtitle_out(of, ost, subtitle, pkt) : 0; > } > > if (frame) { > @@ -968,7 +804,7 @@ static int frame_encode(OutputStream *ost, AVFrame > *frame, AVPacket *pkt) > do_audio_out(of, ost, > frame, pkt); > } > > - return submit_encode_frame(of, ost, NULL, pkt); > + return encode_frame(of, ost, NULL, pkt); > } > > static void enc_thread_set_name(const OutputStream *ost) > @@ -1009,24 +845,50 @@ fail: > void *encoder_thread(void *arg) > { > OutputStream *ost = arg; > - OutputFile *of = output_files[ost->file_index]; > Encoder *e = ost->enc; > EncoderThread et; > int ret = 0, input_status = 0; > + int name_set = 0; > > ret = enc_thread_init(&et); > if (ret < 0) > goto finish; > > - enc_thread_set_name(ost); > + /* Open the subtitle encoders immediately. AVFrame-based encoders > + * are opened through a callback from the scheduler once they get > + * their first frame > + * > + * N.B.: because the callback is called from a different thread, > + * enc_ctx MUST NOT be accessed before sch_enc_receive() returns > + * for the first time for audio/video. */ > + if (ost->type != AVMEDIA_TYPE_VIDEO && ost->type != > AVMEDIA_TYPE_AUDIO) { > + ret = enc_open(ost, NULL); > + if (ret < 0) > + goto finish; > + } > > while (!input_status) { > - int dummy; > - > - input_status = tq_receive(e->queue_in, &dummy, et.frame); > - if (input_status < 0) > + input_status = sch_enc_receive(e->sch, e->sch_idx, et.frame); > + if (input_status == AVERROR_EOF) { > av_log(ost, AV_LOG_VERBOSE, "Encoder thread received EOF\n"); > > + if (!e->opened) { > + av_log(ost, AV_LOG_ERROR, "Could not open encoder before > EOF\n"); > + ret = AVERROR(EINVAL); > + goto finish; > + } > + } else if (input_status < 0) { > + ret = input_status; > + av_log(ost, AV_LOG_ERROR, "Error receiving a frame for > encoding: %s\n", > + av_err2str(ret)); > + goto finish; > + } > + > + if (!name_set) { > + enc_thread_set_name(ost); > + name_set = 1; > + } > + > ret = frame_encode(ost, input_status >= 0 ? et.frame : NULL, > et.pkt); > > av_packet_unref(et.pkt); > @@ -1040,15 +902,6 @@ void *encoder_thread(void *arg) > av_err2str(ret)); > break; > } > - > - // signal to the consumer thread that the frame was encoded > - ret = tq_send(e->queue_out, 0, et.pkt); > - if (ret < 0) { > - if (ret != AVERROR_EOF) > - av_log(ost, AV_LOG_ERROR, > - "Error communicating with the main thread\n"); > - break; > - } > } > > // EOF is normal thread termination > @@ -1056,118 +909,7 @@ void *encoder_thread(void *arg) > ret = 0; > > finish: > - if (ost->sq_idx_encode >= 0) > - sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL)); > - > - tq_receive_finish(e->queue_in, 0); > - tq_send_finish (e->queue_out, 0); > - > enc_thread_uninit(&et); > > - av_log(ost, AV_LOG_VERBOSE, "Terminating encoder thread\n"); > - > return (void*)(intptr_t)ret; > } > - > -int enc_frame(OutputStream *ost, AVFrame *frame) > -{ > - OutputFile *of = output_files[ost->file_index]; > - Encoder *e = ost->enc; > - int ret, thread_ret; > - > - ret = enc_open(ost, frame); > - if (ret < 0) > - return ret; > - > - if (!e->queue_in) > - return AVERROR_EOF; > - > - // send the frame/EOF to the encoder thread > - if (frame) { > - ret = tq_send(e->queue_in, 0, frame); > - if (ret < 0) > - goto finish; > - } else > - tq_send_finish(e->queue_in, 0); > - > - // retrieve all encoded data for the frame > - while (1) { > - int dummy; > - > - ret = tq_receive(e->queue_out, &dummy, e->pkt); > - if (ret < 0) > - break; > - > - // frame fully encoded > - if (!e->pkt->data && !e->pkt->side_data_elems) > - return 0; > - > - // process the encoded packet > - ret = of_output_packet(of, ost, e->pkt); > - if (ret < 0) > - goto finish; > - } > - > -finish: > - thread_ret = enc_thread_stop(e); > - if (thread_ret < 0) { > - av_log(ost, AV_LOG_ERROR, "Encoder thread returned error: %s\n", > - av_err2str(thread_ret)); > - ret = err_merge(ret, thread_ret); > - } > - > - if (ret < 0 && ret != AVERROR_EOF) > - return ret; > - > - // signal EOF to the muxer > - return of_output_packet(of, ost, NULL); > -} > - > -int enc_subtitle(OutputFile *of, OutputStream *ost, const AVSubtitle *sub) > -{ > - Encoder *e = ost->enc; > - AVFrame *f = e->sub_frame; > - int ret; > - > - // XXX the queue for transferring data to the encoder thread runs > - // on AVFrames, so we wrap AVSubtitle in an AVBufferRef and put > - // that inside the frame > - // eventually, subtitles should be switched to use AVFrames natively > - ret = subtitle_wrap_frame(f, sub, 1); > - if (ret < 0) > - return ret; > - > - ret = enc_frame(ost, f); > - av_frame_unref(f); > - > - return ret; > -} > - > -int enc_flush(void) > -{ > - int ret = 0; > - > - for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) { > - OutputFile *of = output_files[ost->file_index]; > - if (ost->sq_idx_encode >= 0) > - sq_send(of->sq_encode, ost->sq_idx_encode, SQFRAME(NULL)); > - } > - > - for (OutputStream *ost = ost_iter(NULL); ost; ost = ost_iter(ost)) { > - Encoder *e = ost->enc; > - AVCodecContext *enc = ost->enc_ctx; > - int err; > - > - if (!enc || !e->opened || > - (enc->codec_type != AVMEDIA_TYPE_VIDEO && enc->codec_type != > AVMEDIA_TYPE_AUDIO)) > - continue; > - > - err = enc_frame(ost, NULL); > - if (err != AVERROR_EOF && ret < 0) > - ret = err_merge(ret, err); > - > - av_assert0(!e->queue_in); > - } > - > - return ret; > -} > diff --git a/fftools/ffmpeg_filter.c b/fftools/ffmpeg_filter.c > index 635b1b0b6e..ada235b084 100644 > --- a/fftools/ffmpeg_filter.c > +++ b/fftools/ffmpeg_filter.c > @@ -21,8 +21,6 @@ > #include <stdint.h> > > #include "ffmpeg.h" > -#include "ffmpeg_utils.h" > -#include "thread_queue.h" > > #include "libavfilter/avfilter.h" > #include "libavfilter/buffersink.h" > @@ -53,10 +51,11 @@ typedef struct FilterGraphPriv { > // true when the filtergraph contains only meta filters > // that do not modify the frame data > int is_meta; > + // source filters are present in the graph > + int have_sources; > int disable_conversions; > > - int nb_inputs_bound; > - int nb_outputs_bound; > + unsigned nb_outputs_done; > > const char *graph_desc; > > @@ -67,41 +66,6 @@ typedef struct FilterGraphPriv { > > Scheduler *sch; > unsigned sch_idx; > - > - pthread_t thread; > - /** > - * Queue for sending frames from the main thread to the filtergraph. > Has > - * nb_inputs+1 streams - the first nb_inputs stream correspond to > - * filtergraph inputs. Frames on those streams may have their opaque > set to > - * - FRAME_OPAQUE_EOF: frame contains no data, but pts+timebase of the > - * EOF event for the correspondint stream. Will be immediately > followed by > - * this stream being send-closed. > - * - FRAME_OPAQUE_SUB_HEARTBEAT: frame contains no data, but > pts+timebase of > - * a subtitle heartbeat event. Will only be sent for sub2video > streams. > - * > - * The last stream is "control" - the main thread sends empty > AVFrames with > - * opaque set to > - * - FRAME_OPAQUE_REAP_FILTERS: a request to retrieve all frame > available > - * from filtergraph outputs. These frames are sent to corresponding > - * streams in queue_out. Finally an empty frame is sent to the > control > - * stream in queue_out. > - * - FRAME_OPAQUE_CHOOSE_INPUT: same as above, but in case no frames > are > - * available the terminating empty frame's opaque will contain the > index+1 > - * of the filtergraph input to which more input frames should be > supplied. > - */ > - ThreadQueue *queue_in; > - /** > - * Queue for sending frames from the filtergraph back to the main > thread. > - * Has nb_outputs+1 streams - the first nb_outputs stream correspond > to > - * filtergraph outputs. > - * > - * The last stream is "control" - see documentation for queue_in for > more > - * details. > - */ > - ThreadQueue *queue_out; > - // submitting frames to filter thread returned EOF > - // this only happens on thread exit, so is not per-input > - int eof_in; > } FilterGraphPriv; > > static FilterGraphPriv *fgp_from_fg(FilterGraph *fg) > @@ -123,6 +87,9 @@ typedef struct FilterGraphThread { > // The output index is stored in frame opaque. > AVFifo *frame_queue_out; > > + // index of the next input to request from the scheduler > + unsigned next_in; > + // set to 1 after at least one frame passed through this output > int got_frame; > > // EOF status of each input/output, as received by the thread > @@ -253,9 +220,6 @@ typedef struct OutputFilterPriv { > int64_t ts_offset; > int64_t next_pts; > FPSConvContext fps; > - > - // set to 1 after at least one frame passed through this output > - int got_frame; > } OutputFilterPriv; > > static OutputFilterPriv *ofp_from_ofilter(OutputFilter *ofilter) > @@ -653,57 +617,6 @@ static int ifilter_has_all_input_formats(FilterGraph > *fg) > > static void *filter_thread(void *arg); > > -// start the filtering thread once all inputs and outputs are bound > -static int fg_thread_try_start(FilterGraphPriv *fgp) > -{ > - FilterGraph *fg = &fgp->fg; > - ObjPool *op; > - int ret = 0; > - > - if (fgp->nb_inputs_bound < fg->nb_inputs || > - fgp->nb_outputs_bound < fg->nb_outputs) > - return 0; > - > - op = objpool_alloc_frames(); > - if (!op) > - return AVERROR(ENOMEM); > - > - fgp->queue_in = tq_alloc(fg->nb_inputs + 1, 1, op, frame_move); > - if (!fgp->queue_in) { > - objpool_free(&op); > - return AVERROR(ENOMEM); > - } > - > - // at least one output is mandatory > - op = objpool_alloc_frames(); > - if (!op) > - goto fail; > - > - fgp->queue_out = tq_alloc(fg->nb_outputs + 1, 1, op, frame_move); > - if (!fgp->queue_out) { > - objpool_free(&op); > - goto fail; > - } > - > - ret = pthread_create(&fgp->thread, NULL, filter_thread, fgp); > - if (ret) { > - ret = AVERROR(ret); > - av_log(NULL, AV_LOG_ERROR, "pthread_create() for filtergraph %d > failed: %s\n", > - fg->index, av_err2str(ret)); > - goto fail; > - } > - > - return 0; > -fail: > - if (ret >= 0) > - ret = AVERROR(ENOMEM); > - > - tq_free(&fgp->queue_in); > - tq_free(&fgp->queue_out); > - > - return ret; > -} > - > static char *describe_filter_link(FilterGraph *fg, AVFilterInOut *inout, > int in) > { > AVFilterContext *ctx = inout->filter_ctx; > @@ -729,7 +642,6 @@ static OutputFilter *ofilter_alloc(FilterGraph *fg) > ofilter->graph = fg; > ofp->format = -1; > ofp->index = fg->nb_outputs - 1; > - ofilter->last_pts = AV_NOPTS_VALUE; > > return ofilter; > } > @@ -760,10 +672,7 @@ static int ifilter_bind_ist(InputFilter *ifilter, > InputStream *ist) > return AVERROR(ENOMEM); > } > > - fgp->nb_inputs_bound++; > - av_assert0(fgp->nb_inputs_bound <= ifilter->graph->nb_inputs); > - > - return fg_thread_try_start(fgp); > + return 0; > } > > static int set_channel_layout(OutputFilterPriv *f, OutputStream *ost) > @@ -902,10 +811,7 @@ int ofilter_bind_ost(OutputFilter *ofilter, > OutputStream *ost, > if (ret < 0) > return ret; > > - fgp->nb_outputs_bound++; > - av_assert0(fgp->nb_outputs_bound <= fg->nb_outputs); > - > - return fg_thread_try_start(fgp); > + return 0; > } > > static InputFilter *ifilter_alloc(FilterGraph *fg) > @@ -935,34 +841,6 @@ static InputFilter *ifilter_alloc(FilterGraph *fg) > return ifilter; > } > > -static int fg_thread_stop(FilterGraphPriv *fgp) > -{ > - void *ret; > - > - if (!fgp->queue_in) > - return 0; > - > - for (int i = 0; i <= fgp->fg.nb_inputs; i++) { > - InputFilterPriv *ifp = i < fgp->fg.nb_inputs ? > - ifp_from_ifilter(fgp->fg.inputs[i]) : NULL; > - > - if (ifp) > - ifp->eof = 1; > - > - tq_send_finish(fgp->queue_in, i); > - } > - > - for (int i = 0; i <= fgp->fg.nb_outputs; i++) > - tq_receive_finish(fgp->queue_out, i); > - > - pthread_join(fgp->thread, &ret); > - > - tq_free(&fgp->queue_in); > - tq_free(&fgp->queue_out); > - > - return (int)(intptr_t)ret; > -} > - > void fg_free(FilterGraph **pfg) > { > FilterGraph *fg = *pfg; > @@ -972,8 +850,6 @@ void fg_free(FilterGraph **pfg) > return; > fgp = fgp_from_fg(fg); > > - fg_thread_stop(fgp); > - > avfilter_graph_free(&fg->graph); > for (int j = 0; j < fg->nb_inputs; j++) { > InputFilter *ifilter = fg->inputs[j]; > @@ -1072,6 +948,15 @@ int fg_create(FilterGraph **pfg, char *graph_desc, > Scheduler *sch) > if (ret < 0) > goto fail; > > + for (unsigned i = 0; i < graph->nb_filters; i++) { > + const AVFilter *f = graph->filters[i]->filter; > + if (!avfilter_filter_pad_count(f, 0) && > + !(f->flags & AVFILTER_FLAG_DYNAMIC_INPUTS)) { > + fgp->have_sources = 1; > + break; > + } > + } > + > for (AVFilterInOut *cur = inputs; cur; cur = cur->next) { > InputFilter *const ifilter = ifilter_alloc(fg); > InputFilterPriv *ifp; > @@ -1800,6 +1685,7 @@ static int configure_filtergraph(FilterGraph *fg, > const FilterGraphThread *fgt) > AVBufferRef *hw_device; > AVFilterInOut *inputs, *outputs, *cur; > int ret, i, simple = filtergraph_is_simple(fg); > + int have_input_eof = 0; > const char *graph_desc = fgp->graph_desc; > > cleanup_filtergraph(fg); > @@ -1922,11 +1808,18 @@ static int configure_filtergraph(FilterGraph *fg, > const FilterGraphThread *fgt) > ret = av_buffersrc_add_frame(ifp->filter, NULL); > if (ret < 0) > goto fail; > + have_input_eof = 1; > } > } > > - return 0; > + if (have_input_eof) { > + // make sure the EOF propagates to the end of the graph > + ret = avfilter_graph_request_oldest(fg->graph); > + if (ret < 0 && ret != AVERROR(EAGAIN) && ret != AVERROR_EOF) > + goto fail; > + } > > + return 0; > fail: > cleanup_filtergraph(fg); > return ret; > @@ -2182,7 +2075,7 @@ static void video_sync_process(OutputFilterPriv > *ofp, AVFrame *frame, > fps->frames_prev_hist[2]); > > if (!*nb_frames && fps->last_dropped) { > - ofilter->nb_frames_drop++; > + atomic_fetch_add(&ofilter->nb_frames_drop, 1); > fps->last_dropped++; > } > > @@ -2260,21 +2153,23 @@ finish: > fps->frames_prev_hist[0] = *nb_frames_prev; > > if (*nb_frames_prev == 0 && fps->last_dropped) { > - ofilter->nb_frames_drop++; > + atomic_fetch_add(&ofilter->nb_frames_drop, 1); > av_log(ost, AV_LOG_VERBOSE, > "*** dropping frame %"PRId64" at ts %"PRId64"\n", > fps->frame_number, fps->last_frame->pts); > } > if (*nb_frames > (*nb_frames_prev && fps->last_dropped) + (*nb_frames > > *nb_frames_prev)) { > + uint64_t nb_frames_dup; > if (*nb_frames > dts_error_threshold * 30) { > av_log(ost, AV_LOG_ERROR, "%"PRId64" frame duplication too > large, skipping\n", *nb_frames - 1); > - ofilter->nb_frames_drop++; > + atomic_fetch_add(&ofilter->nb_frames_drop, 1); > *nb_frames = 0; > return; > } > - ofilter->nb_frames_dup += *nb_frames - (*nb_frames_prev && > fps->last_dropped) - (*nb_frames > *nb_frames_prev); > + nb_frames_dup = atomic_fetch_add(&ofilter->nb_frames_dup, > + *nb_frames - (*nb_frames_prev && > fps->last_dropped) - (*nb_frames > *nb_frames_prev)); > av_log(ost, AV_LOG_VERBOSE, "*** %"PRId64" dup!\n", *nb_frames - > 1); > - if (ofilter->nb_frames_dup > fps->dup_warning) { > + if (nb_frames_dup > fps->dup_warning) { > av_log(ost, AV_LOG_WARNING, "More than %"PRIu64" frames > duplicated\n", fps->dup_warning); > fps->dup_warning *= 10; > } > @@ -2284,8 +2179,57 @@ finish: > fps->dropped_keyframe |= fps->last_dropped && (frame->flags & > AV_FRAME_FLAG_KEY); > } > > +static int close_output(OutputFilterPriv *ofp, FilterGraphThread *fgt) > +{ > + FilterGraphPriv *fgp = fgp_from_fg(ofp->ofilter.graph); > + int ret; > + > + // we are finished and no frames were ever seen at this output, > + // at least initialize the encoder with a dummy frame > + if (!fgt->got_frame) { > + AVFrame *frame = fgt->frame; > + FrameData *fd; > + > + frame->time_base = ofp->tb_out; > + frame->format = ofp->format; > + > + frame->width = ofp->width; > + frame->height = ofp->height; > + frame->sample_aspect_ratio = ofp->sample_aspect_ratio; > + > + frame->sample_rate = ofp->sample_rate; > + if (ofp->ch_layout.nb_channels) { > + ret = av_channel_layout_copy(&frame->ch_layout, > &ofp->ch_layout); > + if (ret < 0) > + return ret; > + } > + > + fd = frame_data(frame); > + if (!fd) > + return AVERROR(ENOMEM); > + > + fd->frame_rate_filter = ofp->fps.framerate; > + > + av_assert0(!frame->buf[0]); > + > + av_log(ofp->ofilter.ost, AV_LOG_WARNING, > + "No filtered frames for output stream, trying to " > + "initialize anyway.\n"); > + > + ret = sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index, frame); > + if (ret < 0) { > + av_frame_unref(frame); > + return ret; > + } > + } > + > + fgt->eof_out[ofp->index] = 1; > + > + return sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index, NULL); > +} > + > static int fg_output_frame(OutputFilterPriv *ofp, FilterGraphThread *fgt, > - AVFrame *frame, int buffer) > + AVFrame *frame) > { > FilterGraphPriv *fgp = fgp_from_fg(ofp->ofilter.graph); > AVFrame *frame_prev = ofp->fps.last_frame; > @@ -2332,28 +2276,17 @@ static int fg_output_frame(OutputFilterPriv *ofp, > FilterGraphThread *fgt, > frame_out = frame; > } > > - if (buffer) { > - AVFrame *f = av_frame_alloc(); > - > - if (!f) { > - av_frame_unref(frame_out); > - return AVERROR(ENOMEM); > - } > - > - av_frame_move_ref(f, frame_out); > - f->opaque = (void*)(intptr_t)ofp->index; > - > - ret = av_fifo_write(fgt->frame_queue_out, &f, 1); > - if (ret < 0) { > - av_frame_free(&f); > - return AVERROR(ENOMEM); > - } > - } else { > - // return the frame to the main thread > - ret = tq_send(fgp->queue_out, ofp->index, frame_out); > + { > + // send the frame to consumers > + ret = sch_filter_send(fgp->sch, fgp->sch_idx, ofp->index, > frame_out); > if (ret < 0) { > av_frame_unref(frame_out); > - fgt->eof_out[ofp->index] = 1; > + > + if (!fgt->eof_out[ofp->index]) { > + fgt->eof_out[ofp->index] = 1; > + fgp->nb_outputs_done++; > + } > + > return ret == AVERROR_EOF ? 0 : ret; > } > } > @@ -2374,16 +2307,14 @@ static int fg_output_frame(OutputFilterPriv *ofp, > FilterGraphThread *fgt, > av_frame_move_ref(frame_prev, frame); > } > > - if (!frame) { > - tq_send_finish(fgp->queue_out, ofp->index); > - fgt->eof_out[ofp->index] = 1; > - } > + if (!frame) > + return close_output(ofp, fgt); > > return 0; > } > > static int fg_output_step(OutputFilterPriv *ofp, FilterGraphThread *fgt, > - AVFrame *frame, int buffer) > + AVFrame *frame) > { > FilterGraphPriv *fgp = fgp_from_fg(ofp->ofilter.graph); > OutputStream *ost = ofp->ofilter.ost; > @@ -2393,8 +2324,8 @@ static int fg_output_step(OutputFilterPriv *ofp, > FilterGraphThread *fgt, > > ret = av_buffersink_get_frame_flags(filter, frame, > AV_BUFFERSINK_FLAG_NO_REQUEST); > - if (ret == AVERROR_EOF && !buffer && !fgt->eof_out[ofp->index]) { > - ret = fg_output_frame(ofp, fgt, NULL, buffer); > + if (ret == AVERROR_EOF && !fgt->eof_out[ofp->index]) { > + ret = fg_output_frame(ofp, fgt, NULL); > return (ret < 0) ? ret : 1; > } else if (ret == AVERROR(EAGAIN) || ret == AVERROR_EOF) { > return 1; > @@ -2448,7 +2379,7 @@ static int fg_output_step(OutputFilterPriv *ofp, > FilterGraphThread *fgt, > fd->frame_rate_filter = ofp->fps.framerate; > } > > - ret = fg_output_frame(ofp, fgt, frame, buffer); > + ret = fg_output_frame(ofp, fgt, frame); > av_frame_unref(frame); > if (ret < 0) > return ret; > @@ -2456,44 +2387,68 @@ static int fg_output_step(OutputFilterPriv *ofp, > FilterGraphThread *fgt, > return 0; > } > > -/* retrieve all frames available at filtergraph outputs and either send > them to > - * the main thread (buffer=0) or buffer them for later (buffer=1) */ > +/* retrieve all frames available at filtergraph outputs > + * and send them to consumers */ > static int read_frames(FilterGraph *fg, FilterGraphThread *fgt, > - AVFrame *frame, int buffer) > + AVFrame *frame) > { > FilterGraphPriv *fgp = fgp_from_fg(fg); > - int ret = 0; > + int did_step = 0; > > - if (!fg->graph) > - return 0; > - > - // process buffered frames > - if (!buffer) { > - AVFrame *f; > - > - while (av_fifo_read(fgt->frame_queue_out, &f, 1) >= 0) { > - int out_idx = (intptr_t)f->opaque; > - f->opaque = NULL; > - ret = tq_send(fgp->queue_out, out_idx, f); > - av_frame_free(&f); > - if (ret < 0 && ret != AVERROR_EOF) > - return ret; > + // graph not configured, just select the input to request > + if (!fg->graph) { > + for (int i = 0; i < fg->nb_inputs; i++) { > + InputFilterPriv *ifp = ifp_from_ifilter(fg->inputs[i]); > + if (ifp->format < 0 && !fgt->eof_in[i]) { > + fgt->next_in = i; > + return 0; > + } > } > + > + // This state - graph is not configured, but all inputs are either > + // initialized or EOF - should be unreachable because sending EOF > to a > + // filter without even a fallback format should fail > + av_assert0(0); > + return AVERROR_BUG; > } > > - /* Reap all buffers present in the buffer sinks */ > - for (int i = 0; i < fg->nb_outputs; i++) { > - OutputFilterPriv *ofp = ofp_from_ofilter(fg->outputs[i]); > - int ret = 0; > + while (fgp->nb_outputs_done < fg->nb_outputs) { > + int ret; > > - while (!ret) { > - ret = fg_output_step(ofp, fgt, frame, buffer); > - if (ret < 0) > - return ret; > + ret = avfilter_graph_request_oldest(fg->graph); > + if (ret == AVERROR(EAGAIN)) { > + fgt->next_in = choose_input(fg, fgt); > + break; > + } else if (ret < 0) { > + if (ret == AVERROR_EOF) > + av_log(fg, AV_LOG_VERBOSE, "Filtergraph returned EOF, > finishing\n"); > + else > + av_log(fg, AV_LOG_ERROR, > + "Error requesting a frame from the filtergraph: > %s\n", > + av_err2str(ret)); > + return ret; > } > - } > + fgt->next_in = fg->nb_inputs; > > - return 0; > + // return after one iteration, so that scheduler can rate-control > us > + if (did_step && fgp->have_sources) > + return 0; > + > + /* Reap all buffers present in the buffer sinks */ > + for (int i = 0; i < fg->nb_outputs; i++) { > + OutputFilterPriv *ofp = ofp_from_ofilter(fg->outputs[i]); > + > + ret = 0; > + while (!ret) { > + ret = fg_output_step(ofp, fgt, frame); > + if (ret < 0) > + return ret; > + } > + } > + did_step = 1; > + }; > + > + return (fgp->nb_outputs_done == fg->nb_outputs) ? AVERROR_EOF : 0; > } > > static void sub2video_heartbeat(InputFilter *ifilter, int64_t pts, > AVRational tb) > @@ -2571,6 +2526,9 @@ static int send_eof(FilterGraphThread *fgt, > InputFilter *ifilter, > InputFilterPriv *ifp = ifp_from_ifilter(ifilter); > int ret; > > + if (fgt->eof_in[ifp->index]) > + return 0; > + > fgt->eof_in[ifp->index] = 1; > > if (ifp->filter) { > @@ -2672,7 +2630,7 @@ static int send_frame(FilterGraph *fg, > FilterGraphThread *fgt, > return ret; > } > > - ret = fg->graph ? read_frames(fg, fgt, tmp, 1) : 0; > + ret = fg->graph ? read_frames(fg, fgt, tmp) : 0; > av_frame_free(&tmp); > if (ret < 0) > return ret; > @@ -2705,82 +2663,6 @@ static int send_frame(FilterGraph *fg, > FilterGraphThread *fgt, > return 0; > } > > -static int msg_process(FilterGraphPriv *fgp, FilterGraphThread *fgt, > - AVFrame *frame) > -{ > - const enum FrameOpaque msg = (intptr_t)frame->opaque; > - FilterGraph *fg = &fgp->fg; > - int graph_eof = 0; > - int ret; > - > - frame->opaque = NULL; > - av_assert0(msg > 0); > - av_assert0(msg == FRAME_OPAQUE_SEND_COMMAND || !frame->buf[0]); > - > - if (!fg->graph) { > - // graph not configured yet, ignore all messages other than > choosing > - // the input to read from > - if (msg != FRAME_OPAQUE_CHOOSE_INPUT) { > - av_frame_unref(frame); > - goto done; > - } > - > - for (int i = 0; i < fg->nb_inputs; i++) { > - InputFilter *ifilter = fg->inputs[i]; > - InputFilterPriv *ifp = ifp_from_ifilter(ifilter); > - if (ifp->format < 0 && !fgt->eof_in[i]) { > - frame->opaque = (void*)(intptr_t)(i + 1); > - goto done; > - } > - } > - > - // This state - graph is not configured, but all inputs are either > - // initialized or EOF - should be unreachable because sending EOF > to a > - // filter without even a fallback format should fail > - av_assert0(0); > - return AVERROR_BUG; > - } > - > - if (msg == FRAME_OPAQUE_SEND_COMMAND) { > - FilterCommand *fc = (FilterCommand*)frame->buf[0]->data; > - send_command(fg, fc->time, fc->target, fc->command, fc->arg, > fc->all_filters); > - av_frame_unref(frame); > - goto done; > - } > - > - if (msg == FRAME_OPAQUE_CHOOSE_INPUT) { > - ret = avfilter_graph_request_oldest(fg->graph); > - > - graph_eof = ret == AVERROR_EOF; > - > - if (ret == AVERROR(EAGAIN)) { > - frame->opaque = (void*)(intptr_t)(choose_input(fg, fgt) + 1); > - goto done; > - } else if (ret < 0 && !graph_eof) > - return ret; > - } > - > - ret = read_frames(fg, fgt, frame, 0); > - if (ret < 0) { > - av_log(fg, AV_LOG_ERROR, "Error sending filtered frames for > encoding\n"); > - return ret; > - } > - > - if (graph_eof) > - return AVERROR_EOF; > - > - // signal to the main thread that we are done processing the message > -done: > - ret = tq_send(fgp->queue_out, fg->nb_outputs, frame); > - if (ret < 0) { > - if (ret != AVERROR_EOF) > - av_log(fg, AV_LOG_ERROR, "Error communicating with the main > thread\n"); > - return ret; > - } > - > - return 0; > -} > - > static void fg_thread_set_name(const FilterGraph *fg) > { > char name[16]; > @@ -2867,294 +2749,94 @@ static void *filter_thread(void *arg) > InputFilter *ifilter; > InputFilterPriv *ifp; > enum FrameOpaque o; > - int input_idx, eof_frame; > + unsigned input_idx = fgt.next_in; > > - input_status = tq_receive(fgp->queue_in, &input_idx, fgt.frame); > - if (input_idx < 0 || > - (input_idx == fg->nb_inputs && input_status < 0)) { > + input_status = sch_filter_receive(fgp->sch, fgp->sch_idx, > + &input_idx, fgt.frame); > + if (input_status == AVERROR_EOF) { > av_log(fg, AV_LOG_VERBOSE, "Filtering thread received EOF\n"); > break; > + } else if (input_status == AVERROR(EAGAIN)) { > + // should only happen when we didn't request any input > + av_assert0(input_idx == fg->nb_inputs); > + goto read_frames; > } > + av_assert0(input_status >= 0); > + > + o = (intptr_t)fgt.frame->opaque; > > o = (intptr_t)fgt.frame->opaque; > > // message on the control stream > if (input_idx == fg->nb_inputs) { > - ret = msg_process(fgp, &fgt, fgt.frame); > - if (ret < 0) > - goto finish; > + FilterCommand *fc; > > + av_assert0(o == FRAME_OPAQUE_SEND_COMMAND && > fgt.frame->buf[0]); > + > + fc = (FilterCommand*)fgt.frame->buf[0]->data; > + send_command(fg, fc->time, fc->target, fc->command, fc->arg, > + fc->all_filters); > + av_frame_unref(fgt.frame); > continue; > } > > // we received an input frame or EOF > ifilter = fg->inputs[input_idx]; > ifp = ifp_from_ifilter(ifilter); > - eof_frame = input_status >= 0 && o == FRAME_OPAQUE_EOF; > + > if (ifp->type_src == AVMEDIA_TYPE_SUBTITLE) { > int hb_frame = input_status >= 0 && o == > FRAME_OPAQUE_SUB_HEARTBEAT; > ret = sub2video_frame(ifilter, (fgt.frame->buf[0] || > hb_frame) ? fgt.frame : NULL); > - } else if (input_status >= 0 && fgt.frame->buf[0]) { > + } else if (fgt.frame->buf[0]) { > ret = send_frame(fg, &fgt, ifilter, fgt.frame); > } else { > - int64_t pts = input_status >= 0 ? fgt.frame->pts : > AV_NOPTS_VALUE; > - AVRational tb = input_status >= 0 ? fgt.frame->time_base : > (AVRational){ 1, 1 }; > - ret = send_eof(&fgt, ifilter, pts, tb); > + av_assert1(o == FRAME_OPAQUE_EOF); > + ret = send_eof(&fgt, ifilter, fgt.frame->pts, > fgt.frame->time_base); > } > av_frame_unref(fgt.frame); > if (ret < 0) > + goto finish; > + > +read_frames: > + // retrieve all newly avalable frames > + ret = read_frames(fg, &fgt, fgt.frame); > + if (ret == AVERROR_EOF) { > + av_log(fg, AV_LOG_VERBOSE, "All consumers returned EOF\n"); > break; > - > - if (eof_frame) { > - // an EOF frame is immediately followed by sender closing > - // the corresponding stream, so retrieve that event > - input_status = tq_receive(fgp->queue_in, &input_idx, > fgt.frame); > - av_assert0(input_status == AVERROR_EOF && input_idx == > ifp->index); > - } > - > - // signal to the main thread that we are done > - ret = tq_send(fgp->queue_out, fg->nb_outputs, fgt.frame); > - if (ret < 0) { > - if (ret == AVERROR_EOF) > - break; > - > - av_log(fg, AV_LOG_ERROR, "Error communicating with the main > thread\n"); > + } else if (ret < 0) { > + av_log(fg, AV_LOG_ERROR, "Error sending frames to consumers: > %s\n", > + av_err2str(ret)); > goto finish; > } > } > > + for (unsigned i = 0; i < fg->nb_outputs; i++) { > + OutputFilterPriv *ofp = ofp_from_ofilter(fg->outputs[i]); > + > + if (fgt.eof_out[i]) > + continue; > + > + ret = fg_output_frame(ofp, &fgt, NULL); > + if (ret < 0) > + goto finish; > + } > + > finish: > // EOF is normal termination > if (ret == AVERROR_EOF) > ret = 0; > > - for (int i = 0; i <= fg->nb_inputs; i++) > - tq_receive_finish(fgp->queue_in, i); > - for (int i = 0; i <= fg->nb_outputs; i++) > - tq_send_finish(fgp->queue_out, i); > - > fg_thread_uninit(&fgt); > > - av_log(fg, AV_LOG_VERBOSE, "Terminating filtering thread\n"); > - > return (void*)(intptr_t)ret; > } > > -static int thread_send_frame(FilterGraphPriv *fgp, InputFilter *ifilter, > - AVFrame *frame, enum FrameOpaque type) > -{ > - InputFilterPriv *ifp = ifp_from_ifilter(ifilter); > - int output_idx, ret; > - > - if (ifp->eof) { > - av_frame_unref(frame); > - return AVERROR_EOF; > - } > - > - frame->opaque = (void*)(intptr_t)type; > - > - ret = tq_send(fgp->queue_in, ifp->index, frame); > - if (ret < 0) { > - ifp->eof = 1; > - av_frame_unref(frame); > - return ret; > - } > - > - if (type == FRAME_OPAQUE_EOF) > - tq_send_finish(fgp->queue_in, ifp->index); > - > - // wait for the frame to be processed > - ret = tq_receive(fgp->queue_out, &output_idx, frame); > - av_assert0(output_idx == fgp->fg.nb_outputs || ret == AVERROR_EOF); > - > - return ret; > -} > - > -int ifilter_send_frame(InputFilter *ifilter, AVFrame *frame, int > keep_reference) > -{ > - FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph); > - int ret; > - > - if (keep_reference) { > - ret = av_frame_ref(fgp->frame, frame); > - if (ret < 0) > - return ret; > - } else > - av_frame_move_ref(fgp->frame, frame); > - > - return thread_send_frame(fgp, ifilter, fgp->frame, 0); > -} > - > -int ifilter_send_eof(InputFilter *ifilter, int64_t pts, AVRational tb) > -{ > - FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph); > - int ret; > - > - fgp->frame->pts = pts; > - fgp->frame->time_base = tb; > - > - ret = thread_send_frame(fgp, ifilter, fgp->frame, FRAME_OPAQUE_EOF); > - > - return ret == AVERROR_EOF ? 0 : ret; > -} > - > -void ifilter_sub2video_heartbeat(InputFilter *ifilter, int64_t pts, > AVRational tb) > -{ > - FilterGraphPriv *fgp = fgp_from_fg(ifilter->graph); > - > - fgp->frame->pts = pts; > - fgp->frame->time_base = tb; > - > - thread_send_frame(fgp, ifilter, fgp->frame, > FRAME_OPAQUE_SUB_HEARTBEAT); > -} > - > -int fg_transcode_step(FilterGraph *graph, InputStream **best_ist) > -{ > - FilterGraphPriv *fgp = fgp_from_fg(graph); > - int ret, got_frames = 0; > - > - if (fgp->eof_in) > - return AVERROR_EOF; > - > - // signal to the filtering thread to return all frames it can > - av_assert0(!fgp->frame->buf[0]); > - fgp->frame->opaque = (void*)(intptr_t)(best_ist ? > - FRAME_OPAQUE_CHOOSE_INPUT : > - FRAME_OPAQUE_REAP_FILTERS); > - > - ret = tq_send(fgp->queue_in, graph->nb_inputs, fgp->frame); > - if (ret < 0) { > - fgp->eof_in = 1; > - goto finish; > - } > - > - while (1) { > - OutputFilter *ofilter; > - OutputFilterPriv *ofp; > - OutputStream *ost; > - int output_idx; > - > - ret = tq_receive(fgp->queue_out, &output_idx, fgp->frame); > - > - // EOF on the whole queue or the control stream > - if (output_idx < 0 || > - (ret < 0 && output_idx == graph->nb_outputs)) > - goto finish; > - > - // EOF for a specific stream > - if (ret < 0) { > - ofilter = graph->outputs[output_idx]; > - ofp = ofp_from_ofilter(ofilter); > - > - // we are finished and no frames were ever seen at this > output, > - // at least initialize the encoder with a dummy frame > - if (!ofp->got_frame) { > - AVFrame *frame = fgp->frame; > - FrameData *fd; > - > - frame->time_base = ofp->tb_out; > - frame->format = ofp->format; > - > - frame->width = ofp->width; > - frame->height = ofp->height; > - frame->sample_aspect_ratio = ofp->sample_aspect_ratio; > - > - frame->sample_rate = ofp->sample_rate; > - if (ofp->ch_layout.nb_channels) { > - ret = av_channel_layout_copy(&frame->ch_layout, > &ofp->ch_layout); > - if (ret < 0) > - return ret; > - } > - > - fd = frame_data(frame); > - if (!fd) > - return AVERROR(ENOMEM); > - > - fd->frame_rate_filter = ofp->fps.framerate; > - > - av_assert0(!frame->buf[0]); > - > - av_log(ofilter->ost, AV_LOG_WARNING, > - "No filtered frames for output stream, trying to " > - "initialize anyway.\n"); > - > - enc_open(ofilter->ost, frame); > - av_frame_unref(frame); > - } > - > - close_output_stream(graph->outputs[output_idx]->ost); > - continue; > - } > - > - // request was fully processed by the filtering thread, > - // return the input stream to read from, if needed > - if (output_idx == graph->nb_outputs) { > - int input_idx = (intptr_t)fgp->frame->opaque - 1; > - av_assert0(input_idx <= graph->nb_inputs); > - > - if (best_ist) { > - *best_ist = (input_idx >= 0 && input_idx < > graph->nb_inputs) ? > - > ifp_from_ifilter(graph->inputs[input_idx])->ist : NULL; > - > - if (input_idx < 0 && !got_frames) { > - for (int i = 0; i < graph->nb_outputs; i++) > - graph->outputs[i]->ost->unavailable = 1; > - } > - } > - break; > - } > - > - // got a frame from the filtering thread, send it for encoding > - ofilter = graph->outputs[output_idx]; > - ost = ofilter->ost; > - ofp = ofp_from_ofilter(ofilter); > - > - if (ost->finished) { > - av_frame_unref(fgp->frame); > - tq_receive_finish(fgp->queue_out, output_idx); > - continue; > - } > - > - if (fgp->frame->pts != AV_NOPTS_VALUE) { > - ofilter->last_pts = av_rescale_q(fgp->frame->pts, > - fgp->frame->time_base, > - AV_TIME_BASE_Q); > - } > - > - ret = enc_frame(ost, fgp->frame); > - av_frame_unref(fgp->frame); > - if (ret < 0) > - goto finish; > - > - ofp->got_frame = 1; > - got_frames = 1; > - } > - > -finish: > - if (ret < 0) { > - fgp->eof_in = 1; > - for (int i = 0; i < graph->nb_outputs; i++) > - close_output_stream(graph->outputs[i]->ost); > - } > - > - return ret; > -} > - > -int reap_filters(FilterGraph *fg, int flush) > -{ > - return fg_transcode_step(fg, NULL); > -} > - > void fg_send_command(FilterGraph *fg, double time, const char *target, > const char *command, const char *arg, int > all_filters) > { > FilterGraphPriv *fgp = fgp_from_fg(fg); > AVBufferRef *buf; > FilterCommand *fc; > - int output_idx, ret; > - > - if (!fgp->queue_in) > - return; > > fc = av_mallocz(sizeof(*fc)); > if (!fc) > @@ -3180,13 +2862,5 @@ void fg_send_command(FilterGraph *fg, double time, > const char *target, > fgp->frame->buf[0] = buf; > fgp->frame->opaque = (void*)(intptr_t)FRAME_OPAQUE_SEND_COMMAND; > > - ret = tq_send(fgp->queue_in, fg->nb_inputs, fgp->frame); > - if (ret < 0) { > - av_frame_unref(fgp->frame); > - return; > - } > - > - // wait for the frame to be processed > - ret = tq_receive(fgp->queue_out, &output_idx, fgp->frame); > - av_assert0(output_idx == fgp->fg.nb_outputs || ret == AVERROR_EOF); > + sch_filter_command(fgp->sch, fgp->sch_idx, fgp->frame); > } > diff --git a/fftools/ffmpeg_mux.c b/fftools/ffmpeg_mux.c > index ef5c2f60e0..067dc65d4e 100644 > --- a/fftools/ffmpeg_mux.c > +++ b/fftools/ffmpeg_mux.c > @@ -23,16 +23,13 @@ > #include "ffmpeg.h" > #include "ffmpeg_mux.h" > #include "ffmpeg_utils.h" > -#include "objpool.h" > #include "sync_queue.h" > -#include "thread_queue.h" > > #include "libavutil/fifo.h" > #include "libavutil/intreadwrite.h" > #include "libavutil/log.h" > #include "libavutil/mem.h" > #include "libavutil/timestamp.h" > -#include "libavutil/thread.h" > > #include "libavcodec/packet.h" > > @@ -41,10 +38,9 @@ > > typedef struct MuxThreadContext { > AVPacket *pkt; > + AVPacket *fix_sub_duration_pkt; > } MuxThreadContext; > > -int want_sdp = 1; > - > static Muxer *mux_from_of(OutputFile *of) > { > return (Muxer*)of; > @@ -207,14 +203,41 @@ static int sync_queue_process(Muxer *mux, > OutputStream *ost, AVPacket *pkt, int > return 0; > } > > +static int of_streamcopy(OutputStream *ost, AVPacket *pkt); > + > /* apply the output bitstream filters */ > -static int mux_packet_filter(Muxer *mux, OutputStream *ost, > - AVPacket *pkt, int *stream_eof) > +static int mux_packet_filter(Muxer *mux, MuxThreadContext *mt, > + OutputStream *ost, AVPacket *pkt, int > *stream_eof) > { > MuxStream *ms = ms_from_ost(ost); > const char *err_msg; > int ret = 0; > > + if (pkt && !ost->enc) { > + ret = of_streamcopy(ost, pkt); > + if (ret == AVERROR(EAGAIN)) > + return 0; > + else if (ret == AVERROR_EOF) { > + av_packet_unref(pkt); > + pkt = NULL; > + ret = 0; > + } else if (ret < 0) > + goto fail; > + } > + > + // emit heartbeat for -fix_sub_duration; > + // we are only interested in heartbeats on on random access points. > + if (pkt && (pkt->flags & AV_PKT_FLAG_KEY)) { > + mt->fix_sub_duration_pkt->opaque = > (void*)(intptr_t)PKT_OPAQUE_FIX_SUB_DURATION; > + mt->fix_sub_duration_pkt->pts = pkt->pts; > + mt->fix_sub_duration_pkt->time_base = pkt->time_base; > + > + ret = sch_mux_sub_heartbeat(mux->sch, mux->sch_idx, ms->sch_idx, > + mt->fix_sub_duration_pkt); > + if (ret < 0) > + goto fail; > + } > + > if (ms->bsf_ctx) { > int bsf_eof = 0; > > @@ -278,6 +301,7 @@ static void thread_set_name(OutputFile *of) > static void mux_thread_uninit(MuxThreadContext *mt) > { > av_packet_free(&mt->pkt); > + av_packet_free(&mt->fix_sub_duration_pkt); > > memset(mt, 0, sizeof(*mt)); > } > @@ -290,6 +314,10 @@ static int mux_thread_init(MuxThreadContext *mt) > if (!mt->pkt) > goto fail; > > + mt->fix_sub_duration_pkt = av_packet_alloc(); > + if (!mt->fix_sub_duration_pkt) > + goto fail; > + > return 0; > > fail: > @@ -316,19 +344,22 @@ void *muxer_thread(void *arg) > OutputStream *ost; > int stream_idx, stream_eof = 0; > > - ret = tq_receive(mux->tq, &stream_idx, mt.pkt); > + ret = sch_mux_receive(mux->sch, of->index, mt.pkt); > + stream_idx = mt.pkt->stream_index; > if (stream_idx < 0) { > av_log(mux, AV_LOG_VERBOSE, "All streams finished\n"); > ret = 0; > break; > } > > - ost = of->streams[stream_idx]; > - ret = mux_packet_filter(mux, ost, ret < 0 ? NULL : mt.pkt, > &stream_eof); > + ost = of->streams[mux->sch_stream_idx[stream_idx]]; > + mt.pkt->stream_index = ost->index; > + > + ret = mux_packet_filter(mux, &mt, ost, ret < 0 ? NULL : mt.pkt, > &stream_eof); > av_packet_unref(mt.pkt); > if (ret == AVERROR_EOF) { > if (stream_eof) { > - tq_receive_finish(mux->tq, stream_idx); > + sch_mux_receive_finish(mux->sch, of->index, stream_idx); > } else { > av_log(mux, AV_LOG_VERBOSE, "Muxer returned EOF\n"); > ret = 0; > @@ -343,243 +374,55 @@ void *muxer_thread(void *arg) > finish: > mux_thread_uninit(&mt); > > - for (unsigned int i = 0; i < mux->fc->nb_streams; i++) > - tq_receive_finish(mux->tq, i); > - > - av_log(mux, AV_LOG_VERBOSE, "Terminating muxer thread\n"); > - > return (void*)(intptr_t)ret; > } > > -static int thread_submit_packet(Muxer *mux, OutputStream *ost, AVPacket > *pkt) > -{ > - int ret = 0; > - > - if (!pkt || ost->finished & MUXER_FINISHED) > - goto finish; > - > - ret = tq_send(mux->tq, ost->index, pkt); > - if (ret < 0) > - goto finish; > - > - return 0; > - > -finish: > - if (pkt) > - av_packet_unref(pkt); > - > - ost->finished |= MUXER_FINISHED; > - tq_send_finish(mux->tq, ost->index); > - return ret == AVERROR_EOF ? 0 : ret; > -} > - > -static int queue_packet(OutputStream *ost, AVPacket *pkt) > -{ > - MuxStream *ms = ms_from_ost(ost); > - AVPacket *tmp_pkt = NULL; > - int ret; > - > - if (!av_fifo_can_write(ms->muxing_queue)) { > - size_t cur_size = av_fifo_can_read(ms->muxing_queue); > - size_t pkt_size = pkt ? pkt->size : 0; > - unsigned int are_we_over_size = > - (ms->muxing_queue_data_size + pkt_size) > > ms->muxing_queue_data_threshold; > - size_t limit = are_we_over_size ? ms->max_muxing_queue_size : > SIZE_MAX; > - size_t new_size = FFMIN(2 * cur_size, limit); > - > - if (new_size <= cur_size) { > - av_log(ost, AV_LOG_ERROR, > - "Too many packets buffered for output stream %d:%d.\n", > - ost->file_index, ost->st->index); > - return AVERROR(ENOSPC); > - } > - ret = av_fifo_grow2(ms->muxing_queue, new_size - cur_size); > - if (ret < 0) > - return ret; > - } > - > - if (pkt) { > - ret = av_packet_make_refcounted(pkt); > - if (ret < 0) > - return ret; > - > - tmp_pkt = av_packet_alloc(); > - if (!tmp_pkt) > - return AVERROR(ENOMEM); > - > - av_packet_move_ref(tmp_pkt, pkt); > - ms->muxing_queue_data_size += tmp_pkt->size; > - } > - av_fifo_write(ms->muxing_queue, &tmp_pkt, 1); > - > - return 0; > -} > - > -static int submit_packet(Muxer *mux, AVPacket *pkt, OutputStream *ost) > -{ > - int ret; > - > - if (mux->tq) { > - return thread_submit_packet(mux, ost, pkt); > - } else { > - /* the muxer is not initialized yet, buffer the packet */ > - ret = queue_packet(ost, pkt); > - if (ret < 0) { > - if (pkt) > - av_packet_unref(pkt); > - return ret; > - } > - } > - > - return 0; > -} > - > -int of_output_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt) > -{ > - Muxer *mux = mux_from_of(of); > - int ret = 0; > - > - if (pkt && pkt->dts != AV_NOPTS_VALUE) > - ost->last_mux_dts = av_rescale_q(pkt->dts, pkt->time_base, > AV_TIME_BASE_Q); > - > - ret = submit_packet(mux, pkt, ost); > - if (ret < 0) { > - av_log(ost, AV_LOG_ERROR, "Error submitting a packet to the > muxer: %s", > - av_err2str(ret)); > - return ret; > - } > - > - return 0; > -} > - > -int of_streamcopy(OutputStream *ost, const AVPacket *pkt, int64_t dts) > +static int of_streamcopy(OutputStream *ost, AVPacket *pkt) > { > OutputFile *of = output_files[ost->file_index]; > MuxStream *ms = ms_from_ost(ost); > + DemuxPktData *pd = pkt->opaque_ref ? > (DemuxPktData*)pkt->opaque_ref->data : NULL; > + int64_t dts = pd ? pd->dts_est : AV_NOPTS_VALUE; > int64_t start_time = (of->start_time == AV_NOPTS_VALUE) ? 0 : > of->start_time; > int64_t ts_offset; > - AVPacket *opkt = ms->pkt; > - int ret; > - > - av_packet_unref(opkt); > > if (of->recording_time != INT64_MAX && > dts >= of->recording_time + start_time) > - pkt = NULL; > - > - // EOF: flush output bitstream filters. > - if (!pkt) > - return of_output_packet(of, ost, NULL); > + return AVERROR_EOF; > > if (!ms->streamcopy_started && !(pkt->flags & AV_PKT_FLAG_KEY) && > !ms->copy_initial_nonkeyframes) > - return 0; > + return AVERROR(EAGAIN); > > if (!ms->streamcopy_started) { > if (!ms->copy_prior_start && > (pkt->pts == AV_NOPTS_VALUE ? > dts < ms->ts_copy_start : > pkt->pts < av_rescale_q(ms->ts_copy_start, AV_TIME_BASE_Q, > pkt->time_base))) > - return 0; > + return AVERROR(EAGAIN); > > if (of->start_time != AV_NOPTS_VALUE && dts < of->start_time) > - return 0; > + return AVERROR(EAGAIN); > } > > - ret = av_packet_ref(opkt, pkt); > - if (ret < 0) > - return ret; > - > - ts_offset = av_rescale_q(start_time, AV_TIME_BASE_Q, opkt->time_base); > + ts_offset = av_rescale_q(start_time, AV_TIME_BASE_Q, pkt->time_base); > > if (pkt->pts != AV_NOPTS_VALUE) > - opkt->pts -= ts_offset; > + pkt->pts -= ts_offset; > > if (pkt->dts == AV_NOPTS_VALUE) { > - opkt->dts = av_rescale_q(dts, AV_TIME_BASE_Q, opkt->time_base); > + pkt->dts = av_rescale_q(dts, AV_TIME_BASE_Q, pkt->time_base); > } else if (ost->st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO) { > - opkt->pts = opkt->dts - ts_offset; > - } > - opkt->dts -= ts_offset; > - > - { > - int ret = trigger_fix_sub_duration_heartbeat(ost, pkt); > - if (ret < 0) { > - av_log(NULL, AV_LOG_ERROR, > - "Subtitle heartbeat logic failed in %s! (%s)\n", > - __func__, av_err2str(ret)); > - return ret; > - } > + pkt->pts = pkt->dts - ts_offset; > } > > - ret = of_output_packet(of, ost, opkt); > - if (ret < 0) > - return ret; > + pkt->dts -= ts_offset; > > ms->streamcopy_started = 1; > > return 0; > } > > -static int thread_stop(Muxer *mux) > -{ > - void *ret; > - > - if (!mux || !mux->tq) > - return 0; > - > - for (unsigned int i = 0; i < mux->fc->nb_streams; i++) > - tq_send_finish(mux->tq, i); > - > - pthread_join(mux->thread, &ret); > - > - tq_free(&mux->tq); > - > - return (int)(intptr_t)ret; > -} > - > -static int thread_start(Muxer *mux) > -{ > - AVFormatContext *fc = mux->fc; > - ObjPool *op; > - int ret; > - > - op = objpool_alloc_packets(); > - if (!op) > - return AVERROR(ENOMEM); > - > - mux->tq = tq_alloc(fc->nb_streams, mux->thread_queue_size, op, > pkt_move); > - if (!mux->tq) { > - objpool_free(&op); > - return AVERROR(ENOMEM); > - } > - > - ret = pthread_create(&mux->thread, NULL, muxer_thread, (void*)mux); > - if (ret) { > - tq_free(&mux->tq); > - return AVERROR(ret); > - } > - > - /* flush the muxing queues */ > - for (int i = 0; i < fc->nb_streams; i++) { > - OutputStream *ost = mux->of.streams[i]; > - MuxStream *ms = ms_from_ost(ost); > - AVPacket *pkt; > - > - while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0) { > - ret = thread_submit_packet(mux, ost, pkt); > - if (pkt) { > - ms->muxing_queue_data_size -= pkt->size; > - av_packet_free(&pkt); > - } > - if (ret < 0) > - return ret; > - } > - } > - > - return 0; > -} > - > int print_sdp(const char *filename); > > int print_sdp(const char *filename) > @@ -590,11 +433,6 @@ int print_sdp(const char *filename) > AVIOContext *sdp_pb; > AVFormatContext **avc; > > - for (i = 0; i < nb_output_files; i++) { > - if (!mux_from_of(output_files[i])->header_written) > - return 0; > - } > - > avc = av_malloc_array(nb_output_files, sizeof(*avc)); > if (!avc) > return AVERROR(ENOMEM); > @@ -629,25 +467,17 @@ int print_sdp(const char *filename) > avio_closep(&sdp_pb); > } > > - // SDP successfully written, allow muxer threads to start > - ret = 1; > - > fail: > av_freep(&avc); > return ret; > } > > -int mux_check_init(Muxer *mux) > +int mux_check_init(void *arg) > { > + Muxer *mux = arg; > OutputFile *of = &mux->of; > AVFormatContext *fc = mux->fc; > - int ret, i; > - > - for (i = 0; i < fc->nb_streams; i++) { > - OutputStream *ost = of->streams[i]; > - if (!ost->initialized) > - return 0; > - } > + int ret; > > ret = avformat_write_header(fc, &mux->opts); > if (ret < 0) { > @@ -659,27 +489,7 @@ int mux_check_init(Muxer *mux) > mux->header_written = 1; > > av_dump_format(fc, of->index, fc->url, 1); > - nb_output_dumped++; > - > - if (sdp_filename || want_sdp) { > - ret = print_sdp(sdp_filename); > - if (ret < 0) { > - av_log(NULL, AV_LOG_ERROR, "Error writing the SDP.\n"); > - return ret; > - } else if (ret == 1) { > - /* SDP is written only after all the muxers are ready, so now > we > - * start ALL the threads */ > - for (i = 0; i < nb_output_files; i++) { > - ret = thread_start(mux_from_of(output_files[i])); > - if (ret < 0) > - return ret; > - } > - } > - } else { > - ret = thread_start(mux_from_of(of)); > - if (ret < 0) > - return ret; > - } > + atomic_fetch_add(&nb_output_dumped, 1); > > return 0; > } > @@ -736,9 +546,10 @@ int of_stream_init(OutputFile *of, OutputStream *ost) > ost->st->time_base); > } > > - ost->initialized = 1; > + if (ms->sch_idx >= 0) > + return sch_mux_stream_ready(mux->sch, of->index, ms->sch_idx); > > - return mux_check_init(mux); > + return 0; > } > > static int check_written(OutputFile *of) > @@ -852,15 +663,13 @@ int of_write_trailer(OutputFile *of) > AVFormatContext *fc = mux->fc; > int ret, mux_result = 0; > > - if (!mux->tq) { > + if (!mux->header_written) { > av_log(mux, AV_LOG_ERROR, > "Nothing was written into output file, because " > "at least one of its streams received no packets.\n"); > return AVERROR(EINVAL); > } > > - mux_result = thread_stop(mux); > - > ret = av_write_trailer(fc); > if (ret < 0) { > av_log(mux, AV_LOG_ERROR, "Error writing trailer: %s\n", > av_err2str(ret)); > @@ -905,13 +714,6 @@ static void ost_free(OutputStream **post) > ost->logfile = NULL; > } > > - if (ms->muxing_queue) { > - AVPacket *pkt; > - while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0) > - av_packet_free(&pkt); > - av_fifo_freep2(&ms->muxing_queue); > - } > - > avcodec_parameters_free(&ost->par_in); > > av_bsf_free(&ms->bsf_ctx); > @@ -976,8 +778,6 @@ void of_free(OutputFile **pof) > return; > mux = mux_from_of(of); > > - thread_stop(mux); > - > sq_free(&of->sq_encode); > sq_free(&mux->sq_mux); > > diff --git a/fftools/ffmpeg_mux.h b/fftools/ffmpeg_mux.h > index eee2b2cb07..5d7cf3fa76 100644 > --- a/fftools/ffmpeg_mux.h > +++ b/fftools/ffmpeg_mux.h > @@ -25,7 +25,6 @@ > #include <stdint.h> > > #include "ffmpeg_sched.h" > -#include "thread_queue.h" > > #include "libavformat/avformat.h" > > @@ -33,7 +32,6 @@ > > #include "libavutil/dict.h" > #include "libavutil/fifo.h" > -#include "libavutil/thread.h" > > typedef struct MuxStream { > OutputStream ost; > @@ -41,9 +39,6 @@ typedef struct MuxStream { > // name used for logging > char log_name[32]; > > - /* the packets are buffered here until the muxer is ready to be > initialized */ > - AVFifo *muxing_queue; > - > AVBSFContext *bsf_ctx; > AVPacket *bsf_pkt; > > @@ -57,17 +52,6 @@ typedef struct MuxStream { > > int64_t max_frames; > > - /* > - * The size of the AVPackets' buffers in queue. > - * Updated when a packet is either pushed or pulled from the queue. > - */ > - size_t muxing_queue_data_size; > - > - int max_muxing_queue_size; > - > - /* Threshold after which max_muxing_queue_size will be in effect */ > - size_t muxing_queue_data_threshold; > - > // timestamp from which the streamcopied streams should start, > // in AV_TIME_BASE_Q; > // everything before it should be discarded > @@ -106,9 +90,6 @@ typedef struct Muxer { > int *sch_stream_idx; > int nb_sch_stream_idx; > > - pthread_t thread; > - ThreadQueue *tq; > - > AVDictionary *opts; > > int thread_queue_size; > @@ -122,10 +103,7 @@ typedef struct Muxer { > AVPacket *sq_pkt; > } Muxer; > > -/* whether we want to print an SDP, set in of_open() */ > -extern int want_sdp; > - > -int mux_check_init(Muxer *mux); > +int mux_check_init(void *arg); > > static MuxStream *ms_from_ost(OutputStream *ost) > { > diff --git a/fftools/ffmpeg_mux_init.c b/fftools/ffmpeg_mux_init.c > index 534b4379c7..6459296ab0 100644 > --- a/fftools/ffmpeg_mux_init.c > +++ b/fftools/ffmpeg_mux_init.c > @@ -924,13 +924,6 @@ static int new_stream_audio(Muxer *mux, const > OptionsContext *o, > return 0; > } > > -static int new_stream_attachment(Muxer *mux, const OptionsContext *o, > - OutputStream *ost) > -{ > - ost->finished = 1; > - return 0; > -} > - > static int new_stream_subtitle(Muxer *mux, const OptionsContext *o, > OutputStream *ost) > { > @@ -1168,9 +1161,6 @@ static int ost_add(Muxer *mux, const OptionsContext > *o, enum AVMediaType type, > if (!ost->par_in) > return AVERROR(ENOMEM); > > - ms->muxing_queue = av_fifo_alloc2(8, sizeof(AVPacket*), 0); > - if (!ms->muxing_queue) > - return AVERROR(ENOMEM); > ms->last_mux_dts = AV_NOPTS_VALUE; > > ost->st = st; > @@ -1190,7 +1180,8 @@ static int ost_add(Muxer *mux, const OptionsContext > *o, enum AVMediaType type, > if (!ost->enc_ctx) > return AVERROR(ENOMEM); > > - ret = sch_add_enc(mux->sch, encoder_thread, ost, NULL); > + ret = sch_add_enc(mux->sch, encoder_thread, ost, > + ost->type == AVMEDIA_TYPE_SUBTITLE ? NULL : > enc_open); > if (ret < 0) > return ret; > ms->sch_idx_enc = ret; > @@ -1414,9 +1405,6 @@ static int ost_add(Muxer *mux, const OptionsContext > *o, enum AVMediaType type, > > sch_mux_stream_buffering(mux->sch, mux->sch_idx, ms->sch_idx, > max_muxing_queue_size, > muxing_queue_data_threshold); > - > - ms->max_muxing_queue_size = max_muxing_queue_size; > - ms->muxing_queue_data_threshold = muxing_queue_data_threshold; > } > > MATCH_PER_STREAM_OPT(bits_per_raw_sample, i, ost->bits_per_raw_sample, > @@ -1434,8 +1422,6 @@ static int ost_add(Muxer *mux, const OptionsContext > *o, enum AVMediaType type, > if (ost->enc_ctx && > av_get_exact_bits_per_sample(ost->enc_ctx->codec_id) == 24) > av_dict_set(&ost->swr_opts, "output_sample_bits", "24", 0); > > - ost->last_mux_dts = AV_NOPTS_VALUE; > - > MATCH_PER_STREAM_OPT(copy_initial_nonkeyframes, i, > ms->copy_initial_nonkeyframes, oc, st); > > @@ -1443,7 +1429,6 @@ static int ost_add(Muxer *mux, const OptionsContext > *o, enum AVMediaType type, > case AVMEDIA_TYPE_VIDEO: ret = new_stream_video (mux, o, > ost); break; > case AVMEDIA_TYPE_AUDIO: ret = new_stream_audio (mux, o, > ost); break; > case AVMEDIA_TYPE_SUBTITLE: ret = new_stream_subtitle (mux, o, > ost); break; > - case AVMEDIA_TYPE_ATTACHMENT: ret = new_stream_attachment(mux, o, > ost); break; > } > if (ret < 0) > return ret; > @@ -1938,7 +1923,6 @@ static int setup_sync_queues(Muxer *mux, > AVFormatContext *oc, int64_t buf_size_u > MuxStream *ms = ms_from_ost(ost); > enum AVMediaType type = ost->type; > > - ost->sq_idx_encode = -1; > ost->sq_idx_mux = -1; > > nb_interleaved += IS_INTERLEAVED(type); > @@ -1961,11 +1945,17 @@ static int setup_sync_queues(Muxer *mux, > AVFormatContext *oc, int64_t buf_size_u > * - at least one encoded audio/video stream is frame-limited, since > * that has similar semantics to 'shortest' > * - at least one audio encoder requires constant frame sizes > + * > + * Note that encoding sync queues are handled in the scheduler, > because > + * different encoders run in different threads and need external > + * synchronization, while muxer sync queues can be handled inside the > muxer > */ > if ((of->shortest && nb_av_enc > 1) || limit_frames_av_enc || > nb_audio_fs) { > - of->sq_encode = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us, mux); > - if (!of->sq_encode) > - return AVERROR(ENOMEM); > + int sq_idx, ret; > + > + sq_idx = sch_add_sq_enc(mux->sch, buf_size_us, mux); > + if (sq_idx < 0) > + return sq_idx; > > for (int i = 0; i < oc->nb_streams; i++) { > OutputStream *ost = of->streams[i]; > @@ -1975,13 +1965,11 @@ static int setup_sync_queues(Muxer *mux, > AVFormatContext *oc, int64_t buf_size_u > if (!IS_AV_ENC(ost, type)) > continue; > > - ost->sq_idx_encode = sq_add_stream(of->sq_encode, > - of->shortest || > ms->max_frames < INT64_MAX); > - if (ost->sq_idx_encode < 0) > - return ost->sq_idx_encode; > - > - if (ms->max_frames != INT64_MAX) > - sq_limit_frames(of->sq_encode, ost->sq_idx_encode, > ms->max_frames); > + ret = sch_sq_add_enc(mux->sch, sq_idx, ms->sch_idx_enc, > + of->shortest || ms->max_frames < > INT64_MAX, > + ms->max_frames); > + if (ret < 0) > + return ret; > } > } > > @@ -2652,23 +2640,6 @@ static int validate_enc_avopt(Muxer *mux, const > AVDictionary *codec_avopt) > return 0; > } > > -static int init_output_stream_nofilter(OutputStream *ost) > -{ > - int ret = 0; > - > - if (ost->enc_ctx) { > - ret = enc_open(ost, NULL); > - if (ret < 0) > - return ret; > - } else { > - ret = of_stream_init(output_files[ost->file_index], ost); > - if (ret < 0) > - return ret; > - } > - > - return ret; > -} > - > static const char *output_file_item_name(void *obj) > { > const Muxer *mux = obj; > @@ -2751,8 +2722,6 @@ int of_open(const OptionsContext *o, const char > *filename, Scheduler *sch) > av_strlcat(mux->log_name, "/", sizeof(mux->log_name)); > av_strlcat(mux->log_name, oc->oformat->name, sizeof(mux->log_name)); > > - if (strcmp(oc->oformat->name, "rtp")) > - want_sdp = 0; > > of->format = oc->oformat; > if (recording_time != INT64_MAX) > @@ -2768,7 +2737,7 @@ int of_open(const OptionsContext *o, const char > *filename, Scheduler *sch) > AVFMT_FLAG_BITEXACT); > } > > - err = sch_add_mux(sch, muxer_thread, NULL, mux, > + err = sch_add_mux(sch, muxer_thread, mux_check_init, mux, > !strcmp(oc->oformat->name, "rtp")); > if (err < 0) > return err; > @@ -2854,26 +2823,15 @@ int of_open(const OptionsContext *o, const char > *filename, Scheduler *sch) > > of->url = filename; > > - /* initialize stream copy and subtitle/data streams. > - * Encoded AVFrame based streams will get initialized when the first > AVFrame > - * is received in do_video_out > - */ > + /* initialize streamcopy streams. */ > for (int i = 0; i < of->nb_streams; i++) { > OutputStream *ost = of->streams[i]; > > - if (ost->filter) > - continue; > - > - err = init_output_stream_nofilter(ost); > - if (err < 0) > - return err; > - } > - > - /* write the header for files with no streams */ > - if (of->format->flags & AVFMT_NOSTREAMS && oc->nb_streams == 0) { > - int ret = mux_check_init(mux); > - if (ret < 0) > - return ret; > + if (!ost->enc) { > + err = of_stream_init(of, ost); > + if (err < 0) > + return err; > + } > } > > return 0; > diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c > index d463306546..6177a96a4e 100644 > --- a/fftools/ffmpeg_opt.c > +++ b/fftools/ffmpeg_opt.c > @@ -64,7 +64,6 @@ const char *const opt_name_top_field_first[] > = {"top", NULL}; > HWDevice *filter_hw_device; > > char *vstats_filename; > -char *sdp_filename; > > float audio_drift_threshold = 0.1; > float dts_delta_threshold = 10; > @@ -580,9 +579,8 @@ fail: > > static int opt_sdp_file(void *optctx, const char *opt, const char *arg) > { > - av_free(sdp_filename); > - sdp_filename = av_strdup(arg); > - return 0; > + Scheduler *sch = optctx; > + return sch_sdp_filename(sch, arg); > } > > #if CONFIG_VAAPI > diff --git a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat > b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat > index 957a410921..bc9b833799 100644 > --- a/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat > +++ b/tests/ref/fate/ffmpeg-fix_sub_duration_heartbeat > @@ -1,48 +1,40 @@ > 1 > -00:00:00,968 --> 00:00:01,001 > +00:00:00,968 --> 00:00:01,168 > <font face="Monospace">{\an7}(</font> > > 2 > -00:00:01,001 --> 00:00:01,168 > -<font face="Monospace">{\an7}(</font> > - > -3 > 00:00:01,168 --> 00:00:01,368 > <font face="Monospace">{\an7}(<i> inaudibl</i></font> > > -4 > +3 > 00:00:01,368 --> 00:00:01,568 > <font face="Monospace">{\an7}(<i> inaudible radio chat</i></font> > > -5 > +4 > 00:00:01,568 --> 00:00:02,002 > <font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )</font> > > +5 > +00:00:02,002 --> 00:00:03,103 > +<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )</font> > + > 6 > -00:00:02,002 --> 00:00:03,003 > -<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )</font> > - > -7 > -00:00:03,003 --> 00:00:03,103 > -<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> )</font> > - > -8 > 00:00:03,103 --> 00:00:03,303 > -<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> ) > +<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> ) > >></font> > > -9 > +7 > 00:00:03,303 --> 00:00:03,503 > -<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> ) > +<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> ) > >> Safety rema</font> > > -10 > +8 > 00:00:03,504 --> 00:00:03,704 > -<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> ) > +<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> ) > >> Safety remains our numb</font> > > -11 > +9 > 00:00:03,704 --> 00:00:04,004 > -<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> ) > +<font face="Monospace">{\an7}(<i> inaudible radio chatter</i> ) > >> Safety remains our number one</font> > > -- > 2.42.0 > > _______________________________________________ > ffmpeg-devel mailing list > ffmpeg-devel@ffmpeg.org > https://ffmpeg.org/mailman/listinfo/ffmpeg-devel > > To unsubscribe, visit link above, or email > ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe". > _______________________________________________ ffmpeg-devel mailing list ffmpeg-devel@ffmpeg.org https://ffmpeg.org/mailman/listinfo/ffmpeg-devel To unsubscribe, visit link above, or email ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
next prev parent reply other threads:[~2023-12-06 11:23 UTC|newest] Thread overview: 36+ messages / expand[flat|nested] mbox.gz Atom feed top 2023-12-06 10:27 [FFmpeg-devel] [PATCH v3] ffmpeg CLI multithreading Anton Khirnov 2023-12-06 10:27 ` [FFmpeg-devel] [PATCH 01/10] fftools/ffmpeg_filter: make sub2video heartbeat more robust Anton Khirnov 2023-12-06 10:27 ` [FFmpeg-devel] [PATCH 02/10] fftools/ffmpeg_filter: move filtering to a separate thread Anton Khirnov 2023-12-06 10:27 ` [FFmpeg-devel] [PATCH 03/10] fftools/ffmpeg_filter: buffer sub2video heartbeat frames like other frames Anton Khirnov 2023-12-06 10:27 ` [FFmpeg-devel] [PATCH 04/10] fftools/ffmpeg_filter: reindent Anton Khirnov 2023-12-06 10:27 ` [FFmpeg-devel] [PATCH 05/10] fftools/ffmpeg_mux: add muxing thread private data Anton Khirnov 2023-12-06 10:27 ` [FFmpeg-devel] [PATCH 06/10] fftools/ffmpeg_mux: move bitstream filtering to the muxer thread Anton Khirnov 2023-12-06 10:27 ` [FFmpeg-devel] [PATCH 07/10] fftools/ffmpeg_demux: switch from AVThreadMessageQueue to ThreadQueue Anton Khirnov 2023-12-06 10:27 ` [FFmpeg-devel] [PATCH 08/10] fftools/ffmpeg_enc: move encoding to a separate thread Anton Khirnov 2023-12-06 10:27 ` [FFmpeg-devel] [PATCH 09/10] fftools/ffmpeg: add thread-aware transcode scheduling infrastructure Anton Khirnov 2023-12-06 10:27 ` [FFmpeg-devel] [PATCH 10/10] fftools/ffmpeg: convert to a threaded architecture Anton Khirnov 2023-12-06 11:22 ` Paul B Mahol [this message] 2023-12-06 11:31 ` Anton Khirnov 2023-12-06 10:51 ` [FFmpeg-devel] [PATCH] web: add a news entry for ffmpeg CLI threading Anton Khirnov 2023-12-06 10:55 ` [FFmpeg-devel] [PATCH v3] ffmpeg CLI multithreading Nicolas George 2023-12-06 12:06 ` Zhao Zhili 2023-12-06 12:10 ` Nicolas George 2023-12-06 12:30 ` Anton Khirnov 2023-12-06 12:58 ` Nicolas George 2023-12-06 17:31 ` Anton Khirnov 2023-12-06 20:14 ` Nicolas George 2023-12-06 20:53 ` Vittorio Giovara 2023-12-06 19:29 ` Marton Balint 2023-12-06 19:36 ` Anton Khirnov 2023-12-06 20:16 ` Nicolas George [not found] ` <6F5C6E5F-1538-47F0-9C71-CA10A1D38C3F@cosmin.at> 2023-12-06 20:29 ` Cosmin Stejerean via ffmpeg-devel 2023-12-06 21:00 ` Anton Khirnov 2023-12-06 20:29 ` Anton Khirnov 2023-12-06 20:03 ` Vittorio Giovara 2023-12-06 20:21 ` Michael Niedermayer 2023-12-07 10:52 ` Anton Khirnov 2023-12-07 18:10 ` Michael Niedermayer 2023-12-07 18:27 ` Michael Niedermayer 2023-12-11 9:06 ` Anton Khirnov 2024-01-15 23:51 ` Marth64 2024-01-24 12:51 ` Anton Khirnov
Reply instructions: You may reply publicly to this message via plain-text email using any one of the following methods: * Save the following mbox file, import it into your mail client, and reply-to-all from there: mbox Avoid top-posting and favor interleaved quoting: https://en.wikipedia.org/wiki/Posting_style#Interleaved_style * Reply using the --to, --cc, and --in-reply-to switches of git-send-email(1): git send-email \ --in-reply-to=CAPYw7P6ps2PGpA-JWgaVwPVPTaYbTPMJcMRdBGhTwv9XrEA_TQ@mail.gmail.com \ --to=onemda@gmail.com \ --cc=ffmpeg-devel@ffmpeg.org \ /path/to/YOUR_REPLY https://kernel.org/pub/software/scm/git/docs/git-send-email.html * If your mail client supports setting the In-Reply-To header via mailto: links, try the mailto: link
Git Inbox Mirror of the ffmpeg-devel mailing list - see https://ffmpeg.org/mailman/listinfo/ffmpeg-devel This inbox may be cloned and mirrored by anyone: git clone --mirror https://master.gitmailbox.com/ffmpegdev/0 ffmpegdev/git/0.git # If you have public-inbox 1.1+ installed, you may # initialize and index your mirror using the following commands: public-inbox-init -V2 ffmpegdev ffmpegdev/ https://master.gitmailbox.com/ffmpegdev \ ffmpegdev@gitmailbox.com public-inbox-index ffmpegdev Example config snippet for mirrors. AGPL code for this site: git clone https://public-inbox.org/public-inbox.git