[vlc-devel] [RFC v2 1/2] executor: introduce new executor API

Steve Lhomme robux4 at ycbcr.xyz
Wed Sep 2 08:13:15 CEST 2020


On 2020-09-01 18:13, Romain Vimont wrote:
> Introduce a new API to execute "runnables".
> ---
>   include/vlc_executor.h | 149 +++++++++++++++++++++++
>   src/Makefile.am        |   1 +
>   src/libvlccore.sym     |   4 +
>   src/misc/executor.c    | 263 +++++++++++++++++++++++++++++++++++++++++
>   4 files changed, 417 insertions(+)
>   create mode 100644 include/vlc_executor.h
>   create mode 100644 src/misc/executor.c
> 
> diff --git a/include/vlc_executor.h b/include/vlc_executor.h
> new file mode 100644
> index 0000000000..63dd187387
> --- /dev/null
> +++ b/include/vlc_executor.h
> @@ -0,0 +1,149 @@
> +/*****************************************************************************
> + * vlc_executor.h
> + *****************************************************************************
> + * Copyright (C) 2020 Videolabs, VLC authors and VideoLAN
> + *
> + * This program is free software; you can redistribute it and/or modify it
> + * under the terms of the GNU Lesser General Public License as published by
> + * the Free Software Foundation; either version 2.1 of the License, or
> + * (at your option) any later version.
> + *
> + * This program is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> + * GNU Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public License
> + * along with this program; if not, write to the Free Software Foundation,
> + * Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
> + *****************************************************************************/
> +
> +#ifndef VLC_EXECUTOR_H
> +#define VLC_EXECUTOR_H
> +
> +#include <vlc_common.h>
> +#include <vlc_list.h>
> +
> +# ifdef __cplusplus
> +extern "C" {
> +# endif
> +
> +/** Executor type (opaque) */
> +typedef struct vlc_executor vlc_executor_t;
> +
> +/**
> + * A Runnable is intended to be run from another thread by an executor.
> + */
> +struct vlc_runnable {
> +
> +    /**
> +     * This function is to be executed by a vlc_executor_t.
> +     *
> +     * It must implement the actions (arbitrarily long) to execute from an
> +     * executor thread, synchronously. As soon as run() returns, the execution
> +     * of this runnable is complete.
> +     *
> +     * After the runnable is submitted to an executor via
> +     * vlc_executor_Submit(), the run() function is executed at most once (zero
> +     * if the execution is canceled before it was started).
> +     *
> +     * It may not be NULL.

It MUST not be NULL. Otherwise there's no point in a runnable at all.

> +     *
> +     * \param userdata the userdata provided to vlc_executor_Submit()
> +     */
> +    void (*run)(void *userdata);

Since the userdata is also part of the vlc_runnable, you may pass the 
vlc_runnable instead of the userdata. It allows more flexibilty in the 
future (for example if you want to do actions on the runnable, like run 
it again).

> +
> +    /**
> +     * Userdata passed back to run().
> +     */
> +    void *userdata;
> +
> +    /* Private data used by the vlc_executor_t (do not touch) */
> +    struct vlc_list node;
> +};
> +
> +/**
> + * Create a new executor.
> + *
> + * \param max_threads the maximum number of threads used to execute runnables
> + * \return a pointer to a new executor, or NULL if an error occurred
> + */
> +VLC_API vlc_executor_t *
> +vlc_executor_New(unsigned max_threads);
> +
> +/**
> + * Delete an executor.
> + *
> + * Wait for all the threads to complete, and delete the executor instance.
> + *
> + * \param executor the executor
> + */
> +VLC_API void
> +vlc_executor_Delete(vlc_executor_t *executor);
> +
> +/**
> + * Submit a runnable for execution.
> + *
> + * The struct vlc_runnable is not copied, it must exist until the end of the
> + * execution (the user is expected to embed it in its own task structure).
> + *
> + * Here is a simple example:
> + *
> + * \code{c}
> + *  struct my_task {
> + *      char *str;
> + *      struct vlc_runnable runnable;
> + *  };
> + *
> + *  static void Run(void *userdata)
> + *  {
> + *      struct my_task *task = userdata;

Here it would become a container_of(), which is still a cast but cleaner 
IMO.

> + *
> + *      printf("start of %s\n", task->str);
> + *      vlc_tick_sleep(VLC_TICK_FROM_SEC(3)); // long action
> + *      printf("end of %s\n", task->str);
> + *
> + *      free(task->str);
> + *      free(task);
> + *  }
> + *
> + *  void foo(vlc_executor_t *executor, const char *str)
> + *  {
> + *      // no error handling for brevity
> + *      struct my_task *task = malloc(sizeof(*task));
> + *      task->str = strdup(str);
> + *      task->runnable.run = Run;
> + *      task->runnable.userdata = task;

And in fact you don't need any userdata at all. You just wrap your 
runnable in a structure with the data you want. (as an object that would 
support the runnable interface)

> + *      vlc_executor_Submit(executor, &task->runnable);
> + *  }
> + * \endcode
> + *
> + * \param executor the executor
> + * \param runnable the task to run
> + */
> +VLC_API void
> +vlc_executor_Submit(vlc_executor_t *executor, struct vlc_runnable *runnable);
> +
> +/**
> + * Cancel a runnable previously submitted.
> + *
> + * If this runnable is still queued (i.e. it has not be run yet), then dequeue
> + * it so that it will never be run, and return true.
> + *
> + * Otherwise, this runnable has already been taken by an executor thread (it is
> + * still running or is complete). In that case, do nothing, and return false.
> + *
> + * The result is undefined if the runnable has not been submitted.
> + *
> + * \param executor the executor
> + * \param runnable the task to cancel
> + * \return true if the runnable has been canceled before execution
> + */
> +VLC_API bool
> +vlc_executor_Cancel(vlc_executor_t *executor, struct vlc_runnable *runnable);

It might be worth writing somewhere that the executor will not free the 
vlc_runnable, it's the responsibility of the caller to do so (or the 
runnable itself, as in your example).

> +
> +# ifdef __cplusplus
> +}
> +# endif
> +
> + #endif
> diff --git a/src/Makefile.am b/src/Makefile.am
> index 6dc46ab3d6..5336fd7ac0 100644
> --- a/src/Makefile.am
> +++ b/src/Makefile.am
> @@ -352,6 +352,7 @@ libvlccore_la_SOURCES = \
>   	misc/actions.c \
>   	misc/background_worker.c \
>   	misc/background_worker.h \
> +	misc/executor.c \
>   	misc/md5.c \
>   	misc/probe.c \
>   	misc/rand.c \
> diff --git a/src/libvlccore.sym b/src/libvlccore.sym
> index 2b750873fc..6d72ee03e5 100644
> --- a/src/libvlccore.sym
> +++ b/src/libvlccore.sym
> @@ -967,3 +967,7 @@ vlc_video_context_GetType
>   vlc_video_context_GetPrivate
>   vlc_video_context_Hold
>   vlc_video_context_HoldDevice
> +vlc_executor_New
> +vlc_executor_Delete
> +vlc_executor_Submit
> +vlc_executor_Cancel
> diff --git a/src/misc/executor.c b/src/misc/executor.c
> new file mode 100644
> index 0000000000..274ae3eb84
> --- /dev/null
> +++ b/src/misc/executor.c
> @@ -0,0 +1,263 @@
> +/*****************************************************************************
> + * misc/executor.c
> + *****************************************************************************
> + * Copyright (C) 2020 Videolabs, VLC authors and VideoLAN
> + *
> + * This program is free software; you can redistribute it and/or modify it
> + * under the terms of the GNU Lesser General Public License as published by
> + * the Free Software Foundation; either version 2.1 of the License, or
> + * (at your option) any later version.
> + *
> + * This program is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> + * GNU Lesser General Public License for more details.
> + *
> + * You should have received a copy of the GNU Lesser General Public License
> + * along with this program; if not, write to the Free Software Foundation,
> + * Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
> + *****************************************************************************/
> +
> +#ifdef HAVE_CONFIG_H
> +# include "config.h"
> +#endif
> +
> +#include <vlc_executor.h>
> +
> +#include <vlc_atomic.h>
> +#include <vlc_list.h>
> +#include <vlc_threads.h>
> +#include "libvlc.h"
> +
> +/**
> + * An executor can spawn several threads.
> + *
> + * This structure contains the data specific to one thread.
> + */
> +struct vlc_executor_thread {
> +    /** Node of vlc_executor.threads list */
> +    struct vlc_list node;
> +
> +    /** The executor owning the thread */
> +    vlc_executor_t *owner;
> +
> +    /** The system thread */
> +    vlc_thread_t thread;
> +
> +    /** The current task executed by the thread, NULL if none */
> +    struct vlc_runnable *current_task;
> +};
> +
> +/**
> + * The executor (also vlc_executor_t, exposed as opaque type in the public
> + * header).
> + */
> +struct vlc_executor {
> +    vlc_mutex_t lock;
> +
> +    /** Maximum number of threads to run the tasks */
> +    unsigned max_threads;
> +
> +    /** List of active vlc_executor_thread */
> +    struct vlc_list threads;
> +
> +    /** Thread count (in a separate field to quickly compare to max_threads) */
> +    unsigned nthreads;
> +
> +    /* Number of tasks requested but not finished. */
> +    unsigned unfinished;
> +
> +    /** Queue of vlc_runnable */
> +    struct vlc_list queue;
> +
> +    /** Wait for the queue to be non-empty */
> +    vlc_cond_t queue_wait;
> +
> +    /** True if executor deletion is requested */
> +    bool closing;
> +};
> +
> +static void
> +QueuePush(vlc_executor_t *executor, struct vlc_runnable *runnable)
> +{
> +    vlc_mutex_assert(&executor->lock);
> +
> +    vlc_list_append(&runnable->node, &executor->queue);
> +    vlc_cond_signal(&executor->queue_wait);
> +}
> +
> +static struct vlc_runnable *
> +QueueTake(vlc_executor_t *executor)
> +{
> +    vlc_mutex_assert(&executor->lock);
> +
> +    while (!executor->closing && vlc_list_is_empty(&executor->queue))
> +        vlc_cond_wait(&executor->queue_wait, &executor->lock);
> +
> +    if (executor->closing)
> +        return NULL;
> +
> +    struct vlc_runnable *runnable =
> +        vlc_list_first_entry_or_null(&executor->queue, struct vlc_runnable,
> +                                     node);
> +    assert(runnable);
> +    vlc_list_remove(&runnable->node);
> +
> +    /* Set links to NULL to know that it has been taken by a thread in
> +     * vlc_executor_Cancel() */
> +    runnable->node.prev = runnable->node.next = NULL;
> +
> +    return runnable;
> +}
> +
> +static void *
> +ThreadRun(void *userdata)
> +{
> +    struct vlc_executor_thread *thread = userdata;
> +    vlc_executor_t *executor = thread->owner;
> +
> +    vlc_mutex_lock(&executor->lock);
> +
> +    for (;;)
> +    {
> +        struct vlc_runnable *runnable = QueueTake(executor);
> +        if (!runnable)
> +            /* The executor is closing, terminate this thread */
> +            break;
> +
> +        thread->current_task = runnable;
> +        vlc_mutex_unlock(&executor->lock);
> +
> +        /* Execute the user-provided runnable, without the executor lock */
> +        runnable->run(runnable->userdata);
> +
> +        vlc_mutex_lock(&executor->lock);
> +        thread->current_task = NULL;
> +
> +        assert(executor->unfinished > 0);
> +        --executor->unfinished;
> +    }
> +
> +    vlc_mutex_unlock(&executor->lock);
> +
> +    return NULL;
> +}
> +
> +static int
> +SpawnThread(vlc_executor_t *executor)
> +{
> +    assert(executor->nthreads < executor->max_threads);
> +
> +    struct vlc_executor_thread *thread = malloc(sizeof(*thread));
> +    if (!thread)
> +        return VLC_ENOMEM;
> +
> +    thread->owner = executor;
> +    thread->current_task = NULL;
> +
> +    if (vlc_clone(&thread->thread, ThreadRun, thread, VLC_THREAD_PRIORITY_LOW))
> +    {
> +        free(thread);
> +        return VLC_EGENERIC;
> +    }
> +
> +    executor->nthreads++;
> +    vlc_list_append(&thread->node, &executor->threads);
> +
> +    return VLC_SUCCESS;
> +}
> +
> +vlc_executor_t *
> +vlc_executor_New(unsigned max_threads)
> +{
> +    assert(max_threads);
> +    vlc_executor_t *executor = malloc(sizeof(*executor));
> +    if (!executor)
> +        return NULL;
> +
> +    vlc_mutex_init(&executor->lock);
> +
> +    executor->max_threads = max_threads;
> +    executor->nthreads = 0;
> +    executor->unfinished = 0;
> +
> +    vlc_list_init(&executor->threads);
> +    vlc_list_init(&executor->queue);
> +
> +    vlc_cond_init(&executor->queue_wait);
> +
> +    executor->closing = false;
> +
> +    /* Create one thread on init so that vlc_executor_Submit() may never fail */
> +    int ret = SpawnThread(executor);
> +    if (ret != VLC_SUCCESS)
> +    {
> +        free(executor);
> +        return NULL;
> +    }
> +
> +    return executor;
> +}
> +
> +void
> +vlc_executor_Submit(vlc_executor_t *executor, struct vlc_runnable *runnable)
> +{
> +    vlc_mutex_lock(&executor->lock);
> +
> +    assert(!executor->closing);

It might be legitimate to try to submit a task even if the executor is 
closing. For example if a runnable is trying to submit another runnable. 
It should have to check with some higher power if it's allowed to do so 
or not (atomically). So at least you should just return here.

> +
> +    QueuePush(executor, runnable);
> +
> +    if (++executor->unfinished > executor->nthreads
> +            && executor->nthreads < executor->max_threads)
> +        /* If it fails, this is not an error, there is at least one thread */
> +        SpawnThread(executor);
> +
> +    vlc_mutex_unlock(&executor->lock);
> +}
> +
> +bool
> +vlc_executor_Cancel(vlc_executor_t *executor, struct vlc_runnable *runnable)
> +{
> +    vlc_mutex_lock(&executor->lock);
> +
> +    /* Either both prev and next are set, either both are NULL */
> +    assert(!!runnable->node.prev == !!runnable->node.next);
> +
> +    bool in_queue = runnable->node.prev;
> +    if (in_queue)
> +        vlc_list_remove(&runnable->node);
> +
> +    vlc_mutex_unlock(&executor->lock);
> +
> +    return in_queue;
> +}
> +
> +void
> +vlc_executor_Delete(vlc_executor_t *executor)
> +{
> +    vlc_mutex_lock(&executor->lock);
> +
> +    executor->closing = true;
> +
> +    /* All the tasks must be canceled on delete */
> +    assert(vlc_list_is_empty(&executor->queue));
> +
> +    vlc_mutex_unlock(&executor->lock);
> +
> +    /* "closing" is now true, this will wake up threads */
> +    vlc_cond_broadcast(&executor->queue_wait);
> +
> +    /* The threads list may not be written at this point, so it is safe to read
> +     * it without mutex locked (the mutex must be released to join the
> +     * threads). */
> +
> +    struct vlc_executor_thread *thread;
> +    vlc_list_foreach(thread, &executor->threads, node)
> +    {
> +        vlc_join(thread->thread, NULL);
> +        free(thread);
> +    }
> +
> +    free(executor);
> +}
> -- 
> 2.28.0
> 
> _______________________________________________
> vlc-devel mailing list
> To unsubscribe or modify your subscription options:
> https://mailman.videolan.org/listinfo/vlc-devel
> 


More information about the vlc-devel mailing list