[vlc-devel] [RFC 1/2] executor: introduce new executor API

Romain Vimont rom1v at videolabs.io
Wed Aug 26 19:19:56 CEST 2020


Introduce a new API to execute "runnables". The execution can be
automatically interrupted on timeout or via an explicit cancelation
request.
---
 include/vlc_executor.h | 194 +++++++++++++++
 src/Makefile.am        |   1 +
 src/libvlccore.sym     |   4 +
 src/misc/executor.c    | 545 +++++++++++++++++++++++++++++++++++++++++
 4 files changed, 744 insertions(+)
 create mode 100644 include/vlc_executor.h
 create mode 100644 src/misc/executor.c

diff --git a/include/vlc_executor.h b/include/vlc_executor.h
new file mode 100644
index 0000000000..4dd8b93628
--- /dev/null
+++ b/include/vlc_executor.h
@@ -0,0 +1,194 @@
+/*****************************************************************************
+ * vlc_executor.h
+ *****************************************************************************
+ * Copyright (C) 2020 Videolabs, VLC authors and VideoLAN
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
+ *****************************************************************************/
+
+#ifndef VLC_EXECUTOR_H
+#define VLC_EXECUTOR_H
+
+#include <vlc_common.h>
+#include <vlc_tick.h>
+
+# ifdef __cplusplus
+extern "C" {
+# endif
+
+/** Executor type (opaque) */
+typedef struct vlc_executor vlc_executor_t;
+
+/**
+ * A Runnable is intended to be run from another thread by an executor.
+ */
+struct vlc_runnable {
+
+    /**
+     * This function is to be executed by a vlc_executor_t.
+     *
+     * It must implement the actions (arbitrarily long) to execute from an
+     * executor thread, synchronously. As soon as run() returns, the execution
+     * of this runnable is complete.
+     *
+     * After the runnable is submitted to an executor via
+     * vlc_executor_Submit(), the run() function is executed at most once (zero
+     * if the execution is canceled before it was started).
+     *
+     * It may not be NULL.
+     *
+     * \param userdata the userdata provided to vlc_executor_Submit()
+     */
+    void
+    (*run)(void *userdata);
+
+    /**
+     * This function attempts to interrupt the execution of run().
+     *
+     * If not NULL, it may be called on vlc_executor_Cancel() or on timeout.
+     *
+     * It is called from a thread different from the one executing run(). It is
+     * free to do any actions to "interrupt" the execution of run() (set a
+     * flag, close a file descriptor, etc.).
+     *
+     * The runnable will be considered "finished" once run() actually
+     * terminates.
+     *
+     * It should be quick, not to block the interruption of other runnables.
+     *
+     * \param userdata the userdata provided to vlc_executor_Submit()
+     */
+    void
+    (*interrupt)(void *userdata);
+
+    /**
+     * This function notifies the end of the execution.
+     *
+     * If not NULL, it is called either:
+     *  - when run() terminates;
+     *  - when the task is canceled before run() is called.
+     *
+     * In other words, it is always called in the end if vlc_executor_Submit()
+     * returns VLC_SUCCESS.
+     *
+     * \param userdata the userdata provided to vlc_executor_Submit()
+     */
+    void
+    (*on_finished)(void *userdata);
+};
+
+/**
+ * Create a new executor.
+ *
+ * \param parent      a VLC object
+ * \param max_threads the maximum number of threads used to execute runnables
+ * \return a pointer to a new executor, or NULL if an error occurred
+ */
+VLC_API vlc_executor_t *
+vlc_executor_New(vlc_object_t *parent, unsigned max_threads);
+
+/**
+ * Delete an executor.
+ *
+ * Cancels all the pending an running task (interrupting them if necessary),
+ * waits for all the thread to complete, and delete the executor instance.
+ *
+ * \param executor the executor
+ */
+VLC_API void
+vlc_executor_Delete(vlc_executor_t *executor);
+
+/**
+ * Submit a runnable for execution.
+ *
+ * The struct vlc_runnable is not copied, it must exist until the end of the
+ * execution (the user is expected to pass a pointer to a static const
+ * structure).
+ *
+ * An id may optionally be provided in order to explicitly cancel the task
+ * later with vlc_executor_Cancel().
+ *
+ * A timeout may optionally be provided to interrupt the execution after some
+ * delay. The way the interruption is actually handled must be provided by the
+ * user via the runnable callback "interrupt". It is an error to provide a
+ * timeout without an interrupt callback.
+ *
+ * Here is a simple example (without interruption):
+ *
+ * \code{c}
+ *  static void Run(void *userdata)
+ *  {
+ *      char *str = userdata;
+ *      printf("start of %s\n", str);
+ *      sleep(3);
+ *      printf("end of %s\n", str);
+ *  }
+ *
+ *  static void OnFinished(void *userdata)
+ *  {
+ *      free(userdata);
+ *  }
+ *
+ *  static const struct vlc_runnable runnable = {
+ *      .run = Run,
+ *      .on_finished = OnFinished,
+ *  };
+ *
+ *  void foo(vlc_executor_t *executor, const char *name)
+ *  {
+ *      // error handling replaced by assertions for brevity
+ *      char *str = strdup(name);
+ *      assert(str);
+ *      int ret = vlc_executor_Submit(executor, &runnable, str, NULL, 0);
+ *      assert(ret == VLC_SUCCESS);
+ *  }
+ * \endcode
+ *
+ * \param executor the executor
+ * \param runnable the task to run
+ * \param userdata the userdata to pass to all runnable callbacks
+ * \param id       a user-provided id to associate to the task, used to cancel
+ *                 the execution (may be NULL)
+ * \param timeout  the delay before the task is automatically interrupted
+ *                 (0 for no timeout, negative values are illegal)
+ * \return VLC_SUCCESS on success, another value on error
+ */
+VLC_API int
+vlc_executor_Submit(vlc_executor_t *executor,
+                    const struct vlc_runnable *runnable, void *userdata,
+                    void *id, vlc_tick_t timeout);
+
+/**
+ * Cancel all submitted tasks having the specified id.
+ *
+ * If id is NULL, then cancel all tasks.
+ *
+ * If may_interrupt_if_running is true, then the current running tasks may be
+ * interrupted using the interrupt() callback.
+ *
+ * \param executor the executor
+ * \param id       the id of the tasks to cancel (NULL for all tasks)
+ * \param may_interrupt_if_running indicate if the tasks currently running may
+ *                                 be interrupted
+ */
+VLC_API void
+vlc_executor_Cancel(vlc_executor_t *executor, void *id,
+                    bool may_interrupt_if_running);
+
+# ifdef __cplusplus
+}
+# endif
+
+ #endif
diff --git a/src/Makefile.am b/src/Makefile.am
index 9e7c2931d2..456df7a6b2 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -350,6 +350,7 @@ libvlccore_la_SOURCES = \
 	misc/actions.c \
 	misc/background_worker.c \
 	misc/background_worker.h \
+	misc/executor.c \
 	misc/md5.c \
 	misc/probe.c \
 	misc/rand.c \
diff --git a/src/libvlccore.sym b/src/libvlccore.sym
index ed43c17715..5ab0eec6ed 100644
--- a/src/libvlccore.sym
+++ b/src/libvlccore.sym
@@ -958,3 +958,7 @@ vlc_video_context_GetType
 vlc_video_context_GetPrivate
 vlc_video_context_Hold
 vlc_video_context_HoldDevice
+vlc_executor_New
+vlc_executor_Delete
+vlc_executor_Execute
+vlc_executor_Cancel
diff --git a/src/misc/executor.c b/src/misc/executor.c
new file mode 100644
index 0000000000..88c7064b72
--- /dev/null
+++ b/src/misc/executor.c
@@ -0,0 +1,545 @@
+/*****************************************************************************
+ * misc/executor.c
+ *****************************************************************************
+ * Copyright (C) 2020 Videolabs, VLC authors and VideoLAN
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
+ *****************************************************************************/
+
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif
+
+#include <vlc_executor.h>
+
+#include <vlc_atomic.h>
+#include <vlc_list.h>
+#include <vlc_threads.h>
+#include "libvlc.h"
+
+/**
+ * An executor task is created on every call to vlc_executor_Submit().
+ *
+ * It contains the user-provided data (runnable, userdata, timeout) and the
+ * current execution state.
+ */
+struct vlc_executor_task {
+
+    /** Node of vlc_executor.queue list */
+    struct vlc_list node;
+
+    /** Task id, may be NULL (provided by the user) */
+    void *id;
+
+    /** Timeout, 0 for no timeout, negative is illegal (provided by the user) */
+    vlc_tick_t timeout;
+
+    /**
+     * The task is "finished" if either:
+     *   - the run() callback has completed, or
+     *   - the run() callback has not been called and the task is canceled.
+     */
+    bool finished;
+
+    /**
+     * Set to true if the task has been canceled.
+     *
+     * It is possible that the task is canceled but not finished ("cancel" has
+     * been requested while run() was running, but run() is not completed yet).
+     */
+    bool canceled;
+
+    /** Date when run() is called, VLC_TICK_INVALID if not started. */
+    vlc_tick_t start_date;
+
+    /** The associated runnable */
+    const struct vlc_runnable *runnable;
+    /** The user data passed to all runnable callbacks */
+    void *userdata;
+
+    /** Refcount */
+    vlc_atomic_rc_t rc;
+};
+
+/**
+ * An executor can spawn several threads.
+ *
+ * This structure contains the data specific to one thread.
+ */
+struct vlc_executor_thread {
+    /** Node of vlc_executor.threads list */
+    struct vlc_list node;
+
+    /** The executor owning the thread */
+    vlc_executor_t *owner;
+
+    /** The current task executed by the thread, NULL if none */
+    struct vlc_executor_task *current_task;
+};
+
+/**
+ * The executor (also vlc_executor_t, exposed as opaque type in the public
+ * header).
+ */
+struct vlc_executor {
+    vlc_object_t obj;
+    vlc_mutex_t lock;
+
+    /** Maximum number of threads to run the tasks */
+    unsigned max_threads;
+
+    /** List of active vlc_executor_thread instances */
+    struct vlc_list threads;
+
+    /** Thread count (in a separate field to quickly compare to max_threads) */
+    unsigned nthreads;
+
+    /* Number of tasks requested but not finished (neither completed or
+     * canceled) */
+    unsigned unfinished;
+
+    /** Queue of vlc_executor_task_t */
+    struct vlc_list queue;
+
+    /** Wait for nthreads == 0 (used for executor deletion) */
+    vlc_cond_t nothreads_wait;
+
+    /** Wait for the queue to be non-empty */
+    vlc_cond_t queue_wait;
+
+    /** Wait for the next task interruption deadline */
+    vlc_cond_t interrupt_wait;
+
+    /** Indicate is the interrupt thread is running (to know if we need to
+     * spawn one) */
+    bool interrupt_thread_running;
+
+    /** True if executor deletion is requested */
+    bool closing;
+};
+
+vlc_executor_t *
+vlc_executor_New(vlc_object_t *parent, unsigned max_threads)
+{
+    assert(max_threads);
+    vlc_executor_t *executor =
+        vlc_custom_create(parent, sizeof(*executor), "executor");
+    if (!executor)
+        return NULL;
+
+    vlc_mutex_init(&executor->lock);
+
+    executor->max_threads = max_threads;
+    executor->nthreads = 0;
+    executor->unfinished = 0;
+
+    vlc_list_init(&executor->threads);
+    vlc_list_init(&executor->queue);
+
+    vlc_cond_init(&executor->nothreads_wait);
+    vlc_cond_init(&executor->queue_wait);
+    vlc_cond_init(&executor->interrupt_wait);
+
+    executor->closing = false;
+
+    return executor;
+}
+
+static struct vlc_executor_task *
+TaskNew(const struct vlc_runnable *runnable, void *userdata, void *id,
+        vlc_tick_t timeout)
+{
+    struct vlc_executor_task *task = malloc(sizeof(*task));
+    if (!task)
+        return NULL;
+
+    task->id = id;
+    task->timeout = timeout;
+    task->finished = false;
+    task->canceled = false;
+    task->start_date = VLC_TICK_INVALID;
+
+    task->runnable = runnable;
+    task->userdata = userdata;
+
+    vlc_atomic_rc_init(&task->rc);
+    return task;
+}
+
+static void
+TaskHold(struct vlc_executor_task *task)
+{
+    vlc_atomic_rc_inc(&task->rc);
+}
+
+static void
+TaskRelease(struct vlc_executor_task *task)
+{
+    if (vlc_atomic_rc_dec(&task->rc))
+        free(task);
+}
+
+static void
+TerminateTask(vlc_executor_t *executor, struct vlc_executor_task *task)
+{
+    vlc_mutex_assert(&executor->lock);
+
+    assert(!task->finished);
+    task->finished = true;
+
+    if (task->runnable->on_finished)
+        task->runnable->on_finished(task->userdata);
+
+    assert(executor->unfinished > 0);
+    --executor->unfinished;
+
+    TaskRelease(task);
+}
+
+static struct vlc_executor_task *
+FindFirstTaskToInterrupt(vlc_executor_t *executor)
+{
+    vlc_mutex_assert(&executor->lock);
+
+    /* XXX To improve execution complexity, we could keep the interruptible
+     * tasks in a priority queue. But there are typically only few threads, and
+     * this is not called more than once per task, so keep things simple, even
+     * if it's not optimal. */
+
+    struct vlc_executor_task *first = NULL;
+    vlc_tick_t first_deadline = VLC_TICK_INVALID;
+
+    struct vlc_executor_thread *thread;
+    vlc_list_foreach(thread, &executor->threads, node)
+    {
+        struct vlc_executor_task *task = thread->current_task;
+        /* Only consider tasks with a timeout */
+        if (task && task->timeout)
+        {
+            /* If there is a current task, then it is necessarily started */
+            assert(task->start_date != VLC_TICK_INVALID);
+
+            vlc_tick_t deadline = task->start_date + task->timeout;
+            if (!first || deadline < first_deadline)
+            {
+                first = task;
+                first_deadline = deadline;
+            }
+        }
+    }
+
+    return first;
+}
+
+static void *
+InterruptThreadRun(void *userdata)
+{
+    vlc_executor_t *executor = userdata;
+
+    vlc_mutex_lock(&executor->lock);
+    for (;;)
+    {
+        struct vlc_executor_task *task = FindFirstTaskToInterrupt(executor);
+        if (!task)
+            /* There is no task to interrupt, terminate the interrupt thread */
+            break;
+
+        assert(task->start_date != VLC_TICK_INVALID);
+        assert(task->timeout);
+        vlc_tick_t deadline = task->start_date + task->timeout;
+
+        /* The task may be released during the timedwait */
+        TaskHold(task);
+
+        bool timed_out = vlc_cond_timedwait(&executor->interrupt_wait,
+                                            &executor->lock, deadline) != 0;
+
+        if (timed_out && !task->finished && !task->canceled)
+        {
+            task->canceled = true;
+            /* This request the task to stop itself. The interrupt() itself
+             * should be immediate, but the task could terminate later (once is
+             * run() callback is completed). */
+            task->runnable->interrupt(task->userdata);
+        }
+
+        TaskRelease(task);
+    }
+
+    executor->interrupt_thread_running = false;
+    vlc_mutex_unlock(&executor->lock);
+
+    return NULL;
+}
+
+/**
+ * Make sure that an interrupt thread is running and will consider the
+ * interruption for a new task just added.
+ */
+static int
+RequireInterruptThread(vlc_executor_t *executor)
+{
+    vlc_mutex_assert(&executor->lock);
+
+    if (executor->interrupt_thread_running)
+        vlc_cond_signal(&executor->interrupt_wait);
+    else
+    {
+        if (vlc_clone_detach(NULL, InterruptThreadRun, executor,
+                             VLC_THREAD_PRIORITY_LOW))
+        {
+            vlc_mutex_unlock(&executor->lock);
+            return VLC_EGENERIC;
+        }
+
+        executor->interrupt_thread_running = true;
+    }
+
+    return VLC_SUCCESS;
+}
+
+static void
+QueuePush(vlc_executor_t *executor, struct vlc_executor_task *task)
+{
+    vlc_mutex_assert(&executor->lock);
+
+    vlc_list_append(&task->node, &executor->queue);
+    vlc_cond_signal(&executor->queue_wait);
+}
+
+static struct vlc_executor_task *
+QueueTake(vlc_executor_t *executor, vlc_tick_t timeout)
+{
+    vlc_mutex_assert(&executor->lock);
+
+    vlc_tick_t deadline = vlc_tick_now() + timeout;
+    bool timed_out = false;
+    while (!timed_out && !executor->closing
+            && vlc_list_is_empty(&executor->queue))
+        timed_out = vlc_cond_timedwait(&executor->queue_wait,
+                                       &executor->lock, deadline) != 0;
+
+    if (executor->closing || timed_out)
+        return NULL;
+
+    struct vlc_executor_task *task =
+        vlc_list_first_entry_or_null(&executor->queue, struct vlc_executor_task,
+                                     node);
+    assert(task);
+    vlc_list_remove(&task->node);
+
+    return task;
+}
+
+static void
+QueueRemoveAll(vlc_executor_t *executor, void *id)
+{
+    vlc_mutex_assert(&executor->lock);
+
+    struct vlc_executor_task *task;
+    vlc_list_foreach(task, &executor->queue, node)
+    {
+        if (!id || task->id == id)
+        {
+            task->canceled = true;
+            vlc_list_remove(&task->node);
+            TerminateTask(executor, task);
+        }
+    }
+}
+
+static struct vlc_executor_thread *
+ExecutorThreadNew(vlc_executor_t *executor)
+{
+    struct vlc_executor_thread *thread = malloc(sizeof(*thread));
+    if (!thread)
+        return NULL;
+
+    thread->owner = executor;
+    thread->current_task = NULL;
+    return thread;
+}
+
+static void
+ExecutorThreadDelete(struct vlc_executor_thread *thread)
+{
+    free(thread);
+}
+
+static void
+RemoveThread(struct vlc_executor_thread *thread)
+{
+    vlc_executor_t *executor = thread->owner;
+
+    vlc_mutex_lock(&executor->lock);
+
+    vlc_list_remove(&thread->node);
+    assert(executor->nthreads > 0);
+    --executor->nthreads;
+    bool nothreads = !executor->nthreads;
+    vlc_mutex_unlock(&executor->lock);
+
+    if (nothreads)
+        vlc_cond_signal(&executor->nothreads_wait);
+
+    /* After vlc_cond_signal(), the executor instance may be deleted by
+     * vlc_executor_Delete() */
+
+    ExecutorThreadDelete(thread);
+}
+
+static void *
+ThreadRun(void *userdata)
+{
+    struct vlc_executor_thread *thread = userdata;
+    vlc_executor_t *executor = thread->owner;
+
+    vlc_mutex_lock(&executor->lock);
+
+    for (;;)
+    {
+        struct vlc_executor_task *task =
+            QueueTake(executor, VLC_TICK_FROM_SEC(5));
+        if (!task)
+            /* Terminate this thread */
+            break;
+
+        thread->current_task = task;
+        task->start_date = vlc_tick_now();
+        if (task->timeout)
+            RequireInterruptThread(executor);
+        vlc_mutex_unlock(&executor->lock);
+
+        /* Execute the user-provided runnable, without the executor lock */
+        task->runnable->run(task->userdata);
+
+        vlc_mutex_lock(&executor->lock);
+        thread->current_task = NULL;
+        TerminateTask(executor, task);
+    }
+
+    vlc_mutex_unlock(&executor->lock);
+
+    RemoveThread(thread);
+    return NULL;
+}
+
+static int
+SpawnThread(vlc_executor_t *executor)
+{
+    vlc_mutex_assert(&executor->lock);
+    assert(executor->nthreads < executor->max_threads);
+
+    struct vlc_executor_thread *thread = ExecutorThreadNew(executor);
+    if (!thread)
+        return VLC_ENOMEM;
+
+    if (vlc_clone_detach(NULL, ThreadRun, thread, VLC_THREAD_PRIORITY_LOW))
+    {
+        ExecutorThreadDelete(thread);
+        return VLC_EGENERIC;
+    }
+
+    executor->nthreads++;
+    vlc_list_append(&thread->node, &executor->threads);
+
+    return VLC_SUCCESS;
+}
+
+int
+vlc_executor_Submit(vlc_executor_t *executor,
+                    const struct vlc_runnable *runnable, void *userdata,
+                    void *id, vlc_tick_t timeout)
+{
+    assert(timeout >= 0);
+    /* An interrupt callback must be provided if a timeout is requested.
+     * Note that an interrupt callback may also be provided without timeout,
+     * for explicit calls to vlc_executor_Cancel(). */
+    assert(timeout == 0 || runnable->interrupt);
+    assert(runnable->run);
+
+    struct vlc_executor_task *task = TaskNew(runnable, userdata, id, timeout);
+    if (!task)
+        return VLC_ENOMEM;
+
+    vlc_mutex_lock(&executor->lock);
+    QueuePush(executor, task);
+    if (++executor->unfinished > executor->nthreads
+            && executor->nthreads < executor->max_threads)
+        SpawnThread(executor);
+    vlc_mutex_unlock(&executor->lock);
+
+    return VLC_SUCCESS;
+}
+
+static void
+CancelLocked(vlc_executor_t *executor, void *id, bool may_interrupt_if_running)
+{
+    vlc_mutex_assert(&executor->lock);
+
+    QueueRemoveAll(executor, id);
+
+    struct vlc_executor_thread *thread;
+    vlc_list_foreach(thread, &executor->threads, node)
+    {
+        struct vlc_executor_task *task = thread->current_task;
+        if (task && (!id || task->id == id) && !task->canceled)
+        {
+            /* If the task was not started or finished, it would not be stored
+             * as a current task for the thread */
+            assert(task->start_date != VLC_TICK_INVALID);
+            assert(!task->finished);
+
+            if (may_interrupt_if_running && task->runnable->interrupt)
+            {
+                /* Mark the task as canceled only if we can interrupt it,
+                 * otherwise it will complete normally. */
+                task->canceled = true;
+                task->runnable->interrupt(task->userdata);
+            }
+        }
+    }
+}
+
+void
+vlc_executor_Cancel(vlc_executor_t *executor, void *id,
+                    bool may_interrupt_if_running)
+{
+    vlc_mutex_lock(&executor->lock);
+    CancelLocked(executor, id, may_interrupt_if_running);
+    vlc_mutex_unlock(&executor->lock);
+}
+
+void
+vlc_executor_Delete(vlc_executor_t *executor)
+{
+    vlc_mutex_lock(&executor->lock);
+
+    executor->closing = true;
+
+    /* Cancel all tasks */
+    CancelLocked(executor, NULL, true);
+
+    /* "closing" is now true, this will wake up any QueueTake() */
+    vlc_cond_broadcast(&executor->queue_wait);
+
+    /* Wait until all threads are terminated */
+    while (executor->nthreads)
+        vlc_cond_wait(&executor->nothreads_wait, &executor->lock);
+
+    vlc_mutex_unlock(&executor->lock);
+
+    vlc_object_delete(executor);
+}
-- 
2.28.0



More information about the vlc-devel mailing list