From mboxrd@z Thu Jan 1 00:00:00 1970 Return-Path: Received: from ffbox0-bg.mplayerhq.hu (ffbox0-bg.ffmpeg.org [79.124.17.100]) by master.gitmailbox.com (Postfix) with ESMTP id 0580547676 for ; Tue, 19 Sep 2023 19:25:19 +0000 (UTC) Received: from [127.0.1.1] (localhost [127.0.0.1]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTP id 64FDB68C9D5; Tue, 19 Sep 2023 22:21:58 +0300 (EEST) Received: from mail1.khirnov.net (quelana.khirnov.net [94.230.150.81]) by ffbox0-bg.mplayerhq.hu (Postfix) with ESMTPS id 4777F68C90E for ; Tue, 19 Sep 2023 22:21:56 +0300 (EEST) Received: from localhost (mail1.khirnov.net [IPv6:::1]) by mail1.khirnov.net (Postfix) with ESMTP id B55095101 for ; Tue, 19 Sep 2023 21:21:55 +0200 (CEST) Received: from mail1.khirnov.net ([IPv6:::1]) by localhost (mail1.khirnov.net [IPv6:::1]) (amavis, port 10024) with ESMTP id 2T3lmpaAjhwu for ; Tue, 19 Sep 2023 21:21:50 +0200 (CEST) Received: from libav.khirnov.net (libav.khirnov.net [IPv6:2a00:c500:561:201::7]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256 client-signature RSA-PSS (2048 bits) client-digest SHA256) (Client CN "libav.khirnov.net", Issuer "smtp.khirnov.net SMTP CA" (verified OK)) by mail1.khirnov.net (Postfix) with ESMTPS id 51504523A for ; Tue, 19 Sep 2023 21:20:43 +0200 (CEST) Received: from libav.khirnov.net (libav.khirnov.net [IPv6:::1]) by libav.khirnov.net (Postfix) with ESMTP id 450AC3A0D2D for ; Tue, 19 Sep 2023 21:20:43 +0200 (CEST) From: Anton Khirnov To: ffmpeg-devel@ffmpeg.org Date: Tue, 19 Sep 2023 21:10:53 +0200 Message-Id: <20230919191044.18873-27-anton@khirnov.net> X-Mailer: git-send-email 2.40.1 In-Reply-To: <20230919191044.18873-1-anton@khirnov.net> References: <20230919191044.18873-1-anton@khirnov.net> MIME-Version: 1.0 Subject: [FFmpeg-devel] [PATCH 26/27] WIP fftools/ffmpeg_mux: convert to the scheduler X-BeenThere: ffmpeg-devel@ffmpeg.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: FFmpeg development discussions and patches List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-To: FFmpeg development discussions and patches Content-Type: text/plain; charset="us-ascii" Content-Transfer-Encoding: 7bit Errors-To: ffmpeg-devel-bounces@ffmpeg.org Sender: "ffmpeg-devel" Archived-At: List-Archive: List-Post: --- fftools/ffmpeg.c | 26 ------ fftools/ffmpeg.h | 8 -- fftools/ffmpeg_mux.c | 184 ++++++++++++-------------------------- fftools/ffmpeg_mux.h | 14 ++- fftools/ffmpeg_mux_init.c | 33 +++---- fftools/ffmpeg_opt.c | 6 +- 6 files changed, 80 insertions(+), 191 deletions(-) diff --git a/fftools/ffmpeg.c b/fftools/ffmpeg.c index 69e73b84ed..e82d88b3e0 100644 --- a/fftools/ffmpeg.c +++ b/fftools/ffmpeg.c @@ -806,38 +806,12 @@ int trigger_fix_sub_duration_heartbeat(OutputStream *ost, const AVPacket *pkt) static int process_input_packet(InputStream *ist, const AVPacket *pkt, int no_eof) { InputFile *f = input_files[ist->file_index]; - int64_t dts_est = AV_NOPTS_VALUE; int ret = 0; int eof_reached = 0; if (ret == AVERROR_EOF || (!pkt && !ist->decoding_needed)) eof_reached = 1; - if (pkt && pkt->opaque_ref) { - DemuxPktData *pd = (DemuxPktData*)pkt->opaque_ref->data; - dts_est = pd->dts_est; - } - - if (f->recording_time != INT64_MAX) { - int64_t start_time = 0; - if (copy_ts) { - start_time += f->start_time != AV_NOPTS_VALUE ? f->start_time : 0; - start_time += start_at_zero ? 0 : f->start_time_effective; - } - if (dts_est >= f->recording_time + start_time) - pkt = NULL; - } - - for (int oidx = 0; oidx < ist->nb_outputs; oidx++) { - OutputStream *ost = ist->outputs[oidx]; - if (ost->enc || (!pkt && no_eof)) - continue; - - ret = of_streamcopy(ost, pkt, dts_est); - if (ret < 0) - return ret; - } - return !eof_reached; } diff --git a/fftools/ffmpeg.h b/fftools/ffmpeg.h index 66b5e398bc..e3aea75415 100644 --- a/fftools/ffmpeg.h +++ b/fftools/ffmpeg.h @@ -665,7 +665,6 @@ extern FilterGraph **filtergraphs; extern int nb_filtergraphs; extern char *vstats_filename; -extern char *sdp_filename; extern float dts_delta_threshold; extern float dts_error_threshold; @@ -813,13 +812,6 @@ void of_free(OutputFile **pof); void of_enc_stats_close(void); -int of_output_packet(OutputFile *of, OutputStream *ost, AVPacket *pkt); - -/** - * @param dts predicted packet dts in AV_TIME_BASE_Q - */ -int of_streamcopy(OutputStream *ost, const AVPacket *pkt, int64_t dts); - int64_t of_filesize(OutputFile *of); int ifile_open(const OptionsContext *o, const char *filename, Scheduler *sch); diff --git a/fftools/ffmpeg_mux.c b/fftools/ffmpeg_mux.c index 9628728d95..ba90079266 100644 --- a/fftools/ffmpeg_mux.c +++ b/fftools/ffmpeg_mux.c @@ -22,16 +22,13 @@ #include "ffmpeg.h" #include "ffmpeg_mux.h" -#include "objpool.h" #include "sync_queue.h" -#include "thread_queue.h" #include "libavutil/fifo.h" #include "libavutil/intreadwrite.h" #include "libavutil/log.h" #include "libavutil/mem.h" #include "libavutil/timestamp.h" -#include "libavutil/thread.h" #include "libavcodec/packet.h" @@ -42,8 +39,6 @@ typedef struct MuxThreadContext { AVPacket *pkt; } MuxThreadContext; -int want_sdp = 1; - static Muxer *mux_from_of(OutputFile *of) { return (Muxer*)of; @@ -254,19 +249,22 @@ void *muxer_thread(void *arg) OutputStream *ost; int stream_idx, stream_eof = 0; - ret = tq_receive(mux->tq, &stream_idx, mt.pkt); + ret = sch_mux_receive(mux->sch, of->index, mt.pkt); + stream_idx = mt.pkt->stream_index; if (stream_idx < 0) { av_log(mux, AV_LOG_VERBOSE, "All streams finished\n"); ret = 0; break; } - ost = of->streams[stream_idx]; + ost = of->streams[mux->sch_stream_idx[stream_idx]]; + mt.pkt->stream_index = ost->index; + ret = sync_queue_process(mux, ost, ret < 0 ? NULL : mt.pkt, &stream_eof); av_packet_unref(mt.pkt); if (ret == AVERROR_EOF) { if (stream_eof) { - tq_receive_finish(mux->tq, stream_idx); + sch_mux_receive_finish(mux->sch, of->index, stream_idx); } else { av_log(mux, AV_LOG_VERBOSE, "Muxer returned EOF\n"); ret = 0; @@ -278,17 +276,19 @@ void *muxer_thread(void *arg) } } + // XXX move av_write_trailer() here? + finish: mux_thread_uninit(&mt); - for (unsigned int i = 0; i < mux->fc->nb_streams; i++) - tq_receive_finish(mux->tq, i); + sch_mux_receive_finish(mux->sch, of->index, -1); av_log(mux, AV_LOG_VERBOSE, "Terminating muxer thread\n"); return (void*)(intptr_t)ret; } +#if 0 static int thread_submit_packet(Muxer *mux, OutputStream *ost, AVPacket *pkt) { int ret = 0; @@ -296,7 +296,8 @@ static int thread_submit_packet(Muxer *mux, OutputStream *ost, AVPacket *pkt) if (!pkt || ost->finished & MUXER_FINISHED) goto finish; - ret = tq_send(mux->tq, ost->index, pkt); + // XXX + //ret = tq_send(mux->tq, ost->index, pkt); if (ret < 0) goto finish; @@ -306,8 +307,9 @@ finish: if (pkt) av_packet_unref(pkt); + // XXX ost->finished |= MUXER_FINISHED; - tq_send_finish(mux->tq, ost->index); + //tq_send_finish(mux->tq, ost->index); return ret == AVERROR_EOF ? 0 : ret; } @@ -428,57 +430,48 @@ fail: av_log(ost, AV_LOG_ERROR, "Error %s\n", err_msg); return exit_on_error ? ret : 0; } +#endif -int of_streamcopy(OutputStream *ost, const AVPacket *pkt, int64_t dts) +static int of_streamcopy(OutputStream *ost, AVPacket *pkt, int64_t dts) { OutputFile *of = output_files[ost->file_index]; MuxStream *ms = ms_from_ost(ost); int64_t start_time = (of->start_time == AV_NOPTS_VALUE) ? 0 : of->start_time; int64_t ts_offset; - AVPacket *opkt = ms->pkt; - int ret; - - av_packet_unref(opkt); if (of->recording_time != INT64_MAX && dts >= of->recording_time + start_time) - pkt = NULL; - - // EOF: flush output bitstream filters. - if (!pkt) - return of_output_packet(of, ost, NULL); + return AVERROR_EOF; if (!ms->streamcopy_started && !(pkt->flags & AV_PKT_FLAG_KEY) && !ms->copy_initial_nonkeyframes) - return 0; + return AVERROR(EAGAIN); if (!ms->streamcopy_started) { if (!ms->copy_prior_start && (pkt->pts == AV_NOPTS_VALUE ? dts < ms->ts_copy_start : pkt->pts < av_rescale_q(ms->ts_copy_start, AV_TIME_BASE_Q, pkt->time_base))) - return 0; + return AVERROR(EAGAIN); if (of->start_time != AV_NOPTS_VALUE && dts < of->start_time) - return 0; + return AVERROR(EAGAIN); } - ret = av_packet_ref(opkt, pkt); - if (ret < 0) - return ret; - - ts_offset = av_rescale_q(start_time, AV_TIME_BASE_Q, opkt->time_base); + ts_offset = av_rescale_q(start_time, AV_TIME_BASE_Q, pkt->time_base); if (pkt->pts != AV_NOPTS_VALUE) - opkt->pts -= ts_offset; + pkt->pts -= ts_offset; if (pkt->dts == AV_NOPTS_VALUE) { - opkt->dts = av_rescale_q(dts, AV_TIME_BASE_Q, opkt->time_base); + pkt->dts = av_rescale_q(dts, AV_TIME_BASE_Q, pkt->time_base); } else if (ost->st->codecpar->codec_type == AVMEDIA_TYPE_AUDIO) { - opkt->pts = opkt->dts - ts_offset; + pkt->pts = pkt->dts - ts_offset; } - opkt->dts -= ts_offset; + pkt->dts -= ts_offset; + // XXX +#if 0 { int ret = trigger_fix_sub_duration_heartbeat(ost, pkt); if (ret < 0) { @@ -488,73 +481,42 @@ int of_streamcopy(OutputStream *ost, const AVPacket *pkt, int64_t dts) return ret; } } - - ret = of_output_packet(of, ost, opkt); - if (ret < 0) - return ret; +#endif ms->streamcopy_started = 1; return 0; } -static int thread_stop(Muxer *mux) +int of_streamcopy_hook(void *opaque, void *item) { - void *ret; + OutputStream *ost = opaque; + const InputStream *ist = ost->ist; + const InputFile *f = input_files[ist->file_index]; - if (!mux || !mux->tq) - return 0; + AVPacket *pkt = item; + int64_t dts_est = AV_NOPTS_VALUE; - for (unsigned int i = 0; i < mux->fc->nb_streams; i++) - tq_send_finish(mux->tq, i); + // nothing for us to do about seek signals + if ((intptr_t)pkt->opaque == PKT_OPAQUE_SEEK) + return AVERROR(EAGAIN); - pthread_join(mux->thread, &ret); - - tq_free(&mux->tq); - - return (int)(intptr_t)ret; -} - -static int thread_start(Muxer *mux) -{ - AVFormatContext *fc = mux->fc; - ObjPool *op; - int ret; - - op = objpool_alloc_packets(); - if (!op) - return AVERROR(ENOMEM); - - mux->tq = tq_alloc(fc->nb_streams, mux->thread_queue_size, op, pkt_move); - if (!mux->tq) { - objpool_free(&op); - return AVERROR(ENOMEM); + if (pkt->opaque_ref) { + DemuxPktData *pd = (DemuxPktData*)pkt->opaque_ref->data; + dts_est = pd->dts_est; } - ret = pthread_create(&mux->thread, NULL, muxer_thread, (void*)mux); - if (ret) { - tq_free(&mux->tq); - return AVERROR(ret); - } - - /* flush the muxing queues */ - for (int i = 0; i < fc->nb_streams; i++) { - OutputStream *ost = mux->of.streams[i]; - MuxStream *ms = ms_from_ost(ost); - AVPacket *pkt; - - while (av_fifo_read(ms->muxing_queue, &pkt, 1) >= 0) { - ret = thread_submit_packet(mux, ost, pkt); - if (pkt) { - ms->muxing_queue_data_size -= pkt->size; - av_packet_free(&pkt); - } - if (ret < 0) - return ret; + if (f->recording_time != INT64_MAX) { + int64_t start_time = 0; + if (copy_ts) { + start_time += f->start_time != AV_NOPTS_VALUE ? f->start_time : 0; + start_time += start_at_zero ? 0 : f->start_time_effective; } + if (dts_est >= f->recording_time + start_time) + return AVERROR_EOF; } - return 0; + return of_streamcopy(ost, pkt, dts_est); } int print_sdp(const char *filename) @@ -565,11 +527,6 @@ int print_sdp(const char *filename) AVIOContext *sdp_pb; AVFormatContext **avc; - for (i = 0; i < nb_output_files; i++) { - if (!mux_from_of(output_files[i])->header_written) - return 0; - } - avc = av_malloc_array(nb_output_files, sizeof(*avc)); if (!avc) return AVERROR(ENOMEM); @@ -604,25 +561,17 @@ int print_sdp(const char *filename) avio_closep(&sdp_pb); } - // SDP successfully written, allow muxer threads to start - ret = 1; - fail: av_freep(&avc); return ret; } -int mux_check_init(Muxer *mux) +int mux_check_init(void *arg) { + Muxer *mux = arg; OutputFile *of = &mux->of; AVFormatContext *fc = mux->fc; - int ret, i; - - for (i = 0; i < fc->nb_streams; i++) { - OutputStream *ost = of->streams[i]; - if (!ost->initialized) - return 0; - } + int ret; ret = avformat_write_header(fc, &mux->opts); if (ret < 0) { @@ -636,26 +585,6 @@ int mux_check_init(Muxer *mux) av_dump_format(fc, of->index, fc->url, 1); nb_output_dumped++; - if (sdp_filename || want_sdp) { - ret = print_sdp(sdp_filename); - if (ret < 0) { - av_log(NULL, AV_LOG_ERROR, "Error writing the SDP.\n"); - return ret; - } else if (ret == 1) { - /* SDP is written only after all the muxers are ready, so now we - * start ALL the threads */ - for (i = 0; i < nb_output_files; i++) { - ret = thread_start(mux_from_of(output_files[i])); - if (ret < 0) - return ret; - } - } - } else { - ret = thread_start(mux_from_of(of)); - if (ret < 0) - return ret; - } - return 0; } @@ -711,9 +640,10 @@ int of_stream_init(OutputFile *of, OutputStream *ost) ost->st->time_base); } - ost->initialized = 1; + if (ms->sched_idx >= 0) + return sch_mux_stream_ready(mux->sch, of->index, ms->sched_idx); - return mux_check_init(mux); + return 0; } static int check_written(OutputFile *of) @@ -827,15 +757,13 @@ int of_write_trailer(OutputFile *of) AVFormatContext *fc = mux->fc; int ret, mux_result = 0; - if (!mux->tq) { + if (!mux->header_written) { av_log(mux, AV_LOG_ERROR, "Nothing was written into output file, because " "at least one of its streams received no packets.\n"); return AVERROR(EINVAL); } - mux_result = thread_stop(mux); - ret = av_write_trailer(fc); if (ret < 0) { av_log(mux, AV_LOG_ERROR, "Error writing trailer: %s\n", av_err2str(ret)); @@ -951,8 +879,6 @@ void of_free(OutputFile **pof) return; mux = mux_from_of(of); - thread_stop(mux); - sq_free(&of->sq_encode); sq_free(&mux->sq_mux); diff --git a/fftools/ffmpeg_mux.h b/fftools/ffmpeg_mux.h index d5aba6db36..311b612018 100644 --- a/fftools/ffmpeg_mux.h +++ b/fftools/ffmpeg_mux.h @@ -25,7 +25,6 @@ #include #include "ffmpeg_sched.h" -#include "thread_queue.h" #include "libavformat/avformat.h" @@ -33,7 +32,6 @@ #include "libavutil/dict.h" #include "libavutil/fifo.h" -#include "libavutil/thread.h" typedef struct MuxStream { OutputStream ost; @@ -104,9 +102,6 @@ typedef struct Muxer { int *sch_stream_idx; int nb_sch_stream_idx; - pthread_t thread; - ThreadQueue *tq; - AVDictionary *opts; int thread_queue_size; @@ -120,14 +115,15 @@ typedef struct Muxer { AVPacket *sq_pkt; } Muxer; -/* whether we want to print an SDP, set in of_open() */ -extern int want_sdp; - -int mux_check_init(Muxer *mux); +int mux_check_init(void *arg); static MuxStream *ms_from_ost(OutputStream *ost) { return (MuxStream*)ost; } +/* XXX explain why this is needed + * (so it's called in the sending thread rather than the muxer) */ +int of_streamcopy_hook(void *opaque, void *item); + #endif /* FFTOOLS_FFMPEG_MUX_H */ diff --git a/fftools/ffmpeg_mux_init.c b/fftools/ffmpeg_mux_init.c index 9fb6a74393..122a7344e4 100644 --- a/fftools/ffmpeg_mux_init.c +++ b/fftools/ffmpeg_mux_init.c @@ -1375,7 +1375,7 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type, MATCH_PER_STREAM_OPT(bitstream_filters, str, bsfs, oc, st); if (bsfs && *bsfs) { - ret = av_bsf_list_parse_str(bsfs, &ms->bsf_ctx); + ret = sch_add_mux_stream_bsf(mux->sch, mux->of.index, ost->index, bsfs); if (ret < 0) { av_log(ost, AV_LOG_ERROR, "Error parsing bitstream filter sequence '%s': %s\n", bsfs, av_err2str(ret)); return ret; @@ -1486,7 +1486,8 @@ static int ost_add(Muxer *mux, const OptionsContext *o, enum AVMediaType type, return ret; } else { ret = sch_connect(mux->sch, SCH_DSTREAM(ost->ist->file_index, sched_idx), - SCH_MSTREAM(ost->file_index, ms->sched_idx), NULL, NULL); + SCH_MSTREAM(ost->file_index, ms->sched_idx), + of_streamcopy_hook, ost); if (ret < 0) return ret; } @@ -1931,11 +1932,17 @@ static int setup_sync_queues(Muxer *mux, AVFormatContext *oc, int64_t buf_size_u * - at least one encoded audio/video stream is frame-limited, since * that has similar semantics to 'shortest' * - at least one audio encoder requires constant frame sizes + * + * Note that encoding sync queues are handled in the scheduler, because + * different encoders run in different threads and need external + * synchronization, while muxer sync queues can be handled inside the muxer */ if ((of->shortest && nb_av_enc > 1) || limit_frames_av_enc || nb_audio_fs) { - of->sq_encode = sq_alloc(SYNC_QUEUE_FRAMES, buf_size_us, mux); - if (!of->sq_encode) - return AVERROR(ENOMEM); + int sq_idx, ret; + + sq_idx = sch_add_sq_enc(mux->sch, buf_size_us, mux); + if (sq_idx < 0) + return sq_idx; for (int i = 0; i < oc->nb_streams; i++) { OutputStream *ost = of->streams[i]; @@ -1945,13 +1952,11 @@ static int setup_sync_queues(Muxer *mux, AVFormatContext *oc, int64_t buf_size_u if (!IS_AV_ENC(ost, type)) continue; - ost->sq_idx_encode = sq_add_stream(of->sq_encode, - of->shortest || ms->max_frames < INT64_MAX); - if (ost->sq_idx_encode < 0) - return ost->sq_idx_encode; - - if (ms->max_frames != INT64_MAX) - sq_limit_frames(of->sq_encode, ost->sq_idx_encode, ms->max_frames); + ret = sch_sq_add_enc(mux->sch, sq_idx, ms->sched_idx_enc, + of->shortest || ms->max_frames < INT64_MAX, + ms->max_frames); + if (ret < 0) + return ret; } } @@ -2704,8 +2709,6 @@ int of_open(const OptionsContext *o, const char *filename, Scheduler *sch) av_strlcat(mux->log_name, "/", sizeof(mux->log_name)); av_strlcat(mux->log_name, oc->oformat->name, sizeof(mux->log_name)); - if (strcmp(oc->oformat->name, "rtp")) - want_sdp = 0; of->format = oc->oformat; if (recording_time != INT64_MAX) @@ -2721,7 +2724,7 @@ int of_open(const OptionsContext *o, const char *filename, Scheduler *sch) AVFMT_FLAG_BITEXACT); } - err = sch_add_mux(sch, muxer_thread, NULL, mux, + err = sch_add_mux(sch, muxer_thread, mux_check_init, mux, !strcmp(oc->oformat->name, "rtp")); if (err < 0) return err; diff --git a/fftools/ffmpeg_opt.c b/fftools/ffmpeg_opt.c index d463306546..6177a96a4e 100644 --- a/fftools/ffmpeg_opt.c +++ b/fftools/ffmpeg_opt.c @@ -64,7 +64,6 @@ const char *const opt_name_top_field_first[] = {"top", NULL}; HWDevice *filter_hw_device; char *vstats_filename; -char *sdp_filename; float audio_drift_threshold = 0.1; float dts_delta_threshold = 10; @@ -580,9 +579,8 @@ fail: static int opt_sdp_file(void *optctx, const char *opt, const char *arg) { - av_free(sdp_filename); - sdp_filename = av_strdup(arg); - return 0; + Scheduler *sch = optctx; + return sch_sdp_filename(sch, arg); } #if CONFIG_VAAPI -- 2.40.1 _______________________________________________ ffmpeg-devel mailing list ffmpeg-devel@ffmpeg.org https://ffmpeg.org/mailman/listinfo/ffmpeg-devel To unsubscribe, visit link above, or email ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".