* [FFmpeg-devel] [PATCH 1/2] fftools/ffmpeg_sched: move sch_stop() to the bottom of the file
@ 2024-03-27 10:21 Anton Khirnov
2024-03-27 10:21 ` [FFmpeg-devel] [PATCH 2/2] fftools/ffmpeg_sched: make sure to always run task cleanup Anton Khirnov
0 siblings, 1 reply; 2+ messages in thread
From: Anton Khirnov @ 2024-03-27 10:21 UTC (permalink / raw)
To: ffmpeg-devel
Will allow avoiding forward declarations in following commits.
---
fftools/ffmpeg_sched.c | 138 ++++++++++++++++++++---------------------
1 file changed, 69 insertions(+), 69 deletions(-)
diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c
index ec88017e21..67c32fb5a0 100644
--- a/fftools/ffmpeg_sched.c
+++ b/fftools/ffmpeg_sched.c
@@ -399,22 +399,6 @@ static int queue_alloc(ThreadQueue **ptq, unsigned nb_streams, unsigned queue_si
static void *task_wrapper(void *arg);
-static int task_stop(SchTask *task)
-{
- int ret;
- void *thread_ret;
-
- if (!task->thread_running)
- return 0;
-
- ret = pthread_join(task->thread, &thread_ret);
- av_assert0(ret == 0);
-
- task->thread_running = 0;
-
- return (intptr_t)thread_ret;
-}
-
static int task_start(SchTask *task)
{
int ret;
@@ -468,59 +452,6 @@ static int64_t trailing_dts(const Scheduler *sch, int count_finished)
return min_dts == INT64_MAX ? AV_NOPTS_VALUE : min_dts;
}
-int sch_stop(Scheduler *sch, int64_t *finish_ts)
-{
- int ret = 0, err;
-
- atomic_store(&sch->terminate, 1);
-
- for (unsigned type = 0; type < 2; type++)
- for (unsigned i = 0; i < (type ? sch->nb_demux : sch->nb_filters); i++) {
- SchWaiter *w = type ? &sch->demux[i].waiter : &sch->filters[i].waiter;
- waiter_set(w, 1);
- }
-
- for (unsigned i = 0; i < sch->nb_demux; i++) {
- SchDemux *d = &sch->demux[i];
-
- err = task_stop(&d->task);
- ret = err_merge(ret, err);
- }
-
- for (unsigned i = 0; i < sch->nb_dec; i++) {
- SchDec *dec = &sch->dec[i];
-
- err = task_stop(&dec->task);
- ret = err_merge(ret, err);
- }
-
- for (unsigned i = 0; i < sch->nb_filters; i++) {
- SchFilterGraph *fg = &sch->filters[i];
-
- err = task_stop(&fg->task);
- ret = err_merge(ret, err);
- }
-
- for (unsigned i = 0; i < sch->nb_enc; i++) {
- SchEnc *enc = &sch->enc[i];
-
- err = task_stop(&enc->task);
- ret = err_merge(ret, err);
- }
-
- for (unsigned i = 0; i < sch->nb_mux; i++) {
- SchMux *mux = &sch->mux[i];
-
- err = task_stop(&mux->task);
- ret = err_merge(ret, err);
- }
-
- if (finish_ts)
- *finish_ts = trailing_dts(sch, 1);
-
- return ret;
-}
-
void sch_free(Scheduler **psch)
{
Scheduler *sch = *psch;
@@ -2518,3 +2449,72 @@ static void *task_wrapper(void *arg)
return (void*)(intptr_t)ret;
}
+
+static int task_stop(SchTask *task)
+{
+ int ret;
+ void *thread_ret;
+
+ if (!task->thread_running)
+ return 0;
+
+ ret = pthread_join(task->thread, &thread_ret);
+ av_assert0(ret == 0);
+
+ task->thread_running = 0;
+
+ return (intptr_t)thread_ret;
+}
+
+int sch_stop(Scheduler *sch, int64_t *finish_ts)
+{
+ int ret = 0, err;
+
+ atomic_store(&sch->terminate, 1);
+
+ for (unsigned type = 0; type < 2; type++)
+ for (unsigned i = 0; i < (type ? sch->nb_demux : sch->nb_filters); i++) {
+ SchWaiter *w = type ? &sch->demux[i].waiter : &sch->filters[i].waiter;
+ waiter_set(w, 1);
+ }
+
+ for (unsigned i = 0; i < sch->nb_demux; i++) {
+ SchDemux *d = &sch->demux[i];
+
+ err = task_stop(&d->task);
+ ret = err_merge(ret, err);
+ }
+
+ for (unsigned i = 0; i < sch->nb_dec; i++) {
+ SchDec *dec = &sch->dec[i];
+
+ err = task_stop(&dec->task);
+ ret = err_merge(ret, err);
+ }
+
+ for (unsigned i = 0; i < sch->nb_filters; i++) {
+ SchFilterGraph *fg = &sch->filters[i];
+
+ err = task_stop(&fg->task);
+ ret = err_merge(ret, err);
+ }
+
+ for (unsigned i = 0; i < sch->nb_enc; i++) {
+ SchEnc *enc = &sch->enc[i];
+
+ err = task_stop(&enc->task);
+ ret = err_merge(ret, err);
+ }
+
+ for (unsigned i = 0; i < sch->nb_mux; i++) {
+ SchMux *mux = &sch->mux[i];
+
+ err = task_stop(&mux->task);
+ ret = err_merge(ret, err);
+ }
+
+ if (finish_ts)
+ *finish_ts = trailing_dts(sch, 1);
+
+ return ret;
+}
--
2.43.0
_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
To unsubscribe, visit link above, or email
ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
^ permalink raw reply [flat|nested] 2+ messages in thread
* [FFmpeg-devel] [PATCH 2/2] fftools/ffmpeg_sched: make sure to always run task cleanup
2024-03-27 10:21 [FFmpeg-devel] [PATCH 1/2] fftools/ffmpeg_sched: move sch_stop() to the bottom of the file Anton Khirnov
@ 2024-03-27 10:21 ` Anton Khirnov
0 siblings, 0 replies; 2+ messages in thread
From: Anton Khirnov @ 2024-03-27 10:21 UTC (permalink / raw)
To: ffmpeg-devel
Even in cases where the task failed to start due to pthread_create()
failing.
---
fftools/ffmpeg_sched.c | 68 +++++++++++++++++++++++++++---------------
1 file changed, 44 insertions(+), 24 deletions(-)
diff --git a/fftools/ffmpeg_sched.c b/fftools/ffmpeg_sched.c
index 67c32fb5a0..ee3af45908 100644
--- a/fftools/ffmpeg_sched.c
+++ b/fftools/ffmpeg_sched.c
@@ -260,6 +260,12 @@ typedef struct SchFilterGraph {
int task_exited;
} SchFilterGraph;
+enum SchedulerState {
+ SCH_STATE_UNINIT,
+ SCH_STATE_STARTED,
+ SCH_STATE_STOPPED,
+};
+
struct Scheduler {
const AVClass *class;
@@ -292,7 +298,7 @@ struct Scheduler {
char *sdp_filename;
int sdp_auto;
- int transcode_started;
+ enum SchedulerState state;
atomic_int terminate;
atomic_int task_failed;
@@ -1144,7 +1150,8 @@ int sch_mux_stream_ready(Scheduler *sch, unsigned mux_idx, unsigned stream_idx)
// this may be called during initialization - do not start
// threads before sch_start() is called
- if (++mux->nb_streams_ready == mux->nb_streams && sch->transcode_started)
+ if (++mux->nb_streams_ready == mux->nb_streams &&
+ sch->state >= SCH_STATE_STARTED)
ret = mux_init(sch, mux);
pthread_mutex_unlock(&sch->mux_ready_lock);
@@ -1514,7 +1521,8 @@ int sch_start(Scheduler *sch)
if (ret < 0)
return ret;
- sch->transcode_started = 1;
+ av_assert0(sch->state == SCH_STATE_UNINIT);
+ sch->state = SCH_STATE_STARTED;
for (unsigned i = 0; i < sch->nb_mux; i++) {
SchMux *mux = &sch->mux[i];
@@ -1522,7 +1530,7 @@ int sch_start(Scheduler *sch)
if (mux->nb_streams_ready == mux->nb_streams) {
ret = mux_init(sch, mux);
if (ret < 0)
- return ret;
+ goto fail;
}
}
@@ -1531,7 +1539,7 @@ int sch_start(Scheduler *sch)
ret = task_start(&enc->task);
if (ret < 0)
- return ret;
+ goto fail;
}
for (unsigned i = 0; i < sch->nb_filters; i++) {
@@ -1539,7 +1547,7 @@ int sch_start(Scheduler *sch)
ret = task_start(&fg->task);
if (ret < 0)
- return ret;
+ goto fail;
}
for (unsigned i = 0; i < sch->nb_dec; i++) {
@@ -1547,7 +1555,7 @@ int sch_start(Scheduler *sch)
ret = task_start(&dec->task);
if (ret < 0)
- return ret;
+ goto fail;
}
for (unsigned i = 0; i < sch->nb_demux; i++) {
@@ -1558,7 +1566,7 @@ int sch_start(Scheduler *sch)
ret = task_start(&d->task);
if (ret < 0)
- return ret;
+ goto fail;
}
pthread_mutex_lock(&sch->schedule_lock);
@@ -1566,6 +1574,9 @@ int sch_start(Scheduler *sch)
pthread_mutex_unlock(&sch->schedule_lock);
return 0;
+fail:
+ sch_stop(sch, NULL);
+ return ret;
}
int sch_wait(Scheduler *sch, uint64_t timeout_us, int64_t *transcode_ts)
@@ -2414,6 +2425,18 @@ int sch_filter_command(Scheduler *sch, unsigned fg_idx, AVFrame *frame)
return send_to_filter(sch, fg, fg->nb_inputs, frame);
}
+static int task_cleanup(Scheduler *sch, SchedulerNode node)
+{
+ switch (node.type) {
+ case SCH_NODE_TYPE_DEMUX: return demux_done (sch, node.idx);
+ case SCH_NODE_TYPE_MUX: return mux_done (sch, node.idx);
+ case SCH_NODE_TYPE_DEC: return dec_done (sch, node.idx);
+ case SCH_NODE_TYPE_ENC: return enc_done (sch, node.idx);
+ case SCH_NODE_TYPE_FILTER_IN: return filter_done(sch, node.idx);
+ default: av_assert0(0);
+ }
+}
+
static void *task_wrapper(void *arg)
{
SchTask *task = arg;
@@ -2426,15 +2449,7 @@ static void *task_wrapper(void *arg)
av_log(task->func_arg, AV_LOG_ERROR,
"Task finished with error code: %d (%s)\n", ret, av_err2str(ret));
- switch (task->node.type) {
- case SCH_NODE_TYPE_DEMUX: err = demux_done (sch, task->node.idx); break;
- case SCH_NODE_TYPE_MUX: err = mux_done (sch, task->node.idx); break;
- case SCH_NODE_TYPE_DEC: err = dec_done (sch, task->node.idx); break;
- case SCH_NODE_TYPE_ENC: err = enc_done (sch, task->node.idx); break;
- case SCH_NODE_TYPE_FILTER_IN: err = filter_done(sch, task->node.idx); break;
- default: av_assert0(0);
- }
-
+ err = task_cleanup(sch, task->node);
ret = err_merge(ret, err);
// EOF is considered normal termination
@@ -2450,13 +2465,13 @@ static void *task_wrapper(void *arg)
return (void*)(intptr_t)ret;
}
-static int task_stop(SchTask *task)
+static int task_stop(Scheduler *sch, SchTask *task)
{
int ret;
void *thread_ret;
if (!task->thread_running)
- return 0;
+ return task_cleanup(sch, task->node);
ret = pthread_join(task->thread, &thread_ret);
av_assert0(ret == 0);
@@ -2470,6 +2485,9 @@ int sch_stop(Scheduler *sch, int64_t *finish_ts)
{
int ret = 0, err;
+ if (sch->state != SCH_STATE_STARTED)
+ return 0;
+
atomic_store(&sch->terminate, 1);
for (unsigned type = 0; type < 2; type++)
@@ -2481,40 +2499,42 @@ int sch_stop(Scheduler *sch, int64_t *finish_ts)
for (unsigned i = 0; i < sch->nb_demux; i++) {
SchDemux *d = &sch->demux[i];
- err = task_stop(&d->task);
+ err = task_stop(sch, &d->task);
ret = err_merge(ret, err);
}
for (unsigned i = 0; i < sch->nb_dec; i++) {
SchDec *dec = &sch->dec[i];
- err = task_stop(&dec->task);
+ err = task_stop(sch, &dec->task);
ret = err_merge(ret, err);
}
for (unsigned i = 0; i < sch->nb_filters; i++) {
SchFilterGraph *fg = &sch->filters[i];
- err = task_stop(&fg->task);
+ err = task_stop(sch, &fg->task);
ret = err_merge(ret, err);
}
for (unsigned i = 0; i < sch->nb_enc; i++) {
SchEnc *enc = &sch->enc[i];
- err = task_stop(&enc->task);
+ err = task_stop(sch, &enc->task);
ret = err_merge(ret, err);
}
for (unsigned i = 0; i < sch->nb_mux; i++) {
SchMux *mux = &sch->mux[i];
- err = task_stop(&mux->task);
+ err = task_stop(sch, &mux->task);
ret = err_merge(ret, err);
}
if (finish_ts)
*finish_ts = trailing_dts(sch, 1);
+ sch->state = SCH_STATE_STOPPED;
+
return ret;
}
--
2.43.0
_______________________________________________
ffmpeg-devel mailing list
ffmpeg-devel@ffmpeg.org
https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
To unsubscribe, visit link above, or email
ffmpeg-devel-request@ffmpeg.org with subject "unsubscribe".
^ permalink raw reply [flat|nested] 2+ messages in thread
end of thread, other threads:[~2024-03-27 10:21 UTC | newest]
Thread overview: 2+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2024-03-27 10:21 [FFmpeg-devel] [PATCH 1/2] fftools/ffmpeg_sched: move sch_stop() to the bottom of the file Anton Khirnov
2024-03-27 10:21 ` [FFmpeg-devel] [PATCH 2/2] fftools/ffmpeg_sched: make sure to always run task cleanup Anton Khirnov
Git Inbox Mirror of the ffmpeg-devel mailing list - see https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
This inbox may be cloned and mirrored by anyone:
git clone --mirror https://master.gitmailbox.com/ffmpegdev/0 ffmpegdev/git/0.git
# If you have public-inbox 1.1+ installed, you may
# initialize and index your mirror using the following commands:
public-inbox-init -V2 ffmpegdev ffmpegdev/ https://master.gitmailbox.com/ffmpegdev \
ffmpegdev@gitmailbox.com
public-inbox-index ffmpegdev
Example config snippet for mirrors.
AGPL code for this site: git clone https://public-inbox.org/public-inbox.git