[vlc-devel] [RFC v2 2/2] preparser: use the new executor API

Romain Vimont rom1v at videolabs.io
Tue Sep 1 18:13:47 CEST 2020


Replace the background_worker by an executor.
---
 src/preparser/preparser.c | 399 ++++++++++++++++++++++++--------------
 1 file changed, 251 insertions(+), 148 deletions(-)

diff --git a/src/preparser/preparser.c b/src/preparser/preparser.c
index 723feb96dc..c78e956985 100644
--- a/src/preparser/preparser.c
+++ b/src/preparser/preparser.c
@@ -24,8 +24,9 @@
 
 #include <vlc_common.h>
 #include <vlc_atomic.h>
+#include <vlc_executor.h>
+#include <vlc_tick.h>
 
-#include "misc/background_worker.h"
 #include "input/input_interface.h"
 #include "input/input_internal.h"
 #include "preparser.h"
@@ -35,222 +36,307 @@ struct input_preparser_t
 {
     vlc_object_t* owner;
     input_fetcher_t* fetcher;
-    struct background_worker* worker;
+    vlc_executor_t *executor;
+    vlc_tick_t default_timeout;
     atomic_bool deactivated;
+
+    vlc_mutex_t lock;
+    struct vlc_list submitted_tasks; /**< list of struct input_preparser_task */
 };
 
-typedef struct input_preparser_req_t
+struct input_preparser_task
 {
+    input_preparser_t *preparser;
     input_item_t *item;
     input_item_meta_request_option_t options;
     const input_preparser_callbacks_t *cbs;
     void *userdata;
-    vlc_atomic_rc_t rc;
-} input_preparser_req_t;
+    void *id;
+    vlc_tick_t timeout;
 
-typedef struct input_preparser_task_t
-{
-    input_preparser_req_t *req;
-    input_preparser_t* preparser;
-    int preparse_status;
     input_item_parser_id_t *parser;
-    atomic_int state;
-    atomic_bool done;
-} input_preparser_task_t;
-
-static input_preparser_req_t *ReqCreate(input_item_t *item,
-                                        input_item_meta_request_option_t options,
-                                        const input_preparser_callbacks_t *cbs,
-                                        void *userdata)
+
+    vlc_mutex_t lock;
+    vlc_cond_t cond_ended;
+    bool preparse_ended;
+    int preparse_status;
+    bool fetch_ended;
+
+    atomic_bool interrupted;
+
+    struct vlc_runnable runnable; /**< to be passed to the executor */
+
+    struct vlc_list node; /**< node of input_preparser_t.submitted_tasks */
+};
+
+static void RunnableRun(void *);
+
+static struct input_preparser_task *
+TaskNew(input_preparser_t *preparser, input_item_t *item,
+        input_item_meta_request_option_t options,
+        const input_preparser_callbacks_t *cbs, void *userdata,
+        void *id, vlc_tick_t timeout)
 {
-    input_preparser_req_t *req = malloc(sizeof(*req));
-    if (unlikely(!req))
+    assert(timeout >= 0);
+
+    struct input_preparser_task *task = malloc(sizeof(*task));
+    if (!task)
         return NULL;
 
-    req->item = item;
-    req->options = options;
-    req->cbs = cbs;
-    req->userdata = userdata;
-    vlc_atomic_rc_init(&req->rc);
+    task->preparser = preparser;
+    task->item = item;
+    task->options = options;
+    task->cbs = cbs;
+    task->userdata = userdata;
+    task->id = id;
+    task->timeout = timeout;
 
     input_item_Hold(item);
 
-    return req;
+    task->parser = NULL;
+    vlc_mutex_init(&task->lock);
+    vlc_cond_init(&task->cond_ended);
+    task->preparse_ended = false;
+    task->preparse_status = ITEM_PREPARSE_SKIPPED;
+    task->fetch_ended = false;
+
+    atomic_init(&task->interrupted, false);
+
+    task->runnable.run = RunnableRun;
+    task->runnable.userdata = task;
+
+    return task;
 }
 
-static void ReqHold(input_preparser_req_t *req)
+static void
+TaskDelete(struct input_preparser_task *task)
 {
-    vlc_atomic_rc_inc(&req->rc);
+    input_item_Release(task->item);
+    free(task);
 }
 
-static void ReqRelease(input_preparser_req_t *req)
+static void
+PreparserAddTask(input_preparser_t *preparser,
+                 struct input_preparser_task *task)
 {
-    if (vlc_atomic_rc_dec(&req->rc))
-    {
-        input_item_Release(req->item);
-        free(req);
-    }
+    vlc_mutex_lock(&preparser->lock);
+    vlc_list_append(&task->node, &preparser->submitted_tasks);
+    vlc_mutex_unlock(&preparser->lock);
 }
 
-static void OnParserEnded(input_item_t *item, int status, void *task_)
+static void
+PreparserRemoveTask(input_preparser_t *preparser,
+                    struct input_preparser_task *task)
 {
-    VLC_UNUSED(item);
-    input_preparser_task_t* task = task_;
-
-    atomic_store( &task->state, status );
-    atomic_store( &task->done, true );
-    background_worker_RequestProbe( task->preparser->worker );
+    vlc_mutex_lock(&preparser->lock);
+    vlc_list_remove(&task->node);
+    vlc_mutex_unlock(&preparser->lock);
 }
 
-static void OnParserSubtreeAdded(input_item_t *item, input_item_node_t *subtree,
-                                 void *task_)
+static void
+FinishTask(struct input_preparser_task *task)
 {
-    VLC_UNUSED(item);
-    input_preparser_task_t* task = task_;
-    input_preparser_req_t *req = task->req;
+    if (task->cbs && task->cbs->on_preparse_ended)
+        task->cbs->on_preparse_ended(task->item, task->preparse_status,
+                                     task->userdata);
 
-    if (req->cbs && req->cbs->on_subtree_added)
-        req->cbs->on_subtree_added(req->item, subtree, req->userdata);
+    input_preparser_t *preparser = task->preparser;
+    PreparserRemoveTask(preparser, task);
+    TaskDelete(task);
 }
 
-static int PreparserOpenInput( void* preparser_, void* req_, void** out )
+static void
+OnParserEnded(input_item_t *item, int status, void *task_)
 {
-    input_preparser_t* preparser = preparser_;
-    input_preparser_req_t *req = req_;
-    input_preparser_task_t* task = malloc( sizeof *task );
-
-    if( unlikely( !task ) )
-        goto error;
-
-    static const input_item_parser_cbs_t cbs = {
-        .on_ended = OnParserEnded,
-        .on_subtree_added = OnParserSubtreeAdded,
-    };
-
-    atomic_init( &task->state, VLC_ETIMEOUT );
-    atomic_init( &task->done, false );
-
-    task->preparser = preparser_;
-    task->req = req;
-    task->preparse_status = -1;
-    task->parser = input_item_Parse( req->item, preparser->owner, &cbs,
-                                     task );
-    if( !task->parser )
-        goto error;
+    VLC_UNUSED(item);
+    struct input_preparser_task *task = task_;
 
-    *out = task;
+    if (atomic_load(&task->interrupted))
+        /*
+         * On interruption, the call to input_item_parser_id_Release() may
+         * trigger this "parser ended" callback. Ignore it.
+         */
+        return;
 
-    return VLC_SUCCESS;
+    vlc_mutex_lock(&task->lock);
+    assert(!task->preparse_ended);
+    task->preparse_status = status == VLC_SUCCESS ? ITEM_PREPARSE_DONE
+                                                  : ITEM_PREPARSE_FAILED;
+    task->preparse_ended = true;
+    vlc_mutex_unlock(&task->lock);
 
-error:
-    free( task );
-    if (req->cbs && req->cbs->on_preparse_ended)
-        req->cbs->on_preparse_ended(req->item, ITEM_PREPARSE_FAILED, req->userdata);
-    return VLC_EGENERIC;
+    vlc_cond_signal(&task->cond_ended);
 }
 
-static int PreparserProbeInput( void* preparser_, void* task_ )
+static void
+OnParserSubtreeAdded(input_item_t *item, input_item_node_t *subtree,
+                     void *task_)
 {
-    input_preparser_task_t* task = task_;
-    return atomic_load( &task->done );
-    VLC_UNUSED( preparser_ );
+    VLC_UNUSED(item);
+    struct input_preparser_task *task = task_;
+
+    if (task->cbs && task->cbs->on_subtree_added)
+        task->cbs->on_subtree_added(task->item, subtree, task->userdata);
 }
 
-static void on_art_fetch_ended(input_item_t *item, bool fetched, void *userdata)
+static void
+OnArtFetchEnded(input_item_t *item, bool fetched, void *userdata)
 {
     VLC_UNUSED(item);
     VLC_UNUSED(fetched);
-    input_preparser_task_t *task = userdata;
-    input_preparser_req_t *req = task->req;
 
-    input_item_SetPreparsed(req->item, true);
+    struct input_preparser_task *task = userdata;
 
-    if (req->cbs && req->cbs->on_preparse_ended)
-        req->cbs->on_preparse_ended(req->item, task->preparse_status, req->userdata);
+    vlc_mutex_lock(&task->lock);
+    assert(!task->fetch_ended);
+    task->fetch_ended = true;
+    vlc_mutex_unlock(&task->lock);
 
-    ReqRelease(req);
-    free(task);
+    vlc_cond_signal(&task->cond_ended);
 }
 
 static const input_fetcher_callbacks_t input_fetcher_callbacks = {
-    .on_art_fetch_ended = on_art_fetch_ended,
+    .on_art_fetch_ended = OnArtFetchEnded,
 };
 
-static void PreparserCloseInput( void* preparser_, void* task_ )
+static void
+Parse(struct input_preparser_task *task, vlc_tick_t deadline, bool *timed_out)
 {
-    input_preparser_task_t* task = task_;
-    input_preparser_req_t *req = task->req;
+    *timed_out = false;
 
-    input_preparser_t* preparser = preparser_;
-    input_item_t* item = req->item;
+    static const input_item_parser_cbs_t cbs = {
+        .on_ended = OnParserEnded,
+        .on_subtree_added = OnParserSubtreeAdded,
+    };
 
-    int status;
-    switch( atomic_load( &task->state ) )
+    vlc_object_t *obj = task->preparser->owner;
+    task->parser = input_item_Parse(task->item, obj, &cbs, task);
+    if (!task->parser)
     {
-        case VLC_SUCCESS:
-            status = ITEM_PREPARSE_DONE;
-            break;
-        case VLC_ETIMEOUT:
-            status = ITEM_PREPARSE_TIMEOUT;
-            break;
-        default:
-            status = ITEM_PREPARSE_FAILED;
-            break;
+        task->preparse_status = ITEM_PREPARSE_FAILED;
+        return;
     }
 
-    input_item_parser_id_Release( task->parser );
-
-    if( preparser->fetcher && (req->options & META_REQUEST_OPTION_FETCH_ANY) )
+    /* Wait until the end of parsing */
+    vlc_mutex_lock(&task->lock);
+    if (deadline == VLC_TICK_INVALID)
     {
-        task->preparse_status = status;
-        ReqHold(task->req);
-        if (!input_fetcher_Push(preparser->fetcher, item,
-                                req->options & META_REQUEST_OPTION_FETCH_ANY,
-                                &input_fetcher_callbacks, task))
+        while (!task->preparse_ended)
+            vlc_cond_wait(&task->cond_ended, &task->lock);
+    }
+    else
+    {
+        bool timeout = false;
+        while (!task->preparse_ended && !timeout)
+            timeout =
+                vlc_cond_timedwait(&task->cond_ended, &task->lock, deadline);
+
+        if (timeout)
         {
-            return;
+            task->preparse_status = ITEM_PREPARSE_TIMEOUT;
+            *timed_out = true;
         }
-        ReqRelease(task->req);
     }
+    vlc_mutex_unlock(&task->lock);
 
-    free(task);
+    /* This call also interrupts the parsing if it is still running */
+    input_item_parser_id_Release(task->parser);
+}
+
+static void
+Fetch(struct input_preparser_task *task)
+{
+    input_fetcher_t *fetcher = task->preparser->fetcher;
+    if (!fetcher || !(task->options & META_REQUEST_OPTION_FETCH_ANY))
+        return;
+
+    int ret =
+        input_fetcher_Push(fetcher, task->item,
+                           task->options & META_REQUEST_OPTION_FETCH_ANY,
+                           &input_fetcher_callbacks, task);
+    if (ret != VLC_SUCCESS)
+        return;
 
-    input_item_SetPreparsed( item, true );
-    if (req->cbs && req->cbs->on_preparse_ended)
-        req->cbs->on_preparse_ended(req->item, status, req->userdata);
+    /* Wait until the end of fetching (fetching is not interruptible) */
+    vlc_mutex_lock(&task->lock);
+    while (!task->fetch_ended)
+        vlc_cond_wait(&task->cond_ended, &task->lock);
+    vlc_mutex_unlock(&task->lock);
 }
 
-static void ReqHoldVoid(void *item) { ReqHold(item); }
-static void ReqReleaseVoid(void *item) { ReqRelease(item); }
+static void
+RunnableRun(void *userdata)
+{
+    struct input_preparser_task *task = userdata;
+
+    vlc_tick_t deadline = task->timeout ? vlc_tick_now() + task->timeout
+                                        : VLC_TICK_INVALID;
+
+    if (atomic_load(&task->interrupted))
+        goto end;
+
+    bool timed_out;
+    Parse(task, deadline, &timed_out);
+
+    if (timed_out)
+        goto end;
+
+    if (atomic_load(&task->interrupted))
+        goto end;
+
+    Fetch(task);
+
+    if (atomic_load(&task->interrupted))
+        goto end;
+
+    input_item_SetPreparsed(task->item, true);
+
+end:
+    FinishTask(task);
+}
+
+static void
+Interrupt(struct input_preparser_task *task)
+{
+    assert(!atomic_load(&task->interrupted));
+    atomic_store(&task->interrupted, true);
+
+    /* Wake up the preparser cond_wait */
+    vlc_mutex_lock(&task->lock);
+    task->preparse_ended = true;
+    vlc_mutex_unlock(&task->lock);
+    vlc_cond_signal(&task->cond_ended);
+}
 
 input_preparser_t* input_preparser_New( vlc_object_t *parent )
 {
     input_preparser_t* preparser = malloc( sizeof *preparser );
+    if (!preparser)
+        return NULL;
 
-    struct background_worker_config conf = {
-        .default_timeout = VLC_TICK_FROM_MS(var_InheritInteger( parent, "preparse-timeout" )),
-        .max_threads = var_InheritInteger( parent, "preparse-threads" ),
-        .pf_start = PreparserOpenInput,
-        .pf_probe = PreparserProbeInput,
-        .pf_stop = PreparserCloseInput,
-        .pf_release = ReqReleaseVoid,
-        .pf_hold = ReqHoldVoid
-    };
-
-
-    if( likely( preparser ) )
-        preparser->worker = background_worker_New( preparser, &conf );
+    int max_threads = var_InheritInteger(parent, "preparse-threads");
+    if (max_threads < 1)
+        max_threads = 1;
 
-    if( unlikely( !preparser || !preparser->worker ) )
+    preparser->executor = vlc_executor_New(max_threads);
+    if (!preparser->executor)
     {
-        free( preparser );
+        free(preparser);
         return NULL;
     }
 
+    preparser->default_timeout =
+        VLC_TICK_FROM_MS(var_InheritInteger(parent, "preparse-timeout"));
+    if (preparser->default_timeout < 0)
+        preparser->default_timeout = 0;
+
     preparser->owner = parent;
     preparser->fetcher = input_fetcher_New( parent );
     atomic_init( &preparser->deactivated, false );
 
+    vlc_mutex_init(&preparser->lock);
+    vlc_list_init(&preparser->submitted_tasks);
+
     if( unlikely( !preparser->fetcher ) )
         msg_Warn( parent, "unable to create art fetcher" );
 
@@ -260,7 +346,7 @@ input_preparser_t* input_preparser_New( vlc_object_t *parent )
 void input_preparser_Push( input_preparser_t *preparser,
     input_item_t *item, input_item_meta_request_option_t i_options,
     const input_preparser_callbacks_t *cbs, void *cbs_userdata,
-    int timeout, void *id )
+    int timeout_ms, void *id )
 {
     if( atomic_load( &preparser->deactivated ) )
         return;
@@ -287,14 +373,14 @@ void input_preparser_Push( input_preparser_t *preparser,
             return;
     }
 
-    struct input_preparser_req_t *req = ReqCreate(item, i_options,
-                                                  cbs, cbs_userdata);
+    vlc_tick_t timeout = timeout_ms == -1 ? preparser->default_timeout
+                                          : VLC_TICK_FROM_MS(timeout_ms);
+    struct input_preparser_task *task =
+        TaskNew(preparser, item, i_options, cbs, cbs_userdata, id, timeout);
 
-    if (background_worker_Push(preparser->worker, req, id, timeout))
-        if (req->cbs && cbs->on_preparse_ended)
-            cbs->on_preparse_ended(item, ITEM_PREPARSE_FAILED, cbs_userdata);
+    PreparserAddTask(preparser, task);
 
-    ReqRelease(req);
+    vlc_executor_Submit(preparser->executor, &task->runnable);
 }
 
 void input_preparser_fetcher_Push( input_preparser_t *preparser,
@@ -308,18 +394,35 @@ void input_preparser_fetcher_Push( input_preparser_t *preparser,
 
 void input_preparser_Cancel( input_preparser_t *preparser, void *id )
 {
-    background_worker_Cancel( preparser->worker, id );
+    vlc_mutex_lock(&preparser->lock);
+
+    struct input_preparser_task *task;
+    vlc_list_foreach(task, &preparser->submitted_tasks, node)
+    {
+        if (!id || task->id == id)
+        {
+            bool canceled =
+                vlc_executor_Cancel(preparser->executor, &task->runnable);
+            if (canceled)
+                FinishTask(task);
+            else
+                /* The task will be finished after run() */
+                Interrupt(task);
+        }
+    }
+
+    vlc_mutex_unlock(&preparser->lock);
 }
 
 void input_preparser_Deactivate( input_preparser_t* preparser )
 {
     atomic_store( &preparser->deactivated, true );
-    background_worker_Cancel( preparser->worker, NULL );
+    input_preparser_Cancel(preparser, NULL);
 }
 
 void input_preparser_Delete( input_preparser_t *preparser )
 {
-    background_worker_Delete( preparser->worker );
+    vlc_executor_Delete(preparser->executor);
 
     if( preparser->fetcher )
         input_fetcher_Delete( preparser->fetcher );
-- 
2.28.0



More information about the vlc-devel mailing list