[vlc-devel] [PATCH v2 1/8] executor: introduce new executor API

Romain Vimont rom1v at videolabs.io
Tue Oct 6 10:08:18 CEST 2020


On Sat, Sep 19, 2020 at 12:24:34PM +0200, Alexandre Janniaux wrote:
> Hi,
> 
> The patchset seems to need rebasing. Does it need any
> additional changes?
> 
> A few nits:
> 
> On Sun, Sep 13, 2020 at 09:11:14PM +0200, Romain Vimont wrote:
> > Introduce a new API to execute "runnables" from background threads.
> >
> > The final design is a result of discussions on the mailing-list:
> >  - <https://mailman.videolan.org/pipermail/vlc-devel/2020-August/136696.html>
> >  - <https://mailman.videolan.org/pipermail/vlc-devel/2020-September/136944.html>
> > ---
> >  include/vlc_executor.h | 183 ++++++++++++++++++++++++++
> >  src/Makefile.am        |   2 +
> >  src/libvlccore.sym     |   5 +
> >  src/misc/executor.c    | 288 +++++++++++++++++++++++++++++++++++++++++
> >  4 files changed, 478 insertions(+)
> >  create mode 100644 include/vlc_executor.h
> >  create mode 100644 src/misc/executor.c
> >
> > diff --git a/include/vlc_executor.h b/include/vlc_executor.h
> > new file mode 100644
> > index 0000000000..08a7f00472
> > --- /dev/null
> > +++ b/include/vlc_executor.h
> > @@ -0,0 +1,183 @@
> > +/*****************************************************************************
> > + * vlc_executor.h
> > + *****************************************************************************
> > + * Copyright (C) 2020 Videolabs, VLC authors and VideoLAN
> > + *
> > + * This program is free software; you can redistribute it and/or modify it
> > + * under the terms of the GNU Lesser General Public License as published by
> > + * the Free Software Foundation; either version 2.1 of the License, or
> > + * (at your option) any later version.
> > + *
> > + * This program is distributed in the hope that it will be useful,
> > + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> > + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> > + * GNU Lesser General Public License for more details.
> > + *
> > + * You should have received a copy of the GNU Lesser General Public License
> > + * along with this program; if not, write to the Free Software Foundation,
> > + * Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
> > + *****************************************************************************/
> > +
> > +#ifndef VLC_EXECUTOR_H
> > +#define VLC_EXECUTOR_H
> > +
> > +#include <vlc_common.h>
> > +#include <vlc_list.h>
> > +
> > +# ifdef __cplusplus
> > +extern "C" {
> > +# endif
> > +
> > +/** Executor type (opaque) */
> > +typedef struct vlc_executor vlc_executor_t;
> > +
> > +/**
> > + * A Runnable is intended to be run from another thread by an executor.
> > + */
> 
> I'm not sure the wording is correct. Technically if we submit a task
> from another task within a single threaded executor, it would be
> executed from the same thread.
> 
> I'd suggest
> 
>     /**
>      * A Runnable encapsulate a sequential task that will be ran into
>      * an async execution environment, typically executed from another
>      * thread.
>      */
> 
> Or something simpler of the same taste.

OK, I simplified to:

> A Runnable encapsulates a task to be run from an executor thread.

> > +struct vlc_runnable {
> > +
> > +    /**
> > +     * This function is to be executed by a vlc_executor_t.
> > +     *
> > +     * It must implement the actions (arbitrarily long) to execute from an
> > +     * executor thread, synchronously. As soon as run() returns, the execution
> > +     * of this runnable is complete.
> > +     *
> > +     * After the runnable is submitted to an executor via
> > +     * vlc_executor_Submit(), the run() function is executed at most once (zero
> > +     * if the execution is canceled before it was started).
> > +     *
> > +     * It must not be NULL.
> > +     *
> > +     * \param userdata the userdata provided to vlc_executor_Submit()
> > +     */
> > +    void (*run)(void *userdata);
> 
> Did we find a consensus wrt. whether we exposes the userdata
> or the task itself here?

I think the majority was in favor of userdata here. IMO it would be
useless to expose the runnable itself, since the only thing the user
could do from that runnable is access the userata.

> > +
> > +    /**
> > +     * Userdata passed back to run().
> > +     */
> > +    void *userdata;
> > +
> > +    /* Private data used by the vlc_executor_t (do not touch) */
> > +    struct vlc_list node;
> > +};
> > +
> > +/**
> > + * Create a new executor.
> > + *
> > + * \param max_threads the maximum number of threads used to execute runnables
> > + * \return a pointer to a new executor, or NULL if an error occurred
> > + */
> > +VLC_API vlc_executor_t *
> > +vlc_executor_New(unsigned max_threads);
> > +
> > +/**
> > + * Delete an executor.
> > + *
> > + * Wait for all the threads to complete, and delete the executor instance.
> > + *
> > + * All submitted tasks must be either started or explicitly canceled. To wait
> > + * for all tasks to complete, use vlc_executor_WaitIdle().
> > + *
> > + * It is an error to submit a new runnable after vlc_executor_Delete() is
> > + * called. In particular, a running task must not submit a new runnable once
> > + * deletion has been requested.
> 
> Here we can probably say that it's even undefined and not just
> an error since we access deallocated values.

Here, "it is an error to" means "it is illegal to do it" (it is a
programmer error).

> > + *
> > + * \param executor the executor
> > + */
> > +VLC_API void
> > +vlc_executor_Delete(vlc_executor_t *executor);
> > +
> > +/**
> > + * Submit a runnable for execution.
> > + *
> > + * The struct vlc_runnable is not copied, it must exist until the end of the
> > + * execution (the user is expected to embed it in its own task structure).
> > + *
> > + * Here is a simple example:
> > + *
> > + * \code{c}
> > + *  struct my_task {
> > + *      char *str;
> > + *      struct vlc_runnable runnable;
> > + *  };
> > + *
> > + *  static void Run(void *userdata)
> > + *  {
> > + *      struct my_task *task = userdata;
> > + *
> > + *      printf("start of %s\n", task->str);
> > + *      vlc_tick_sleep(VLC_TICK_FROM_SEC(3)); // long action
> > + *      printf("end of %s\n", task->str);
> > + *
> > + *      free(task->str);
> > + *      free(task);
> > + *  }
> > + *
> > + *  void foo(vlc_executor_t *executor, const char *str)
> > + *  {
> > + *      // no error handling for brevity
> > + *      struct my_task *task = malloc(sizeof(*task));
> > + *      task->str = strdup(str);
> > + *      task->runnable.run = Run;
> > + *      task->runnable.userdata = task;
> > + *      vlc_executor_Submit(executor, &task->runnable);
> > + *  }
> > + * \endcode
> > + *
> > + * A runnable instance is intended to be submitted at most once. The caller is
> > + * expected to allocate a new task structure (embedding the runnable) for every
> > + * submission.
> > + *
> > + * More precisely, it is incorrect to submit a runnable already submitted that
> > + * is still in the pending queue (i.e. not canceled or started). This is due to
> > + * the intrusive linked list of runnables.
> > + *
> > + * It is strongly discouraged to submit a runnable that its currently running
> 
> s/its/is/

Done.

> > + * on the executor (unless you are prepared for the run() callback to be run
> > + * several times in parallel).
> 
> It could quickly mention how to duplicate a task and gotcha,
> except if you really want to avoid encouraging this kind of
> construction.

Just discouraging to submit a runnable previously submitted is
sufficient IMO (the user just have to submit another runnable).

> > + *
> > + * For simplicity, it is discouraged to submit a runnable previously submitted.
> > + *
> > + * \param executor the executor
> > + * \param runnable the task to run
> > + */
> > +VLC_API void
> > +vlc_executor_Submit(vlc_executor_t *executor, struct vlc_runnable *runnable);
> > +
> > +/**
> > + * Cancel a runnable previously submitted.
> > + *
> > + * If this runnable is still queued (i.e. it has not be run yet), then dequeue
> > + * it so that it will never be run, and return true.
> > + *
> > + * Otherwise, this runnable has already been taken by an executor thread (it is
> > + * still running or is complete). In that case, do nothing, and return false.
> > + *
> > + * This is an error to pass a runnable not submitted to this executor (the
> > + * result is undefined in that case).
> > + *
> > + * Note that the runnable instance is owned by the caller, so the executor will
> > + * never attempt to free it.
> > + *
> > + * \param executor the executor
> > + * \param runnable the task to cancel
> > + * \retval true if the runnable has been canceled before execution
> > + * \retval false if the runnable has not been canceled
> > + */
> > +VLC_API bool
> > +vlc_executor_Cancel(vlc_executor_t *executor, struct vlc_runnable *runnable);
> > +
> > +/**
> > + * Wait until all submitted tasks are completed or canceled.
> > + *
> > + * \param executor the executor
> > + */
> > +VLC_API void
> > +vlc_executor_WaitIdle(vlc_executor_t *executor);
> > +
> > +# ifdef __cplusplus
> > +}
> > +# endif
> > +
> > + #endif
> > diff --git a/src/Makefile.am b/src/Makefile.am
> > index 6dc46ab3d6..722292e6d1 100644
> > --- a/src/Makefile.am
> > +++ b/src/Makefile.am
> > @@ -49,6 +49,7 @@ pluginsinclude_HEADERS = \
> >  	../include/vlc_es.h \
> >  	../include/vlc_es_out.h \
> >  	../include/vlc_events.h \
> > +	../include/vlc_executor.h \
> >  	../include/vlc_filter.h \
> >  	../include/vlc_fourcc.h \
> >  	../include/vlc_fs.h \
> > @@ -352,6 +353,7 @@ libvlccore_la_SOURCES = \
> >  	misc/actions.c \
> >  	misc/background_worker.c \
> >  	misc/background_worker.h \
> > +	misc/executor.c \
> >  	misc/md5.c \
> >  	misc/probe.c \
> >  	misc/rand.c \
> > diff --git a/src/libvlccore.sym b/src/libvlccore.sym
> > index 31fcd37cba..38bab22184 100644
> > --- a/src/libvlccore.sym
> > +++ b/src/libvlccore.sym
> > @@ -966,3 +966,8 @@ vlc_video_context_GetType
> >  vlc_video_context_GetPrivate
> >  vlc_video_context_Hold
> >  vlc_video_context_HoldDevice
> > +vlc_executor_New
> > +vlc_executor_Delete
> > +vlc_executor_Submit
> > +vlc_executor_Cancel
> > +vlc_executor_WaitIdle
> > diff --git a/src/misc/executor.c b/src/misc/executor.c
> > new file mode 100644
> > index 0000000000..3b79f9ba8d
> > --- /dev/null
> > +++ b/src/misc/executor.c
> > @@ -0,0 +1,288 @@
> > +/*****************************************************************************
> > + * misc/executor.c
> > + *****************************************************************************
> > + * Copyright (C) 2020 Videolabs, VLC authors and VideoLAN
> > + *
> > + * This program is free software; you can redistribute it and/or modify it
> > + * under the terms of the GNU Lesser General Public License as published by
> > + * the Free Software Foundation; either version 2.1 of the License, or
> > + * (at your option) any later version.
> > + *
> > + * This program is distributed in the hope that it will be useful,
> > + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> > + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> > + * GNU Lesser General Public License for more details.
> > + *
> > + * You should have received a copy of the GNU Lesser General Public License
> > + * along with this program; if not, write to the Free Software Foundation,
> > + * Inc., 51 Franklin Street, Fifth Floor, Boston MA 02110-1301, USA.
> > + *****************************************************************************/
> > +
> > +#ifdef HAVE_CONFIG_H
> > +# include "config.h"
> > +#endif
> > +
> > +#include <vlc_executor.h>
> > +
> > +#include <vlc_atomic.h>
> > +#include <vlc_list.h>
> > +#include <vlc_threads.h>
> > +#include "libvlc.h"
> > +
> > +/**
> > + * An executor can spawn several threads.
> > + *
> > + * This structure contains the data specific to one thread.
> > + */
> > +struct vlc_executor_thread {
> > +    /** Node of vlc_executor.threads list */
> > +    struct vlc_list node;
> > +
> > +    /** The executor owning the thread */
> > +    vlc_executor_t *owner;
> > +
> > +    /** The system thread */
> > +    vlc_thread_t thread;
> > +
> > +    /** The current task executed by the thread, NULL if none */
> > +    struct vlc_runnable *current_task;
> > +};
> > +
> > +/**
> > + * The executor (also vlc_executor_t, exposed as opaque type in the public
> > + * header).
> > + */
> > +struct vlc_executor {
> > +    vlc_mutex_t lock;
> > +
> > +    /** Maximum number of threads to run the tasks */
> > +    unsigned max_threads;
> > +
> > +    /** List of active vlc_executor_thread */
> > +    struct vlc_list threads;
> > +
> > +    /** Thread count (in a separate field to quickly compare to max_threads) */
> > +    unsigned nthreads;
> > +
> > +    /* Number of tasks requested but not finished. */
> > +    unsigned unfinished;
> > +
> > +    /** Wait for the executor to be idle (i.e. unfinished == 0) */
> > +    vlc_cond_t idle_wait;
> > +
> > +    /** Queue of vlc_runnable */
> > +    struct vlc_list queue;
> > +
> > +    /** Wait for the queue to be non-empty */
> > +    vlc_cond_t queue_wait;
> > +
> > +    /** True if executor deletion is requested */
> > +    bool closing;
> > +};
> > +
> > +static void
> > +QueuePush(vlc_executor_t *executor, struct vlc_runnable *runnable)
> > +{
> > +    vlc_mutex_assert(&executor->lock);
> > +
> > +    vlc_list_append(&runnable->node, &executor->queue);
> > +    vlc_cond_signal(&executor->queue_wait);
> > +}
> > +
> > +static struct vlc_runnable *
> > +QueueTake(vlc_executor_t *executor)
> > +{
> > +    vlc_mutex_assert(&executor->lock);
> > +
> > +    while (!executor->closing && vlc_list_is_empty(&executor->queue))
> > +        vlc_cond_wait(&executor->queue_wait, &executor->lock);
> > +
> > +    if (executor->closing)
> > +        return NULL;
> > +
> > +    struct vlc_runnable *runnable =
> > +        vlc_list_first_entry_or_null(&executor->queue, struct vlc_runnable,
> > +                                     node);
> > +    assert(runnable);
> > +    vlc_list_remove(&runnable->node);
> > +
> > +    /* Set links to NULL to know that it has been taken by a thread in
> > +     * vlc_executor_Cancel() */
> > +    runnable->node.prev = runnable->node.next = NULL;
> > +
> > +    return runnable;
> > +}
> > +
> > +static void *
> > +ThreadRun(void *userdata)
> > +{
> > +    struct vlc_executor_thread *thread = userdata;
> > +    vlc_executor_t *executor = thread->owner;
> > +
> > +    vlc_mutex_lock(&executor->lock);
> > +
> > +    struct vlc_runnable *runnable;
> > +    /* When the executor is closing, QueueTake() returns NULL */
> > +    while ((runnable = QueueTake(executor)))
> > +    {
> > +        thread->current_task = runnable;
> > +        vlc_mutex_unlock(&executor->lock);
> > +
> > +        /* Execute the user-provided runnable, without the executor lock */
> > +        runnable->run(runnable->userdata);
> > +
> > +        vlc_mutex_lock(&executor->lock);
> > +        thread->current_task = NULL;
> > +
> > +        assert(executor->unfinished > 0);
> > +        --executor->unfinished;
> > +        if (!executor->unfinished)
> > +            vlc_cond_signal(&executor->idle_wait);
> > +    }
> > +
> > +    vlc_mutex_unlock(&executor->lock);
> > +
> > +    return NULL;
> > +}
> > +
> > +static int
> > +SpawnThread(vlc_executor_t *executor)
> > +{
> > +    assert(executor->nthreads < executor->max_threads);
> > +
> > +    struct vlc_executor_thread *thread = malloc(sizeof(*thread));
> > +    if (!thread)
> > +        return VLC_ENOMEM;
> > +
> > +    thread->owner = executor;
> > +    thread->current_task = NULL;
> > +
> > +    if (vlc_clone(&thread->thread, ThreadRun, thread, VLC_THREAD_PRIORITY_LOW))
> > +    {
> > +        free(thread);
> > +        return VLC_EGENERIC;
> > +    }
> > +
> > +    executor->nthreads++;
> > +    vlc_list_append(&thread->node, &executor->threads);
> > +
> > +    return VLC_SUCCESS;
> > +}
> > +
> > +vlc_executor_t *
> > +vlc_executor_New(unsigned max_threads)
> > +{
> > +    assert(max_threads);
> > +    vlc_executor_t *executor = malloc(sizeof(*executor));
> > +    if (!executor)
> > +        return NULL;
> > +
> > +    vlc_mutex_init(&executor->lock);
> > +
> > +    executor->max_threads = max_threads;
> > +    executor->nthreads = 0;
> > +    executor->unfinished = 0;
> > +
> > +    vlc_list_init(&executor->threads);
> > +    vlc_list_init(&executor->queue);
> > +
> > +    vlc_cond_init(&executor->idle_wait);
> > +    vlc_cond_init(&executor->queue_wait);
> > +
> > +    executor->closing = false;
> > +
> > +    /* Create one thread on init so that vlc_executor_Submit() may never fail */
> > +    int ret = SpawnThread(executor);
> > +    if (ret != VLC_SUCCESS)
> > +    {
> > +        free(executor);
> > +        return NULL;
> > +    }
> > +
> > +    return executor;
> > +}
> > +
> > +void
> > +vlc_executor_Submit(vlc_executor_t *executor, struct vlc_runnable *runnable)
> > +{
> > +    vlc_mutex_lock(&executor->lock);
> > +
> > +    assert(!executor->closing);
> > +
> > +    QueuePush(executor, runnable);
> > +
> > +    if (++executor->unfinished > executor->nthreads
> > +            && executor->nthreads < executor->max_threads)
> > +        /* If it fails, this is not an error, there is at least one thread */
> > +        SpawnThread(executor);
> > +
> > +    vlc_mutex_unlock(&executor->lock);
> > +}
> > +
> > +bool
> > +vlc_executor_Cancel(vlc_executor_t *executor, struct vlc_runnable *runnable)
> > +{
> > +    vlc_mutex_lock(&executor->lock);
> > +
> > +    /* Either both prev and next are set, either both are NULL */
> > +    assert(!runnable->node.prev == !runnable->node.next);
> > +
> > +    bool in_queue = runnable->node.prev;
> > +    if (in_queue)
> > +    {
> > +        vlc_list_remove(&runnable->node);
> > +
> > +        assert(executor->unfinished > 0);
> > +        --executor->unfinished;
> > +        if (!executor->unfinished)
> > +            vlc_cond_signal(&executor->idle_wait);
> > +    }
> > +
> > +    vlc_mutex_unlock(&executor->lock);
> > +
> > +    return in_queue;
> > +}
> > +
> > +void
> > +vlc_executor_WaitIdle(vlc_executor_t *executor)
> > +{
> > +    vlc_mutex_lock(&executor->lock);
> > +    while (executor->unfinished)
> > +        vlc_cond_wait(&executor->idle_wait, &executor->lock);
> > +    vlc_mutex_unlock(&executor->lock);
> > +}
> > +
> > +void
> > +vlc_executor_Delete(vlc_executor_t *executor)
> > +{
> > +    vlc_mutex_lock(&executor->lock);
> > +
> > +    executor->closing = true;
> > +
> > +    /* All the tasks must be canceled on delete */
> > +    assert(vlc_list_is_empty(&executor->queue));
> > +
> > +    vlc_mutex_unlock(&executor->lock);
> > +
> > +    /* "closing" is now true, this will wake up threads */
> > +    vlc_cond_broadcast(&executor->queue_wait);
> > +
> > +    /* The threads list may not be written at this point, so it is safe to read
> > +     * it without mutex locked (the mutex must be released to join the
> > +     * threads). */
> > +
> > +    struct vlc_executor_thread *thread;
> > +    vlc_list_foreach(thread, &executor->threads, node)
> > +    {
> > +        vlc_join(thread->thread, NULL);
> > +        free(thread);
> > +    }
> > +
> > +    /* The queue must still be empty (no runnable submitted a new runnable) */
> > +    assert(vlc_list_is_empty(&executor->queue));
> > +
> > +    /* There are no tasks anymore */
> > +    assert(!executor->unfinished);
> > +
> > +    free(executor);
> > +}
> > --
> > 2.28.0
> >
> > _______________________________________________
> > vlc-devel mailing list
> > To unsubscribe or modify your subscription options:
> > https://mailman.videolan.org/listinfo/vlc-devel
> _______________________________________________
> vlc-devel mailing list
> To unsubscribe or modify your subscription options:
> https://mailman.videolan.org/listinfo/vlc-devel


More information about the vlc-devel mailing list