[vlc-devel] [PATCH 02/14] misc: background_worker: make the background worker multithreaded
Romain Vimont
rom1v at videolabs.io
Thu Aug 16 16:01:59 CEST 2018
A way to speed up the preparsing consists in preparsing several inputs
in parallel.
For this purpose, make the background worker (used by the preparser and
fetcher) execute tasks from (possibly) several threads.
Apart from adding a new field "max_threads" in the
background_worker_config structure, the background worker API is kept
unchanged.
Two new options are added to configure the maximum number of threads
used for preparsing and fetching:
- preparse-threads
- fetch-art-threads
---
src/libvlc-module.c | 14 ++
src/misc/background_worker.c | 437 +++++++++++++++++++++--------------
src/misc/background_worker.h | 20 ++
src/preparser/fetcher.c | 1 +
src/preparser/preparser.c | 1 +
5 files changed, 304 insertions(+), 169 deletions(-)
diff --git a/src/libvlc-module.c b/src/libvlc-module.c
index 82862ab474..cac63239c6 100644
--- a/src/libvlc-module.c
+++ b/src/libvlc-module.c
@@ -1125,6 +1125,14 @@ static const char *const ppsz_prefres[] = {
#define PREPARSE_TIMEOUT_LONGTEXT N_( \
"Maximum time allowed to preparse an item, in milliseconds" )
+#define PREPARSE_THREADS_TEXT N_( "Preparsing threads" )
+#define PREPARSE_THREADS_LONGTEXT N_( \
+ "Maximum number of threads used to preparse items" )
+
+#define FETCH_ART_THREADS_TEXT N_( "Fetch-art threads" )
+#define FETCH_ART_THREADS_LONGTEXT N_( \
+ "Maximum number of threads used to fetch art" )
+
#define METADATA_NETWORK_TEXT N_( "Allow metadata network access" )
static const char *const psz_recursive_list[] = {
@@ -2110,6 +2118,12 @@ vlc_module_begin ()
add_integer( "preparse-timeout", 5000, PREPARSE_TIMEOUT_TEXT,
PREPARSE_TIMEOUT_LONGTEXT, false )
+ add_integer( "preparse-threads", 2, PREPARSE_THREADS_TEXT,
+ PREPARSE_THREADS_LONGTEXT, false )
+
+ add_integer( "fetch-art-threads", 2, FETCH_ART_THREADS_TEXT,
+ FETCH_ART_THREADS_LONGTEXT, false )
+
add_obsolete_integer( "album-art" )
add_bool( "metadata-network-access", false, METADATA_NETWORK_TEXT,
METADATA_NETWORK_TEXT, false )
diff --git a/src/misc/background_worker.c b/src/misc/background_worker.c
index 2f21e5006a..1f2796e5d3 100644
--- a/src/misc/background_worker.c
+++ b/src/misc/background_worker.c
@@ -22,246 +22,345 @@
#include <assert.h>
#include <vlc_common.h>
+#include <vlc_atomic.h>
+#include <vlc_list.h>
#include <vlc_threads.h>
-#include <vlc_arrays.h>
#include "libvlc.h"
#include "background_worker.h"
-struct bg_queued_item {
+struct task {
+ struct vlc_list node;
void* id; /**< id associated with entity */
void* entity; /**< the entity to process */
- int timeout; /**< timeout duration in microseconds */
+ int timeout; /**< timeout duration in milliseconds */
+};
+
+struct background_worker;
+
+struct background_thread {
+ struct background_worker *owner;
+ vlc_cond_t probe_cancel_wait; /**< wait for probe request or cancelation */
+ bool probe; /**< true if a probe is requested */
+ bool cancel; /**< true if a cancel is requested */
+ struct task *task; /**< current task */
+ struct vlc_list node;
};
struct background_worker {
void* owner;
struct background_worker_config conf;
- vlc_mutex_t lock; /**< acquire to inspect members that follow */
- struct {
- bool probe_request; /**< true if a probe is requested */
- vlc_cond_t wait; /**< wait for update in terms of head */
- vlc_cond_t worker_wait; /**< wait for probe request or cancelation */
- vlc_tick_t deadline; /**< deadline of the current task */
- void* id; /**< id of the current task */
- bool active; /**< true if there is an active thread */
- } head;
-
- struct {
- vlc_cond_t wait; /**< wait for update in terms of tail */
- vlc_array_t data; /**< queue of pending entities to process */
- } tail;
+ vlc_mutex_t lock;
+
+ int uncompleted; /**< number of tasks requested but not completed */
+ int nthreads; /**< number of threads in the threads list */
+ struct vlc_list threads; /**< list of active background_thread instances */
+
+ struct vlc_list queue; /**< queue of tasks */
+ vlc_cond_t queue_wait; /**< wait for the queue to be non-empty */
+
+ vlc_cond_t nothreads_wait; /**< wait for nthreads == 0 */
+ bool closing; /**< true if background worker deletion is requested */
};
-static void* Thread( void* data )
+static struct task *task_Create(struct background_worker *worker, void *id,
+ void *entity, int timeout)
{
- struct background_worker* worker = data;
+ struct task *task = malloc(sizeof(*task));
+ if (unlikely(!task))
+ return NULL;
- for( ;; )
- {
- struct bg_queued_item* item = NULL;
- void* handle;
+ task->id = id;
+ task->entity = entity;
+ task->timeout = timeout < 0 ? worker->conf.default_timeout : timeout;
+ worker->conf.pf_hold(task->entity);
+ return task;
+}
- vlc_mutex_lock( &worker->lock );
- for( ;; )
- {
- if( vlc_array_count( &worker->tail.data ) )
- {
- item = vlc_array_item_at_index( &worker->tail.data, 0 );
- handle = NULL;
+static void task_Destroy(struct background_worker *worker, struct task *task)
+{
+ worker->conf.pf_release(task->entity);
+ free(task);
+}
- vlc_array_remove( &worker->tail.data, 0 );
- }
+static struct task *QueueTake(struct background_worker *worker, int timeout_ms)
+{
+ vlc_assert_locked(&worker->lock);
- if( worker->head.deadline == VLC_TICK_INVALID && item == NULL )
- worker->head.active = false;
- worker->head.id = item ? item->id : NULL;
- vlc_cond_broadcast( &worker->head.wait );
+ vlc_tick_t deadline = vlc_tick_now() + VLC_TICK_FROM_MS(timeout_ms);
+ bool timeout = false;
+ while (!timeout && !worker->closing && vlc_list_is_empty(&worker->queue))
+ timeout = vlc_cond_timedwait(&worker->queue_wait,
+ &worker->lock, deadline) != 0;
- if( item )
- {
- if( item->timeout > 0 )
- worker->head.deadline = vlc_tick_now() + item->timeout * 1000;
- else
- worker->head.deadline = INT64_MAX;
- }
- else if( worker->head.deadline != VLC_TICK_INVALID )
- {
- /* Wait 1 seconds for new inputs before terminating */
- vlc_tick_t deadline = vlc_tick_now() + VLC_TICK_FROM_SEC(1);
- int ret = vlc_cond_timedwait( &worker->tail.wait,
- &worker->lock, deadline );
- if( ret != 0 )
- {
- /* Timeout: if there is still no items, the thread will be
- * terminated at next loop iteration (active = false). */
- worker->head.deadline = VLC_TICK_INVALID;
- }
- continue;
- }
- break;
+ if (worker->closing || timeout)
+ return NULL;
+
+ struct task *task = vlc_list_first_entry_or_null(&worker->queue,
+ struct task, node);
+ assert(task);
+ vlc_list_remove(&task->node);
+
+ return task;
+}
+
+static void QueuePush(struct background_worker *worker, struct task *task)
+{
+ vlc_assert_locked(&worker->lock);
+ vlc_list_append(&task->node, &worker->queue);
+ vlc_cond_signal(&worker->queue_wait);
+}
+
+static void QueueRemoveAll(struct background_worker *worker, void *id)
+{
+ vlc_assert_locked(&worker->lock);
+ struct task *task;
+ vlc_list_foreach(task, &worker->queue, node)
+ {
+ if (!id || task->id == id)
+ {
+ vlc_list_remove(&task->node);
+ task_Destroy(worker, task);
}
+ }
+}
+
+static struct background_thread *
+background_thread_Create(struct background_worker *owner)
+{
+ struct background_thread *thread = malloc(sizeof(*thread));
+ if (!thread)
+ return NULL;
+
+ vlc_cond_init(&thread->probe_cancel_wait);
+ thread->probe = false;
+ thread->cancel = false;
+ thread->task = NULL;
+ thread->owner = owner;
+ return thread;
+}
+
+static void background_thread_Destroy(struct background_thread *thread)
+{
+ vlc_cond_destroy(&thread->probe_cancel_wait);
+ free(thread);
+}
+
+static struct background_worker *background_worker_Create(void *owner,
+ struct background_worker_config *conf)
+{
+ struct background_worker* worker = malloc(sizeof(*worker));
+ if (unlikely(!worker))
+ return NULL;
+
+ worker->conf = *conf;
+ worker->owner = owner;
+
+ vlc_mutex_init(&worker->lock);
+ worker->uncompleted = 0;
+ worker->nthreads = 0;
+ vlc_list_init(&worker->threads);
+ vlc_list_init(&worker->queue);
+ vlc_cond_init(&worker->queue_wait);
+ vlc_cond_init(&worker->nothreads_wait);
+ worker->closing = false;
+ return worker;
+}
+
+static void background_worker_Destroy(struct background_worker *worker)
+{
+ vlc_cond_destroy(&worker->queue_wait);
+ vlc_mutex_destroy(&worker->lock);
+ free(worker);
+}
+
+static void EndTask(struct background_thread *thread, struct task *task)
+{
+ struct background_worker *worker = thread->owner;
+ task_Destroy(worker, task);
+
+ vlc_mutex_lock(&worker->lock);
+ thread->task = NULL;
+ worker->uncompleted--;
+ assert(worker->uncompleted >= 0);
+ vlc_mutex_unlock(&worker->lock);
+}
+
+static void RemoveThread(struct background_thread *thread)
+{
+ struct background_worker *worker = thread->owner;
+
+ vlc_mutex_lock(&worker->lock);
+
+ vlc_list_remove(&thread->node);
+ worker->nthreads--;
+ assert(worker->nthreads >= 0);
+ if (!worker->nthreads)
+ vlc_cond_signal(&worker->nothreads_wait);
+
+ vlc_mutex_unlock(&worker->lock);
- if( !worker->head.active )
+ background_thread_Destroy(thread);
+}
+
+static void* Thread( void* data )
+{
+ struct background_thread *thread = data;
+ struct background_worker *worker = thread->owner;
+
+ for (;;)
+ {
+ vlc_mutex_lock(&worker->lock);
+ struct task *task = QueueTake(worker, 5000);
+ if (!task)
{
- vlc_mutex_unlock( &worker->lock );
+ vlc_mutex_unlock(&worker->lock);
+ /* terminate this thread */
break;
}
- vlc_mutex_unlock( &worker->lock );
-
- assert( item != NULL );
- if( worker->conf.pf_start( worker->owner, item->entity, &handle ) )
+ thread->task = task;
+ thread->cancel = false;
+ thread->probe = false;
+ vlc_tick_t deadline;
+ if (task->timeout > 0)
+ deadline = vlc_tick_now() + VLC_TICK_FROM_MS(task->timeout);
+ else
+ deadline = INT64_MAX; /* no deadline */
+ vlc_mutex_unlock(&worker->lock);
+
+ void *handle;
+ if (worker->conf.pf_start(worker->owner, task->entity, &handle))
{
- worker->conf.pf_release( item->entity );
- free( item );
+ EndTask(thread, task);
continue;
}
- for( ;; )
+ for (;;)
{
- vlc_mutex_lock( &worker->lock );
-
- bool const b_timeout = worker->head.deadline <= vlc_tick_now();
- worker->head.probe_request = false;
-
- vlc_mutex_unlock( &worker->lock );
-
- if( b_timeout ||
- worker->conf.pf_probe( worker->owner, handle ) )
+ vlc_mutex_lock(&worker->lock);
+ bool timeout = false;
+ while (!timeout && !thread->probe && !thread->cancel)
+ /* any non-zero return value means timeout */
+ timeout = vlc_cond_timedwait(&thread->probe_cancel_wait,
+ &worker->lock, deadline) != 0;
+
+ bool cancel = thread->cancel;
+ thread->cancel = false;
+ thread->probe = false;
+ vlc_mutex_unlock(&worker->lock);
+
+ if (timeout || cancel
+ || worker->conf.pf_probe(worker->owner, handle))
{
- worker->conf.pf_stop( worker->owner, handle );
- worker->conf.pf_release( item->entity );
- free( item );
+ worker->conf.pf_stop(worker->owner, handle);
+ EndTask(thread, task);
break;
}
-
- vlc_mutex_lock( &worker->lock );
- if( worker->head.probe_request == false &&
- worker->head.deadline > vlc_tick_now() )
- {
- vlc_cond_timedwait( &worker->head.worker_wait, &worker->lock,
- worker->head.deadline );
- }
- vlc_mutex_unlock( &worker->lock );
}
}
+ RemoveThread(thread);
+
return NULL;
}
-static void BackgroundWorkerCancel( struct background_worker* worker, void* id)
+static bool SpawnThread(struct background_worker *worker)
{
- vlc_mutex_lock( &worker->lock );
- for( size_t i = 0; i < vlc_array_count( &worker->tail.data ); )
- {
- struct bg_queued_item* item =
- vlc_array_item_at_index( &worker->tail.data, i );
+ vlc_assert_locked(&worker->lock);
- if( id == NULL || item->id == id )
- {
- vlc_array_remove( &worker->tail.data, i );
- worker->conf.pf_release( item->entity );
- free( item );
- continue;
- }
+ struct background_thread *thread = background_thread_Create(worker);
+ if (!thread)
+ return false;
- ++i;
- }
-
- while( ( id == NULL && worker->head.active )
- || ( id != NULL && worker->head.id == id ) )
+ if (vlc_clone_detach(NULL, Thread, thread, VLC_THREAD_PRIORITY_LOW))
{
- worker->head.deadline = VLC_TICK_INVALID;
- vlc_cond_signal( &worker->head.worker_wait );
- vlc_cond_signal( &worker->tail.wait );
- vlc_cond_wait( &worker->head.wait, &worker->lock );
+ free(thread);
+ return false;
}
- vlc_mutex_unlock( &worker->lock );
+ worker->nthreads++;
+ vlc_list_append(&thread->node, &worker->threads);
+
+ return true;
}
struct background_worker* background_worker_New( void* owner,
struct background_worker_config* conf )
{
- struct background_worker* worker = malloc( sizeof *worker );
-
- if( unlikely( !worker ) )
- return NULL;
-
- worker->conf = *conf;
- worker->owner = owner;
- worker->head.id = NULL;
- worker->head.active = false;
- worker->head.deadline = VLC_TICK_INVALID;
-
- vlc_mutex_init( &worker->lock );
- vlc_cond_init( &worker->head.wait );
- vlc_cond_init( &worker->head.worker_wait );
-
- vlc_array_init( &worker->tail.data );
- vlc_cond_init( &worker->tail.wait );
-
- return worker;
+ return background_worker_Create(owner, conf);
}
int background_worker_Push( struct background_worker* worker, void* entity,
void* id, int timeout )
{
- struct bg_queued_item* item = malloc( sizeof( *item ) );
-
- if( unlikely( !item ) )
- return VLC_EGENERIC;
+ struct task *task = task_Create(worker, id, entity, timeout);
+ if (unlikely(!task))
+ return VLC_ENOMEM;
+
+ vlc_mutex_lock(&worker->lock);
+ QueuePush(worker, task);
+ if (++worker->uncompleted > worker->nthreads
+ && worker->nthreads < worker->conf.max_threads)
+ SpawnThread(worker);
+ vlc_mutex_unlock(&worker->lock);
+
+ return VLC_SUCCESS;
+}
- item->id = id;
- item->entity = entity;
- item->timeout = timeout < 0 ? worker->conf.default_timeout : timeout;
+static void BackgroundWorkerCancelLocked(struct background_worker *worker,
+ void *id)
+{
+ vlc_assert_locked(&worker->lock);
- vlc_mutex_lock( &worker->lock );
- int i_ret = vlc_array_append( &worker->tail.data, item );
- vlc_cond_signal( &worker->tail.wait );
- if( i_ret != 0 )
- {
- free( item );
- return VLC_EGENERIC;
- }
+ QueueRemoveAll(worker, id);
- if( worker->head.active == false )
+ struct background_thread *thread;
+ vlc_list_foreach(thread, &worker->threads, node)
{
- worker->head.probe_request = false;
- worker->head.active =
- !vlc_clone_detach( NULL, Thread, worker, VLC_THREAD_PRIORITY_LOW );
+ if (!id || (thread->task && thread->task->id == id && !thread->cancel))
+ {
+ thread->cancel = true;
+ vlc_cond_signal(&thread->probe_cancel_wait);
+ }
}
-
- if( worker->head.active )
- worker->conf.pf_hold( item->entity );
-
- int ret = worker->head.active ? VLC_SUCCESS : VLC_EGENERIC;
- vlc_mutex_unlock( &worker->lock );
-
- return ret;
}
void background_worker_Cancel( struct background_worker* worker, void* id )
{
- BackgroundWorkerCancel( worker, id );
+ vlc_mutex_lock(&worker->lock);
+ BackgroundWorkerCancelLocked(worker, id);
+ vlc_mutex_unlock(&worker->lock);
}
void background_worker_RequestProbe( struct background_worker* worker )
{
- vlc_mutex_lock( &worker->lock );
- worker->head.probe_request = true;
- vlc_cond_signal( &worker->head.worker_wait );
- vlc_mutex_unlock( &worker->lock );
+ vlc_mutex_lock(&worker->lock);
+
+ struct background_thread *thread;
+ vlc_list_foreach(thread, &worker->threads, node)
+ {
+ thread->probe = true;
+ vlc_cond_signal(&thread->probe_cancel_wait);
+ }
+
+ vlc_mutex_unlock(&worker->lock);
}
void background_worker_Delete( struct background_worker* worker )
{
- BackgroundWorkerCancel( worker, NULL );
- vlc_array_clear( &worker->tail.data );
- vlc_mutex_destroy( &worker->lock );
- vlc_cond_destroy( &worker->head.wait );
- vlc_cond_destroy( &worker->head.worker_wait );
- vlc_cond_destroy( &worker->tail.wait );
- free( worker );
+ vlc_mutex_lock(&worker->lock);
+
+ worker->closing = true;
+ BackgroundWorkerCancelLocked(worker, NULL);
+ /* closing is now true, this will wake up any QueueTake() */
+ vlc_cond_broadcast(&worker->queue_wait);
+
+ while (worker->nthreads)
+ vlc_cond_wait(&worker->nothreads_wait, &worker->lock);
+
+ vlc_mutex_unlock(&worker->lock);
+
+ /* no threads use the worker anymore, we can destroy it */
+ background_worker_Destroy(worker);
}
diff --git a/src/misc/background_worker.h b/src/misc/background_worker.h
index 049e47d543..abe65159ff 100644
--- a/src/misc/background_worker.h
+++ b/src/misc/background_worker.h
@@ -29,6 +29,11 @@ struct background_worker_config {
**/
vlc_tick_t default_timeout;
+ /**
+ * Maximum number of threads used to execute tasks.
+ */
+ int max_threads;
+
/**
* Release an entity
*
@@ -37,6 +42,9 @@ struct background_worker_config {
* pf_stop has finished executing, or if the entity is removed from the
* queue (through \ref background_worker_Cancel)
*
+ * \warning As each task might be executed in parallel by different threads,
+ * this callback must be thread-safe.
+ *
* \param entity the entity to release
**/
void( *pf_release )( void* entity );
@@ -48,6 +56,9 @@ struct background_worker_config {
* entity. It will happen when the entity is pushed into the queue of
* pending tasks as part of \ref background_worker_Push.
*
+ * \warning As each task might be executed in parallel by different threads,
+ * this callback must be thread-safe.
+ *
* \param entity the entity to hold
**/
void( *pf_hold )( void* entity );
@@ -63,6 +74,9 @@ struct background_worker_config {
* The value of `*out` will then be the value of the argument named `handle`
* in terms of \ref pf_probe and \ref pf_stop.
*
+ * \warning As each task might be executed in parallel by different threads,
+ * this callback must be thread-safe.
+ *
* \param owner the owner of the background-worker
* \param entity the entity for which a task is to be created
* \param out [out] `*out` shall, on success, refer to the handle associated
@@ -78,6 +92,9 @@ struct background_worker_config {
* finished or not. It can be called anytime between a successful call to
* \ref pf_start, and the corresponding call to \ref pf_stop.
*
+ * \warning As each task might be executed in parallel by different threads,
+ * this callback must be thread-safe.
+ *
* \param owner the owner of the background-worker
* \param handle the handle associated with the running task
* \return 0 if the task is still running, any other value if finished.
@@ -95,6 +112,9 @@ struct background_worker_config {
* that the task has finished, or if the timeout (if any) for the
* task has been reached.
*
+ * \warning As each task might be executed in parallel by different threads,
+ * this callback must be thread-safe.
+ *
* \param owner the owner of the background-worker
* \parma handle the handle associated with the task to be stopped
**/
diff --git a/src/preparser/fetcher.c b/src/preparser/fetcher.c
index 16af948551..6d095963af 100644
--- a/src/preparser/fetcher.c
+++ b/src/preparser/fetcher.c
@@ -404,6 +404,7 @@ static void WorkerInit( input_fetcher_t* fetcher,
{
struct background_worker_config conf = {
.default_timeout = 0,
+ .max_threads = var_InheritInteger( fetcher->owner, "fetch-art-threads" ),
.pf_start = starter,
.pf_probe = ProbeWorker,
.pf_stop = CloseWorker,
diff --git a/src/preparser/preparser.c b/src/preparser/preparser.c
index 3d292b7c5e..42ab9d7694 100644
--- a/src/preparser/preparser.c
+++ b/src/preparser/preparser.c
@@ -153,6 +153,7 @@ input_preparser_t* input_preparser_New( vlc_object_t *parent )
struct background_worker_config conf = {
.default_timeout = var_InheritInteger( parent, "preparse-timeout" ),
+ .max_threads = var_InheritInteger( parent, "preparse-threads" ),
.pf_start = PreparserOpenInput,
.pf_probe = PreparserProbeInput,
.pf_stop = PreparserCloseInput,
--
2.18.0
More information about the vlc-devel
mailing list