From: Raja-89 via ffmpeg-devel <ffmpeg-devel@ffmpeg.org>
To: ffmpeg-devel@ffmpeg.org
Cc: Raja-89 <code@ffmpeg.org>
Subject: [FFmpeg-devel] [PR] avfilter/dnn: implement asynchronous execution for LibTorch backend (PR #21747)
Date: Fri, 13 Feb 2026 17:17:23 -0000
Message-ID: <177100304402.25.3463696960615278331@009cbcb3d8cd> (raw)
PR #21747 opened by Raja-89
URL: https://code.ffmpeg.org/FFmpeg/FFmpeg/pulls/21747
Patch URL: https://code.ffmpeg.org/FFmpeg/FFmpeg/pulls/21747.patch
### Overview
This patch implements the asynchronous execution path for the LibTorch backend using FFmpeg's common DNN framework (DNNAsyncExecModule). This allows the dnn_processing filter to run inference without blocking the main filtergraph execution.
Key Changes
- Asynchronous Integration: Properly utilizes ff_dnn_start_inference_async to offload inference to the common DNN thread pool.
- Persistent Buffer Management: Implemented persistent input/output buffers in THInferRequest. This eliminates per-frame memory allocation/deallocation overhead, significantly improving performance.
- Device Management: Added explicit checks and synchronization to ensure tensors are moved to the correct device (CPU/XPU) before and after inference.
- Refined Error Handling: Standardized return codes using AVERROR and simplified logic flow by removing redundant goto statements.
### Style & Cleanliness
- Strict Formatting: Adheres to FFmpeg’s 4-space indentation standard and avoids redundant braces.
- Minimal Diff Noise: Re-submitted from a clean branch to ensure only essential changes are included, addressing previous reviewer concerns regarding formatting noise.
- Tool Verification: Successfully passed tools/patcheck.
### Testing Conducted
- Verified with testsrc using both async=1 (asynchronous) and async=0 (synchronous) modes.
- Build confirmed on Ubuntu with gcc 15 and LibTorch v2.x.
- Verified backend registration via ./ffmpeg -h filter=dnn_processing.
From 18aa887cdc38b888b7b1e9c5a8887e534e9c8de3 Mon Sep 17 00:00:00 2001
From: Raja Rathour <imraja729@gmail.com>
Date: Fri, 13 Feb 2026 22:43:40 +0530
Subject: [PATCH] avfilter/dnn: implement async execution for libtorch backend
---
Changelog | 2 +
libavfilter/dnn/dnn_backend_common.h | 1 +
libavfilter/dnn/dnn_backend_torch.cpp | 334 ++++++++++----------------
libavfilter/vf_dnn_processing.c | 3 +
4 files changed, 132 insertions(+), 208 deletions(-)
diff --git a/Changelog b/Changelog
index a9d68b369e..f24c7d4b9e 100644
--- a/Changelog
+++ b/Changelog
@@ -2256,3 +2256,5 @@ version 0.3.1: added avi/divx support
version 0.3: initial public release
+
+- * libavfilter/dnn: asynchronous execution support for LibTorch backend
\ No newline at end of file
diff --git a/libavfilter/dnn/dnn_backend_common.h b/libavfilter/dnn/dnn_backend_common.h
index 9f5d37b3e0..803a66a5f6 100644
--- a/libavfilter/dnn/dnn_backend_common.h
+++ b/libavfilter/dnn/dnn_backend_common.h
@@ -156,5 +156,6 @@ DNNAsyncStatusType ff_dnn_get_result_common(Queue *task_queue, AVFrame **in, AVF
* @returns 0 if successful or error code otherwise.
*/
int ff_dnn_fill_gettingoutput_task(TaskItem *task, DNNExecBaseParams *exec_params, void *backend_model, int input_height, int input_width, void *ctx);
+int ff_dnn_async_module_submit(DNNAsyncExecModule *async_module);
#endif
diff --git a/libavfilter/dnn/dnn_backend_torch.cpp b/libavfilter/dnn/dnn_backend_torch.cpp
index d3c4966c09..6c005353b0 100644
--- a/libavfilter/dnn/dnn_backend_torch.cpp
+++ b/libavfilter/dnn/dnn_backend_torch.cpp
@@ -25,20 +25,24 @@
#include <torch/torch.h>
#include <torch/script.h>
-#include <thread>
-#include <mutex>
-#include <condition_variable>
-#include <atomic>
extern "C" {
-#include "dnn_io_proc.h"
-#include "dnn_backend_common.h"
#include "libavutil/opt.h"
#include "libavutil/mem.h"
+#include "libavutil/avassert.h"
+#include "../dnn_interface.h"
+#include "dnn_backend_common.h"
+#include "dnn_io_proc.h"
#include "queue.h"
#include "safe_queue.h"
}
+static int get_input_th(DNNModel *model, DNNData *input, const char *input_name);
+static int get_output_th(DNNModel *model, const char *input_name, int input_width, int input_height, const char *output_name, int *output_width, int *output_height);
+static void dnn_free_model_th(DNNModel **model);
+static int th_start_inference(void *args);
+static void infer_completion_callback(void *args);
+
typedef struct THModel {
DNNModel model;
DnnContext *ctx;
@@ -46,16 +50,13 @@ typedef struct THModel {
SafeQueue *request_queue;
Queue *task_queue;
Queue *lltask_queue;
- SafeQueue *pending_queue; ///< requests waiting for inference
- std::thread *worker_thread; ///< background worker thread
- std::mutex *mutex; ///< mutex for the condition variable
- std::condition_variable *cond; ///< condition variable for worker wakeup
- std::atomic<bool> worker_stop; ///< signal for thread exit
} THModel;
typedef struct THInferRequest {
torch::Tensor *output;
torch::Tensor *input_tensor;
+ uint8_t *input_data;
+ size_t input_data_size;
} THInferRequest;
typedef struct THRequestItem {
@@ -104,7 +105,10 @@ static void th_free_request(THInferRequest *request)
delete(request->input_tensor);
request->input_tensor = NULL;
}
- return;
+ if (request->input_data) {
+ av_freep(&request->input_data);
+ }
+ request->input_data_size = 0;
}
static inline void destroy_request_item(THRequestItem **arg)
@@ -129,38 +133,6 @@ static void dnn_free_model_th(DNNModel **model)
th_model = (THModel *)(*model);
- /* 1. Stop and join the worker thread if it exists */
- if (th_model->worker_thread) {
- {
- std::lock_guard<std::mutex> lock(*th_model->mutex);
- th_model->worker_stop = true;
- }
- th_model->cond->notify_all();
- th_model->worker_thread->join();
- delete th_model->worker_thread;
- th_model->worker_thread = NULL;
- }
-
- /* 2. Safely delete C++ synchronization objects */
- if (th_model->mutex) {
- delete th_model->mutex;
- th_model->mutex = NULL;
- }
- if (th_model->cond) {
- delete th_model->cond;
- th_model->cond = NULL;
- }
-
- /* 3. Clean up the pending queue */
- if (th_model->pending_queue) {
- while (ff_safe_queue_size(th_model->pending_queue) > 0) {
- THRequestItem *item = (THRequestItem *)ff_safe_queue_pop_front(th_model->pending_queue);
- destroy_request_item(&item);
- }
- ff_safe_queue_destroy(th_model->pending_queue);
- }
-
- /* 4. Clean up standard backend queues */
if (th_model->request_queue) {
while (ff_safe_queue_size(th_model->request_queue) != 0) {
THRequestItem *item = (THRequestItem *)ff_safe_queue_pop_front(th_model->request_queue);
@@ -187,7 +159,6 @@ static void dnn_free_model_th(DNNModel **model)
ff_queue_destroy(th_model->task_queue);
}
- /* 5. Final model cleanup */
if (th_model->jit_model)
delete th_model->jit_model;
@@ -207,44 +178,60 @@ static int get_input_th(DNNModel *model, DNNData *input, const char *input_name)
return 0;
}
-static void deleter(void *arg)
-{
- av_freep(&arg);
-}
-
static int fill_model_input_th(THModel *th_model, THRequestItem *request)
{
- LastLevelTaskItem *lltask = NULL;
- TaskItem *task = NULL;
THInferRequest *infer_request = NULL;
+ TaskItem *task = NULL;
+ LastLevelTaskItem *lltask = NULL;
DNNData input = { 0 };
DnnContext *ctx = th_model->ctx;
int ret, width_idx, height_idx, channel_idx;
+ size_t cur_size;
lltask = (LastLevelTaskItem *)ff_queue_pop_front(th_model->lltask_queue);
- if (!lltask) {
- ret = AVERROR(EINVAL);
- goto err;
- }
+ if (!lltask)
+ return AVERROR(EINVAL);
+
request->lltask = lltask;
task = lltask->task;
infer_request = request->infer_request;
ret = get_input_th(&th_model->model, &input, NULL);
- if ( ret != 0) {
+ if (ret != 0)
goto err;
- }
+
width_idx = dnn_get_width_idx_by_layout(input.layout);
height_idx = dnn_get_height_idx_by_layout(input.layout);
channel_idx = dnn_get_channel_idx_by_layout(input.layout);
+
input.dims[height_idx] = task->in_frame->height;
input.dims[width_idx] = task->in_frame->width;
- input.data = av_malloc(input.dims[height_idx] * input.dims[width_idx] *
- input.dims[channel_idx] * sizeof(float));
- if (!input.data)
- return AVERROR(ENOMEM);
- infer_request->input_tensor = new torch::Tensor();
- infer_request->output = new torch::Tensor();
+
+ // Calculate required size for the current frame
+ cur_size = (size_t)input.dims[height_idx] * input.dims[width_idx] *
+ input.dims[channel_idx] * sizeof(float);
+
+ /**
+ * Reuse the persistent buffer.
+ * Only reallocate if the existing buffer is too small or doesn't exist.
+ */
+ if (!infer_request->input_data || infer_request->input_data_size < cur_size) {
+ av_freep(&infer_request->input_data);
+ infer_request->input_data = (uint8_t *)av_malloc(cur_size);
+ if (!infer_request->input_data) {
+ ret = AVERROR(ENOMEM);
+ goto err;
+ }
+ infer_request->input_data_size = cur_size;
+ }
+
+ input.data = infer_request->input_data;
+
+ // Initialize tensors if they don't exist
+ if (!infer_request->input_tensor)
+ infer_request->input_tensor = new torch::Tensor();
+ if (!infer_request->output)
+ infer_request->output = new torch::Tensor();
switch (th_model->model.func_type) {
case DFT_PROCESS_FRAME:
@@ -258,12 +245,20 @@ static int fill_model_input_th(THModel *th_model, THRequestItem *request)
}
break;
default:
- avpriv_report_missing_feature(NULL, "model function type %d", th_model->model.func_type);
- break;
+ avpriv_report_missing_feature(ctx, "model function type %d", th_model->model.func_type);
+ ret = AVERROR(ENOSYS);
+ goto err;
}
+
+ /**
+ * Map the buffer to a Torch tensor.
+ * Note: We do NOT pass 'deleter' here because 'input_data' is owned
+ * by THInferRequest and will be freed in dnn_free_model_th.
+ */
*infer_request->input_tensor = torch::from_blob(input.data,
{1, input.dims[channel_idx], input.dims[height_idx], input.dims[width_idx]},
- deleter, torch::kFloat32);
+ torch::kFloat32);
+
return 0;
err:
@@ -312,83 +307,57 @@ static int th_start_inference(void *args)
return 0;
}
-static void infer_completion_callback(void *args) {
- THRequestItem *request = (THRequestItem*)args;
+static void infer_completion_callback(void *args)
+{
+ THRequestItem *request = (THRequestItem *)args;
LastLevelTaskItem *lltask = request->lltask;
TaskItem *task = lltask->task;
- DNNData outputs = { 0 };
- THInferRequest *infer_request = request->infer_request;
THModel *th_model = (THModel *)task->model;
+ THInferRequest *infer_request = request->infer_request;
torch::Tensor *output = infer_request->output;
+ DNNData outputs = { 0 };
+ c10::IntArrayRef sizes;
- c10::IntArrayRef sizes = output->sizes();
- outputs.order = DCO_RGB;
- outputs.layout = DL_NCHW;
- outputs.dt = DNN_FLOAT;
- if (sizes.size() == 4) {
- // 4 dimensions: [batch_size, channel, height, width]
- // this format of data is normally used for video frame SR
- outputs.dims[0] = sizes.at(0); // N
- outputs.dims[1] = sizes.at(1); // C
- outputs.dims[2] = sizes.at(2); // H
- outputs.dims[3] = sizes.at(3); // W
- } else {
- avpriv_report_missing_feature(th_model->ctx, "Support of this kind of model");
+ if (!output || output->ndimension() != 4) {
+ avpriv_report_missing_feature(th_model->ctx, "torch model output dimensions != 4");
goto err;
}
- switch (th_model->model.func_type) {
- case DFT_PROCESS_FRAME:
+ sizes = output->sizes();
+ outputs.order = DCO_RGB;
+ outputs.layout = DL_NCHW;
+ outputs.dt = DNN_FLOAT;
+ outputs.dims[0] = sizes.at(0);
+ outputs.dims[1] = sizes.at(1);
+ outputs.dims[2] = sizes.at(2);
+ outputs.dims[3] = sizes.at(3);
+
+ if (th_model->model.func_type == DFT_PROCESS_FRAME) {
if (task->do_ioproc) {
- // Post process can only deal with CPU memory.
if (output->device() != torch::kCPU)
*output = output->to(torch::kCPU);
outputs.scale = 255;
outputs.data = output->data_ptr();
- if (th_model->model.frame_post_proc != NULL) {
+ if (th_model->model.frame_post_proc)
th_model->model.frame_post_proc(task->out_frame, &outputs, th_model->model.filter_ctx);
- } else {
+ else
ff_proc_from_dnn_to_frame(task->out_frame, &outputs, th_model->ctx);
- }
} else {
- task->out_frame->width = outputs.dims[dnn_get_width_idx_by_layout(outputs.layout)];
+ task->out_frame->width = outputs.dims[dnn_get_width_idx_by_layout(outputs.layout)];
task->out_frame->height = outputs.dims[dnn_get_height_idx_by_layout(outputs.layout)];
}
- break;
- default:
+ } else {
avpriv_report_missing_feature(th_model->ctx, "model function type %d", th_model->model.func_type);
goto err;
}
+
task->inference_done++;
av_freep(&request->lltask);
+
err:
th_free_request(infer_request);
-
if (ff_safe_queue_push_back(th_model->request_queue, request) < 0) {
destroy_request_item(&request);
- av_log(th_model->ctx, AV_LOG_ERROR, "Unable to push back request_queue when failed to start inference.\n");
- }
-}
-
-static void th_worker_thread(THModel *th_model) {
- while (true) {
- THRequestItem *request = NULL;
- {
- std::unique_lock<std::mutex> lock(*th_model->mutex);
- th_model->cond->wait(lock, [&]{
- return th_model->worker_stop || ff_safe_queue_size(th_model->pending_queue) > 0;
- });
-
- if (th_model->worker_stop && ff_safe_queue_size(th_model->pending_queue) == 0)
- break;
-
- request = (THRequestItem *)ff_safe_queue_pop_front(th_model->pending_queue);
- }
-
- if (request) {
- th_start_inference(request);
- infer_completion_callback(request);
- }
}
}
@@ -405,31 +374,21 @@ static int execute_model_th(THRequestItem *request, Queue *lltask_queue)
}
lltask = (LastLevelTaskItem *)ff_queue_peek_front(lltask_queue);
- if (lltask == NULL) {
- av_log(NULL, AV_LOG_ERROR, "Failed to get LastLevelTaskItem\n");
- ret = AVERROR(EINVAL);
- goto err;
+ if (!lltask) {
+ destroy_request_item(&request);
+ return AVERROR(EINVAL);
}
task = lltask->task;
th_model = (THModel *)task->model;
ret = fill_model_input_th(th_model, request);
- if ( ret != 0) {
- goto err;
- }
+ if (ret != 0) goto err;
+
if (task->async) {
- std::lock_guard<std::mutex> lock(*th_model->mutex);
- if (ff_safe_queue_push_back(th_model->pending_queue, request) < 0) {
- return AVERROR(ENOMEM);
- }
- th_model->cond->notify_one();
- return 0;
+ return ff_dnn_start_inference_async(th_model->ctx, &request->exec_module);
} else {
- // Synchronous execution path
- ret = th_start_inference((void *)(request));
- if (ret != 0) {
- goto err;
- }
+ ret = th_start_inference(request);
+ if (ret != 0) goto err;
infer_completion_callback(request);
return (task->inference_done == task->inference_todo) ? 0 : DNN_GENERIC_ERROR;
}
@@ -442,11 +401,20 @@ err:
return ret;
}
+static THInferRequest *th_create_inference_request(void)
+{
+ THInferRequest *request = (THInferRequest *)av_mallocz(sizeof(THInferRequest));
+ if (!request) {
+ return NULL;
+ }
+ return request;
+}
+
static int get_output_th(DNNModel *model, const char *input_name, int input_width, int input_height,
const char *output_name, int *output_width, int *output_height)
{
int ret = 0;
- THModel *th_model = (THModel*) model;
+ THModel *th_model = (THModel *) model;
DnnContext *ctx = th_model->ctx;
TaskItem task = { 0 };
THRequestItem *request = NULL;
@@ -457,20 +425,17 @@ static int get_output_th(DNNModel *model, const char *input_name, int input_widt
.in_frame = NULL,
.out_frame = NULL,
};
+
ret = ff_dnn_fill_gettingoutput_task(&task, &exec_params, th_model, input_height, input_width, ctx);
- if ( ret != 0) {
+ if (ret != 0)
goto err;
- }
ret = extract_lltask_from_task(&task, th_model->lltask_queue);
- if ( ret != 0) {
- av_log(ctx, AV_LOG_ERROR, "unable to extract last level task from task.\n");
+ if (ret != 0)
goto err;
- }
request = (THRequestItem*) ff_safe_queue_pop_front(th_model->request_queue);
if (!request) {
- av_log(ctx, AV_LOG_ERROR, "unable to get infer request.\n");
ret = AVERROR(EINVAL);
goto err;
}
@@ -485,44 +450,21 @@ err:
return ret;
}
-static THInferRequest *th_create_inference_request(void)
-{
- THInferRequest *request = (THInferRequest *)av_malloc(sizeof(THInferRequest));
- if (!request) {
- return NULL;
- }
- request->input_tensor = NULL;
- request->output = NULL;
- return request;
-}
-
static DNNModel *dnn_load_model_th(DnnContext *ctx, DNNFunctionType func_type, AVFilterContext *filter_ctx)
{
- DNNModel *model = NULL;
- THModel *th_model = NULL;
+ THModel *th_model = (THModel *)av_mallocz(sizeof(THModel));
THRequestItem *item = NULL;
const char *device_name = ctx->device ? ctx->device : "cpu";
- th_model = (THModel *)av_mallocz(sizeof(THModel));
if (!th_model)
return NULL;
- model = &th_model->model;
+
th_model->ctx = ctx;
+ // Device and XPU Initialization
c10::Device device = c10::Device(device_name);
if (device.is_xpu()) {
- if (!at::hasXPU()) {
- av_log(ctx, AV_LOG_ERROR, "No XPU device found\n");
- goto fail;
- }
-#if TORCH_VERSION_MAJOR > 2 || (TORCH_VERSION_MAJOR == 2 && TORCH_VERSION_MINOR >= 6)
- at::detail::getXPUHooks().init();
-#else
at::detail::getXPUHooks().initXPU();
-#endif
- } else if (!device.is_cpu()) {
- av_log(ctx, AV_LOG_ERROR, "Not supported device:\"%s\"\n", device_name);
- goto fail;
}
try {
@@ -535,61 +477,37 @@ static DNNModel *dnn_load_model_th(DnnContext *ctx, DNNFunctionType func_type, A
}
th_model->request_queue = ff_safe_queue_create();
- if (!th_model->request_queue) {
+ th_model->task_queue = ff_queue_create();
+ th_model->lltask_queue = ff_queue_create();
+
+ if (!th_model->request_queue || !th_model->task_queue || !th_model->lltask_queue)
goto fail;
- }
item = (THRequestItem *)av_mallocz(sizeof(THRequestItem));
- if (!item) {
+ if (!item)
goto fail;
- }
- item->lltask = NULL;
+
item->infer_request = th_create_inference_request();
- if (!item->infer_request) {
- av_log(NULL, AV_LOG_ERROR, "Failed to allocate memory for Torch inference request\n");
+ if (!item->infer_request)
goto fail;
- }
+
+ // Setup the async module callbacks for the common infrastructure
item->exec_module.start_inference = &th_start_inference;
item->exec_module.callback = &infer_completion_callback;
item->exec_module.args = item;
- if (ff_safe_queue_push_back(th_model->request_queue, item) < 0) {
+ if (ff_safe_queue_push_back(th_model->request_queue, item) < 0)
goto fail;
- }
- item = NULL;
- th_model->task_queue = ff_queue_create();
- if (!th_model->task_queue) {
- goto fail;
- }
+ th_model->model.get_input = &get_input_th;
+ th_model->model.get_output = &get_output_th;
+ th_model->model.filter_ctx = filter_ctx;
+ th_model->model.func_type = func_type;
- th_model->lltask_queue = ff_queue_create();
- if (!th_model->lltask_queue) {
- goto fail;
- }
-
- th_model->pending_queue = ff_safe_queue_create();
- if (!th_model->pending_queue) {
- goto fail;
- }
-
- th_model->mutex = new std::mutex();
- th_model->cond = new std::condition_variable();
- th_model->worker_stop = false;
- th_model->worker_thread = new std::thread(th_worker_thread, th_model);
-
- model->get_input = &get_input_th;
- model->get_output = &get_output_th;
- model->filter_ctx = filter_ctx;
- model->func_type = func_type;
- return model;
+ return &th_model->model;
fail:
- if (item) {
- destroy_request_item(&item);
- av_freep(&item);
- }
- dnn_free_model_th(&model);
+ dnn_free_model_th((DNNModel**)&th_model);
return NULL;
}
diff --git a/libavfilter/vf_dnn_processing.c b/libavfilter/vf_dnn_processing.c
index 0771ceb5fc..27d4e088cb 100644
--- a/libavfilter/vf_dnn_processing.c
+++ b/libavfilter/vf_dnn_processing.c
@@ -52,6 +52,9 @@ static const AVOption dnn_processing_options[] = {
#endif
#if (CONFIG_LIBTORCH == 1)
{ "torch", "torch backend flag", 0, AV_OPT_TYPE_CONST, { .i64 = DNN_TH }, 0, 0, FLAGS, "backend" },
+#endif
+#if (CONFIG_LIBTORCH)
+ { "torch", "torch backend", 0, AV_OPT_TYPE_CONST, { .i64 = DNN_TH }, 0, 0, FLAGS, .unit = "backend" },
#endif
{ NULL }
};
--
2.52.0
_______________________________________________
ffmpeg-devel mailing list -- ffmpeg-devel@ffmpeg.org
To unsubscribe send an email to ffmpeg-devel-leave@ffmpeg.org
reply other threads:[~2026-02-13 17:18 UTC|newest]
Thread overview: [no followups] expand[flat|nested] mbox.gz Atom feed
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=177100304402.25.3463696960615278331@009cbcb3d8cd \
--to=ffmpeg-devel@ffmpeg.org \
--cc=code@ffmpeg.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Git Inbox Mirror of the ffmpeg-devel mailing list - see https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
This inbox may be cloned and mirrored by anyone:
git clone --mirror https://master.gitmailbox.com/ffmpegdev/0 ffmpegdev/git/0.git
# If you have public-inbox 1.1+ installed, you may
# initialize and index your mirror using the following commands:
public-inbox-init -V2 ffmpegdev ffmpegdev/ https://master.gitmailbox.com/ffmpegdev \
ffmpegdev@gitmailbox.com
public-inbox-index ffmpegdev
Example config snippet for mirrors.
AGPL code for this site: git clone https://public-inbox.org/public-inbox.git