[vlc-devel] [PATCH] misc: background_worker: make the background worker multithreaded

Rémi Denis-Courmont remi at remlab.net
Tue Jul 17 18:06:19 CEST 2018


Any default limit larger than one will, with large enough user base, cause failures and performance problems. Of course, a default limit of one makes the feature mostly useless. A rock and a hard place...

However, the first problem is the lack of timeout IIRC - not the lack of parallelism. Spawn however many threads, and without timeout, you might not make any progress anyway.

Le 17 juillet 2018 14:24:56 GMT+03:00, Romain Vimont <rom1v at videolabs.io> a écrit :
>A way to speed up the preparsing consists in preparsing several inputs
>in parallel.
>For this purpose, make the background worker (used by the preparser and
>fetcher) execute tasks from (possibly) several threads.
>Apart from adding a new field "max_threads" in the
>background_worker_config structure, the background worker API is kept
>Two new options are added to configure the maximum number of threads
>used for preparsing and fetching:
> - preparse-threads
> - fetch-art-threads
>This patch depends on:
>    input: preparser: prepare for multithreaded background worker
> src/libvlc-module.c          |  14 ++
> src/misc/background_worker.c | 435 +++++++++++++++++++++--------------
> src/misc/background_worker.h |  20 ++
> src/preparser/fetcher.c      |   1 +
> src/preparser/preparser.c    |   1 +
> 5 files changed, 303 insertions(+), 168 deletions(-)
>diff --git a/src/libvlc-module.c b/src/libvlc-module.c
>index 82862ab4745..cac63239c65 100644
>--- a/src/libvlc-module.c
>+++ b/src/libvlc-module.c
>@@ -1125,6 +1125,14 @@ static const char *const ppsz_prefres[] = {
>     "Maximum time allowed to preparse an item, in milliseconds" )
>+#define PREPARSE_THREADS_TEXT N_( "Preparsing threads" )
>+    "Maximum number of threads used to preparse items" )
>+#define FETCH_ART_THREADS_TEXT N_( "Fetch-art threads" )
>+    "Maximum number of threads used to fetch art" )
> #define METADATA_NETWORK_TEXT N_( "Allow metadata network access" )
> static const char *const psz_recursive_list[] = {
>@@ -2110,6 +2118,12 @@ vlc_module_begin ()
>     add_integer( "preparse-timeout", 5000, PREPARSE_TIMEOUT_TEXT,
>                  PREPARSE_TIMEOUT_LONGTEXT, false )
>+    add_integer( "preparse-threads", 2, PREPARSE_THREADS_TEXT,
>+                 PREPARSE_THREADS_LONGTEXT, false )
>+    add_integer( "fetch-art-threads", 2, FETCH_ART_THREADS_TEXT,
>+                 FETCH_ART_THREADS_LONGTEXT, false )
>     add_obsolete_integer( "album-art" )
>     add_bool( "metadata-network-access", false, METADATA_NETWORK_TEXT,
>                  METADATA_NETWORK_TEXT, false )
>diff --git a/src/misc/background_worker.c
>index 2f21e5006a3..1f2796e5d39 100644
>--- a/src/misc/background_worker.c
>+++ b/src/misc/background_worker.c
>@@ -22,246 +22,345 @@
> #include <assert.h>
> #include <vlc_common.h>
>+#include <vlc_atomic.h>
>+#include <vlc_list.h>
> #include <vlc_threads.h>
>-#include <vlc_arrays.h>
> #include "libvlc.h"
> #include "background_worker.h"
>-struct bg_queued_item {
>+struct task {
>+    struct vlc_list node;
>     void* id; /**< id associated with entity */
>     void* entity; /**< the entity to process */
>-    int timeout; /**< timeout duration in microseconds */
>+    int timeout; /**< timeout duration in milliseconds */
>+struct background_worker;
>+struct background_thread {
>+    struct background_worker *owner;
>+    vlc_cond_t probe_cancel_wait; /**< wait for probe request or
>cancelation */
>+    bool probe; /**< true if a probe is requested */
>+    bool cancel; /**< true if a cancel is requested */
>+    struct task *task; /**< current task */
>+    struct vlc_list node;
> };
> struct background_worker {
>     void* owner;
>     struct background_worker_config conf;
>-    vlc_mutex_t lock; /**< acquire to inspect members that follow */
>-    struct {
>-        bool probe_request; /**< true if a probe is requested */
>-        vlc_cond_t wait; /**< wait for update in terms of head */
>-        vlc_cond_t worker_wait; /**< wait for probe request or
>cancelation */
>-        vlc_tick_t deadline; /**< deadline of the current task */
>-        void* id; /**< id of the current task */
>-        bool active; /**< true if there is an active thread */
>-    } head;
>+    vlc_mutex_t lock;
>-    struct {
>-        vlc_cond_t wait; /**< wait for update in terms of tail */
>-        vlc_array_t data; /**< queue of pending entities to process */
>-    } tail;
>+    int uncompleted; /**< number of tasks requested but not completed
>+    int nthreads; /**< number of threads in the threads list */
>+    struct vlc_list threads; /**< list of active background_thread
>instances */
>+    struct vlc_list queue; /**< queue of tasks */
>+    vlc_cond_t queue_wait; /**< wait for the queue to be non-empty */
>+    vlc_cond_t nothreads_wait; /**< wait for nthreads == 0 */
>+    bool closing; /**< true if background worker deletion is requested
> };
>+static struct task *task_Create(struct background_worker *worker, void
>+                                void *entity, int timeout)
>+    struct task *task = malloc(sizeof(*task));
>+    if (unlikely(!task))
>+        return NULL;
>+    task->id = id;
>+    task->entity = entity;
>+    task->timeout = timeout < 0 ? worker->conf.default_timeout :
>+    worker->conf.pf_hold(task->entity);
>+    return task;
>+static void task_Destroy(struct background_worker *worker, struct task
>+    worker->conf.pf_release(task->entity);
>+    free(task);
>+static struct task *QueueTake(struct background_worker *worker, int
>+    vlc_assert_locked(&worker->lock);
>+    vlc_tick_t deadline = vlc_tick_now() +
>+    bool timeout = false;
>+    while (!timeout && !worker->closing &&
>+        timeout = vlc_cond_timedwait(&worker->queue_wait,
>+                                     &worker->lock, deadline) != 0;
>+    if (worker->closing || timeout)
>+        return NULL;
>+    struct task *task = vlc_list_first_entry_or_null(&worker->queue,
>+                                                     struct task,
>+    assert(task);
>+    vlc_list_remove(&task->node);
>+    return task;
>+static void QueuePush(struct background_worker *worker, struct task
>+    vlc_assert_locked(&worker->lock);
>+    vlc_list_append(&task->node, &worker->queue);
>+    vlc_cond_signal(&worker->queue_wait);
>+static void QueueRemoveAll(struct background_worker *worker, void *id)
>+    vlc_assert_locked(&worker->lock);
>+    struct task *task;
>+    vlc_list_foreach(task, &worker->queue, node)
>+    {
>+        if (!id || task->id == id)
>+        {
>+            vlc_list_remove(&task->node);
>+            task_Destroy(worker, task);
>+        }
>+    }
>+static struct background_thread *
>+background_thread_Create(struct background_worker *owner)
>+    struct background_thread *thread = malloc(sizeof(*thread));
>+    if (!thread)
>+        return NULL;
>+    vlc_cond_init(&thread->probe_cancel_wait);
>+    thread->probe = false;
>+    thread->cancel = false;
>+    thread->task = NULL;
>+    thread->owner = owner;
>+    return thread;
>+static void background_thread_Destroy(struct background_thread
>+    vlc_cond_destroy(&thread->probe_cancel_wait);
>+    free(thread);
>+static struct background_worker *background_worker_Create(void *owner,
>+                                         struct
>background_worker_config *conf)
>+    struct background_worker* worker = malloc(sizeof(*worker));
>+    if (unlikely(!worker))
>+        return NULL;
>+    worker->conf = *conf;
>+    worker->owner = owner;
>+    vlc_mutex_init(&worker->lock);
>+    worker->uncompleted = 0;
>+    worker->nthreads = 0;
>+    vlc_list_init(&worker->threads);
>+    vlc_list_init(&worker->queue);
>+    vlc_cond_init(&worker->queue_wait);
>+    vlc_cond_init(&worker->nothreads_wait);
>+    worker->closing = false;
>+    return worker;
>+static void background_worker_Destroy(struct background_worker
>+    vlc_cond_destroy(&worker->queue_wait);
>+    vlc_mutex_destroy(&worker->lock);
>+    free(worker);
>+static void EndTask(struct background_thread *thread, struct task
>+    struct background_worker *worker = thread->owner;
>+    task_Destroy(worker, task);
>+    vlc_mutex_lock(&worker->lock);
>+    thread->task = NULL;
>+    worker->uncompleted--;
>+    assert(worker->uncompleted >= 0);
>+    vlc_mutex_unlock(&worker->lock);
>+static void RemoveThread(struct background_thread *thread)
>+    struct background_worker *worker = thread->owner;
>+    vlc_mutex_lock(&worker->lock);
>+    vlc_list_remove(&thread->node);
>+    worker->nthreads--;
>+    assert(worker->nthreads >= 0);
>+    if (!worker->nthreads)
>+        vlc_cond_signal(&worker->nothreads_wait);
>+    vlc_mutex_unlock(&worker->lock);
>+    background_thread_Destroy(thread);
> static void* Thread( void* data )
> {
>-    struct background_worker* worker = data;
>+    struct background_thread *thread = data;
>+    struct background_worker *worker = thread->owner;
>-    for( ;; )
>+    for (;;)
>     {
>-        struct bg_queued_item* item = NULL;
>-        void* handle;
>-        vlc_mutex_lock( &worker->lock );
>-        for( ;; )
>-        {
>-            if( vlc_array_count( &worker->tail.data ) )
>-            {
>-                item = vlc_array_item_at_index( &worker->tail.data, 0
>-                handle = NULL;
>-                vlc_array_remove( &worker->tail.data, 0 );
>-            }
>-            if( worker->head.deadline == VLC_TICK_INVALID && item ==
>-                worker->head.active = false;
>-            worker->head.id = item ? item->id : NULL;
>-            vlc_cond_broadcast( &worker->head.wait );
>-            if( item )
>-            {
>-                if( item->timeout > 0 )
>-                    worker->head.deadline = vlc_tick_now() +
>item->timeout * 1000;
>-                else
>-                    worker->head.deadline = INT64_MAX;
>-            }
>-            else if( worker->head.deadline != VLC_TICK_INVALID )
>-            {
>-                /* Wait 1 seconds for new inputs before terminating */
>-                vlc_tick_t deadline = vlc_tick_now() +
>-                int ret = vlc_cond_timedwait( &worker->tail.wait,
>-                                              &worker->lock, deadline
>-                if( ret != 0 )
>-                {
>-                    /* Timeout: if there is still no items, the thread
>will be
>-                     * terminated at next loop iteration (active =
>false). */
>-                    worker->head.deadline = VLC_TICK_INVALID;
>-                }
>-                continue;
>-            }
>-            break;
>-        }
>-        if( !worker->head.active )
>+        vlc_mutex_lock(&worker->lock);
>+        struct task *task = QueueTake(worker, 5000);
>+        if (!task)
>         {
>-            vlc_mutex_unlock( &worker->lock );
>+            vlc_mutex_unlock(&worker->lock);
>+            /* terminate this thread */
>             break;
>         }
>-        vlc_mutex_unlock( &worker->lock );
>-        assert( item != NULL );
>+        thread->task = task;
>+        thread->cancel = false;
>+        thread->probe = false;
>+        vlc_tick_t deadline;
>+        if (task->timeout > 0)
>+            deadline = vlc_tick_now() +
>+        else
>+            deadline = INT64_MAX; /* no deadline */
>+        vlc_mutex_unlock(&worker->lock);
>-        if( worker->conf.pf_start( worker->owner, item->entity,
>&handle ) )
>+        void *handle;
>+        if (worker->conf.pf_start(worker->owner, task->entity,
>         {
>-            worker->conf.pf_release( item->entity );
>-            free( item );
>+            EndTask(thread, task);
>             continue;
>         }
>-        for( ;; )
>+        for (;;)
>         {
>-            vlc_mutex_lock( &worker->lock );
>+            vlc_mutex_lock(&worker->lock);
>+            bool timeout = false;
>+            while (!timeout && !thread->probe && !thread->cancel)
>+                /* any non-zero return value means timeout */
>+                timeout =
>+                                             &worker->lock, deadline)
>!= 0;
>-            bool const b_timeout = worker->head.deadline <=
>-            worker->head.probe_request = false;
>+            bool cancel = thread->cancel;
>+            thread->cancel = false;
>+            thread->probe = false;
>+            vlc_mutex_unlock(&worker->lock);
>-            vlc_mutex_unlock( &worker->lock );
>-            if( b_timeout ||
>-                worker->conf.pf_probe( worker->owner, handle ) )
>+            if (timeout || cancel
>+                    || worker->conf.pf_probe(worker->owner, handle))
>             {
>-                worker->conf.pf_stop( worker->owner, handle );
>-                worker->conf.pf_release( item->entity );
>-                free( item );
>+                worker->conf.pf_stop(worker->owner, handle);
>+                EndTask(thread, task);
>                 break;
>             }
>-            vlc_mutex_lock( &worker->lock );
>-            if( worker->head.probe_request == false &&
>-                worker->head.deadline > vlc_tick_now() )
>-            {
>-                vlc_cond_timedwait( &worker->head.worker_wait,
>-                                     worker->head.deadline );
>-            }
>-            vlc_mutex_unlock( &worker->lock );
>         }
>     }
>+    RemoveThread(thread);
>     return NULL;
> }
>-static void BackgroundWorkerCancel( struct background_worker* worker,
>void* id)
>+static bool SpawnThread(struct background_worker *worker)
> {
>-    vlc_mutex_lock( &worker->lock );
>-    for( size_t i = 0; i < vlc_array_count( &worker->tail.data ); )
>-    {
>-        struct bg_queued_item* item =
>-            vlc_array_item_at_index( &worker->tail.data, i );
>-        if( id == NULL || item->id == id )
>-        {
>-            vlc_array_remove( &worker->tail.data, i );
>-            worker->conf.pf_release( item->entity );
>-            free( item );
>-            continue;
>-        }
>+    vlc_assert_locked(&worker->lock);
>-        ++i;
>-    }
>+    struct background_thread *thread =
>+    if (!thread)
>+        return false;
>-    while( ( id == NULL && worker->head.active )
>-        || ( id != NULL && worker->head.id == id ) )
>+    if (vlc_clone_detach(NULL, Thread, thread,
>     {
>-        worker->head.deadline = VLC_TICK_INVALID;
>-        vlc_cond_signal( &worker->head.worker_wait );
>-        vlc_cond_signal( &worker->tail.wait );
>-        vlc_cond_wait( &worker->head.wait, &worker->lock );
>+        free(thread);
>+        return false;
>     }
>-    vlc_mutex_unlock( &worker->lock );
>+    worker->nthreads++;
>+    vlc_list_append(&thread->node, &worker->threads);
>+    return true;
> }
> struct background_worker* background_worker_New( void* owner,
>     struct background_worker_config* conf )
> {
>-    struct background_worker* worker = malloc( sizeof *worker );
>-    if( unlikely( !worker ) )
>-        return NULL;
>-    worker->conf = *conf;
>-    worker->owner = owner;
>-    worker->head.id = NULL;
>-    worker->head.active = false;
>-    worker->head.deadline = VLC_TICK_INVALID;
>-    vlc_mutex_init( &worker->lock );
>-    vlc_cond_init( &worker->head.wait );
>-    vlc_cond_init( &worker->head.worker_wait );
>-    vlc_array_init( &worker->tail.data );
>-    vlc_cond_init( &worker->tail.wait );
>-    return worker;
>+    return background_worker_Create(owner, conf);
> }
>int background_worker_Push( struct background_worker* worker, void*
>                         void* id, int timeout )
> {
>-    struct bg_queued_item* item = malloc( sizeof( *item ) );
>+    struct task *task = task_Create(worker, id, entity, timeout);
>+    if (unlikely(!task))
>+        return VLC_ENOMEM;
>-    if( unlikely( !item ) )
>-        return VLC_EGENERIC;
>+    vlc_mutex_lock(&worker->lock);
>+    QueuePush(worker, task);
>+    if (++worker->uncompleted > worker->nthreads
>+            && worker->nthreads < worker->conf.max_threads)
>+        SpawnThread(worker);
>+    vlc_mutex_unlock(&worker->lock);
>-    item->id = id;
>-    item->entity = entity;
>-    item->timeout = timeout < 0 ? worker->conf.default_timeout :
>+    return VLC_SUCCESS;
>-    vlc_mutex_lock( &worker->lock );
>-    int i_ret = vlc_array_append( &worker->tail.data, item );
>-    vlc_cond_signal( &worker->tail.wait );
>-    if( i_ret != 0 )
>-    {
>-        free( item );
>-        return VLC_EGENERIC;
>-    }
>+static void BackgroundWorkerCancelLocked(struct background_worker
>+                                         void *id)
>+    vlc_assert_locked(&worker->lock);
>+    QueueRemoveAll(worker, id);
>-    if( worker->head.active == false )
>+    struct background_thread *thread;
>+    vlc_list_foreach(thread, &worker->threads, node)
>     {
>-        worker->head.probe_request = false;
>-        worker->head.active =
>-            !vlc_clone_detach( NULL, Thread, worker,
>+        if (!id || (thread->task && thread->task->id == id &&
>+        {
>+            thread->cancel = true;
>+            vlc_cond_signal(&thread->probe_cancel_wait);
>+        }
>     }
>-    if( worker->head.active )
>-        worker->conf.pf_hold( item->entity );
>-    int ret = worker->head.active ? VLC_SUCCESS : VLC_EGENERIC;
>-    vlc_mutex_unlock( &worker->lock );
>-    return ret;
> }
>void background_worker_Cancel( struct background_worker* worker, void*
>id )
> {
>-    BackgroundWorkerCancel( worker, id );
>+    vlc_mutex_lock(&worker->lock);
>+    BackgroundWorkerCancelLocked(worker, id);
>+    vlc_mutex_unlock(&worker->lock);
> }
>void background_worker_RequestProbe( struct background_worker* worker )
> {
>-    vlc_mutex_lock( &worker->lock );
>-    worker->head.probe_request = true;
>-    vlc_cond_signal( &worker->head.worker_wait );
>-    vlc_mutex_unlock( &worker->lock );
>+    vlc_mutex_lock(&worker->lock);
>+    struct background_thread *thread;
>+    vlc_list_foreach(thread, &worker->threads, node)
>+    {
>+        thread->probe = true;
>+        vlc_cond_signal(&thread->probe_cancel_wait);
>+    }
>+    vlc_mutex_unlock(&worker->lock);
> }
> void background_worker_Delete( struct background_worker* worker )
> {
>-    BackgroundWorkerCancel( worker, NULL );
>-    vlc_array_clear( &worker->tail.data );
>-    vlc_mutex_destroy( &worker->lock );
>-    vlc_cond_destroy( &worker->head.wait );
>-    vlc_cond_destroy( &worker->head.worker_wait );
>-    vlc_cond_destroy( &worker->tail.wait );
>-    free( worker );
>+    vlc_mutex_lock(&worker->lock);
>+    worker->closing = true;
>+    BackgroundWorkerCancelLocked(worker, NULL);
>+    /* closing is now true, this will wake up any QueueTake() */
>+    vlc_cond_broadcast(&worker->queue_wait);
>+    while (worker->nthreads)
>+        vlc_cond_wait(&worker->nothreads_wait, &worker->lock);
>+    vlc_mutex_unlock(&worker->lock);
>+    /* no threads use the worker anymore, we can destroy it */
>+    background_worker_Destroy(worker);
> }
>diff --git a/src/misc/background_worker.h
>index 049e47d543d..abe65159ff2 100644
>--- a/src/misc/background_worker.h
>+++ b/src/misc/background_worker.h
>@@ -29,6 +29,11 @@ struct background_worker_config {
>      **/
>     vlc_tick_t default_timeout;
>+    /**
>+     * Maximum number of threads used to execute tasks.
>+     */
>+    int max_threads;
>     /**
>      * Release an entity
>      *
>@@ -37,6 +42,9 @@ struct background_worker_config {
> * pf_stop has finished executing, or if the entity is removed from the
>      * queue (through \ref background_worker_Cancel)
>      *
>+     * \warning As each task might be executed in parallel by
>different threads,
>+     *          this callback must be thread-safe.
>+     *
>      * \param entity the entity to release
>      **/
>     void( *pf_release )( void* entity );
>@@ -48,6 +56,9 @@ struct background_worker_config {
>   * entity. It will happen when the entity is pushed into the queue of
>      * pending tasks as part of \ref background_worker_Push.
>      *
>+     * \warning As each task might be executed in parallel by
>different threads,
>+     *          this callback must be thread-safe.
>+     *
>      * \param entity the entity to hold
>      **/
>     void( *pf_hold )( void* entity );
>@@ -63,6 +74,9 @@ struct background_worker_config {
>* The value of `*out` will then be the value of the argument named
>      * in terms of \ref pf_probe and \ref pf_stop.
>      *
>+     * \warning As each task might be executed in parallel by
>different threads,
>+     *          this callback must be thread-safe.
>+     *
>      * \param owner the owner of the background-worker
>      * \param entity the entity for which a task is to be created
>* \param out [out] `*out` shall, on success, refer to the handle
>@@ -78,6 +92,9 @@ struct background_worker_config {
>* finished or not. It can be called anytime between a successful call
>      * \ref pf_start, and the corresponding call to \ref pf_stop.
>      *
>+     * \warning As each task might be executed in parallel by
>different threads,
>+     *          this callback must be thread-safe.
>+     *
>      * \param owner the owner of the background-worker
>      * \param handle the handle associated with the running task
> * \return 0 if the task is still running, any other value if finished.
>@@ -95,6 +112,9 @@ struct background_worker_config {
>*          that the task has finished, or if the timeout (if any) for
>      *          task has been reached.
>      *
>+     * \warning As each task might be executed in parallel by
>different threads,
>+     *          this callback must be thread-safe.
>+     *
>      * \param owner the owner of the background-worker
>      * \parma handle the handle associated with the task to be stopped
>      **/
>diff --git a/src/preparser/fetcher.c b/src/preparser/fetcher.c
>index 16af948551c..6d095963afb 100644
>--- a/src/preparser/fetcher.c
>+++ b/src/preparser/fetcher.c
>@@ -404,6 +404,7 @@ static void WorkerInit( input_fetcher_t* fetcher,
> {
>     struct background_worker_config conf = {
>         .default_timeout = 0,
>+        .max_threads = var_InheritInteger( fetcher->owner,
>"fetch-art-threads" ),
>         .pf_start = starter,
>         .pf_probe = ProbeWorker,
>         .pf_stop = CloseWorker,
>diff --git a/src/preparser/preparser.c b/src/preparser/preparser.c
>index 529464e4b89..1f3833f5bc5 100644
>--- a/src/preparser/preparser.c
>+++ b/src/preparser/preparser.c
>@@ -153,6 +153,7 @@ input_preparser_t* input_preparser_New(
>vlc_object_t *parent )
>     struct background_worker_config conf = {
>   .default_timeout = var_InheritInteger( parent, "preparse-timeout" ),
>+        .max_threads = var_InheritInteger( parent, "preparse-threads"
>         .pf_start = PreparserOpenInput,
>         .pf_probe = PreparserProbeInput,
>         .pf_stop = PreparserCloseInput,
>vlc-devel mailing list
>To unsubscribe or modify your subscription options:

Envoyé de mon appareil Android avec Courriel K-9 Mail. Veuillez excuser ma brièveté.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://mailman.videolan.org/pipermail/vlc-devel/attachments/20180717/b512d970/attachment-0001.html>

More information about the vlc-devel mailing list