[vlc-devel] [RFC v2 1/2] executor: introduce new executor API

Romain Vimont rom1v at videolabs.io
Tue Sep 1 18:13:46 CEST 2020


Introduce a new API to execute "runnables".
---
 include/vlc_executor.h | 149 +++++++++++++++++++++++
 src/Makefile.am        |   1 +
 src/libvlccore.sym     |   4 +
 src/misc/executor.c    | 263 +++++++++++++++++++++++++++++++++++++++++
 4 files changed, 417 insertions(+)
 create mode 100644 include/vlc_executor.h
 create mode 100644 src/misc/executor.c

diff --git a/include/vlc_executor.h b/include/vlc_executor.h
new file mode 100644
index 0000000000..63dd187387
--- /dev/null
+++ b/include/vlc_executor.h
@@ -0,0 +1,149 @@
+/*****************************************************************************
+ * vlc_executor.h
+ *****************************************************************************
+ * Copyright (C) 2020 Videolabs, VLC authors and VideoLAN
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
+ *****************************************************************************/
+
+#ifndef VLC_EXECUTOR_H
+#define VLC_EXECUTOR_H
+
+#include <vlc_common.h>
+#include <vlc_list.h>
+
+# ifdef __cplusplus
+extern "C" {
+# endif
+
+/** Executor type (opaque) */
+typedef struct vlc_executor vlc_executor_t;
+
+/**
+ * A Runnable is intended to be run from another thread by an executor.
+ */
+struct vlc_runnable {
+
+    /**
+     * This function is to be executed by a vlc_executor_t.
+     *
+     * It must implement the actions (arbitrarily long) to execute from an
+     * executor thread, synchronously. As soon as run() returns, the execution
+     * of this runnable is complete.
+     *
+     * After the runnable is submitted to an executor via
+     * vlc_executor_Submit(), the run() function is executed at most once (zero
+     * if the execution is canceled before it was started).
+     *
+     * It may not be NULL.
+     *
+     * \param userdata the userdata provided to vlc_executor_Submit()
+     */
+    void (*run)(void *userdata);
+
+    /**
+     * Userdata passed back to run().
+     */
+    void *userdata;
+
+    /* Private data used by the vlc_executor_t (do not touch) */
+    struct vlc_list node;
+};
+
+/**
+ * Create a new executor.
+ *
+ * \param max_threads the maximum number of threads used to execute runnables
+ * \return a pointer to a new executor, or NULL if an error occurred
+ */
+VLC_API vlc_executor_t *
+vlc_executor_New(unsigned max_threads);
+
+/**
+ * Delete an executor.
+ *
+ * Wait for all the threads to complete, and delete the executor instance.
+ *
+ * \param executor the executor
+ */
+VLC_API void
+vlc_executor_Delete(vlc_executor_t *executor);
+
+/**
+ * Submit a runnable for execution.
+ *
+ * The struct vlc_runnable is not copied, it must exist until the end of the
+ * execution (the user is expected to embed it in its own task structure).
+ *
+ * Here is a simple example:
+ *
+ * \code{c}
+ *  struct my_task {
+ *      char *str;
+ *      struct vlc_runnable runnable;
+ *  };
+ *
+ *  static void Run(void *userdata)
+ *  {
+ *      struct my_task *task = userdata;
+ *
+ *      printf("start of %s\n", task->str);
+ *      vlc_tick_sleep(VLC_TICK_FROM_SEC(3)); // long action
+ *      printf("end of %s\n", task->str);
+ *
+ *      free(task->str);
+ *      free(task);
+ *  }
+ *
+ *  void foo(vlc_executor_t *executor, const char *str)
+ *  {
+ *      // no error handling for brevity
+ *      struct my_task *task = malloc(sizeof(*task));
+ *      task->str = strdup(str);
+ *      task->runnable.run = Run;
+ *      task->runnable.userdata = task;
+ *      vlc_executor_Submit(executor, &task->runnable);
+ *  }
+ * \endcode
+ *
+ * \param executor the executor
+ * \param runnable the task to run
+ */
+VLC_API void
+vlc_executor_Submit(vlc_executor_t *executor, struct vlc_runnable *runnable);
+
+/**
+ * Cancel a runnable previously submitted.
+ *
+ * If this runnable is still queued (i.e. it has not be run yet), then dequeue
+ * it so that it will never be run, and return true.
+ *
+ * Otherwise, this runnable has already been taken by an executor thread (it is
+ * still running or is complete). In that case, do nothing, and return false.
+ *
+ * The result is undefined if the runnable has not been submitted.
+ *
+ * \param executor the executor
+ * \param runnable the task to cancel
+ * \return true if the runnable has been canceled before execution
+ */
+VLC_API bool
+vlc_executor_Cancel(vlc_executor_t *executor, struct vlc_runnable *runnable);
+
+# ifdef __cplusplus
+}
+# endif
+
+ #endif
diff --git a/src/Makefile.am b/src/Makefile.am
index 6dc46ab3d6..5336fd7ac0 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -352,6 +352,7 @@ libvlccore_la_SOURCES = \
 	misc/actions.c \
 	misc/background_worker.c \
 	misc/background_worker.h \
+	misc/executor.c \
 	misc/md5.c \
 	misc/probe.c \
 	misc/rand.c \
diff --git a/src/libvlccore.sym b/src/libvlccore.sym
index 2b750873fc..6d72ee03e5 100644
--- a/src/libvlccore.sym
+++ b/src/libvlccore.sym
@@ -967,3 +967,7 @@ vlc_video_context_GetType
 vlc_video_context_GetPrivate
 vlc_video_context_Hold
 vlc_video_context_HoldDevice
+vlc_executor_New
+vlc_executor_Delete
+vlc_executor_Submit
+vlc_executor_Cancel
diff --git a/src/misc/executor.c b/src/misc/executor.c
new file mode 100644
index 0000000000..274ae3eb84
--- /dev/null
+++ b/src/misc/executor.c
@@ -0,0 +1,263 @@
+/*****************************************************************************
+ * misc/executor.c
+ *****************************************************************************
+ * Copyright (C) 2020 Videolabs, VLC authors and VideoLAN
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
+ *****************************************************************************/
+
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif
+
+#include <vlc_executor.h>
+
+#include <vlc_atomic.h>
+#include <vlc_list.h>
+#include <vlc_threads.h>
+#include "libvlc.h"
+
+/**
+ * An executor can spawn several threads.
+ *
+ * This structure contains the data specific to one thread.
+ */
+struct vlc_executor_thread {
+    /** Node of vlc_executor.threads list */
+    struct vlc_list node;
+
+    /** The executor owning the thread */
+    vlc_executor_t *owner;
+
+    /** The system thread */
+    vlc_thread_t thread;
+
+    /** The current task executed by the thread, NULL if none */
+    struct vlc_runnable *current_task;
+};
+
+/**
+ * The executor (also vlc_executor_t, exposed as opaque type in the public
+ * header).
+ */
+struct vlc_executor {
+    vlc_mutex_t lock;
+
+    /** Maximum number of threads to run the tasks */
+    unsigned max_threads;
+
+    /** List of active vlc_executor_thread */
+    struct vlc_list threads;
+
+    /** Thread count (in a separate field to quickly compare to max_threads) */
+    unsigned nthreads;
+
+    /* Number of tasks requested but not finished. */
+    unsigned unfinished;
+
+    /** Queue of vlc_runnable */
+    struct vlc_list queue;
+
+    /** Wait for the queue to be non-empty */
+    vlc_cond_t queue_wait;
+
+    /** True if executor deletion is requested */
+    bool closing;
+};
+
+static void
+QueuePush(vlc_executor_t *executor, struct vlc_runnable *runnable)
+{
+    vlc_mutex_assert(&executor->lock);
+
+    vlc_list_append(&runnable->node, &executor->queue);
+    vlc_cond_signal(&executor->queue_wait);
+}
+
+static struct vlc_runnable *
+QueueTake(vlc_executor_t *executor)
+{
+    vlc_mutex_assert(&executor->lock);
+
+    while (!executor->closing && vlc_list_is_empty(&executor->queue))
+        vlc_cond_wait(&executor->queue_wait, &executor->lock);
+
+    if (executor->closing)
+        return NULL;
+
+    struct vlc_runnable *runnable =
+        vlc_list_first_entry_or_null(&executor->queue, struct vlc_runnable,
+                                     node);
+    assert(runnable);
+    vlc_list_remove(&runnable->node);
+
+    /* Set links to NULL to know that it has been taken by a thread in
+     * vlc_executor_Cancel() */
+    runnable->node.prev = runnable->node.next = NULL;
+
+    return runnable;
+}
+
+static void *
+ThreadRun(void *userdata)
+{
+    struct vlc_executor_thread *thread = userdata;
+    vlc_executor_t *executor = thread->owner;
+
+    vlc_mutex_lock(&executor->lock);
+
+    for (;;)
+    {
+        struct vlc_runnable *runnable = QueueTake(executor);
+        if (!runnable)
+            /* The executor is closing, terminate this thread */
+            break;
+
+        thread->current_task = runnable;
+        vlc_mutex_unlock(&executor->lock);
+
+        /* Execute the user-provided runnable, without the executor lock */
+        runnable->run(runnable->userdata);
+
+        vlc_mutex_lock(&executor->lock);
+        thread->current_task = NULL;
+
+        assert(executor->unfinished > 0);
+        --executor->unfinished;
+    }
+
+    vlc_mutex_unlock(&executor->lock);
+
+    return NULL;
+}
+
+static int
+SpawnThread(vlc_executor_t *executor)
+{
+    assert(executor->nthreads < executor->max_threads);
+
+    struct vlc_executor_thread *thread = malloc(sizeof(*thread));
+    if (!thread)
+        return VLC_ENOMEM;
+
+    thread->owner = executor;
+    thread->current_task = NULL;
+
+    if (vlc_clone(&thread->thread, ThreadRun, thread, VLC_THREAD_PRIORITY_LOW))
+    {
+        free(thread);
+        return VLC_EGENERIC;
+    }
+
+    executor->nthreads++;
+    vlc_list_append(&thread->node, &executor->threads);
+
+    return VLC_SUCCESS;
+}
+
+vlc_executor_t *
+vlc_executor_New(unsigned max_threads)
+{
+    assert(max_threads);
+    vlc_executor_t *executor = malloc(sizeof(*executor));
+    if (!executor)
+        return NULL;
+
+    vlc_mutex_init(&executor->lock);
+
+    executor->max_threads = max_threads;
+    executor->nthreads = 0;
+    executor->unfinished = 0;
+
+    vlc_list_init(&executor->threads);
+    vlc_list_init(&executor->queue);
+
+    vlc_cond_init(&executor->queue_wait);
+
+    executor->closing = false;
+
+    /* Create one thread on init so that vlc_executor_Submit() may never fail */
+    int ret = SpawnThread(executor);
+    if (ret != VLC_SUCCESS)
+    {
+        free(executor);
+        return NULL;
+    }
+
+    return executor;
+}
+
+void
+vlc_executor_Submit(vlc_executor_t *executor, struct vlc_runnable *runnable)
+{
+    vlc_mutex_lock(&executor->lock);
+
+    assert(!executor->closing);
+
+    QueuePush(executor, runnable);
+
+    if (++executor->unfinished > executor->nthreads
+            && executor->nthreads < executor->max_threads)
+        /* If it fails, this is not an error, there is at least one thread */
+        SpawnThread(executor);
+
+    vlc_mutex_unlock(&executor->lock);
+}
+
+bool
+vlc_executor_Cancel(vlc_executor_t *executor, struct vlc_runnable *runnable)
+{
+    vlc_mutex_lock(&executor->lock);
+
+    /* Either both prev and next are set, either both are NULL */
+    assert(!!runnable->node.prev == !!runnable->node.next);
+
+    bool in_queue = runnable->node.prev;
+    if (in_queue)
+        vlc_list_remove(&runnable->node);
+
+    vlc_mutex_unlock(&executor->lock);
+
+    return in_queue;
+}
+
+void
+vlc_executor_Delete(vlc_executor_t *executor)
+{
+    vlc_mutex_lock(&executor->lock);
+
+    executor->closing = true;
+
+    /* All the tasks must be canceled on delete */
+    assert(vlc_list_is_empty(&executor->queue));
+
+    vlc_mutex_unlock(&executor->lock);
+
+    /* "closing" is now true, this will wake up threads */
+    vlc_cond_broadcast(&executor->queue_wait);
+
+    /* The threads list may not be written at this point, so it is safe to read
+     * it without mutex locked (the mutex must be released to join the
+     * threads). */
+
+    struct vlc_executor_thread *thread;
+    vlc_list_foreach(thread, &executor->threads, node)
+    {
+        vlc_join(thread->thread, NULL);
+        free(thread);
+    }
+
+    free(executor);
+}
-- 
2.28.0



More information about the vlc-devel mailing list