[vlc-devel] [PATCH 1/7] playlist/background_worker: introduce background-worker utility

Filip Roséen filip at atch.se
Fri Mar 24 03:28:28 CET 2017


This added utility will make it easier to handle a queue of tasks that
is to be finished in the order received. It allows for users of the
utility to focus on the end-goal instead of having to deal with
synchronization issues in terms of task-queue handling.

--

Changes since previous submission:

 - added documentation
 - moved from src/playlist/ to src/misc
 - background-worker are now constructed through a single call
 - background-worker internals are now fully private
 - rewrote thread-synchronization logic to be more robust (and easier
   to prove correct)
---
 src/Makefile.am              |   2 +
 src/misc/background_worker.c | 235 +++++++++++++++++++++++++++++++++++++++++++
 src/misc/background_worker.h | 182 +++++++++++++++++++++++++++++++++
 3 files changed, 419 insertions(+)
 create mode 100644 src/misc/background_worker.c
 create mode 100644 src/misc/background_worker.h

diff --git a/src/Makefile.am b/src/Makefile.am
index 1e0e70fa86..208128d076 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -310,6 +310,8 @@ libvlccore_la_SOURCES = \
 	text/filesystem.c \
 	text/iso_lang.c \
 	text/iso-639_def.h \
+	misc/background_worker.c \
+	misc/background_worker.h \
 	misc/md5.c \
 	misc/probe.c \
 	misc/rand.c \
diff --git a/src/misc/background_worker.c b/src/misc/background_worker.c
new file mode 100644
index 0000000000..527509fb98
--- /dev/null
+++ b/src/misc/background_worker.c
@@ -0,0 +1,235 @@
+/*****************************************************************************
+ * Copyright (C) 2017 VLC authors and VideoLAN
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
+ *****************************************************************************/
+
+#ifdef HAVE_CONFIG_H
+#  include "config.h"
+#endif
+
+#include <vlc_common.h>
+#include <vlc_threads.h>
+#include <vlc_arrays.h>
+
+#include "libvlc.h"
+#include "background_worker.h"
+
+struct bg_queued_item {
+    void* id; /**< id associated with entity */
+    void* entity; /**< the entity to process */
+    int timeout; /**< timeout duration in microseconds */
+};
+
+struct background_worker {
+    void* owner;
+    struct background_worker_config conf;
+
+    struct {
+        bool probe_request; /**< true if a probe is requested */
+        vlc_mutex_t lock; /**< acquire to inspect members that follow */
+        vlc_cond_t wait; /**< wait for update in terms of head */
+        mtime_t deadline; /**< deadline of the current task */
+        void* id; /**< id of the current task */
+        bool active; /**< true if there is an active thread */
+    } head;
+
+    struct {
+        vlc_mutex_t lock; /**< acquire to inspect members that follow */
+        vlc_array_t data; /**< queue of pending entities to process */
+    } tail;
+};
+
+static void* Thread( void* data )
+{
+    struct background_worker* worker = data;
+
+    for( ;; )
+    {
+        struct bg_queued_item* item = NULL;
+        void* handle;
+
+        vlc_mutex_lock( &worker->tail.lock );
+        {
+            if( vlc_array_count( &worker->tail.data ) )
+            {
+                item = vlc_array_item_at_index( &worker->tail.data, 0 );
+                handle = NULL;
+
+                vlc_array_remove( &worker->tail.data, 0 );
+            }
+
+            vlc_mutex_lock( &worker->head.lock );
+            {
+                worker->head.deadline = INT64_MAX;
+                worker->head.active = item != NULL;
+                worker->head.id = item ? item->id : NULL;
+
+                if( item && item->timeout > 0 )
+                    worker->head.deadline = mdate() + item->timeout;
+            }
+            vlc_cond_broadcast( &worker->head.wait );
+            vlc_mutex_unlock( &worker->head.lock );
+        }
+        vlc_mutex_unlock( &worker->tail.lock );
+
+        if( item == NULL )
+            break;
+
+        if( worker->conf.pf_start( worker->owner, item->entity, &handle ) )
+        {
+            worker->conf.pf_release( item->entity );
+            free( item );
+            continue;
+        }
+
+        for( ;; )
+        {
+            vlc_mutex_lock( &worker->head.lock );
+
+            bool const b_timeout = worker->head.deadline <= mdate();
+            worker->head.probe_request = false;
+
+            vlc_mutex_unlock( &worker->head.lock );
+
+            if( b_timeout ||
+                worker->conf.pf_probe( worker->owner, handle ) )
+            {
+                worker->conf.pf_stop( worker->owner, handle );
+                worker->conf.pf_release( item->entity );
+                free( item );
+                break;
+            }
+
+            vlc_mutex_lock( &worker->head.lock );
+            if( worker->head.probe_request == false &&
+                worker->head.deadline > mdate() )
+            {
+                vlc_cond_timedwait( &worker->head.wait, &worker->head.lock,
+                                     worker->head.deadline );
+            }
+            vlc_mutex_unlock( &worker->head.lock );
+        }
+    }
+
+    return NULL;
+}
+
+static void BackgroundWorkerCancel( struct background_worker* worker, void* id)
+{
+    vlc_mutex_lock( &worker->tail.lock );
+    for( size_t i = 0; i < vlc_array_count( &worker->tail.data ); )
+    {
+        struct bg_queued_item* item =
+            vlc_array_item_at_index( &worker->tail.data, i );
+
+        if( id == NULL || item->id == id )
+        {
+            vlc_array_remove( &worker->tail.data, i );
+            worker->conf.pf_release( item->entity );
+            free( item );
+            continue;
+        }
+
+        ++i;
+    }
+    vlc_mutex_unlock( &worker->tail.lock );
+
+    vlc_mutex_lock( &worker->head.lock );
+    while( ( id == NULL && worker->head.active )
+        || ( id != NULL && worker->head.id == id ) )
+    {
+        worker->head.deadline = VLC_TS_0;
+        vlc_cond_broadcast( &worker->head.wait );
+        vlc_cond_wait( &worker->head.wait, &worker->head.lock );
+    }
+    vlc_mutex_unlock( &worker->head.lock );
+}
+
+struct background_worker* background_worker_New( void* owner,
+    struct background_worker_config* conf )
+{
+    struct background_worker* worker = malloc( sizeof *worker );
+
+    if( unlikely( !worker ) )
+        return NULL;
+
+    worker->conf = *conf;
+    worker->owner = owner;
+    worker->head.id = NULL;
+    worker->head.active = false;
+    worker->head.deadline = VLC_TS_INVALID;
+
+    vlc_mutex_init( &worker->head.lock );
+    vlc_cond_init( &worker->head.wait );
+
+    vlc_array_init( &worker->tail.data );
+    vlc_mutex_init( &worker->tail.lock );
+
+    return worker;
+}
+
+int background_worker_Push( struct background_worker* worker, void* entity,
+                        void* id, int timeout )
+{
+    struct bg_queued_item* item = malloc( sizeof( *item ) );
+
+    if( unlikely( !item ) )
+        return VLC_EGENERIC;
+
+    item->id = id;
+    item->entity = entity;
+    item->timeout = timeout ? timeout*1000 : worker->conf.default_timeout;
+
+    vlc_mutex_lock( &worker->tail.lock );
+    vlc_array_append( &worker->tail.data, item );
+    vlc_mutex_unlock( &worker->tail.lock );
+
+    vlc_mutex_lock( &worker->head.lock );
+    if( worker->head.active == false )
+    {
+        worker->head.probe_request = false;
+        worker->head.active =
+            !vlc_clone_detach( NULL, Thread, worker, VLC_THREAD_PRIORITY_LOW );
+    }
+
+    if( worker->head.active )
+        worker->conf.pf_hold( item->entity );
+
+    int ret = worker->head.active ? VLC_SUCCESS : VLC_EGENERIC;
+    vlc_mutex_unlock( &worker->head.lock );
+
+    return ret;
+}
+
+void background_worker_Cancel( struct background_worker* worker, void* id )
+{
+    BackgroundWorkerCancel( worker, id );
+}
+
+void background_worker_RequestProbe( struct background_worker* worker )
+{
+    vlc_mutex_lock( &worker->head.lock );
+    worker->head.probe_request = true;
+    vlc_cond_broadcast( &worker->head.wait );
+    vlc_mutex_unlock( &worker->head.lock );
+}
+
+void background_worker_Delete( struct background_worker* worker )
+{
+    BackgroundWorkerCancel( worker, NULL );
+    vlc_array_clear( &worker->tail.data );
+    free( worker );
+}
diff --git a/src/misc/background_worker.h b/src/misc/background_worker.h
new file mode 100644
index 0000000000..8232c1d80a
--- /dev/null
+++ b/src/misc/background_worker.h
@@ -0,0 +1,182 @@
+/*****************************************************************************
+ * Copyright (C) 2017 VLC authors and VideoLAN
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
+ *****************************************************************************/
+
+#ifndef BACKGROUND_WORKER_H__
+#define BACKGROUND_WORKER_H__
+
+struct background_worker_config {
+    /**
+     * Default timeout for completing a task
+     *
+     * If `0` a task can run indefinitely without being killed, whereas a value
+     * other than `0` denotes the maximum number of milliseconds a task can run
+     * before \ref pf_stop is called to kill it.
+     **/
+    mtime_t default_timeout;
+
+    /**
+     * Release an entity
+     *
+     * This callback will be called in order to decrement the ref-count of a
+     * entity within the background-worker. It will happen either when \ref
+     * pf_stop has finished executing, or if the entity is removed from the
+     * queue (through \ref background_worker_Cancel)
+     *
+     * \param entity the entity to release
+     **/
+    void( *pf_release )( void* entity );
+
+    /**
+     * Hold a queued item
+     *
+     * This callback will be called in order to increment the ref-count of an
+     * entity. It will happen when the entity is pushed into the queue of
+     * pending tasks as part of \ref background_worker_Push.
+     *
+     * \param entity the entity to hold
+     **/
+    void( *pf_hold )( void* entity );
+
+    /**
+     * Start a new task
+     *
+     * This callback is called in order to construct a new background task. In
+     * order for the background-worker to be able to continue processing
+     * incoming requests, \ref pf_start is meant to start a task (such as a
+     * thread), and then store the associated handle in `*out`.
+     *
+     * The value of `*out` will then be the value of the argument named `handle`
+     * in terms of \ref pf_probe and \ref pf_stop.
+     *
+     * \param owner the owner of the background-worker
+     * \param entity the entity for which a task is to be created
+     * \param out [out] `*out` shall, on success, refer to the handle associated
+     *                   with the running task.
+     * \return VLC_SUCCESS if a task was created, an error-code on failure.
+     **/
+    int( *pf_start )( void* owner, void* entity, void** out );
+
+    /**
+     * Probe a running task
+     *
+     * This callback is called in order to see whether or not a running task has
+     * finished or not. It can be called anytime between a successful call to
+     * \ref pf_start, and the corresponding call to \ref pf_stop.
+     *
+     * \param owner the owner of the background-worker
+     * \param handle the handle associated with the running task
+     * \return 0 if the task is still running, any other value if finished.
+     **/
+    int( *pf_probe )( void* owner, void* handle );
+
+    /**
+     * Stop a running task
+     *
+     * This callback is called in order to stop a running task. If \ref pf_start
+     * has created a non-detached thread, \ref pf_stop is where you would
+     * interrupt and then join it.
+     *
+     * \warning This function is called either after \ref pf_probe has stated
+     *          that the task has finished, or if the timeout (if any) for the
+     *          task has been reached.
+     *
+     * \param owner the owner of the background-worker
+     * \parma handle the handle associated with the task to be stopped
+     **/
+    void( *pf_stop )( void* owner, void* handle );
+};
+
+/**
+ * Create a background-worker
+ *
+ * This function creates a new background-worker using the passed configuration.
+ *
+ * \warning all members of `config` shall have been set by the caller.
+ * \warning the returned resource must be destroyed using \ref
+ *          background_worker_Delete on success.
+ *
+ * \param owner the owner of the background-worker
+ * \param config the background-worker's configuration
+ * \return a pointer-to the created background-worker on success,
+ *         `NULL` on failure.
+ **/
+struct background_worker* background_worker_New( void* owner,
+    struct background_worker_config* config );
+
+/**
+ * Request the background-worker to probe the current task
+ *
+ * This function is used to signal the background-worker that it should do
+ * another probe to see whether the current task is still alive.
+ *
+ * \warning Note that the function will not wait for the probing to finish, it
+ *          will simply ask the background worker to recheck it as soon as
+ *          possible.
+ *
+ * \param worker the background-worker
+ **/
+void background_worker_RequestProbe( struct background_worker* worker );
+
+/**
+ * Push an entity into the background-worker
+ *
+ * This function is used to push an entity into the queue of pending work. The
+ * entities will be processed in the order in which they are received (in terms
+ * of the order of invocations in a single-threaded environment).
+ *
+ * \param worker the background-worker
+ * \param entity the entity which is to be queued
+ * \param id a value suitable for identifying the entity, or `NULL`
+ * \param timeout the timeout of the entity in milliseconds, if `0` the
+ *        default-timeout of the background-worker will be used.
+ * \return VLC_SUCCESS if the entity was successfully queued, an error-code on
+ *         failure.
+ **/
+int background_worker_Push( struct background_worker* worker, void* entity,
+    void* id, int timeout );
+
+/**
+ * Remove entities from the background-worker
+ *
+ * This function is used to remove processing of a certain entity given its
+ * associated id, or to remove all queued (including currently running)
+ * entities.
+ *
+ * \warning if the `id` passed refers to an entity that is currently being
+ *          processed, the call will block until the task has been terminated.
+ *
+ * \param worker the background-worker
+ * \param id NULL if every entity shall be removed, and the currently running
+ *        task (if any) shall be cancelled.
+ **/
+void background_worker_Cancel( struct background_worker* worker, void* id );
+
+/**
+ * Delete a background-worker
+ *
+ * This function will destroy a background-worker created through \ref
+ * background_worker_New. It will effectively stop the currently running task,
+ * if any, and empty the queue of pending entities.
+ *
+ * \warning If there is a currently running task, the function will block until
+ *          it has been stopped.
+ *
+ * \param worker the background-worker
+ **/
+void background_worker_Delete( struct background_worker* worker );
+#endif
-- 
2.12.1


More information about the vlc-devel mailing list