[vlc-devel] [PATCH v2 1/8] executor: introduce new executor API

Alexandre Janniaux ajanni at videolabs.io
Sat Sep 19 12:24:34 CEST 2020


Hi,

The patchset seems to need rebasing. Does it need any
additional changes?

A few nits:

On Sun, Sep 13, 2020 at 09:11:14PM +0200, Romain Vimont wrote:
> Introduce a new API to execute "runnables" from background threads.
>
> The final design is a result of discussions on the mailing-list:
>  - <https://mailman.videolan.org/pipermail/vlc-devel/2020-August/136696.html>
>  - <https://mailman.videolan.org/pipermail/vlc-devel/2020-September/136944.html>
> ---
>  include/vlc_executor.h | 183 ++++++++++++++++++++++++++
>  src/Makefile.am        |   2 +
>  src/libvlccore.sym     |   5 +
>  src/misc/executor.c    | 288 +++++++++++++++++++++++++++++++++++++++++
>  4 files changed, 478 insertions(+)
>  create mode 100644 include/vlc_executor.h
>  create mode 100644 src/misc/executor.c
>
> diff --git a/include/vlc_executor.h b/include/vlc_executor.h
> new file mode 100644
> index 0000000000..08a7f00472
> --- /dev/null
> +++ b/include/vlc_executor.h
> @@ -0,0 +1,183 @@
> +/*****************************************************************************
> + * vlc_executor.h
> + *****************************************************************************
> + * Copyright (C) 2020 Videolabs, VLC authors and VideoLAN
> + *
> + * This program is free software; you can redistribute it and/or modify it
> + * under the terms of the GNU Lesser General Public License as published by
> + * the Free Software Foundation; either version 2.1 of the License, or
> + * (at your option) any later version.
> + *
> + * This program is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> + * GNU Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public License
> + * along with this program; if not, write to the Free Software Foundation,
> + * Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
> + *****************************************************************************/
> +
> +#ifndef VLC_EXECUTOR_H
> +#define VLC_EXECUTOR_H
> +
> +#include <vlc_common.h>
> +#include <vlc_list.h>
> +
> +# ifdef __cplusplus
> +extern "C" {
> +# endif
> +
> +/** Executor type (opaque) */
> +typedef struct vlc_executor vlc_executor_t;
> +
> +/**
> + * A Runnable is intended to be run from another thread by an executor.
> + */

I'm not sure the wording is correct. Technically if we submit a task
from another task within a single threaded executor, it would be
executed from the same thread.

I'd suggest

    /**
     * A Runnable encapsulate a sequential task that will be ran into
     * an async execution environment, typically executed from another
     * thread.
     */

Or something simpler of the same taste.

> +struct vlc_runnable {
> +
> +    /**
> +     * This function is to be executed by a vlc_executor_t.
> +     *
> +     * It must implement the actions (arbitrarily long) to execute from an
> +     * executor thread, synchronously. As soon as run() returns, the execution
> +     * of this runnable is complete.
> +     *
> +     * After the runnable is submitted to an executor via
> +     * vlc_executor_Submit(), the run() function is executed at most once (zero
> +     * if the execution is canceled before it was started).
> +     *
> +     * It must not be NULL.
> +     *
> +     * \param userdata the userdata provided to vlc_executor_Submit()
> +     */
> +    void (*run)(void *userdata);

Did we find a consensus wrt. whether we exposes the userdata
or the task itself here?

> +
> +    /**
> +     * Userdata passed back to run().
> +     */
> +    void *userdata;
> +
> +    /* Private data used by the vlc_executor_t (do not touch) */
> +    struct vlc_list node;
> +};
> +
> +/**
> + * Create a new executor.
> + *
> + * \param max_threads the maximum number of threads used to execute runnables
> + * \return a pointer to a new executor, or NULL if an error occurred
> + */
> +VLC_API vlc_executor_t *
> +vlc_executor_New(unsigned max_threads);
> +
> +/**
> + * Delete an executor.
> + *
> + * Wait for all the threads to complete, and delete the executor instance.
> + *
> + * All submitted tasks must be either started or explicitly canceled. To wait
> + * for all tasks to complete, use vlc_executor_WaitIdle().
> + *
> + * It is an error to submit a new runnable after vlc_executor_Delete() is
> + * called. In particular, a running task must not submit a new runnable once
> + * deletion has been requested.

Here we can probably say that it's even undefined and not just
an error since we access deallocated values.

> + *
> + * \param executor the executor
> + */
> +VLC_API void
> +vlc_executor_Delete(vlc_executor_t *executor);
> +
> +/**
> + * Submit a runnable for execution.
> + *
> + * The struct vlc_runnable is not copied, it must exist until the end of the
> + * execution (the user is expected to embed it in its own task structure).
> + *
> + * Here is a simple example:
> + *
> + * \code{c}
> + *  struct my_task {
> + *      char *str;
> + *      struct vlc_runnable runnable;
> + *  };
> + *
> + *  static void Run(void *userdata)
> + *  {
> + *      struct my_task *task = userdata;
> + *
> + *      printf("start of %s\n", task->str);
> + *      vlc_tick_sleep(VLC_TICK_FROM_SEC(3)); // long action
> + *      printf("end of %s\n", task->str);
> + *
> + *      free(task->str);
> + *      free(task);
> + *  }
> + *
> + *  void foo(vlc_executor_t *executor, const char *str)
> + *  {
> + *      // no error handling for brevity
> + *      struct my_task *task = malloc(sizeof(*task));
> + *      task->str = strdup(str);
> + *      task->runnable.run = Run;
> + *      task->runnable.userdata = task;
> + *      vlc_executor_Submit(executor, &task->runnable);
> + *  }
> + * \endcode
> + *
> + * A runnable instance is intended to be submitted at most once. The caller is
> + * expected to allocate a new task structure (embedding the runnable) for every
> + * submission.
> + *
> + * More precisely, it is incorrect to submit a runnable already submitted that
> + * is still in the pending queue (i.e. not canceled or started). This is due to
> + * the intrusive linked list of runnables.
> + *
> + * It is strongly discouraged to submit a runnable that its currently running

s/its/is/

> + * on the executor (unless you are prepared for the run() callback to be run
> + * several times in parallel).

It could quickly mention how to duplicate a task and gotcha,
except if you really want to avoid encouraging this kind of
construction.

> + *
> + * For simplicity, it is discouraged to submit a runnable previously submitted.
> + *
> + * \param executor the executor
> + * \param runnable the task to run
> + */
> +VLC_API void
> +vlc_executor_Submit(vlc_executor_t *executor, struct vlc_runnable *runnable);
> +
> +/**
> + * Cancel a runnable previously submitted.
> + *
> + * If this runnable is still queued (i.e. it has not be run yet), then dequeue
> + * it so that it will never be run, and return true.
> + *
> + * Otherwise, this runnable has already been taken by an executor thread (it is
> + * still running or is complete). In that case, do nothing, and return false.
> + *
> + * This is an error to pass a runnable not submitted to this executor (the
> + * result is undefined in that case).
> + *
> + * Note that the runnable instance is owned by the caller, so the executor will
> + * never attempt to free it.
> + *
> + * \param executor the executor
> + * \param runnable the task to cancel
> + * \retval true if the runnable has been canceled before execution
> + * \retval false if the runnable has not been canceled
> + */
> +VLC_API bool
> +vlc_executor_Cancel(vlc_executor_t *executor, struct vlc_runnable *runnable);
> +
> +/**
> + * Wait until all submitted tasks are completed or canceled.
> + *
> + * \param executor the executor
> + */
> +VLC_API void
> +vlc_executor_WaitIdle(vlc_executor_t *executor);
> +
> +# ifdef __cplusplus
> +}
> +# endif
> +
> + #endif
> diff --git a/src/Makefile.am b/src/Makefile.am
> index 6dc46ab3d6..722292e6d1 100644
> --- a/src/Makefile.am
> +++ b/src/Makefile.am
> @@ -49,6 +49,7 @@ pluginsinclude_HEADERS = \
>  	../include/vlc_es.h \
>  	../include/vlc_es_out.h \
>  	../include/vlc_events.h \
> +	../include/vlc_executor.h \
>  	../include/vlc_filter.h \
>  	../include/vlc_fourcc.h \
>  	../include/vlc_fs.h \
> @@ -352,6 +353,7 @@ libvlccore_la_SOURCES = \
>  	misc/actions.c \
>  	misc/background_worker.c \
>  	misc/background_worker.h \
> +	misc/executor.c \
>  	misc/md5.c \
>  	misc/probe.c \
>  	misc/rand.c \
> diff --git a/src/libvlccore.sym b/src/libvlccore.sym
> index 31fcd37cba..38bab22184 100644
> --- a/src/libvlccore.sym
> +++ b/src/libvlccore.sym
> @@ -966,3 +966,8 @@ vlc_video_context_GetType
>  vlc_video_context_GetPrivate
>  vlc_video_context_Hold
>  vlc_video_context_HoldDevice
> +vlc_executor_New
> +vlc_executor_Delete
> +vlc_executor_Submit
> +vlc_executor_Cancel
> +vlc_executor_WaitIdle
> diff --git a/src/misc/executor.c b/src/misc/executor.c
> new file mode 100644
> index 0000000000..3b79f9ba8d
> --- /dev/null
> +++ b/src/misc/executor.c
> @@ -0,0 +1,288 @@
> +/*****************************************************************************
> + * misc/executor.c
> + *****************************************************************************
> + * Copyright (C) 2020 Videolabs, VLC authors and VideoLAN
> + *
> + * This program is free software; you can redistribute it and/or modify it
> + * under the terms of the GNU Lesser General Public License as published by
> + * the Free Software Foundation; either version 2.1 of the License, or
> + * (at your option) any later version.
> + *
> + * This program is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> + * GNU Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public License
> + * along with this program; if not, write to the Free Software Foundation,
> + * Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
> + *****************************************************************************/
> +
> +#ifdef HAVE_CONFIG_H
> +# include "config.h"
> +#endif
> +
> +#include <vlc_executor.h>
> +
> +#include <vlc_atomic.h>
> +#include <vlc_list.h>
> +#include <vlc_threads.h>
> +#include "libvlc.h"
> +
> +/**
> + * An executor can spawn several threads.
> + *
> + * This structure contains the data specific to one thread.
> + */
> +struct vlc_executor_thread {
> +    /** Node of vlc_executor.threads list */
> +    struct vlc_list node;
> +
> +    /** The executor owning the thread */
> +    vlc_executor_t *owner;
> +
> +    /** The system thread */
> +    vlc_thread_t thread;
> +
> +    /** The current task executed by the thread, NULL if none */
> +    struct vlc_runnable *current_task;
> +};
> +
> +/**
> + * The executor (also vlc_executor_t, exposed as opaque type in the public
> + * header).
> + */
> +struct vlc_executor {
> +    vlc_mutex_t lock;
> +
> +    /** Maximum number of threads to run the tasks */
> +    unsigned max_threads;
> +
> +    /** List of active vlc_executor_thread */
> +    struct vlc_list threads;
> +
> +    /** Thread count (in a separate field to quickly compare to max_threads) */
> +    unsigned nthreads;
> +
> +    /* Number of tasks requested but not finished. */
> +    unsigned unfinished;
> +
> +    /** Wait for the executor to be idle (i.e. unfinished == 0) */
> +    vlc_cond_t idle_wait;
> +
> +    /** Queue of vlc_runnable */
> +    struct vlc_list queue;
> +
> +    /** Wait for the queue to be non-empty */
> +    vlc_cond_t queue_wait;
> +
> +    /** True if executor deletion is requested */
> +    bool closing;
> +};
> +
> +static void
> +QueuePush(vlc_executor_t *executor, struct vlc_runnable *runnable)
> +{
> +    vlc_mutex_assert(&executor->lock);
> +
> +    vlc_list_append(&runnable->node, &executor->queue);
> +    vlc_cond_signal(&executor->queue_wait);
> +}
> +
> +static struct vlc_runnable *
> +QueueTake(vlc_executor_t *executor)
> +{
> +    vlc_mutex_assert(&executor->lock);
> +
> +    while (!executor->closing && vlc_list_is_empty(&executor->queue))
> +        vlc_cond_wait(&executor->queue_wait, &executor->lock);
> +
> +    if (executor->closing)
> +        return NULL;
> +
> +    struct vlc_runnable *runnable =
> +        vlc_list_first_entry_or_null(&executor->queue, struct vlc_runnable,
> +                                     node);
> +    assert(runnable);
> +    vlc_list_remove(&runnable->node);
> +
> +    /* Set links to NULL to know that it has been taken by a thread in
> +     * vlc_executor_Cancel() */
> +    runnable->node.prev = runnable->node.next = NULL;
> +
> +    return runnable;
> +}
> +
> +static void *
> +ThreadRun(void *userdata)
> +{
> +    struct vlc_executor_thread *thread = userdata;
> +    vlc_executor_t *executor = thread->owner;
> +
> +    vlc_mutex_lock(&executor->lock);
> +
> +    struct vlc_runnable *runnable;
> +    /* When the executor is closing, QueueTake() returns NULL */
> +    while ((runnable = QueueTake(executor)))
> +    {
> +        thread->current_task = runnable;
> +        vlc_mutex_unlock(&executor->lock);
> +
> +        /* Execute the user-provided runnable, without the executor lock */
> +        runnable->run(runnable->userdata);
> +
> +        vlc_mutex_lock(&executor->lock);
> +        thread->current_task = NULL;
> +
> +        assert(executor->unfinished > 0);
> +        --executor->unfinished;
> +        if (!executor->unfinished)
> +            vlc_cond_signal(&executor->idle_wait);
> +    }
> +
> +    vlc_mutex_unlock(&executor->lock);
> +
> +    return NULL;
> +}
> +
> +static int
> +SpawnThread(vlc_executor_t *executor)
> +{
> +    assert(executor->nthreads < executor->max_threads);
> +
> +    struct vlc_executor_thread *thread = malloc(sizeof(*thread));
> +    if (!thread)
> +        return VLC_ENOMEM;
> +
> +    thread->owner = executor;
> +    thread->current_task = NULL;
> +
> +    if (vlc_clone(&thread->thread, ThreadRun, thread, VLC_THREAD_PRIORITY_LOW))
> +    {
> +        free(thread);
> +        return VLC_EGENERIC;
> +    }
> +
> +    executor->nthreads++;
> +    vlc_list_append(&thread->node, &executor->threads);
> +
> +    return VLC_SUCCESS;
> +}
> +
> +vlc_executor_t *
> +vlc_executor_New(unsigned max_threads)
> +{
> +    assert(max_threads);
> +    vlc_executor_t *executor = malloc(sizeof(*executor));
> +    if (!executor)
> +        return NULL;
> +
> +    vlc_mutex_init(&executor->lock);
> +
> +    executor->max_threads = max_threads;
> +    executor->nthreads = 0;
> +    executor->unfinished = 0;
> +
> +    vlc_list_init(&executor->threads);
> +    vlc_list_init(&executor->queue);
> +
> +    vlc_cond_init(&executor->idle_wait);
> +    vlc_cond_init(&executor->queue_wait);
> +
> +    executor->closing = false;
> +
> +    /* Create one thread on init so that vlc_executor_Submit() may never fail */
> +    int ret = SpawnThread(executor);
> +    if (ret != VLC_SUCCESS)
> +    {
> +        free(executor);
> +        return NULL;
> +    }
> +
> +    return executor;
> +}
> +
> +void
> +vlc_executor_Submit(vlc_executor_t *executor, struct vlc_runnable *runnable)
> +{
> +    vlc_mutex_lock(&executor->lock);
> +
> +    assert(!executor->closing);
> +
> +    QueuePush(executor, runnable);
> +
> +    if (++executor->unfinished > executor->nthreads
> +            && executor->nthreads < executor->max_threads)
> +        /* If it fails, this is not an error, there is at least one thread */
> +        SpawnThread(executor);
> +
> +    vlc_mutex_unlock(&executor->lock);
> +}
> +
> +bool
> +vlc_executor_Cancel(vlc_executor_t *executor, struct vlc_runnable *runnable)
> +{
> +    vlc_mutex_lock(&executor->lock);
> +
> +    /* Either both prev and next are set, either both are NULL */
> +    assert(!runnable->node.prev == !runnable->node.next);
> +
> +    bool in_queue = runnable->node.prev;
> +    if (in_queue)
> +    {
> +        vlc_list_remove(&runnable->node);
> +
> +        assert(executor->unfinished > 0);
> +        --executor->unfinished;
> +        if (!executor->unfinished)
> +            vlc_cond_signal(&executor->idle_wait);
> +    }
> +
> +    vlc_mutex_unlock(&executor->lock);
> +
> +    return in_queue;
> +}
> +
> +void
> +vlc_executor_WaitIdle(vlc_executor_t *executor)
> +{
> +    vlc_mutex_lock(&executor->lock);
> +    while (executor->unfinished)
> +        vlc_cond_wait(&executor->idle_wait, &executor->lock);
> +    vlc_mutex_unlock(&executor->lock);
> +}
> +
> +void
> +vlc_executor_Delete(vlc_executor_t *executor)
> +{
> +    vlc_mutex_lock(&executor->lock);
> +
> +    executor->closing = true;
> +
> +    /* All the tasks must be canceled on delete */
> +    assert(vlc_list_is_empty(&executor->queue));
> +
> +    vlc_mutex_unlock(&executor->lock);
> +
> +    /* "closing" is now true, this will wake up threads */
> +    vlc_cond_broadcast(&executor->queue_wait);
> +
> +    /* The threads list may not be written at this point, so it is safe to read
> +     * it without mutex locked (the mutex must be released to join the
> +     * threads). */
> +
> +    struct vlc_executor_thread *thread;
> +    vlc_list_foreach(thread, &executor->threads, node)
> +    {
> +        vlc_join(thread->thread, NULL);
> +        free(thread);
> +    }
> +
> +    /* The queue must still be empty (no runnable submitted a new runnable) */
> +    assert(vlc_list_is_empty(&executor->queue));
> +
> +    /* There are no tasks anymore */
> +    assert(!executor->unfinished);
> +
> +    free(executor);
> +}
> --
> 2.28.0
>
> _______________________________________________
> vlc-devel mailing list
> To unsubscribe or modify your subscription options:
> https://mailman.videolan.org/listinfo/vlc-devel


More information about the vlc-devel mailing list