[vlc-devel] [RFC 1/2] executor: introduce new executor API
Romain Vimont
rom1v at videolabs.io
Wed Aug 26 19:19:56 CEST 2020
Introduce a new API to execute "runnables". The execution can be
automatically interrupted on timeout or via an explicit cancelation
request.
---
include/vlc_executor.h | 194 +++++++++++++++
src/Makefile.am | 1 +
src/libvlccore.sym | 4 +
src/misc/executor.c | 545 +++++++++++++++++++++++++++++++++++++++++
4 files changed, 744 insertions(+)
create mode 100644 include/vlc_executor.h
create mode 100644 src/misc/executor.c
diff --git a/include/vlc_executor.h b/include/vlc_executor.h
new file mode 100644
index 0000000000..4dd8b93628
--- /dev/null
+++ b/include/vlc_executor.h
@@ -0,0 +1,194 @@
+/*****************************************************************************
+ * vlc_executor.h
+ *****************************************************************************
+ * Copyright (C) 2020 Videolabs, VLC authors and VideoLAN
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
+ *****************************************************************************/
+
+#ifndef VLC_EXECUTOR_H
+#define VLC_EXECUTOR_H
+
+#include <vlc_common.h>
+#include <vlc_tick.h>
+
+# ifdef __cplusplus
+extern "C" {
+# endif
+
+/** Executor type (opaque) */
+typedef struct vlc_executor vlc_executor_t;
+
+/**
+ * A Runnable is intended to be run from another thread by an executor.
+ */
+struct vlc_runnable {
+
+ /**
+ * This function is to be executed by a vlc_executor_t.
+ *
+ * It must implement the actions (arbitrarily long) to execute from an
+ * executor thread, synchronously. As soon as run() returns, the execution
+ * of this runnable is complete.
+ *
+ * After the runnable is submitted to an executor via
+ * vlc_executor_Submit(), the run() function is executed at most once (zero
+ * if the execution is canceled before it was started).
+ *
+ * It may not be NULL.
+ *
+ * \param userdata the userdata provided to vlc_executor_Submit()
+ */
+ void
+ (*run)(void *userdata);
+
+ /**
+ * This function attempts to interrupt the execution of run().
+ *
+ * If not NULL, it may be called on vlc_executor_Cancel() or on timeout.
+ *
+ * It is called from a thread different from the one executing run(). It is
+ * free to do any actions to "interrupt" the execution of run() (set a
+ * flag, close a file descriptor, etc.).
+ *
+ * The runnable will be considered "finished" once run() actually
+ * terminates.
+ *
+ * It should be quick, not to block the interruption of other runnables.
+ *
+ * \param userdata the userdata provided to vlc_executor_Submit()
+ */
+ void
+ (*interrupt)(void *userdata);
+
+ /**
+ * This function notifies the end of the execution.
+ *
+ * If not NULL, it is called either:
+ * - when run() terminates;
+ * - when the task is canceled before run() is called.
+ *
+ * In other words, it is always called in the end if vlc_executor_Submit()
+ * returns VLC_SUCCESS.
+ *
+ * \param userdata the userdata provided to vlc_executor_Submit()
+ */
+ void
+ (*on_finished)(void *userdata);
+};
+
+/**
+ * Create a new executor.
+ *
+ * \param parent a VLC object
+ * \param max_threads the maximum number of threads used to execute runnables
+ * \return a pointer to a new executor, or NULL if an error occurred
+ */
+VLC_API vlc_executor_t *
+vlc_executor_New(vlc_object_t *parent, unsigned max_threads);
+
+/**
+ * Delete an executor.
+ *
+ * Cancels all the pending an running task (interrupting them if necessary),
+ * waits for all the thread to complete, and delete the executor instance.
+ *
+ * \param executor the executor
+ */
+VLC_API void
+vlc_executor_Delete(vlc_executor_t *executor);
+
+/**
+ * Submit a runnable for execution.
+ *
+ * The struct vlc_runnable is not copied, it must exist until the end of the
+ * execution (the user is expected to pass a pointer to a static const
+ * structure).
+ *
+ * An id may optionally be provided in order to explicitly cancel the task
+ * later with vlc_executor_Cancel().
+ *
+ * A timeout may optionally be provided to interrupt the execution after some
+ * delay. The way the interruption is actually handled must be provided by the
+ * user via the runnable callback "interrupt". It is an error to provide a
+ * timeout without an interrupt callback.
+ *
+ * Here is a simple example (without interruption):
+ *
+ * \code{c}
+ * static void Run(void *userdata)
+ * {
+ * char *str = userdata;
+ * printf("start of %s\n", str);
+ * sleep(3);
+ * printf("end of %s\n", str);
+ * }
+ *
+ * static void OnFinished(void *userdata)
+ * {
+ * free(userdata);
+ * }
+ *
+ * static const struct vlc_runnable runnable = {
+ * .run = Run,
+ * .on_finished = OnFinished,
+ * };
+ *
+ * void foo(vlc_executor_t *executor, const char *name)
+ * {
+ * // error handling replaced by assertions for brevity
+ * char *str = strdup(name);
+ * assert(str);
+ * int ret = vlc_executor_Submit(executor, &runnable, str, NULL, 0);
+ * assert(ret == VLC_SUCCESS);
+ * }
+ * \endcode
+ *
+ * \param executor the executor
+ * \param runnable the task to run
+ * \param userdata the userdata to pass to all runnable callbacks
+ * \param id a user-provided id to associate to the task, used to cancel
+ * the execution (may be NULL)
+ * \param timeout the delay before the task is automatically interrupted
+ * (0 for no timeout, negative values are illegal)
+ * \return VLC_SUCCESS on success, another value on error
+ */
+VLC_API int
+vlc_executor_Submit(vlc_executor_t *executor,
+ const struct vlc_runnable *runnable, void *userdata,
+ void *id, vlc_tick_t timeout);
+
+/**
+ * Cancel all submitted tasks having the specified id.
+ *
+ * If id is NULL, then cancel all tasks.
+ *
+ * If may_interrupt_if_running is true, then the current running tasks may be
+ * interrupted using the interrupt() callback.
+ *
+ * \param executor the executor
+ * \param id the id of the tasks to cancel (NULL for all tasks)
+ * \param may_interrupt_if_running indicate if the tasks currently running may
+ * be interrupted
+ */
+VLC_API void
+vlc_executor_Cancel(vlc_executor_t *executor, void *id,
+ bool may_interrupt_if_running);
+
+# ifdef __cplusplus
+}
+# endif
+
+ #endif
diff --git a/src/Makefile.am b/src/Makefile.am
index 9e7c2931d2..456df7a6b2 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -350,6 +350,7 @@ libvlccore_la_SOURCES = \
misc/actions.c \
misc/background_worker.c \
misc/background_worker.h \
+ misc/executor.c \
misc/md5.c \
misc/probe.c \
misc/rand.c \
diff --git a/src/libvlccore.sym b/src/libvlccore.sym
index ed43c17715..5ab0eec6ed 100644
--- a/src/libvlccore.sym
+++ b/src/libvlccore.sym
@@ -958,3 +958,7 @@ vlc_video_context_GetType
vlc_video_context_GetPrivate
vlc_video_context_Hold
vlc_video_context_HoldDevice
+vlc_executor_New
+vlc_executor_Delete
+vlc_executor_Execute
+vlc_executor_Cancel
diff --git a/src/misc/executor.c b/src/misc/executor.c
new file mode 100644
index 0000000000..88c7064b72
--- /dev/null
+++ b/src/misc/executor.c
@@ -0,0 +1,545 @@
+/*****************************************************************************
+ * misc/executor.c
+ *****************************************************************************
+ * Copyright (C) 2020 Videolabs, VLC authors and VideoLAN
+ *
+ * This program is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as published by
+ * the Free Software Foundation; either version 2.1 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this program; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
+ *****************************************************************************/
+
+#ifdef HAVE_CONFIG_H
+# include "config.h"
+#endif
+
+#include <vlc_executor.h>
+
+#include <vlc_atomic.h>
+#include <vlc_list.h>
+#include <vlc_threads.h>
+#include "libvlc.h"
+
+/**
+ * An executor task is created on every call to vlc_executor_Submit().
+ *
+ * It contains the user-provided data (runnable, userdata, timeout) and the
+ * current execution state.
+ */
+struct vlc_executor_task {
+
+ /** Node of vlc_executor.queue list */
+ struct vlc_list node;
+
+ /** Task id, may be NULL (provided by the user) */
+ void *id;
+
+ /** Timeout, 0 for no timeout, negative is illegal (provided by the user) */
+ vlc_tick_t timeout;
+
+ /**
+ * The task is "finished" if either:
+ * - the run() callback has completed, or
+ * - the run() callback has not been called and the task is canceled.
+ */
+ bool finished;
+
+ /**
+ * Set to true if the task has been canceled.
+ *
+ * It is possible that the task is canceled but not finished ("cancel" has
+ * been requested while run() was running, but run() is not completed yet).
+ */
+ bool canceled;
+
+ /** Date when run() is called, VLC_TICK_INVALID if not started. */
+ vlc_tick_t start_date;
+
+ /** The associated runnable */
+ const struct vlc_runnable *runnable;
+ /** The user data passed to all runnable callbacks */
+ void *userdata;
+
+ /** Refcount */
+ vlc_atomic_rc_t rc;
+};
+
+/**
+ * An executor can spawn several threads.
+ *
+ * This structure contains the data specific to one thread.
+ */
+struct vlc_executor_thread {
+ /** Node of vlc_executor.threads list */
+ struct vlc_list node;
+
+ /** The executor owning the thread */
+ vlc_executor_t *owner;
+
+ /** The current task executed by the thread, NULL if none */
+ struct vlc_executor_task *current_task;
+};
+
+/**
+ * The executor (also vlc_executor_t, exposed as opaque type in the public
+ * header).
+ */
+struct vlc_executor {
+ vlc_object_t obj;
+ vlc_mutex_t lock;
+
+ /** Maximum number of threads to run the tasks */
+ unsigned max_threads;
+
+ /** List of active vlc_executor_thread instances */
+ struct vlc_list threads;
+
+ /** Thread count (in a separate field to quickly compare to max_threads) */
+ unsigned nthreads;
+
+ /* Number of tasks requested but not finished (neither completed or
+ * canceled) */
+ unsigned unfinished;
+
+ /** Queue of vlc_executor_task_t */
+ struct vlc_list queue;
+
+ /** Wait for nthreads == 0 (used for executor deletion) */
+ vlc_cond_t nothreads_wait;
+
+ /** Wait for the queue to be non-empty */
+ vlc_cond_t queue_wait;
+
+ /** Wait for the next task interruption deadline */
+ vlc_cond_t interrupt_wait;
+
+ /** Indicate is the interrupt thread is running (to know if we need to
+ * spawn one) */
+ bool interrupt_thread_running;
+
+ /** True if executor deletion is requested */
+ bool closing;
+};
+
+vlc_executor_t *
+vlc_executor_New(vlc_object_t *parent, unsigned max_threads)
+{
+ assert(max_threads);
+ vlc_executor_t *executor =
+ vlc_custom_create(parent, sizeof(*executor), "executor");
+ if (!executor)
+ return NULL;
+
+ vlc_mutex_init(&executor->lock);
+
+ executor->max_threads = max_threads;
+ executor->nthreads = 0;
+ executor->unfinished = 0;
+
+ vlc_list_init(&executor->threads);
+ vlc_list_init(&executor->queue);
+
+ vlc_cond_init(&executor->nothreads_wait);
+ vlc_cond_init(&executor->queue_wait);
+ vlc_cond_init(&executor->interrupt_wait);
+
+ executor->closing = false;
+
+ return executor;
+}
+
+static struct vlc_executor_task *
+TaskNew(const struct vlc_runnable *runnable, void *userdata, void *id,
+ vlc_tick_t timeout)
+{
+ struct vlc_executor_task *task = malloc(sizeof(*task));
+ if (!task)
+ return NULL;
+
+ task->id = id;
+ task->timeout = timeout;
+ task->finished = false;
+ task->canceled = false;
+ task->start_date = VLC_TICK_INVALID;
+
+ task->runnable = runnable;
+ task->userdata = userdata;
+
+ vlc_atomic_rc_init(&task->rc);
+ return task;
+}
+
+static void
+TaskHold(struct vlc_executor_task *task)
+{
+ vlc_atomic_rc_inc(&task->rc);
+}
+
+static void
+TaskRelease(struct vlc_executor_task *task)
+{
+ if (vlc_atomic_rc_dec(&task->rc))
+ free(task);
+}
+
+static void
+TerminateTask(vlc_executor_t *executor, struct vlc_executor_task *task)
+{
+ vlc_mutex_assert(&executor->lock);
+
+ assert(!task->finished);
+ task->finished = true;
+
+ if (task->runnable->on_finished)
+ task->runnable->on_finished(task->userdata);
+
+ assert(executor->unfinished > 0);
+ --executor->unfinished;
+
+ TaskRelease(task);
+}
+
+static struct vlc_executor_task *
+FindFirstTaskToInterrupt(vlc_executor_t *executor)
+{
+ vlc_mutex_assert(&executor->lock);
+
+ /* XXX To improve execution complexity, we could keep the interruptible
+ * tasks in a priority queue. But there are typically only few threads, and
+ * this is not called more than once per task, so keep things simple, even
+ * if it's not optimal. */
+
+ struct vlc_executor_task *first = NULL;
+ vlc_tick_t first_deadline = VLC_TICK_INVALID;
+
+ struct vlc_executor_thread *thread;
+ vlc_list_foreach(thread, &executor->threads, node)
+ {
+ struct vlc_executor_task *task = thread->current_task;
+ /* Only consider tasks with a timeout */
+ if (task && task->timeout)
+ {
+ /* If there is a current task, then it is necessarily started */
+ assert(task->start_date != VLC_TICK_INVALID);
+
+ vlc_tick_t deadline = task->start_date + task->timeout;
+ if (!first || deadline < first_deadline)
+ {
+ first = task;
+ first_deadline = deadline;
+ }
+ }
+ }
+
+ return first;
+}
+
+static void *
+InterruptThreadRun(void *userdata)
+{
+ vlc_executor_t *executor = userdata;
+
+ vlc_mutex_lock(&executor->lock);
+ for (;;)
+ {
+ struct vlc_executor_task *task = FindFirstTaskToInterrupt(executor);
+ if (!task)
+ /* There is no task to interrupt, terminate the interrupt thread */
+ break;
+
+ assert(task->start_date != VLC_TICK_INVALID);
+ assert(task->timeout);
+ vlc_tick_t deadline = task->start_date + task->timeout;
+
+ /* The task may be released during the timedwait */
+ TaskHold(task);
+
+ bool timed_out = vlc_cond_timedwait(&executor->interrupt_wait,
+ &executor->lock, deadline) != 0;
+
+ if (timed_out && !task->finished && !task->canceled)
+ {
+ task->canceled = true;
+ /* This request the task to stop itself. The interrupt() itself
+ * should be immediate, but the task could terminate later (once is
+ * run() callback is completed). */
+ task->runnable->interrupt(task->userdata);
+ }
+
+ TaskRelease(task);
+ }
+
+ executor->interrupt_thread_running = false;
+ vlc_mutex_unlock(&executor->lock);
+
+ return NULL;
+}
+
+/**
+ * Make sure that an interrupt thread is running and will consider the
+ * interruption for a new task just added.
+ */
+static int
+RequireInterruptThread(vlc_executor_t *executor)
+{
+ vlc_mutex_assert(&executor->lock);
+
+ if (executor->interrupt_thread_running)
+ vlc_cond_signal(&executor->interrupt_wait);
+ else
+ {
+ if (vlc_clone_detach(NULL, InterruptThreadRun, executor,
+ VLC_THREAD_PRIORITY_LOW))
+ {
+ vlc_mutex_unlock(&executor->lock);
+ return VLC_EGENERIC;
+ }
+
+ executor->interrupt_thread_running = true;
+ }
+
+ return VLC_SUCCESS;
+}
+
+static void
+QueuePush(vlc_executor_t *executor, struct vlc_executor_task *task)
+{
+ vlc_mutex_assert(&executor->lock);
+
+ vlc_list_append(&task->node, &executor->queue);
+ vlc_cond_signal(&executor->queue_wait);
+}
+
+static struct vlc_executor_task *
+QueueTake(vlc_executor_t *executor, vlc_tick_t timeout)
+{
+ vlc_mutex_assert(&executor->lock);
+
+ vlc_tick_t deadline = vlc_tick_now() + timeout;
+ bool timed_out = false;
+ while (!timed_out && !executor->closing
+ && vlc_list_is_empty(&executor->queue))
+ timed_out = vlc_cond_timedwait(&executor->queue_wait,
+ &executor->lock, deadline) != 0;
+
+ if (executor->closing || timed_out)
+ return NULL;
+
+ struct vlc_executor_task *task =
+ vlc_list_first_entry_or_null(&executor->queue, struct vlc_executor_task,
+ node);
+ assert(task);
+ vlc_list_remove(&task->node);
+
+ return task;
+}
+
+static void
+QueueRemoveAll(vlc_executor_t *executor, void *id)
+{
+ vlc_mutex_assert(&executor->lock);
+
+ struct vlc_executor_task *task;
+ vlc_list_foreach(task, &executor->queue, node)
+ {
+ if (!id || task->id == id)
+ {
+ task->canceled = true;
+ vlc_list_remove(&task->node);
+ TerminateTask(executor, task);
+ }
+ }
+}
+
+static struct vlc_executor_thread *
+ExecutorThreadNew(vlc_executor_t *executor)
+{
+ struct vlc_executor_thread *thread = malloc(sizeof(*thread));
+ if (!thread)
+ return NULL;
+
+ thread->owner = executor;
+ thread->current_task = NULL;
+ return thread;
+}
+
+static void
+ExecutorThreadDelete(struct vlc_executor_thread *thread)
+{
+ free(thread);
+}
+
+static void
+RemoveThread(struct vlc_executor_thread *thread)
+{
+ vlc_executor_t *executor = thread->owner;
+
+ vlc_mutex_lock(&executor->lock);
+
+ vlc_list_remove(&thread->node);
+ assert(executor->nthreads > 0);
+ --executor->nthreads;
+ bool nothreads = !executor->nthreads;
+ vlc_mutex_unlock(&executor->lock);
+
+ if (nothreads)
+ vlc_cond_signal(&executor->nothreads_wait);
+
+ /* After vlc_cond_signal(), the executor instance may be deleted by
+ * vlc_executor_Delete() */
+
+ ExecutorThreadDelete(thread);
+}
+
+static void *
+ThreadRun(void *userdata)
+{
+ struct vlc_executor_thread *thread = userdata;
+ vlc_executor_t *executor = thread->owner;
+
+ vlc_mutex_lock(&executor->lock);
+
+ for (;;)
+ {
+ struct vlc_executor_task *task =
+ QueueTake(executor, VLC_TICK_FROM_SEC(5));
+ if (!task)
+ /* Terminate this thread */
+ break;
+
+ thread->current_task = task;
+ task->start_date = vlc_tick_now();
+ if (task->timeout)
+ RequireInterruptThread(executor);
+ vlc_mutex_unlock(&executor->lock);
+
+ /* Execute the user-provided runnable, without the executor lock */
+ task->runnable->run(task->userdata);
+
+ vlc_mutex_lock(&executor->lock);
+ thread->current_task = NULL;
+ TerminateTask(executor, task);
+ }
+
+ vlc_mutex_unlock(&executor->lock);
+
+ RemoveThread(thread);
+ return NULL;
+}
+
+static int
+SpawnThread(vlc_executor_t *executor)
+{
+ vlc_mutex_assert(&executor->lock);
+ assert(executor->nthreads < executor->max_threads);
+
+ struct vlc_executor_thread *thread = ExecutorThreadNew(executor);
+ if (!thread)
+ return VLC_ENOMEM;
+
+ if (vlc_clone_detach(NULL, ThreadRun, thread, VLC_THREAD_PRIORITY_LOW))
+ {
+ ExecutorThreadDelete(thread);
+ return VLC_EGENERIC;
+ }
+
+ executor->nthreads++;
+ vlc_list_append(&thread->node, &executor->threads);
+
+ return VLC_SUCCESS;
+}
+
+int
+vlc_executor_Submit(vlc_executor_t *executor,
+ const struct vlc_runnable *runnable, void *userdata,
+ void *id, vlc_tick_t timeout)
+{
+ assert(timeout >= 0);
+ /* An interrupt callback must be provided if a timeout is requested.
+ * Note that an interrupt callback may also be provided without timeout,
+ * for explicit calls to vlc_executor_Cancel(). */
+ assert(timeout == 0 || runnable->interrupt);
+ assert(runnable->run);
+
+ struct vlc_executor_task *task = TaskNew(runnable, userdata, id, timeout);
+ if (!task)
+ return VLC_ENOMEM;
+
+ vlc_mutex_lock(&executor->lock);
+ QueuePush(executor, task);
+ if (++executor->unfinished > executor->nthreads
+ && executor->nthreads < executor->max_threads)
+ SpawnThread(executor);
+ vlc_mutex_unlock(&executor->lock);
+
+ return VLC_SUCCESS;
+}
+
+static void
+CancelLocked(vlc_executor_t *executor, void *id, bool may_interrupt_if_running)
+{
+ vlc_mutex_assert(&executor->lock);
+
+ QueueRemoveAll(executor, id);
+
+ struct vlc_executor_thread *thread;
+ vlc_list_foreach(thread, &executor->threads, node)
+ {
+ struct vlc_executor_task *task = thread->current_task;
+ if (task && (!id || task->id == id) && !task->canceled)
+ {
+ /* If the task was not started or finished, it would not be stored
+ * as a current task for the thread */
+ assert(task->start_date != VLC_TICK_INVALID);
+ assert(!task->finished);
+
+ if (may_interrupt_if_running && task->runnable->interrupt)
+ {
+ /* Mark the task as canceled only if we can interrupt it,
+ * otherwise it will complete normally. */
+ task->canceled = true;
+ task->runnable->interrupt(task->userdata);
+ }
+ }
+ }
+}
+
+void
+vlc_executor_Cancel(vlc_executor_t *executor, void *id,
+ bool may_interrupt_if_running)
+{
+ vlc_mutex_lock(&executor->lock);
+ CancelLocked(executor, id, may_interrupt_if_running);
+ vlc_mutex_unlock(&executor->lock);
+}
+
+void
+vlc_executor_Delete(vlc_executor_t *executor)
+{
+ vlc_mutex_lock(&executor->lock);
+
+ executor->closing = true;
+
+ /* Cancel all tasks */
+ CancelLocked(executor, NULL, true);
+
+ /* "closing" is now true, this will wake up any QueueTake() */
+ vlc_cond_broadcast(&executor->queue_wait);
+
+ /* Wait until all threads are terminated */
+ while (executor->nthreads)
+ vlc_cond_wait(&executor->nothreads_wait, &executor->lock);
+
+ vlc_mutex_unlock(&executor->lock);
+
+ vlc_object_delete(executor);
+}
--
2.28.0
More information about the vlc-devel
mailing list