[vlc-devel] [WiP] core: make the background worker multithreaded

Romain Vimont rom1v at videolabs.io
Tue Jul 10 19:11:44 CEST 2018


A way to speed up the preparsing consists in preparsing several inputs
in parallel.

For this purpose, make the background worker (used by the preparser and
fetcher) execute tasks from (possibly) several threads.

Apart from adding a new field "nthreads" in the background_worker_config
structure, the background worker API is kept unchanged.

The implementation now uses 1 queue (splitted into a separate
"task_queue") consumed by n threads.

For now, the "nthreads" threads are created on worker construction and
destroyed on worker destruction. They might be created "on demand"
instead in the future.

A test using 2 threads for preparsing a big video folder gave a speed up
of 1.8x. More performance tests are needed.
---
 src/misc/background_worker.c | 425 ++++++++++++++++++++---------------
 src/misc/background_worker.h |   2 +
 src/preparser/fetcher.c      |   1 +
 src/preparser/preparser.c    |   1 +
 4 files changed, 254 insertions(+), 175 deletions(-)

diff --git a/src/misc/background_worker.c b/src/misc/background_worker.c
index 2f21e5006a3..395a5587db9 100644
--- a/src/misc/background_worker.c
+++ b/src/misc/background_worker.c
@@ -22,185 +22,260 @@
 
 #include <assert.h>
 #include <vlc_common.h>
+#include <vlc_list.h>
 #include <vlc_threads.h>
-#include <vlc_arrays.h>
 
 #include "libvlc.h"
 #include "background_worker.h"
 
-struct bg_queued_item {
+struct task {
+    struct vlc_list node;
     void* id; /**< id associated with entity */
     void* entity; /**< the entity to process */
-    int timeout; /**< timeout duration in microseconds */
+    int timeout; /**< timeout duration in milliseconds */
+};
+
+struct task_queue {
+    vlc_mutex_t lock;
+    vlc_cond_t wait;
+    struct vlc_list list;
+    bool interrupted;
+};
+
+struct background_thread {
+    vlc_thread_t thread;
+    vlc_mutex_t lock; /**< acquire to inspect members that follow */
+    vlc_cond_t probe_cancel_wait; /**< wait for probe request or cancelation */
+    vlc_tick_t deadline; /**< deadline of the current task */
+    bool probe_request; /**< true if a probe is requested */
+    bool cancel_request; /**< true if a cancel is requested */
+    struct task *task; /**< current task */
 };
 
 struct background_worker {
     void* owner;
     struct background_worker_config conf;
 
-    vlc_mutex_t lock; /**< acquire to inspect members that follow */
-    struct {
-        bool probe_request; /**< true if a probe is requested */
-        vlc_cond_t wait; /**< wait for update in terms of head */
-        vlc_cond_t worker_wait; /**< wait for probe request or cancelation */
-        vlc_tick_t deadline; /**< deadline of the current task */
-        void* id; /**< id of the current task */
-        bool active; /**< true if there is an active thread */
-    } head;
-
-    struct {
-        vlc_cond_t wait; /**< wait for update in terms of tail */
-        vlc_array_t data; /**< queue of pending entities to process */
-    } tail;
+    struct background_thread *threads;
+    struct task_queue queue;
+};
+
+static struct task *task_Create(struct background_worker *worker, void *id, void *entity, int timeout)
+{
+    struct task *task = malloc(sizeof(*task));
+    if (unlikely(!task))
+        return NULL;
+
+    task->id = id;
+    task->entity = entity;
+    task->timeout = timeout < 0 ? worker->conf.default_timeout : timeout;
+    worker->conf.pf_hold(task->entity);
+    return task;
+}
+
+static void task_Destroy(struct background_worker *worker, struct task *task)
+{
+    worker->conf.pf_release(task->entity);
+    free(task);
+}
+
+static void task_queue_Init(struct task_queue *queue)
+{
+    vlc_mutex_init(&queue->lock);
+    vlc_cond_init(&queue->wait);
+    vlc_list_init(&queue->list);
+    queue->interrupted = false;
+}
+
+static void task_queue_Destroy(struct task_queue *queue)
+{
+    vlc_cond_destroy(&queue->wait);
+    vlc_mutex_destroy(&queue->lock);
+}
+
+/* Once interrupted, calls to task_queue_Take() will be unblocked.
+ * This is used to shutdown the worker properly. */
+static void task_queue_Interrupt(struct task_queue *queue)
+{
+    vlc_mutex_lock(&queue->lock);
+    queue->interrupted = true;
+    vlc_cond_broadcast(&queue->wait);
+    vlc_mutex_unlock(&queue->lock);
+}
+
+static struct task *task_queue_Take(struct task_queue *queue)
+{
+    vlc_mutex_lock(&queue->lock);
+    while (!queue->interrupted && vlc_list_is_empty(&queue->list))
+        vlc_cond_wait(&queue->wait, &queue->lock);
+
+    if (queue->interrupted)
+    {
+        vlc_mutex_unlock(&queue->lock);
+        return NULL;
+    }
+
+    struct task *task = vlc_list_first_entry_or_null(&queue->list, struct task, node);
+    assert(task);
+    vlc_list_remove(&task->node);
+    vlc_mutex_unlock(&queue->lock);
+
+    return task;
+}
+
+static void task_queue_Push(struct task_queue *queue, struct task *task)
+{
+    vlc_mutex_lock(&queue->lock);
+    vlc_list_append(&task->node, &queue->list);
+    vlc_mutex_unlock(&queue->lock);
+    vlc_cond_signal(&queue->wait);
+}
+
+static void task_queue_RemoveAll(struct task_queue *queue, void *id, struct vlc_list *removed)
+{
+    vlc_mutex_lock(&queue->lock);
+    struct task *task;
+    vlc_list_foreach(task, &queue->list, node)
+    {
+        if (!id || task->id == id)
+        {
+            vlc_list_remove(&task->node);
+            vlc_list_append(&task->node, removed);
+        }
+    }
+    vlc_mutex_unlock(&queue->lock);
+}
+
+static void background_thread_Init(struct background_thread *thread)
+{
+    vlc_mutex_init(&thread->lock);
+    vlc_cond_init(&thread->probe_cancel_wait);
+    thread->deadline = VLC_TICK_INVALID;
+    thread->probe_request = false;
+    thread->cancel_request = false;
+    thread->task = NULL;
+}
+
+static void background_thread_Destroy(struct background_thread *thread)
+{
+    vlc_cond_destroy(&thread->probe_cancel_wait);
+    vlc_mutex_destroy(&thread->lock);
+}
+
+/* only used for the Thread() parameter */
+struct thread_data {
+    struct background_worker *worker;
+    struct background_thread *thread;
 };
 
 static void* Thread( void* data )
 {
-    struct background_worker* worker = data;
+    struct thread_data *thread_data = data;
+    struct background_worker *worker = thread_data->worker;
+    struct background_thread *thread = thread_data->thread;
+    free(thread_data);
 
-    for( ;; )
+    for (;;)
     {
-        struct bg_queued_item* item = NULL;
-        void* handle;
-
-        vlc_mutex_lock( &worker->lock );
-        for( ;; )
-        {
-            if( vlc_array_count( &worker->tail.data ) )
-            {
-                item = vlc_array_item_at_index( &worker->tail.data, 0 );
-                handle = NULL;
-
-                vlc_array_remove( &worker->tail.data, 0 );
-            }
-
-            if( worker->head.deadline == VLC_TICK_INVALID && item == NULL )
-                worker->head.active = false;
-            worker->head.id = item ? item->id : NULL;
-            vlc_cond_broadcast( &worker->head.wait );
-
-            if( item )
-            {
-                if( item->timeout > 0 )
-                    worker->head.deadline = vlc_tick_now() + item->timeout * 1000;
-                else
-                    worker->head.deadline = INT64_MAX;
-            }
-            else if( worker->head.deadline != VLC_TICK_INVALID )
-            {
-                /* Wait 1 seconds for new inputs before terminating */
-                vlc_tick_t deadline = vlc_tick_now() + VLC_TICK_FROM_SEC(1);
-                int ret = vlc_cond_timedwait( &worker->tail.wait,
-                                              &worker->lock, deadline );
-                if( ret != 0 )
-                {
-                    /* Timeout: if there is still no items, the thread will be
-                     * terminated at next loop iteration (active = false). */
-                    worker->head.deadline = VLC_TICK_INVALID;
-                }
-                continue;
-            }
-            break;
-        }
-
-        if( !worker->head.active )
-        {
-            vlc_mutex_unlock( &worker->lock );
+        struct task *task = task_queue_Take(&worker->queue);
+        if (task == NULL)
+            /* queue is interrupted, the worker is stopping, so leave */
             break;
-        }
-        vlc_mutex_unlock( &worker->lock );
 
-        assert( item != NULL );
+        vlc_mutex_lock(&thread->lock);
+        thread->task = task;
+        thread->cancel_request = false;
+        thread->probe_request = false;
+        if (task->timeout > 0)
+            thread->deadline = vlc_tick_now() + VLC_TICK_FROM_MS(task->timeout);
+        else
+            thread->deadline = INT64_MAX; /* no deadline */
+        vlc_mutex_unlock(&thread->lock);
 
-        if( worker->conf.pf_start( worker->owner, item->entity, &handle ) )
+        void *handle;
+        if (worker->conf.pf_start(worker->owner, task->entity, &handle))
         {
-            worker->conf.pf_release( item->entity );
-            free( item );
+            vlc_mutex_lock(&thread->lock);
+            thread->task = NULL;
+            vlc_mutex_unlock(&thread->lock);
+            task_Destroy(worker, task);
             continue;
         }
 
-        for( ;; )
+        for (;;)
         {
-            vlc_mutex_lock( &worker->lock );
+            vlc_mutex_lock(&thread->lock);
+            bool timeout = false;
+            while (!timeout && !thread->probe_request && !thread->cancel_request)
+                /* any non-zero return value means timeout */
+                timeout = vlc_cond_timedwait(&thread->probe_cancel_wait, &thread->lock, thread->deadline) != 0;
 
-            bool const b_timeout = worker->head.deadline <= vlc_tick_now();
-            worker->head.probe_request = false;
+            bool cancel = thread->cancel_request;
+            thread->cancel_request = false;
+            thread->probe_request = false;
+            vlc_mutex_unlock(&thread->lock);
 
-            vlc_mutex_unlock( &worker->lock );
-
-            if( b_timeout ||
-                worker->conf.pf_probe( worker->owner, handle ) )
+            if (timeout || cancel || worker->conf.pf_probe(worker->owner, handle))
             {
-                worker->conf.pf_stop( worker->owner, handle );
-                worker->conf.pf_release( item->entity );
-                free( item );
+                worker->conf.pf_stop(worker->owner, handle);
+                vlc_mutex_lock(&thread->lock);
+                thread->task = NULL;
+                vlc_mutex_unlock(&thread->lock);
+                task_Destroy(worker, task);
                 break;
             }
-
-            vlc_mutex_lock( &worker->lock );
-            if( worker->head.probe_request == false &&
-                worker->head.deadline > vlc_tick_now() )
-            {
-                vlc_cond_timedwait( &worker->head.worker_wait, &worker->lock,
-                                     worker->head.deadline );
-            }
-            vlc_mutex_unlock( &worker->lock );
         }
     }
 
     return NULL;
 }
 
-static void BackgroundWorkerCancel( struct background_worker* worker, void* id)
-{
-    vlc_mutex_lock( &worker->lock );
-    for( size_t i = 0; i < vlc_array_count( &worker->tail.data ); )
-    {
-        struct bg_queued_item* item =
-            vlc_array_item_at_index( &worker->tail.data, i );
-
-        if( id == NULL || item->id == id )
-        {
-            vlc_array_remove( &worker->tail.data, i );
-            worker->conf.pf_release( item->entity );
-            free( item );
-            continue;
-        }
-
-        ++i;
-    }
-
-    while( ( id == NULL && worker->head.active )
-        || ( id != NULL && worker->head.id == id ) )
-    {
-        worker->head.deadline = VLC_TICK_INVALID;
-        vlc_cond_signal( &worker->head.worker_wait );
-        vlc_cond_signal( &worker->tail.wait );
-        vlc_cond_wait( &worker->head.wait, &worker->lock );
-    }
-    vlc_mutex_unlock( &worker->lock );
-}
-
 struct background_worker* background_worker_New( void* owner,
     struct background_worker_config* conf )
 {
-    struct background_worker* worker = malloc( sizeof *worker );
+    struct background_worker* worker = malloc(sizeof(*worker));
 
-    if( unlikely( !worker ) )
+    if (unlikely(!worker))
         return NULL;
 
+    int nthreads = conf->nthreads;
+    worker->threads = vlc_alloc(nthreads, sizeof(*worker));
+    if (unlikely(!worker->threads))
+    {
+        free(worker);
+        return NULL;
+    }
+
     worker->conf = *conf;
     worker->owner = owner;
-    worker->head.id = NULL;
-    worker->head.active = false;
-    worker->head.deadline = VLC_TICK_INVALID;
+    task_queue_Init(&worker->queue);
 
-    vlc_mutex_init( &worker->lock );
-    vlc_cond_init( &worker->head.wait );
-    vlc_cond_init( &worker->head.worker_wait );
-
-    vlc_array_init( &worker->tail.data );
-    vlc_cond_init( &worker->tail.wait );
+    int i;
+    for (i = 0; i < nthreads; ++i)
+    {
+        struct background_thread *thread = &worker->threads[i];
+        background_thread_Init(thread);
+        struct thread_data *data = malloc(sizeof(*data));
+        if (!data)
+            break;
+        data->worker = worker;
+        data->thread = thread;
+        // TODO create/clean the threads on demand
+        if (vlc_clone(&thread->thread, Thread, data, VLC_THREAD_PRIORITY_LOW))
+        {
+            free(data);
+            break;
+        }
+    }
+    if (i < nthreads)
+    {
+        /* something has failed, cleanup */
+        task_queue_Interrupt(&worker->queue); /* so that Take() returns NULL */
+        while (--i >= 0)
+            vlc_join(worker->threads[i].thread, NULL);
+        free(worker);
+        return NULL;
+    }
 
     return worker;
 }
@@ -208,60 +283,60 @@ struct background_worker* background_worker_New( void* owner,
 int background_worker_Push( struct background_worker* worker, void* entity,
                         void* id, int timeout )
 {
-    struct bg_queued_item* item = malloc( sizeof( *item ) );
+    struct task *task = task_Create(worker, id, entity, timeout);
+    if (unlikely(!task))
+        return VLC_ENOMEM;
 
-    if( unlikely( !item ) )
-        return VLC_EGENERIC;
-
-    item->id = id;
-    item->entity = entity;
-    item->timeout = timeout < 0 ? worker->conf.default_timeout : timeout;
-
-    vlc_mutex_lock( &worker->lock );
-    int i_ret = vlc_array_append( &worker->tail.data, item );
-    vlc_cond_signal( &worker->tail.wait );
-    if( i_ret != 0 )
-    {
-        free( item );
-        return VLC_EGENERIC;
-    }
-
-    if( worker->head.active == false )
-    {
-        worker->head.probe_request = false;
-        worker->head.active =
-            !vlc_clone_detach( NULL, Thread, worker, VLC_THREAD_PRIORITY_LOW );
-    }
-
-    if( worker->head.active )
-        worker->conf.pf_hold( item->entity );
-
-    int ret = worker->head.active ? VLC_SUCCESS : VLC_EGENERIC;
-    vlc_mutex_unlock( &worker->lock );
-
-    return ret;
+    task_queue_Push(&worker->queue, task);
+    return VLC_SUCCESS;
 }
 
 void background_worker_Cancel( struct background_worker* worker, void* id )
 {
-    BackgroundWorkerCancel( worker, id );
+    struct vlc_list removed_tasks;
+    vlc_list_init(&removed_tasks);
+    task_queue_RemoveAll(&worker->queue, id, &removed_tasks);
+
+    struct task *task;
+    vlc_list_foreach(task, &removed_tasks, node)
+        task_Destroy(worker, task);
+
+    for (int i = 0; i < worker->conf.nthreads; ++i)
+    {
+        struct background_thread *thread = &worker->threads[i];
+        vlc_mutex_lock(&thread->lock);
+        if (!id || (thread->task && thread->task->id == id && !thread->cancel_request))
+        {
+            thread->cancel_request = true;
+            vlc_cond_signal(&thread->probe_cancel_wait);
+        }
+        vlc_mutex_unlock(&thread->lock);
+    }
 }
 
 void background_worker_RequestProbe( struct background_worker* worker )
 {
-    vlc_mutex_lock( &worker->lock );
-    worker->head.probe_request = true;
-    vlc_cond_signal( &worker->head.worker_wait );
-    vlc_mutex_unlock( &worker->lock );
+    for (int i = 0; i < worker->conf.nthreads; ++i)
+    {
+        struct background_thread *thread = &worker->threads[i];
+        vlc_mutex_lock(&thread->lock);
+        thread->probe_request = true;
+        vlc_cond_signal(&thread->probe_cancel_wait);
+        vlc_mutex_unlock(&thread->lock);
+    }
 }
 
 void background_worker_Delete( struct background_worker* worker )
 {
-    BackgroundWorkerCancel( worker, NULL );
-    vlc_array_clear( &worker->tail.data );
-    vlc_mutex_destroy( &worker->lock );
-    vlc_cond_destroy( &worker->head.wait );
-    vlc_cond_destroy( &worker->head.worker_wait );
-    vlc_cond_destroy( &worker->tail.wait );
-    free( worker );
+    task_queue_Interrupt(&worker->queue);
+    background_worker_Cancel(worker, NULL);
+    for (int i = 0; i < worker->conf.nthreads; ++i)
+    {
+        struct background_thread *thread = &worker->threads[i];
+        vlc_join(thread->thread, NULL);
+        background_thread_Destroy(thread);
+    }
+
+    task_queue_Destroy(&worker->queue);
+    free(worker);
 }
diff --git a/src/misc/background_worker.h b/src/misc/background_worker.h
index 049e47d543d..a426b54dd8e 100644
--- a/src/misc/background_worker.h
+++ b/src/misc/background_worker.h
@@ -29,6 +29,8 @@ struct background_worker_config {
      **/
     vlc_tick_t default_timeout;
 
+    int nthreads;
+
     /**
      * Release an entity
      *
diff --git a/src/preparser/fetcher.c b/src/preparser/fetcher.c
index 16af948551c..12bf4d37112 100644
--- a/src/preparser/fetcher.c
+++ b/src/preparser/fetcher.c
@@ -404,6 +404,7 @@ static void WorkerInit( input_fetcher_t* fetcher,
 {
     struct background_worker_config conf = {
         .default_timeout = 0,
+        .nthreads = 2, // TODO configurable by a variable
         .pf_start = starter,
         .pf_probe = ProbeWorker,
         .pf_stop = CloseWorker,
diff --git a/src/preparser/preparser.c b/src/preparser/preparser.c
index 4331aae4411..9811e9147c9 100644
--- a/src/preparser/preparser.c
+++ b/src/preparser/preparser.c
@@ -124,6 +124,7 @@ input_preparser_t* input_preparser_New( vlc_object_t *parent )
 
     struct background_worker_config conf = {
         .default_timeout = var_InheritInteger( parent, "preparse-timeout" ),
+        .nthreads = 2, // TODO configurable by a variable
         .pf_start = PreparserOpenInput,
         .pf_probe = PreparserProbeInput,
         .pf_stop = PreparserCloseInput,
-- 
2.18.0



More information about the vlc-devel mailing list