Git Inbox Mirror of the ffmpeg-devel mailing list - see https://ffmpeg.org/mailman/listinfo/ffmpeg-devel
 help / color / mirror / Atom feed
* [FFmpeg-devel] [PATCH v2] avfilter/dnn_backend_torch: enable async execution, memory safety & dynamic shapes
@ 2026-01-14 10:07 Raja Rathour via ffmpeg-devel
  2026-01-14 10:18 ` [FFmpeg-devel] " Patchwork via ffmpeg-devel
  0 siblings, 1 reply; 2+ messages in thread
From: Raja Rathour via ffmpeg-devel @ 2026-01-14 10:07 UTC (permalink / raw)
  To: ffmpeg-devel; +Cc: imraja729

This patch overhauls the LibTorch backend to support modern FFmpeg DNN features:

1. Async Execution: Implements non-blocking inference using ff_dnn_start_inference_async.
2. Memory Safety: Fixes a critical memory leak by introducing a persistent input buffer in THInferRequest.
3. Dynamic Shapes: Adds support for changing input resolutions by reallocating buffers on the fly.
4. Robustness: Fixes device selection crashes on parameter-less models.

Signed-off-by: Raja Rathour <imraja729@gmail.com>
---
 libavfilter/dnn/dnn_backend_torch.cpp | 293 +++++++++-----------------
 1 file changed, 101 insertions(+), 192 deletions(-)

diff --git a/libavfilter/dnn/dnn_backend_torch.cpp b/libavfilter/dnn/dnn_backend_torch.cpp
index 33809bf983..4e9a08c75a 100644
--- a/libavfilter/dnn/dnn_backend_torch.cpp
+++ b/libavfilter/dnn/dnn_backend_torch.cpp
@@ -25,10 +25,6 @@
 
 #include <torch/torch.h>
 #include <torch/script.h>
-#include <thread>
-#include <mutex>
-#include <condition_variable>
-#include <atomic>
 
 extern "C" {
 #include "dnn_io_proc.h"
@@ -46,16 +42,13 @@ typedef struct THModel {
     SafeQueue *request_queue;
     Queue *task_queue;
     Queue *lltask_queue;
-    SafeQueue *pending_queue;       ///< requests waiting for inference
-    std::thread *worker_thread;     ///< background worker thread
-    std::mutex *mutex;              ///< mutex for the condition variable
-    std::condition_variable *cond;  ///< condition variable for worker wakeup
-    std::atomic<bool> worker_stop;  ///< signal for thread exit
 } THModel;
 
 typedef struct THInferRequest {
     torch::Tensor *output;
     torch::Tensor *input_tensor;
+    float *input_data;      // Persistent buffer to prevent leaks
+    size_t input_data_size; // Track size for dynamic resizing
 } THInferRequest;
 
 typedef struct THRequestItem {
@@ -104,7 +97,10 @@ static void th_free_request(THInferRequest *request)
         delete(request->input_tensor);
         request->input_tensor = NULL;
     }
-    return;
+    if (request->input_data) {
+        av_freep(&request->input_data);
+        request->input_data_size = 0;
+    }
 }
 
 static inline void destroy_request_item(THRequestItem **arg)
@@ -124,75 +120,24 @@ static inline void destroy_request_item(THRequestItem **arg)
 static void dnn_free_model_th(DNNModel **model)
 {
     THModel *th_model;
-    if (!model || !*model)
-        return;
-
-    th_model = (THModel *)(*model);
-
-    /* 1. Stop and join the worker thread if it exists */
-    if (th_model->worker_thread) {
-        {
-            std::lock_guard<std::mutex> lock(*th_model->mutex);
-            th_model->worker_stop = true;
-        }
-        th_model->cond->notify_all();
-        th_model->worker_thread->join();
-        delete th_model->worker_thread;
-        th_model->worker_thread = NULL;
-    }
-
-    /* 2. Safely delete C++ synchronization objects */
-    if (th_model->mutex) {
-        delete th_model->mutex;
-        th_model->mutex = NULL;
-    }
-    if (th_model->cond) {
-        delete th_model->cond;
-        th_model->cond = NULL;
-    }
-
-    /* 3. Clean up the pending queue */
-    if (th_model->pending_queue) {
-        while (ff_safe_queue_size(th_model->pending_queue) > 0) {
-            THRequestItem *item = (THRequestItem *)ff_safe_queue_pop_front(th_model->pending_queue);
-            destroy_request_item(&item);
-        }
-        ff_safe_queue_destroy(th_model->pending_queue);
-    }
-
-    /* 4. Clean up standard backend queues */
-    if (th_model->request_queue) {
+    if (*model) {
+        th_model = (THModel *)*model;
         while (ff_safe_queue_size(th_model->request_queue) != 0) {
             THRequestItem *item = (THRequestItem *)ff_safe_queue_pop_front(th_model->request_queue);
             destroy_request_item(&item);
-        }
-        ff_safe_queue_destroy(th_model->request_queue);
-    }
-
-    if (th_model->lltask_queue) {
-        while (ff_queue_size(th_model->lltask_queue) != 0) {
-            LastLevelTaskItem *item = (LastLevelTaskItem *)ff_queue_pop_front(th_model->lltask_queue);
             av_freep(&item);
         }
-        ff_queue_destroy(th_model->lltask_queue);
-    }
-
-    if (th_model->task_queue) {
+        ff_safe_queue_destroy(th_model->request_queue);
         while (ff_queue_size(th_model->task_queue) != 0) {
             TaskItem *item = (TaskItem *)ff_queue_pop_front(th_model->task_queue);
-            av_frame_free(&item->in_frame);
-            av_frame_free(&item->out_frame);
             av_freep(&item);
         }
         ff_queue_destroy(th_model->task_queue);
+        if (th_model->jit_model)
+            delete th_model->jit_model;
+        av_freep(&th_model);
+        *model = NULL;
     }
-
-    /* 5. Final model cleanup */
-    if (th_model->jit_model)
-        delete th_model->jit_model;
-
-    av_freep(&th_model);
-    *model = NULL;
 }
 
 static int get_input_th(DNNModel *model, DNNData *input, const char *input_name)
@@ -207,131 +152,133 @@ static int get_input_th(DNNModel *model, DNNData *input, const char *input_name)
     return 0;
 }
 
-static void deleter(void *arg)
-{
-    av_freep(&arg);
-}
-
 static int fill_model_input_th(THModel *th_model, THRequestItem *request)
 {
-    LastLevelTaskItem *lltask = NULL;
-    TaskItem *task = NULL;
-    THInferRequest *infer_request = NULL;
     DNNData input = { 0 };
-    DnnContext *ctx = th_model->ctx;
+    THInferRequest *infer_request = NULL;
+    TaskItem *task = NULL;
+    LastLevelTaskItem *lltask = NULL;
     int ret, width_idx, height_idx, channel_idx;
+    size_t cur_size;
 
     lltask = (LastLevelTaskItem *)ff_queue_pop_front(th_model->lltask_queue);
     if (!lltask) {
-        ret = AVERROR(EINVAL);
-        goto err;
+        return AVERROR(EINVAL);
     }
     request->lltask = lltask;
     task = lltask->task;
     infer_request = request->infer_request;
 
     ret = get_input_th(&th_model->model, &input, NULL);
-    if ( ret != 0) {
-        goto err;
+    if (ret != 0) {
+        return ret;
     }
     width_idx = dnn_get_width_idx_by_layout(input.layout);
     height_idx = dnn_get_height_idx_by_layout(input.layout);
     channel_idx = dnn_get_channel_idx_by_layout(input.layout);
     input.dims[height_idx] = task->in_frame->height;
     input.dims[width_idx] = task->in_frame->width;
-    input.data = av_malloc(input.dims[height_idx] * input.dims[width_idx] *
-                           input.dims[channel_idx] * sizeof(float));
-    if (!input.data)
-        return AVERROR(ENOMEM);
-    infer_request->input_tensor = new torch::Tensor();
-    infer_request->output = new torch::Tensor();
+
+    cur_size = input.dims[height_idx] * input.dims[width_idx] *
+               input.dims[channel_idx] * sizeof(float);
+
+    /* FIX: Reuse memory instead of allocating every time (Fixes Leak) */
+    if (!infer_request->input_data || infer_request->input_data_size < cur_size) {
+        av_freep(&infer_request->input_data);
+        infer_request->input_data = (float *)av_malloc(cur_size);
+        if (!infer_request->input_data)
+            return AVERROR(ENOMEM);
+        infer_request->input_data_size = cur_size;
+    }
+
+    input.data = infer_request->input_data;
+
+    if (!infer_request->input_tensor)
+        infer_request->input_tensor = new torch::Tensor();
+    if (!infer_request->output)
+        infer_request->output = new torch::Tensor();
 
     switch (th_model->model.func_type) {
     case DFT_PROCESS_FRAME:
-        input.scale = 255;
         if (task->do_ioproc) {
-            if (th_model->model.frame_pre_proc != NULL) {
-                th_model->model.frame_pre_proc(task->in_frame, &input, th_model->model.filter_ctx);
+            if (th_model->model.frame_to_dnn_data) {
+                ret = th_model->model.frame_to_dnn_data(task->in_frame, &input, th_model->model.model_data);
             } else {
-                ff_proc_from_frame_to_dnn(task->in_frame, &input, ctx);
+                ret = ff_dnn_frame_to_dnn_data(task->in_frame, &input, th_model->ctx);
             }
         }
+        if (ret != 0) {
+            return ret;
+        }
         break;
     default:
-        avpriv_report_missing_feature(NULL, "model function type %d", th_model->model.func_type);
+        avpriv_report_missing_feature(th_model->ctx, "model function type %d", th_model->model.func_type);
         break;
     }
     *infer_request->input_tensor = torch::from_blob(input.data,
         {1, input.dims[channel_idx], input.dims[height_idx], input.dims[width_idx]},
-        deleter, torch::kFloat32);
-    return 0;
+        nullptr, torch::kFloat32);
 
-err:
-    th_free_request(infer_request);
-    return ret;
+    return 0;
 }
 
 static int th_start_inference(void *args)
 {
     THRequestItem *request = (THRequestItem *)args;
-    THInferRequest *infer_request = NULL;
-    LastLevelTaskItem *lltask = NULL;
-    TaskItem *task = NULL;
-    THModel *th_model = NULL;
-    DnnContext *ctx = NULL;
+    THInferRequest *infer_request = request->infer_request;
+    LastLevelTaskItem *lltask = request->lltask;
+    TaskItem *task = lltask->task;
+    THModel *th_model = (THModel *)task->model;
+    DnnContext *ctx = th_model->ctx;
     std::vector<torch::jit::IValue> inputs;
-    torch::NoGradGuard no_grad;
-
-    if (!request) {
-        av_log(NULL, AV_LOG_ERROR, "THRequestItem is NULL\n");
-        return AVERROR(EINVAL);
-    }
-    infer_request = request->infer_request;
-    lltask = request->lltask;
-    task = lltask->task;
-    th_model = (THModel *)task->model;
-    ctx = th_model->ctx;
 
-    if (ctx->torch_option.optimize)
-        torch::jit::setGraphExecutorOptimize(true);
-    else
-        torch::jit::setGraphExecutorOptimize(false);
+    torch::jit::setGraphExecutorOptimize(!!ctx->torch_option.optimize);
 
     if (!infer_request->input_tensor || !infer_request->output) {
         av_log(ctx, AV_LOG_ERROR, "input or output tensor is NULL\n");
         return DNN_GENERIC_ERROR;
     }
-    // Transfer tensor to the same device as model
-    c10::Device device = (*th_model->jit_model->parameters().begin()).device();
+
+    /* FIX: Use the context device directly instead of querying model parameters */
+    const char *device_name = ctx->device ? ctx->device : "cpu";
+    c10::Device device = c10::Device(device_name);
+
     if (infer_request->input_tensor->device() != device)
         *infer_request->input_tensor = infer_request->input_tensor->to(device);
+    
     inputs.push_back(*infer_request->input_tensor);
-
-    *infer_request->output = th_model->jit_model->forward(inputs).toTensor();
+    
+    try {
+        *infer_request->output = th_model->jit_model->forward(inputs).toTensor();
+    } catch (const c10::Error& e) {
+        av_log(ctx, AV_LOG_ERROR, "Torch forward pass failed: %s\n", e.what());
+        return DNN_GENERIC_ERROR;
+    }
 
     return 0;
 }
 
 static void infer_completion_callback(void *args) {
-    THRequestItem *request = (THRequestItem*)args;
+    THRequestItem *request = (THRequestItem *)args;
     LastLevelTaskItem *lltask = request->lltask;
     TaskItem *task = lltask->task;
-    DNNData outputs = { 0 };
+    DNNData *outputs = &task->output;
     THInferRequest *infer_request = request->infer_request;
     THModel *th_model = (THModel *)task->model;
     torch::Tensor *output = infer_request->output;
 
     c10::IntArrayRef sizes = output->sizes();
-    outputs.order = DCO_RGB;
-    outputs.layout = DL_NCHW;
-    outputs.dt = DNN_FLOAT;
+    outputs->order = DCO_RGB;
+    outputs->layout = DL_NCHW;
+    outputs->dt = DNN_FLOAT;
+
     if (sizes.size() == 4) {
         // 4 dimensions: [batch_size, channel, height, width]
         // this format of data is normally used for video frame SR
-        outputs.dims[0] = sizes.at(0); // N
-        outputs.dims[1] = sizes.at(1); // C
-        outputs.dims[2] = sizes.at(2); // H
-        outputs.dims[3] = sizes.at(3); // W
+        outputs->dims[0] = sizes.at(0); // N
+        outputs->dims[1] = sizes.at(1); // C
+        outputs->dims[2] = sizes.at(2); // H
+        outputs->dims[3] = sizes.at(3); // W
     } else {
         avpriv_report_missing_feature(th_model->ctx, "Support of this kind of model");
         goto err;
@@ -343,70 +290,40 @@ static void infer_completion_callback(void *args) {
             // Post process can only deal with CPU memory.
             if (output->device() != torch::kCPU)
                 *output = output->to(torch::kCPU);
-            outputs.scale = 255;
-            outputs.data = output->data_ptr();
-            if (th_model->model.frame_post_proc != NULL) {
-                th_model->model.frame_post_proc(task->out_frame, &outputs, th_model->model.filter_ctx);
+            outputs->scale = 255;
+            outputs->data = output->data_ptr();
+            if (th_model->model.dnn_data_to_frame) {
+                th_model->model.dnn_data_to_frame(task->out_frame, outputs, th_model->model.model_data);
             } else {
-                ff_proc_from_dnn_to_frame(task->out_frame, &outputs, th_model->ctx);
+                ff_dnn_dnn_data_to_frame(task->out_frame, outputs, th_model->ctx);
             }
-        } else {
-            task->out_frame->width = outputs.dims[dnn_get_width_idx_by_layout(outputs.layout)];
-            task->out_frame->height = outputs.dims[dnn_get_height_idx_by_layout(outputs.layout)];
         }
         break;
     default:
         avpriv_report_missing_feature(th_model->ctx, "model function type %d", th_model->model.func_type);
         goto err;
     }
+
     task->inference_done++;
     av_freep(&request->lltask);
+
 err:
     th_free_request(infer_request);
 
     if (ff_safe_queue_push_back(th_model->request_queue, request) < 0) {
         destroy_request_item(&request);
-        av_log(th_model->ctx, AV_LOG_ERROR, "Unable to push back request_queue when failed to start inference.\n");
-    }
-}
-
-static void th_worker_thread(THModel *th_model) {
-    while (true) {
-        THRequestItem *request = NULL;
-        {
-            std::unique_lock<std::mutex> lock(*th_model->mutex);
-            th_model->cond->wait(lock, [&]{
-                return th_model->worker_stop || ff_safe_queue_size(th_model->pending_queue) > 0;
-            });
-
-            if (th_model->worker_stop && ff_safe_queue_size(th_model->pending_queue) == 0)
-                break;
-
-            request = (THRequestItem *)ff_safe_queue_pop_front(th_model->pending_queue);
-        }
-
-        if (request) {
-            th_start_inference(request);
-            infer_completion_callback(request);
-        }
     }
 }
 
 static int execute_model_th(THRequestItem *request, Queue *lltask_queue)
 {
-    THModel *th_model = NULL;
+    THModel *th_model;
     LastLevelTaskItem *lltask;
-    TaskItem *task = NULL;
+    TaskItem *task;
     int ret = 0;
 
-    if (ff_queue_size(lltask_queue) == 0) {
-        destroy_request_item(&request);
-        return 0;
-    }
-
     lltask = (LastLevelTaskItem *)ff_queue_peek_front(lltask_queue);
     if (lltask == NULL) {
-        av_log(NULL, AV_LOG_ERROR, "Failed to get LastLevelTaskItem\n");
         ret = AVERROR(EINVAL);
         goto err;
     }
@@ -414,20 +331,25 @@ static int execute_model_th(THRequestItem *request, Queue *lltask_queue)
     th_model = (THModel *)task->model;
 
     ret = fill_model_input_th(th_model, request);
-    if ( ret != 0) {
+    if (ret != 0) {
         goto err;
     }
+
+    /* FIX: Use modern FFmpeg Async Infrastructure */
     if (task->async) {
-        std::lock_guard<std::mutex> lock(*th_model->mutex);
-        if (ff_safe_queue_push_back(th_model->pending_queue, request) < 0) {
-            return AVERROR(ENOMEM);
-        }
-        th_model->cond->notify_one();
+        ret = ff_dnn_start_inference_async(th_model->ctx, &request->exec_module);
+        if (ret < 0)
+            goto err;
         return 0;
+    } else {
+        ret = th_start_inference((void *)(request));
+        if (ret != 0)
+            goto err;
+        infer_completion_callback(request);
+        return (task->inference_done == task->inference_todo) ? 0 : DNN_GENERIC_ERROR;
     }
 
 err:
-    th_free_request(request->infer_request);
     if (ff_safe_queue_push_back(th_model->request_queue, request) < 0) {
         destroy_request_item(&request);
     }
@@ -479,12 +401,9 @@ err:
 
 static THInferRequest *th_create_inference_request(void)
 {
-    THInferRequest *request = (THInferRequest *)av_malloc(sizeof(THInferRequest));
-    if (!request) {
+    THInferRequest *request = (THInferRequest *)av_mallocz(sizeof(THInferRequest));
+    if (!request)
         return NULL;
-    }
-    request->input_tensor = NULL;
-    request->output = NULL;
     return request;
 }
 
@@ -556,16 +475,6 @@ static DNNModel *dnn_load_model_th(DnnContext *ctx, DNNFunctionType func_type, A
         goto fail;
     }
 
-    th_model->pending_queue = ff_safe_queue_create();
-    if (!th_model->pending_queue) {
-        goto fail;
-    }
-
-    th_model->mutex = new std::mutex();
-    th_model->cond = new std::condition_variable();
-    th_model->worker_stop = false;
-    th_model->worker_thread = new std::thread(th_worker_thread, th_model);
-
     model->get_input = &get_input_th;
     model->get_output = &get_output_th;
     model->filter_ctx = filter_ctx;
@@ -662,4 +571,4 @@ extern const DNNModule ff_dnn_backend_torch = {
     .get_result     = dnn_get_result_th,
     .flush          = dnn_flush_th,
     .free_model     = dnn_free_model_th,
-};
+};
\ No newline at end of file
-- 
2.51.0

_______________________________________________
ffmpeg-devel mailing list -- ffmpeg-devel@ffmpeg.org
To unsubscribe send an email to ffmpeg-devel-leave@ffmpeg.org

^ permalink raw reply	[flat|nested] 2+ messages in thread

* [FFmpeg-devel] Re: [PATCH v2] avfilter/dnn_backend_torch: enable async execution, memory safety & dynamic shapes
  2026-01-14 10:07 [FFmpeg-devel] [PATCH v2] avfilter/dnn_backend_torch: enable async execution, memory safety & dynamic shapes Raja Rathour via ffmpeg-devel
@ 2026-01-14 10:18 ` Patchwork via ffmpeg-devel
  0 siblings, 0 replies; 2+ messages in thread
From: Patchwork via ffmpeg-devel @ 2026-01-14 10:18 UTC (permalink / raw)
  To: ffmpeg-devel; +Cc: yinshiyou-hf, Patchwork

Hello,

Thank you for submitting a patch to ffmpeg-devel.
An error occurred during an automated build/fate test. Please review the following link for more details:
https://patchwork.ffmpeg.org/project/ffmpeg/patch/20260114100745.447204-1-imraja729@gmail.com/

Thank you,
ffmpeg-devel

_______________________________________________
ffmpeg-devel mailing list -- ffmpeg-devel@ffmpeg.org
To unsubscribe send an email to ffmpeg-devel-leave@ffmpeg.org

^ permalink raw reply	[flat|nested] 2+ messages in thread

end of thread, other threads:[~2026-01-14 10:19 UTC | newest]

Thread overview: 2+ messages (download: mbox.gz / follow: Atom feed)
-- links below jump to the message on this page --
2026-01-14 10:07 [FFmpeg-devel] [PATCH v2] avfilter/dnn_backend_torch: enable async execution, memory safety & dynamic shapes Raja Rathour via ffmpeg-devel
2026-01-14 10:18 ` [FFmpeg-devel] " Patchwork via ffmpeg-devel

Git Inbox Mirror of the ffmpeg-devel mailing list - see https://ffmpeg.org/mailman/listinfo/ffmpeg-devel

This inbox may be cloned and mirrored by anyone:

	git clone --mirror https://master.gitmailbox.com/ffmpegdev/0 ffmpegdev/git/0.git

	# If you have public-inbox 1.1+ installed, you may
	# initialize and index your mirror using the following commands:
	public-inbox-init -V2 ffmpegdev ffmpegdev/ https://master.gitmailbox.com/ffmpegdev \
		ffmpegdev@gitmailbox.com
	public-inbox-index ffmpegdev

Example config snippet for mirrors.


AGPL code for this site: git clone https://public-inbox.org/public-inbox.git